Agent Discrete

DataPipes used by Agent operating in the Discrete action space.

ArgMaxer

 ArgMaxer (*args, **kwds)

Iterable-style DataPipe.

All DataPipes that represent an iterable of data samples should subclass this. This style of DataPipes is particularly useful when data come from a stream, or when the number of samples is too large to fit them all in memory. IterDataPipe is lazily initialized and its elements are computed only when next() is called on the iterator of an IterDataPipe.

All subclasses should overwrite :meth:__iter__, which would return an iterator of samples in this DataPipe. Calling __iter__ of an IterDataPipe automatically invokes its method reset(), which by default performs no operation. When writing a custom IterDataPipe, users should override reset() if necessary. The common usages include resetting buffers, pointers, and various state variables within the custom IterDataPipe.

Note: Only one iterator can be valid for each IterDataPipe at a time, and the creation a second iterator will invalidate the first one. This constraint is necessary because some IterDataPipe have internal buffers, whose states can become invalid if there are multiple iterators. The code example below presents details on how this constraint looks in practice. If you have any feedback related to this constraint, please see GitHub IterDataPipe Single Iterator Issue_.

These DataPipes can be invoked in two ways, using the class constructor or applying their functional form onto an existing IterDataPipe (recommended, available to most but not all DataPipes). You can chain multiple IterDataPipe together to form a pipeline that will perform multiple operations in succession.

.. _GitHub IterDataPipe Single Iterator Issue: https://github.com/pytorch/data/issues/45

Note: When a subclass is used with :class:~torch.utils.data.DataLoader, each item in the DataPipe will be yielded from the :class:~torch.utils.data.DataLoader iterator. When :attr:num_workers > 0, each worker process will have a different copy of the DataPipe object, so it is often desired to configure each copy independently to avoid having duplicate data returned from the workers. :func:~torch.utils.data.get_worker_info, when called in a worker process, returns information about the worker. It can be used in either the dataset’s :meth:__iter__ method or the :class:~torch.utils.data.DataLoader ’s :attr:worker_init_fn option to modify each copy’s behavior.

Examples: General Usage: >>> # xdoctest: +SKIP >>> from torchdata.datapipes.iter import IterableWrapper, Mapper >>> dp = IterableWrapper(range(10)) >>> map_dp_1 = Mapper(dp, lambda x: x + 1) # Using class constructor >>> map_dp_2 = dp.map(lambda x: x + 1) # Using functional form (recommended) >>> list(map_dp_1) [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] >>> list(map_dp_2) [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] >>> filter_dp = map_dp_1.filter(lambda x: x % 2 == 0) >>> list(filter_dp) [2, 4, 6, 8, 10] Single Iterator Constraint Example: >>> from torchdata.datapipes.iter import IterableWrapper, Mapper >>> dp = IterableWrapper(range(10)) >>> it1 = iter(source_dp) >>> list(it1) [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] >>> it1 = iter(source_dp) >>> it2 = iter(source_dp) # The creation of a new iterator invalidates it1 >>> next(it2) 0 >>> next(it1) # Further usage of it1 will raise a RunTimeError

# class DQN(Module):
#     def __init__(self,state_sz:int,action_sz:int,hidden=512):
#         self.layers=Sequential(
#             Linear(state_sz,hidden),
#             ReLU(),
#             Linear(hidden,action_sz),
#         )
#     def forward(self,x): return self.layers(x)
from fastrl.agents.dqn.basic import DQN
# from fastrl.agents.dqn.basic import DQN
# Setup up the core NN
torch.manual_seed(0)
model = DQN(4,2)
# Setup the agent
agent = AgentBase(model)
agent = StepFieldSelector(agent,field='state')
agent = SimpleModelRunner(agent)
agent = ArgMaxer(agent)
agent = AgentHead(agent)

for action in agent([SimpleStep.random(state=tensor([[1.,2.,3.,4.]]))]):
    print(action)

traverse(agent)
tensor([[1, 0]])
{140047797327504: (AgentHead,
  {140047624286480: (ArgMaxer,
    {140047623235216: (SimpleModelRunner,
      {140047623536912: (StepFieldSelector,
        {140047623234960: (AgentBase, {})}),
       140047623234960: (AgentBase, {})})}),
   140047623234960: (AgentBase, {})})}

EpsilonSelector

 EpsilonSelector (*args, **kwds)

Iterable-style DataPipe.

All DataPipes that represent an iterable of data samples should subclass this. This style of DataPipes is particularly useful when data come from a stream, or when the number of samples is too large to fit them all in memory. IterDataPipe is lazily initialized and its elements are computed only when next() is called on the iterator of an IterDataPipe.

