DQN Basic

Core DQN modules, pipes, and tooling

Model


DQN

 DQN (state_sz:int, action_sz:int, hidden=512,
      head_layer:fastrl.torch_core.Module=<class
      'torch.nn.modules.linear.Linear'>,
      activition_fn:fastrl.torch_core.Module=<class
      'torch.nn.modules.activation.ReLU'>)

Same as nn.Module, but no need for subclasses to call super().__init__

Type Default Details
state_sz int The input dim of the state
action_sz int The output dim of the actions
hidden int 512 Number of neurons connected between the 2 input/output layers
head_layer Module Linear DQN extensions such as Dueling DQNs have custom heads
activition_fn Module ReLU The activiation fn used by DQN

Agent


DQNAgent

 DQNAgent (model, logger_bases=None, min_epsilon=0.02, max_epsilon=1,
           max_steps=1000, device='cpu', dp_augmentation_fns:Union[List[Ca
           llable[[Union[torch.utils.data.datapipes.datapipe.IterDataPipe,
           torch.utils.data.datapipes.datapipe.MapDataPipe]],Union[torch.u
           tils.data.datapipes.datapipe.IterDataPipe,torch.utils.data.data
           pipes.datapipe.MapDataPipe,NoneType]]],NoneType]=None)
torch.manual_seed(0)
model = DQN(4,2)

agent = DQNAgent(model)
input_tensor = tensor([1,2,3,4]).float()
step = SimpleStep(state=input_tensor)

for action in agent([step]):
    print(action)
    
test_eq(input_tensor,tensor([1., 2., 3., 4.]))
1
from fastrl.envs.gym import *
AgentHead.debug=True
# Setup Logger
logger_base = ProgressBarLogger()

# Setup up the core NN
torch.manual_seed(0)
model = DQN(4,2)

agent = DQNAgent(model,[logger_base])

block = DataBlock(
    GymTransformBlock(agent,n=10)
)
# dls = L(block.dataloaders(['CartPole-v1']*1,n=10,bs=1))
pipes = L(block.datapipes(['CartPole-v1']*1))

# list(dls[0])
list(pipes[0])
traverse(agent)
{139835997488656: (AgentHead,
  {139835997488848: (PyPrimativeConverter,
    {139835997489168: (NumpyConverter,
      {139835997490512: (ArgMaxer,
        {139835997489104: (EpsilonCollector,
          {139835791724432: (EpsilonSelector,
            {139835791724368: (ArgMaxer,
              {139835791808976: (SimpleModelRunner,
                {139835791808912: (InputInjester,
                  {139835791808784: (StepFieldSelector,
                    {139837965380880: (AgentBase,
                      {139835791808400: (ProgressBarLogger, {})})})}),
                 139837965380880: (AgentBase,
                  {139835791808400: (ProgressBarLogger, {})})})}),
             139837965380880: (AgentBase,
              {139835791808400: (ProgressBarLogger, {})})})})})})}),
   139837965380880: (AgentBase, {139835791808400: (ProgressBarLogger, {})})})}

Training DataPipes


QCalc

 QCalc (*args, **kwds)

Iterable-style DataPipe.

All DataPipes that represent an iterable of data samples should subclass this. This style of DataPipes is particularly useful when data come from a stream, or when the number of samples is too large to fit them all in memory. IterDataPipe is lazily initialized and its elements are computed only when next() is called on the iterator of an IterDataPipe.

All subclasses should overwrite :meth:__iter__, which would return an iterator of samples in this DataPipe. Calling __iter__ of an IterDataPipe automatically invokes its method reset(), which by default performs no operation. When writing a custom IterDataPipe, users should override reset() if necessary. The common usages include resetting buffers, pointers, and various state variables within the custom IterDataPipe.

Note: Only one iterator can be valid for each IterDataPipe at a time, and the creation a second iterator will invalidate the first one. This constraint is necessary because some IterDataPipe have internal buffers, whose states can become invalid if there are multiple iterators. The code example below presents details on how this constraint looks in practice. If you have any feedback related to this constraint, please see GitHub IterDataPipe Single Iterator Issue_.

