DQN Async

Components that allow for syncing multiple dqn agents on multiple processes to calcualtions on the

There is a little weirdness using cuda with spawn. pytorch has a bug: https://github.com/pytorch/pytorch/issues/30401 so queue usage isnt so simple

Training DataPipes


ModelSubscriber

 ModelSubscriber (*args, **kwds)

If an agent is passed to another process and ‘spawn’ start method is used, then this module is needed.


ModelPublisher

 ModelPublisher (*args, **kwds)

Iterable-style DataPipe.

All DataPipes that represent an iterable of data samples should subclass this. This style of DataPipes is particularly useful when data come from a stream, or when the number of samples is too large to fit them all in memory. IterDataPipe is lazily initialized and its elements are computed only when next() is called on the iterator of an IterDataPipe.

All subclasses should overwrite :meth:__iter__, which would return an iterator of samples in this DataPipe. Calling __iter__ of an IterDataPipe automatically invokes its method reset(), which by default performs no operation. When writing a custom IterDataPipe, users should override reset() if necessary. The common usages include resetting buffers, pointers, and various state variables within the custom IterDataPipe.

Note: Only one iterator can be valid for each IterDataPipe at a time, and the creation a second iterator will invalidate the first one. This constraint is necessary because some IterDataPipe have internal buffers, whose states can become invalid if there are multiple iterators. The code example below presents details on how this constraint looks in practice. If you have any feedback related to this constraint, please see GitHub IterDataPipe Single Iterator Issue_.

These DataPipes can be invoked in two ways, using the class constructor or applying their functional form onto an existing IterDataPipe (recommended, available to most but not all DataPipes). You can chain multiple IterDataPipe together to form a pipeline that will perform multiple operations in succession.

.. _GitHub IterDataPipe Single Iterator Issue: https://github.com/pytorch/data/issues/45

Note: When a subclass is used with :class:~torch.utils.data.DataLoader, each item in the DataPipe will be yielded from the :class:~torch.utils.data.DataLoader iterator. When :attr:num_workers > 0, each worker process will have a different copy of the DataPipe object, so it is often desired to configure each copy independently to avoid having duplicate data returned from the workers. :func:~torch.utils.data.get_worker_info, when called in a worker process, returns information about the worker. It can be used in either the dataset’s :meth:__iter__ method or the :class:~torch.utils.data.DataLoader ’s :attr:worker_init_fn option to modify each copy’s behavior.

Examples: General Usage: >>> # xdoctest: +SKIP >>> from torchdata.datapipes.iter import IterableWrapper, Mapper >>> dp = IterableWrapper(range(10)) >>> map_dp_1 = Mapper(dp, lambda x: x + 1) # Using class constructor >>> map_dp_2 = dp.map(lambda x: x + 1) # Using functional form (recommended) >>> list(map_dp_1) [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] >>> list(map_dp_2) [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] >>> filter_dp = map_dp_1.filter(lambda x: x % 2 == 0) >>> list(filter_dp) [2, 4, 6, 8, 10] Single Iterator Constraint Example: >>> from torchdata.datapipes.iter import IterableWrapper, Mapper >>> dp = IterableWrapper(range(10)) >>> it1 = iter(source_dp) >>> list(it1) [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] >>> it1 = iter(source_dp) >>> it2 = iter(source_dp) # The creation of a new iterator invalidates it1 >>> next(it2) 0 >>> next(it1) # Further usage of it1 will raise a RunTimeError

Try training with basic defaults…

import torch
from torch.nn import *
import torch.nn.functional as F
from fastrl.loggers.core import *
from fastrl.loggers.jupyter_visualizers import *
from fastrl.learner.core import *
from fastrl.data.block import *
from fastrl.envs.gym import *
from fastrl.agents.core import *
from fastrl.agents.discrete import *
from fastrl.agents.dqn.basic import *

logger_base = ProgressBarLogger(epoch_on_pipe=EpocherCollector,
                 batch_on_pipe=BatchCollector)

# Setup up the core NN
torch.manual_seed(0)
model = DQN(4,2).cuda()
# model.share_memory() # This will not work in spawn
# Setup the Agent
agent = DQNAgent(model,max_steps=4000,device='cuda',
                 dp_augmentation_fns=[ModelSubscriber.insert_dp()])
# Setup the DataBlock
block = DataBlock(
    GymTransformBlock(agent=agent,nsteps=1,nskips=1,firstlast=False),
    GymTransformBlock(agent=agent,nsteps=1,nskips=1,firstlast=False,include_images=True)
)
dls = L(block.dataloaders(['CartPole-v1']*1,num_workers=1))
# # Setup the Learner
learner = DQNLearner(model,dls,batches=1000,logger_bases=[logger_base],bs=128,max_sz=100_000,device='cuda',
                     dp_augmentation_fns=[ModelPublisher.insert_dp()]
                     )
# learner.fit(2)
# %%python

if __name__=='__main__':
    from torch.multiprocessing import Pool, Process, set_start_method
    
    try:
        set_start_method('spawn')
    except RuntimeError:
        pass
    
    from fastcore.all import *
    import torch
    from torch.nn import *
    import torch.nn.functional as F
    from fastrl.loggers.core import *
    from fastrl.loggers.jupyter_visualizers import *
    from fastrl.learner.core import *
    from fastrl.data.block import *
    from fastrl.envs.gym import *
    from fastrl.agents.core import *
    from fastrl.agents.discrete import *
    from fastrl.agents.dqn.basic import *
    from fastrl.agents.dqn.asynchronous import *
    
    from torchdata.dataloader2 import DataLoader2
    from torchdata.dataloader2.graph import traverse
    from fastrl.data.dataloader2 import *
    
    logger_base = ProgressBarLogger(epoch_on_pipe=EpocherCollector,
                    batch_on_pipe=BatchCollector)

    # Setup up the core NN
    torch.manual_seed(0)
    model = DQN(4,2).cuda()
    # model.share_memory() # This will not work in spawn
    # Setup the Agent
    agent = DQNAgent(model,max_steps=4000,device='cuda',
                    dp_augmentation_fns=[ModelSubscriber.insert_dp()])
    # Setup the DataBlock
    block = DataBlock(
        GymTransformBlock(agent=agent,nsteps=1,nskips=1,firstlast=False),
        GymTransformBlock(agent=agent,nsteps=1,nskips=1,firstlast=False,include_images=True)
    )
    dls = L(block.dataloaders(['CartPole-v1']*1,num_workers=1))
    # # Setup the Learner
    learner = DQNLearner(model,dls,batches=1000,logger_bases=[logger_base],bs=128,max_sz=100_000,device='cuda',
                        dp_augmentation_fns=[ModelPublisher.insert_dp(publish_freq=10)])
    # print(traverse(learner))
    learner.fit(20)
Overwriting ../../external_run_scripts/agents_dqn_async_35.py