All subclasses should overwrite :meth:__iter__, which would return an iterator of samples in this DataPipe. Calling __iter__ of an IterDataPipe automatically invokes its method reset(), which by default performs no operation. When writing a custom IterDataPipe, users should override reset() if necessary. The common usages include resetting buffers, pointers, and various state variables within the custom IterDataPipe.

Note: Only one iterator can be valid for each IterDataPipe at a time, and the creation a second iterator will invalidate the first one. This constraint is necessary because some IterDataPipe have internal buffers, whose states can become invalid if there are multiple iterators. The code example below presents details on how this constraint looks in practice. If you have any feedback related to this constraint, please see GitHub IterDataPipe Single Iterator Issue_.

These DataPipes can be invoked in two ways, using the class constructor or applying their functional form onto an existing IterDataPipe (recommended, available to most but not all DataPipes). You can chain multiple IterDataPipe together to form a pipeline that will perform multiple operations in succession.

.. _GitHub IterDataPipe Single Iterator Issue: https://github.com/pytorch/data/issues/45

Note: When a subclass is used with :class:~torch.utils.data.DataLoader, each item in the DataPipe will be yielded from the :class:~torch.utils.data.DataLoader iterator. When :attr:num_workers > 0, each worker process will have a different copy of the DataPipe object, so it is often desired to configure each copy independently to avoid having duplicate data returned from the workers. :func:~torch.utils.data.get_worker_info, when called in a worker process, returns information about the worker. It can be used in either the dataset’s :meth:__iter__ method or the :class:~torch.utils.data.DataLoader ’s :attr:worker_init_fn option to modify each copy’s behavior.

Examples: General Usage: >>> # xdoctest: +SKIP >>> from torchdata.datapipes.iter import IterableWrapper, Mapper >>> dp = IterableWrapper(range(10)) >>> map_dp_1 = Mapper(dp, lambda x: x + 1) # Using class constructor >>> map_dp_2 = dp.map(lambda x: x + 1) # Using functional form (recommended) >>> list(map_dp_1) [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] >>> list(map_dp_2) [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] >>> filter_dp = map_dp_1.filter(lambda x: x % 2 == 0) >>> list(filter_dp) [2, 4, 6, 8, 10] Single Iterator Constraint Example: >>> from torchdata.datapipes.iter import IterableWrapper, Mapper >>> dp = IterableWrapper(range(10)) >>> it1 = iter(source_dp) >>> list(it1) [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] >>> it1 = iter(source_dp) >>> it2 = iter(source_dp) # The creation of a new iterator invalidates it1 >>> next(it2) 0 >>> next(it1) # Further usage of it1 will raise a RunTimeError

Check that when min_epsilon=1, that the actions have 100% likihood of randomness applied (even though some might not change due to the random action matching the chosen action). Check that this works on a large batch of 200 steps

agent = AgentBase(model)
agent = StepFieldSelector(agent,field='state')
agent = SimpleModelRunner(agent)
agent = ArgMaxer(agent)
selector = EpsilonSelector(agent,min_epsilon=1,ret_mask=True)
agent = AgentHead(selector)

for action,mask in agent([SimpleStep.random(state=tensor([[1.,2.,3.,4.]]*200))]):
    test_eq(mask.sum(),200)
    test_ne(action.sum(0)[0],200) # Only some of the actions should 1
    test_ne(action.sum(0)[1],0) # Only some of the actions should be 0
    test_eq(selector.epsilon,1)
    test_eq(selector.step,1)

traverse(agent)
{140047623236880: (AgentHead,
  {140047623235856: (EpsilonSelector,
    {140047623234448: (ArgMaxer,
      {140047623234512: (SimpleModelRunner,
        {140047623234704: (StepFieldSelector,
          {140047623237328: (AgentBase, {})}),
         140047623237328: (AgentBase, {})})}),
     140047623237328: (AgentBase, {})}),
   140047623237328: (AgentBase, {})})}

Check that when min_epsilon=1, that the actions have 100% likihood of randomness applied (even though some might not change due to the random action matching the chosen action). Check that this works on single batches over 200 steps

ArgMaxer.debug=False
agent = AgentBase(model)
agent = StepFieldSelector(agent,field='state')
agent = SimpleModelRunner(agent)
agent = ArgMaxer(agent,axis=1)
selector = EpsilonSelector(agent,min_epsilon=1,ret_mask=True)
agent = AgentHead(selector)

actions = None
for i in range(200):
    for action,mask in agent([SimpleStep.random(state=tensor([[1.,2.,3.,4.]]))]):
        test_eq(mask.sum(),1)
        if actions is None: actions = action
        else:               actions = torch.vstack((actions,action))