These DataPipes can be invoked in two ways, using the class constructor or applying their functional form onto an existing IterDataPipe (recommended, available to most but not all DataPipes). You can chain multiple IterDataPipe together to form a pipeline that will perform multiple operations in succession.

.. _GitHub IterDataPipe Single Iterator Issue: https://github.com/pytorch/data/issues/45

Note: When a subclass is used with :class:~torch.utils.data.DataLoader, each item in the DataPipe will be yielded from the :class:~torch.utils.data.DataLoader iterator. When :attr:num_workers > 0, each worker process will have a different copy of the DataPipe object, so it is often desired to configure each copy independently to avoid having duplicate data returned from the workers. :func:~torch.utils.data.get_worker_info, when called in a worker process, returns information about the worker. It can be used in either the dataset’s :meth:__iter__ method or the :class:~torch.utils.data.DataLoader ’s :attr:worker_init_fn option to modify each copy’s behavior.

Examples: General Usage: >>> # xdoctest: +SKIP >>> from torchdata.datapipes.iter import IterableWrapper, Mapper >>> dp = IterableWrapper(range(10)) >>> map_dp_1 = Mapper(dp, lambda x: x + 1) # Using class constructor >>> map_dp_2 = dp.map(lambda x: x + 1) # Using functional form (recommended) >>> list(map_dp_1) [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] >>> list(map_dp_2) [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] >>> filter_dp = map_dp_1.filter(lambda x: x % 2 == 0) >>> list(filter_dp) [2, 4, 6, 8, 10] Single Iterator Constraint Example: >>> from torchdata.datapipes.iter import IterableWrapper, Mapper >>> dp = IterableWrapper(range(10)) >>> it1 = iter(source_dp) >>> list(it1) [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] >>> it1 = iter(source_dp) >>> it2 = iter(source_dp) # The creation of a new iterator invalidates it1 >>> next(it2) 0 >>> next(it1) # Further usage of it1 will raise a RunTimeError


TargetCalc

 TargetCalc (*args, **kwds)

Iterable-style DataPipe.

All DataPipes that represent an iterable of data samples should subclass this. This style of DataPipes is particularly useful when data come from a stream, or when the number of samples is too large to fit them all in memory. IterDataPipe is lazily initialized and its elements are computed only when next() is called on the iterator of an IterDataPipe.

All subclasses should overwrite :meth:__iter__, which would return an iterator of samples in this DataPipe. Calling __iter__ of an IterDataPipe automatically invokes its method reset(), which by default performs no operation. When writing a custom IterDataPipe, users should override reset() if necessary. The common usages include resetting buffers, pointers, and various state variables within the custom IterDataPipe.

Note: Only one iterator can be valid for each IterDataPipe at a time, and the creation a second iterator will invalidate the first one. This constraint is necessary because some IterDataPipe have internal buffers, whose states can become invalid if there are multiple iterators. The code example below presents details on how this constraint looks in practice. If you have any feedback related to this constraint, please see GitHub IterDataPipe Single Iterator Issue_.

These DataPipes can be invoked in two ways, using the class constructor or applying their functional form onto an existing IterDataPipe (recommended, available to most but not all DataPipes). You can chain multiple IterDataPipe together to form a pipeline that will perform multiple operations in succession.

.. _GitHub IterDataPipe Single Iterator Issue: https://github.com/pytorch/data/issues/45

Note: When a subclass is used with :class:~torch.utils.data.DataLoader, each item in the DataPipe will be yielded from the :class:~torch.utils.data.DataLoader iterator. When :attr:num_workers > 0, each worker process will have a different copy of the DataPipe object, so it is often desired to configure each copy independently to avoid having duplicate data returned from the workers. :func:~torch.utils.data.get_worker_info, when called in a worker process, returns information about the worker. It can be used in either the dataset’s :meth:__iter__ method or the :class:~torch.utils.data.DataLoader ’s :attr:worker_init_fn option to modify each copy’s behavior.