test_ne(actions.sum(0)[0],200) # Only some of the actions should 1
test_ne(actions.sum(0)[1],0) # Only some of the actions should be 0
test_eq(selector.epsilon,1)
test_eq(selector.step,200)
traverse(agent)
{140047623595664: (AgentHead,
  {140047623595216: (EpsilonSelector,
    {140047623594704: (ArgMaxer,
      {140047623595728: (SimpleModelRunner,
        {140047623597712: (StepFieldSelector,
          {140047623594896: (AgentBase, {})}),
         140047623594896: (AgentBase, {})})}),
     140047623594896: (AgentBase, {})}),
   140047623594896: (AgentBase, {})})}

Check that when min_epsilon=0 and max_epsilon=0, that the actions have 0% likihood of randomness applied. Check that this works on a large batch of 200 steps

agent = AgentBase(model)
agent = StepFieldSelector(agent,field='state')
agent = SimpleModelRunner(agent)
agent = ArgMaxer(agent)
selector = EpsilonSelector(agent,min_epsilon=0,max_epsilon=0,ret_mask=True)
agent = AgentHead(selector)

for action,mask in agent([SimpleStep.random(state=tensor([[1.,2.,3.,4.]]*200))]):
    test_eq(mask.sum(),0)
    test_eq(action.sum(0)[0],200) # All the "left" actions should be 1
    test_eq(action.sum(0)[1],0) # All the "right" actions should be 0
    test_eq(selector.epsilon,0)
    test_eq(selector.step,1)
traverse(agent)
{140047796955600: (AgentHead,
  {140047796956048: (EpsilonSelector,
    {140047796954256: (ArgMaxer,
      {140047796956688: (SimpleModelRunner,
        {140047796956496: (StepFieldSelector,
          {140047796954896: (AgentBase, {})}),
         140047796954896: (AgentBase, {})})}),
     140047796954896: (AgentBase, {})}),
   140047796954896: (AgentBase, {})})}

Check that when min_epsilon=0 and max_epsilon=0, that the actions have 0% likihood of randomness applied. Check that this works on single batches over 200 steps

agent = AgentBase(model)
agent = StepFieldSelector(agent,field='state')
agent = SimpleModelRunner(agent)
agent = ArgMaxer(agent)
selector = EpsilonSelector(agent,min_epsilon=0,max_epsilon=0,ret_mask=True)
agent = AgentHead(selector)

actions = None
for i in range(200):
    for action,mask in agent([SimpleStep.random(state=tensor([[1.,2.,3.,4.]]))]):
        test_eq(mask.sum(),0)
        if actions is None: actions = action
        else:               actions = torch.vstack((actions,action))
test_eq(actions.sum(0)[0],200) # All the "left" actions should be 1
test_eq(actions.sum(0)[1],0) # All the "right" actions should be 0
test_eq(selector.epsilon,0)
test_eq(selector.step,200)
traverse(agent)
{140047799059920: (AgentHead,
  {140047623596816: (EpsilonSelector,
    {140047623597008: (ArgMaxer,
      {140047623595024: (SimpleModelRunner,
        {140047623594320: (StepFieldSelector,
          {140047623597328: (AgentBase, {})}),
         140047623597328: (AgentBase, {})})}),
     140047623597328: (AgentBase, {})}),
   140047623597328: (AgentBase, {})})}

Check that when min_epsilon=0 and max_epsilon=1, the actions should become less random as the steps go on. Check that this works on a large batch of 200 steps

epislon should be 0 at the end of this…

agent = AgentBase(model)
agent = StepFieldSelector(agent,field='state')
agent = SimpleModelRunner(agent)
agent = ArgMaxer(agent)
selector = EpsilonSelector(agent,min_epsilon=0,max_epsilon=1,max_steps=100,ret_mask=True)
agent = AgentHead(selector)

actions = None
masks = None
epsilons = None
for i in range(200):
    for action,mask in agent([SimpleStep.random(state=tensor([[1.,2.,3.,4.]]*200))]):
        if actions is None: actions = action
        else:               actions = torch.vstack((actions,action))
        if masks is None: masks = mask
        else:             masks = torch.hstack((masks,mask))
        if epsilons is None: epsilons = tensor([selector.epsilon])
        else:                epsilons = torch.hstack((epsilons,tensor([selector.epsilon])))
        