Examples: General Usage: >>> # xdoctest: +SKIP >>> from torchdata.datapipes.iter import IterableWrapper, Mapper >>> dp = IterableWrapper(range(10)) >>> map_dp_1 = Mapper(dp, lambda x: x + 1) # Using class constructor >>> map_dp_2 = dp.map(lambda x: x + 1) # Using functional form (recommended) >>> list(map_dp_1) [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] >>> list(map_dp_2) [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] >>> filter_dp = map_dp_1.filter(lambda x: x % 2 == 0) >>> list(filter_dp) [2, 4, 6, 8, 10] Single Iterator Constraint Example: >>> from torchdata.datapipes.iter import IterableWrapper, Mapper >>> dp = IterableWrapper(range(10)) >>> it1 = iter(source_dp) >>> list(it1) [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] >>> it1 = iter(source_dp) >>> it2 = iter(source_dp) # The creation of a new iterator invalidates it1 >>> next(it2) 0 >>> next(it1) # Further usage of it1 will raise a RunTimeError


LossCalc

 LossCalc (*args, **kwds)

Iterable-style DataPipe.

All DataPipes that represent an iterable of data samples should subclass this. This style of DataPipes is particularly useful when data come from a stream, or when the number of samples is too large to fit them all in memory. IterDataPipe is lazily initialized and its elements are computed only when next() is called on the iterator of an IterDataPipe.

All subclasses should overwrite :meth:__iter__, which would return an iterator of samples in this DataPipe. Calling __iter__ of an IterDataPipe automatically invokes its method reset(), which by default performs no operation. When writing a custom IterDataPipe, users should override reset() if necessary. The common usages include resetting buffers, pointers, and various state variables within the custom IterDataPipe.

Note: Only one iterator can be valid for each IterDataPipe at a time, and the creation a second iterator will invalidate the first one. This constraint is necessary because some IterDataPipe have internal buffers, whose states can become invalid if there are multiple iterators. The code example below presents details on how this constraint looks in practice. If you have any feedback related to this constraint, please see GitHub IterDataPipe Single Iterator Issue_.

These DataPipes can be invoked in two ways, using the class constructor or applying their functional form onto an existing IterDataPipe (recommended, available to most but not all DataPipes). You can chain multiple IterDataPipe together to form a pipeline that will perform multiple operations in succession.

.. _GitHub IterDataPipe Single Iterator Issue: https://github.com/pytorch/data/issues/45

Note: When a subclass is used with :class:~torch.utils.data.DataLoader, each item in the DataPipe will be yielded from the :class:~torch.utils.data.DataLoader iterator. When :attr:num_workers > 0, each worker process will have a different copy of the DataPipe object, so it is often desired to configure each copy independently to avoid having duplicate data returned from the workers. :func:~torch.utils.data.get_worker_info, when called in a worker process, returns information about the worker. It can be used in either the dataset’s :meth:__iter__ method or the :class:~torch.utils.data.DataLoader ’s :attr:worker_init_fn option to modify each copy’s behavior.

Examples: General Usage: >>> # xdoctest: +SKIP >>> from torchdata.datapipes.iter import IterableWrapper, Mapper >>> dp = IterableWrapper(range(10)) >>> map_dp_1 = Mapper(dp, lambda x: x + 1) # Using class constructor >>> map_dp_2 = dp.map(lambda x: x + 1) # Using functional form (recommended) >>> list(map_dp_1) [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] >>> list(map_dp_2) [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] >>> filter_dp = map_dp_1.filter(lambda x: x % 2 == 0) >>> list(filter_dp) [2, 4, 6, 8, 10] Single Iterator Constraint Example: >>> from torchdata.datapipes.iter import IterableWrapper, Mapper >>> dp = IterableWrapper(range(10)) >>> it1 = iter(source_dp) >>> list(it1) [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] >>> it1 = iter(source_dp) >>> it2 = iter(source_dp) # The creation of a new iterator invalidates it1 >>> next(it2) 0 >>> next(it1) # Further usage of it1 will raise a RunTimeError


ModelLearnCalc

 ModelLearnCalc (*args, **kwds)

Iterable-style DataPipe.

All DataPipes that represent an iterable of data samples should subclass this. This style of DataPipes is particularly useful when data come from a stream, or when the number of samples is too large to fit them all in memory. IterDataPipe is lazily initialized and its elements are computed only when next() is called on the iterator of an IterDataPipe.

All subclasses should overwrite :meth:__iter__, which would return an iterator of samples in this DataPipe. Calling __iter__ of an IterDataPipe automatically invokes its method reset(), which by default performs no operation. When writing a custom IterDataPipe, users should override reset() if necessary. The common usages include resetting buffers, pointers, and various state variables within the custom IterDataPipe.

Note: Only one iterator can be valid for each IterDataPipe at a time, and the creation a second iterator will invalidate the first one. This constraint is necessary because some IterDataPipe have internal buffers, whose states can become invalid if there are multiple iterators. The code example below presents details on how this constraint looks in practice. If you have any feedback related to this constraint, please see GitHub IterDataPipe Single Iterator Issue_.

These DataPipes can be invoked in two ways, using the class constructor or applying their functional form onto an existing IterDataPipe (recommended, available to most but not all DataPipes). You can chain multiple IterDataPipe together to form a pipeline that will perform multiple operations in succession.

.. _GitHub IterDataPipe Single Iterator Issue: https://github.com/pytorch/data/issues/45

Note: When a subclass is used with :class:~torch.utils.data.DataLoader, each item in the DataPipe will be yielded from the :class:~torch.utils.data.DataLoader iterator. When :attr:num_workers > 0, each worker process will have a different copy of the DataPipe object, so it is often desired to configure each copy independently to avoid having duplicate data returned from the workers. :func:~torch.utils.data.get_worker_info, when called in a worker process, returns information about the worker. It can be used in either the dataset’s :meth:__iter__ method or the :class:~torch.utils.data.DataLoader ’s :attr:worker_init_fn option to modify each copy’s behavior.

Examples: General Usage: >>> # xdoctest: +SKIP >>> from torchdata.datapipes.iter import IterableWrapper, Mapper >>> dp = IterableWrapper(range(10)) >>> map_dp_1 = Mapper(dp, lambda x: x + 1) # Using class constructor >>> map_dp_2 = dp.map(lambda x: x + 1) # Using functional form (recommended) >>> list(map_dp_1) [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] >>> list(map_dp_2) [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] >>> filter_dp = map_dp_1.filter(lambda x: x % 2 == 0) >>> list(filter_dp) [2, 4, 6, 8, 10] Single Iterator Constraint Example: >>> from torchdata.datapipes.iter import IterableWrapper, Mapper >>> dp = IterableWrapper(range(10)) >>> it1 = iter(source_dp) >>> list(it1) [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] >>> it1 = iter(source_dp) >>> it2 = iter(source_dp) # The creation of a new iterator invalidates it1 >>> next(it2) 0 >>> next(it1) # Further usage of it1 will raise a RunTimeError


LossCollector

 LossCollector (*args, **kwds)

LogCollector specifically manages finding and attaching itself to LoggerBases found earlier in the pipeline.


DQNLearner

 DQNLearner (model, dls, logger_bases=None, loss_func=MSELoss(),
             opt=<class 'torch.optim.adamw.AdamW'>, lr=0.005, bs=128,
             max_sz=10000, nsteps=1, device=None, batches=None, dp_augment
             ation_fns:Union[List[Callable[[Union[torch.utils.data.datapip
             es.datapipe.IterDataPipe,torch.utils.data.datapipes.datapipe.
             MapDataPipe]],Union[torch.utils.data.datapipes.datapipe.IterD
             ataPipe,torch.utils.data.datapipes.datapipe.MapDataPipe,NoneT
             ype]]],NoneType]=None)