test_ne(masks[:((200*200)//2)].sum(),200) # We do not expect this to equal a perfect 200...
test_ne(masks[:((200*200)//2)].sum(),0)   # ... but we also dont expect it to be 0
assert 1000<masks[:((200*200)//2)].sum()<10_000,\
        """We expect this to be somewhere between 1000 and 10,000, generally in the 9000 range since 
           for 200 steps, we are running 200 inputs"""
test_eq(masks[((200*200)//2):].sum(),0) # We fully expect this to be 0 after the half way point
test_ne(actions.sum(0)[0],200) # All the "left" generally shouldnt be 1
test_ne(actions.sum(0)[1],0) # All the "right"  generally shouldnt be 0
test_eq(selector.epsilon,0)
test_eq(selector.step,200)
# Since the max steps are 100, and we go for 200 steps, the first 100 epislon entries shouldnt be 0
test_ne(epsilons[:100].sum(),0) 
# In fact the first 100 should sum up to somewhere between 40 and 50. (expected 49.5)
test_eq(40<epsilons[:100].sum()<50,True) 
# Everything after 100 should be 0
test_eq(epsilons[100:].sum(),0)
traverse(agent)
{140047622145488: (AgentHead,
  {140047622147472: (EpsilonSelector,
    {140047622146256: (ArgMaxer,
      {140047622146128: (SimpleModelRunner,
        {140047622147600: (StepFieldSelector,
          {140047622146192: (AgentBase, {})}),
         140047622146192: (AgentBase, {})})}),
     140047622146192: (AgentBase, {})}),
   140047622146192: (AgentBase, {})})}

Check that when min_epsilon=0 and max_epsilon=0, that the actions have 0% likihood of randomness applied. Check that this works on single batches over 200 steps

agent = AgentBase(model)
agent = StepFieldSelector(agent,field='state')
agent = SimpleModelRunner(agent)
agent = ArgMaxer(agent)
selector = EpsilonSelector(agent,min_epsilon=0,max_epsilon=1,max_steps=100,ret_mask=True)
agent = AgentHead(selector)

actions = None
masks = None
epsilons = None
for i in range(200):
    for action,mask in agent([SimpleStep.random(state=tensor([[1.,2.,3.,4.]]))]):
        if actions is None: actions = action
        else:               actions = torch.vstack((actions,action))
        if masks is None: masks = mask
        else:             masks = torch.hstack((masks,mask))
        if epsilons is None: epsilons = tensor([selector.epsilon])
        else:                epsilons = torch.hstack((epsilons,tensor([selector.epsilon])))
        
test_ne(masks[:(200//2)].sum(),200) # We do not expect this to equal a perfect 200...
test_ne(masks[:(200//2)].sum(),0)   # ... but we also dont expect it to be 0
assert 40<masks[:(200//2)].sum()<60,'We expect this to be somewhere between 60, generally in the ~50 range'
test_eq(masks[(200//2):].sum(),0) # We fully expect this to be 0 after the half way point
test_ne(actions.sum(0)[0],200) # All the "left" generally shouldnt be 1
test_ne(actions.sum(0)[1],0) # All the "right"  generally shouldnt be 0
test_eq(selector.epsilon,0)
test_eq(selector.step,200)
# Since the max steps are 100, and we go for 200 steps, the first 100 epislon entries shouldnt be 0
test_ne(epsilons[:100].sum(),0) 
# In fact the first 100 should sum up to somewhere between 40 and 50. (expected 49.5)
test_eq(40<epsilons[:100].sum()<50,True) 
# Everything after 100 should be 0
test_eq(epsilons[100:].sum(),0)

traverse(agent)
{140047623236112: (AgentHead,
  {140047623236496: (EpsilonSelector,
    {140047623237136: (ArgMaxer,
      {140047623236944: (SimpleModelRunner,
        {140047623234128: (StepFieldSelector,
          {140047623233616: (AgentBase, {})}),
         140047623233616: (AgentBase, {})})}),
     140047623233616: (AgentBase, {})}),
   140047623233616: (AgentBase, {})})}

EpsilonCollector

 EpsilonCollector (*args, **kwds)

LogCollector specifically manages finding and attaching itself to LoggerBases found earlier in the pipeline.


PyPrimativeConverter

 PyPrimativeConverter (*args, **kwds)

Iterable-style DataPipe.

All DataPipes that represent an iterable of data samples should subclass this. This style of DataPipes is particularly useful when data come from a stream, or when the number of samples is too large to fit them all in memory. IterDataPipe is lazily initialized and its elements are computed only when next() is called on the iterator of an IterDataPipe.

All subclasses should overwrite :meth:__iter__, which would return an iterator of samples in this DataPipe. Calling __iter__ of an IterDataPipe automatically invokes its method reset(), which by default performs no operation. When writing a custom IterDataPipe, users should override reset() if necessary. The common usages include resetting buffers, pointers, and various state variables within the custom IterDataPipe.

Note: Only one iterator can be valid for each IterDataPipe at a time, and the creation a second iterator will invalidate the first one. This constraint is necessary because some IterDataPipe have internal buffers, whose states can become invalid if there are multiple iterators. The code example below presents details on how this constraint looks in practice. If you have any feedback related to this constraint, please see GitHub IterDataPipe Single Iterator Issue_.

These DataPipes can be invoked in two ways, using the class constructor or applying their functional form onto an existing IterDataPipe (recommended, available to most but not all DataPipes). You can chain multiple IterDataPipe together to form a pipeline that will perform multiple operations in succession.

.. _GitHub IterDataPipe Single Iterator Issue: https://github.com/pytorch/data/issues/45

Note: When a subclass is used with :class:~torch.utils.data.DataLoader, each item in the DataPipe will be yielded from the :class:~torch.utils.data.DataLoader iterator. When :attr:num_workers > 0, each worker process will have a different copy of the DataPipe object, so it is often desired to configure each copy independently to avoid having duplicate data returned from the workers. :func:~torch.utils.data.get_worker_info, when called in a worker process, returns information about the worker. It can be used in either the dataset’s :meth:__iter__ method or the :class:~torch.utils.data.DataLoader ’s :attr:worker_init_fn option to modify each copy’s behavior.

Examples: General Usage: >>> # xdoctest: +SKIP >>> from torchdata.datapipes.iter import IterableWrapper, Mapper >>> dp = IterableWrapper(range(10)) >>> map_dp_1 = Mapper(dp, lambda x: x + 1) # Using class constructor >>> map_dp_2 = dp.map(lambda x: x + 1) # Using functional form (recommended) >>> list(map_dp_1) [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] >>> list(map_dp_2) [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] >>> filter_dp = map_dp_1.filter(lambda x: x % 2 == 0) >>> list(filter_dp) [2, 4, 6, 8, 10] Single Iterator Constraint Example: >>> from torchdata.datapipes.iter import IterableWrapper, Mapper >>> dp = IterableWrapper(range(10)) >>> it1 = iter(source_dp) >>> list(it1) [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] >>> it1 = iter(source_dp) >>> it2 = iter(source_dp) # The creation of a new iterator invalidates it1 >>> next(it2) 0 >>> next(it1) # Further usage of it1 will raise a RunTimeError

pipe = PyPrimativeConverter([np.array([0.5])])
L(pipe)
(#1) [0.5]
pipe = PyPrimativeConverter([np.array([1])])
L(pipe)
(#1) [1]
pipe = PyPrimativeConverter([np.array([True])])
L(pipe)
(#1) [True]
from multiprocessing import get_start_method
get_start_method()
'fork'
logger_base = ProgressBarLogger()
traverse(logger_base)
{140047796619344: (ProgressBarLogger, {})}
# Setup up the core NN
torch.manual_seed(0)
model = DQN(4,2)

agent = AgentBase(model)
agent = LoggerBasePassThrough(agent,[logger_base])
agent = StepFieldSelector(agent,field='state')
agent = SimpleModelRunner(agent)
agent = ArgMaxer(agent)
selector = EpsilonSelector(agent,min_epsilon=0,max_epsilon=1,max_steps=100)
agent = EpsilonCollector(selector)
agent = ArgMaxer(agent,only_idx=True)
agent = NumpyConverter(agent)
agent = PyPrimativeConverter(agent)
agent = AgentHead(agent)

traverse(agent)
{140047622144464: (AgentHead,
  {140047622144656: (PyPrimativeConverter,
    {140047622145360: (NumpyConverter,
      {140047622144208: (ArgMaxer,
        {140047622147152: (EpsilonCollector,
          {140047622147088: (EpsilonSelector,
            {140047622147728: (ArgMaxer,
              {140047622811472: (SimpleModelRunner,
                {140047796723664: (StepFieldSelector,
                  {140047796643280: (LoggerBasePassThrough,
                    {140047624034640: (AgentBase, {}),
                     140047796619344: (ProgressBarLogger, {})})}),
                 140047624034640: (AgentBase, {})})}),
             140047624034640: (AgentBase, {})})})})})}),
   140047624034640: (AgentBase, {})})}
for action in agent([SimpleStep.random(state=tensor([[1.,2.,3.,4.]]))]*800):
    pass # print(action)

epsilon_logs = list(logger_base.dequeue())
test_eq(len(epsilon_logs),801)