Try training with basic defaults…

# Setup Loggers
logger_base = ProgressBarLogger(epoch_on_pipe=EpocherCollector,
                 batch_on_pipe=BatchCollector)

# Setup up the core NN
torch.manual_seed(0)
model = DQN(4,2).cuda()
# Setup the Agent
agent = DQNAgent(model,[logger_base],max_steps=4000,device='cuda')
# Setup the DataBlock
block = DataBlock(
    GymTransformBlock(agent=agent,nsteps=1,nskips=1,firstlast=False,bs=1)
)

dls = L(block.dataloaders(['CartPole-v1']*1,num_workers=0))

# Setup the Learner
learner = DQNLearner(model,dls,batches=4,#1000,
                     logger_bases=[logger_base],
                      bs=2,#128,
                      max_sz=100_000,device='cuda')
# learner.fit(3)
learner.fit(2)
loss episode rolling_reward epoch batch epsilon
0.44844115 1 None 1 5 0.998500
0.41163164 1 None 1 5 0.997500

If we try a regular DQN with nsteps/nskips it doesnt really converge after 130. We cant expect stability at all, and im pretty sure that nsteps (correctly) tries to reduce to number of duplicated states so that the agent can sample more unique state transitions. The problem with this is that the base dqn is not stable, so giving it lots of “new” stuff, im not sure helps. In otherwords, its going to forget the old stuff very quickly, and having duplicate states helps “remind it”

logger_base = ProgressBarLogger(epoch_on_pipe=EpocherCollector,
                 batch_on_pipe=BatchCollector)

# Setup up the core NN
torch.manual_seed(0)
model = DQN(4,2)
# Setup the Agent
agent = DQNAgent(model,[logger_base],max_steps=10000)
# Setup the DataBlock
block = DataBlock(
    GymTransformBlock(agent=agent,nsteps=2,nskips=2,firstlast=True) # We basically merge 2 steps into 1 and skip. 
)
# pipes = L(block.datapipes(['CartPole-v1']*1,n=10))
dls = L(block.dataloaders(['CartPole-v1']*1))
# Setup the Learner
learner = DQNLearner(model,dls,batches=1000,logger_bases=[logger_base],bs=128,max_sz=20_000,nsteps=2,lr=0.001)
# learner.fit(3)
# learner.fit(20)
import pandas as pd
from fastrl.pipes.core import *
from fastrl.pipes.map.transforms import *
from fastrl.envs.gym import GymTypeTransform,GymStepper
from fastrl.loggers.core import *
from fastrl.loggers.jupyter_visualizers import *
from fastrl.loggers.vscode_visualizers import *
# Setup Loggers
logger_base = ProgressBarLogger(epoch_on_pipe=EpocherCollector,
                 batch_on_pipe=BatchCollector)

# Setup up the core NN
torch.manual_seed(0)
model = DQN(8,4)
# Setup the Agent
agent = DQNAgent(model,[logger_base])
# Setup the DataBlock
block = DataBlock(
    GymTransformBlock(agent=agent,bs=1,n=1000)
)
dls = L(block.dataloaders(['LunarLander-v2']*1))
# Setup the Learner
learner = DQNLearner(model,dls,logger_bases=[logger_base])
learner.fit(3)
# learner.fit(30)
loss episode rolling_reward epoch batch epsilon
0.44493392 9 -174.366881 1 1000 0.020000
0.57215977 11 -149.441252 2 1000 0.020000
0.37252918 11 -149.441252 2 1000 0.020000
import pandas as pd
from fastrl.pipes.core import *
from fastrl.pipes.map.transforms import *
from fastrl.envs.gym import GymTypeTransform,GymStepper
from fastrl.loggers.vscode_visualizers import VSCodeTransformBlock