# class DQN(Module):
# def __init__(self,state_sz:int,action_sz:int,hidden=512):
# self.layers=Sequential(
# Linear(state_sz,hidden),
# ReLU(),
# Linear(hidden,action_sz),
# )
# def forward(self,x): return self.layers(x)
Agent Discrete
ArgMaxer
ArgMaxer (*args, **kwds)
Iterable-style DataPipe.
All DataPipes that represent an iterable of data samples should subclass this. This style of DataPipes is particularly useful when data come from a stream, or when the number of samples is too large to fit them all in memory. IterDataPipe
is lazily initialized and its elements are computed only when next()
is called on the iterator of an IterDataPipe
.
All subclasses should overwrite :meth:__iter__
, which would return an iterator of samples in this DataPipe. Calling __iter__
of an IterDataPipe
automatically invokes its method reset()
, which by default performs no operation. When writing a custom IterDataPipe
, users should override reset()
if necessary. The common usages include resetting buffers, pointers, and various state variables within the custom IterDataPipe
.
Note: Only one
iterator can be valid for each IterDataPipe
at a time, and the creation a second iterator will invalidate the first one. This constraint is necessary because some IterDataPipe
have internal buffers, whose states can become invalid if there are multiple iterators. The code example below presents details on how this constraint looks in practice. If you have any feedback related to this constraint, please see GitHub IterDataPipe Single Iterator Issue
_.
These DataPipes can be invoked in two ways, using the class constructor or applying their functional form onto an existing IterDataPipe
(recommended, available to most but not all DataPipes). You can chain multiple IterDataPipe
together to form a pipeline that will perform multiple operations in succession.
.. _GitHub IterDataPipe Single Iterator Issue: https://github.com/pytorch/data/issues/45
Note: When a subclass is used with :class:~torch.utils.data.DataLoader
, each item in the DataPipe will be yielded from the :class:~torch.utils.data.DataLoader
iterator. When :attr:num_workers > 0
, each worker process will have a different copy of the DataPipe object, so it is often desired to configure each copy independently to avoid having duplicate data returned from the workers. :func:~torch.utils.data.get_worker_info
, when called in a worker process, returns information about the worker. It can be used in either the dataset’s :meth:__iter__
method or the :class:~torch.utils.data.DataLoader
’s :attr:worker_init_fn
option to modify each copy’s behavior.
Examples: General Usage: >>> # xdoctest: +SKIP >>> from torchdata.datapipes.iter import IterableWrapper, Mapper >>> dp = IterableWrapper(range(10)) >>> map_dp_1 = Mapper(dp, lambda x: x + 1) # Using class constructor >>> map_dp_2 = dp.map(lambda x: x + 1) # Using functional form (recommended) >>> list(map_dp_1) [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] >>> list(map_dp_2) [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] >>> filter_dp = map_dp_1.filter(lambda x: x % 2 == 0) >>> list(filter_dp) [2, 4, 6, 8, 10] Single Iterator Constraint Example: >>> from torchdata.datapipes.iter import IterableWrapper, Mapper >>> dp = IterableWrapper(range(10)) >>> it1 = iter(source_dp) >>> list(it1) [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] >>> it1 = iter(source_dp) >>> it2 = iter(source_dp) # The creation of a new iterator invalidates it1
>>> next(it2) 0 >>> next(it1) # Further usage of it1
will raise a RunTimeError
from fastrl.agents.dqn.basic import DQN
# from fastrl.agents.dqn.basic import DQN
# Setup up the core NN
0)
torch.manual_seed(= DQN(4,2)
model # Setup the agent
= AgentBase(model)
agent = StepFieldSelector(agent,field='state')
agent = SimpleModelRunner(agent)
agent = ArgMaxer(agent)
agent = AgentHead(agent)
agent
for action in agent([SimpleStep.random(state=tensor([[1.,2.,3.,4.]]))]):
print(action)
traverse(agent)
tensor([[1, 0]])
{140047797327504: (AgentHead,
{140047624286480: (ArgMaxer,
{140047623235216: (SimpleModelRunner,
{140047623536912: (StepFieldSelector,
{140047623234960: (AgentBase, {})}),
140047623234960: (AgentBase, {})})}),
140047623234960: (AgentBase, {})})}
EpsilonSelector
EpsilonSelector (*args, **kwds)
Iterable-style DataPipe.
All DataPipes that represent an iterable of data samples should subclass this. This style of DataPipes is particularly useful when data come from a stream, or when the number of samples is too large to fit them all in memory. IterDataPipe
is lazily initialized and its elements are computed only when next()
is called on the iterator of an IterDataPipe
.
All subclasses should overwrite :meth:__iter__
, which would return an iterator of samples in this DataPipe. Calling __iter__
of an IterDataPipe
automatically invokes its method reset()
, which by default performs no operation. When writing a custom IterDataPipe
, users should override reset()
if necessary. The common usages include resetting buffers, pointers, and various state variables within the custom IterDataPipe
.
Note: Only one
iterator can be valid for each IterDataPipe
at a time, and the creation a second iterator will invalidate the first one. This constraint is necessary because some IterDataPipe
have internal buffers, whose states can become invalid if there are multiple iterators. The code example below presents details on how this constraint looks in practice. If you have any feedback related to this constraint, please see GitHub IterDataPipe Single Iterator Issue
_.
These DataPipes can be invoked in two ways, using the class constructor or applying their functional form onto an existing IterDataPipe
(recommended, available to most but not all DataPipes). You can chain multiple IterDataPipe
together to form a pipeline that will perform multiple operations in succession.
.. _GitHub IterDataPipe Single Iterator Issue: https://github.com/pytorch/data/issues/45
Note: When a subclass is used with :class:~torch.utils.data.DataLoader
, each item in the DataPipe will be yielded from the :class:~torch.utils.data.DataLoader
iterator. When :attr:num_workers > 0
, each worker process will have a different copy of the DataPipe object, so it is often desired to configure each copy independently to avoid having duplicate data returned from the workers. :func:~torch.utils.data.get_worker_info
, when called in a worker process, returns information about the worker. It can be used in either the dataset’s :meth:__iter__
method or the :class:~torch.utils.data.DataLoader
’s :attr:worker_init_fn
option to modify each copy’s behavior.
Examples: General Usage: >>> # xdoctest: +SKIP >>> from torchdata.datapipes.iter import IterableWrapper, Mapper >>> dp = IterableWrapper(range(10)) >>> map_dp_1 = Mapper(dp, lambda x: x + 1) # Using class constructor >>> map_dp_2 = dp.map(lambda x: x + 1) # Using functional form (recommended) >>> list(map_dp_1) [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] >>> list(map_dp_2) [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] >>> filter_dp = map_dp_1.filter(lambda x: x % 2 == 0) >>> list(filter_dp) [2, 4, 6, 8, 10] Single Iterator Constraint Example: >>> from torchdata.datapipes.iter import IterableWrapper, Mapper >>> dp = IterableWrapper(range(10)) >>> it1 = iter(source_dp) >>> list(it1) [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] >>> it1 = iter(source_dp) >>> it2 = iter(source_dp) # The creation of a new iterator invalidates it1
>>> next(it2) 0 >>> next(it1) # Further usage of it1
will raise a RunTimeError
Check that when min_epsilon=1
, that the actions have 100% likihood of randomness applied (even though some might not change due to the random action matching the chosen action). Check that this works on a large batch of 200 steps
…
= AgentBase(model)
agent = StepFieldSelector(agent,field='state')
agent = SimpleModelRunner(agent)
agent = ArgMaxer(agent)
agent = EpsilonSelector(agent,min_epsilon=1,ret_mask=True)
selector = AgentHead(selector)
agent
for action,mask in agent([SimpleStep.random(state=tensor([[1.,2.,3.,4.]]*200))]):
sum(),200)
test_eq(mask.sum(0)[0],200) # Only some of the actions should 1
test_ne(action.sum(0)[1],0) # Only some of the actions should be 0
test_ne(action.1)
test_eq(selector.epsilon,1)
test_eq(selector.step,
traverse(agent)
{140047623236880: (AgentHead,
{140047623235856: (EpsilonSelector,
{140047623234448: (ArgMaxer,
{140047623234512: (SimpleModelRunner,
{140047623234704: (StepFieldSelector,
{140047623237328: (AgentBase, {})}),
140047623237328: (AgentBase, {})})}),
140047623237328: (AgentBase, {})}),
140047623237328: (AgentBase, {})})}
Check that when min_epsilon=1
, that the actions have 100% likihood of randomness applied (even though some might not change due to the random action matching the chosen action). Check that this works on single batches over 200 steps
…
=False ArgMaxer.debug
= AgentBase(model)
agent = StepFieldSelector(agent,field='state')
agent = SimpleModelRunner(agent)
agent = ArgMaxer(agent,axis=1)
agent = EpsilonSelector(agent,min_epsilon=1,ret_mask=True)
selector = AgentHead(selector)
agent
= None
actions for i in range(200):
for action,mask in agent([SimpleStep.random(state=tensor([[1.,2.,3.,4.]]))]):
sum(),1)
test_eq(mask.if actions is None: actions = action
else: actions = torch.vstack((actions,action))
sum(0)[0],200) # Only some of the actions should 1
test_ne(actions.sum(0)[1],0) # Only some of the actions should be 0
test_ne(actions.1)
test_eq(selector.epsilon,200)
test_eq(selector.step, traverse(agent)
{140047623595664: (AgentHead,
{140047623595216: (EpsilonSelector,
{140047623594704: (ArgMaxer,
{140047623595728: (SimpleModelRunner,
{140047623597712: (StepFieldSelector,
{140047623594896: (AgentBase, {})}),
140047623594896: (AgentBase, {})})}),
140047623594896: (AgentBase, {})}),
140047623594896: (AgentBase, {})})}
Check that when min_epsilon=0 and max_epsilon=0
, that the actions have 0% likihood of randomness applied. Check that this works on a large batch of 200 steps
…
= AgentBase(model)
agent = StepFieldSelector(agent,field='state')
agent = SimpleModelRunner(agent)
agent = ArgMaxer(agent)
agent = EpsilonSelector(agent,min_epsilon=0,max_epsilon=0,ret_mask=True)
selector = AgentHead(selector)
agent
for action,mask in agent([SimpleStep.random(state=tensor([[1.,2.,3.,4.]]*200))]):
sum(),0)
test_eq(mask.sum(0)[0],200) # All the "left" actions should be 1
test_eq(action.sum(0)[1],0) # All the "right" actions should be 0
test_eq(action.0)
test_eq(selector.epsilon,1)
test_eq(selector.step, traverse(agent)
{140047796955600: (AgentHead,
{140047796956048: (EpsilonSelector,
{140047796954256: (ArgMaxer,
{140047796956688: (SimpleModelRunner,
{140047796956496: (StepFieldSelector,
{140047796954896: (AgentBase, {})}),
140047796954896: (AgentBase, {})})}),
140047796954896: (AgentBase, {})}),
140047796954896: (AgentBase, {})})}
Check that when min_epsilon=0 and max_epsilon=0
, that the actions have 0% likihood of randomness applied. Check that this works on single batches over 200 steps
…
= AgentBase(model)
agent = StepFieldSelector(agent,field='state')
agent = SimpleModelRunner(agent)
agent = ArgMaxer(agent)
agent = EpsilonSelector(agent,min_epsilon=0,max_epsilon=0,ret_mask=True)
selector = AgentHead(selector)
agent
= None
actions for i in range(200):
for action,mask in agent([SimpleStep.random(state=tensor([[1.,2.,3.,4.]]))]):
sum(),0)
test_eq(mask.if actions is None: actions = action
else: actions = torch.vstack((actions,action))
sum(0)[0],200) # All the "left" actions should be 1
test_eq(actions.sum(0)[1],0) # All the "right" actions should be 0
test_eq(actions.0)
test_eq(selector.epsilon,200)
test_eq(selector.step, traverse(agent)
{140047799059920: (AgentHead,
{140047623596816: (EpsilonSelector,
{140047623597008: (ArgMaxer,
{140047623595024: (SimpleModelRunner,
{140047623594320: (StepFieldSelector,
{140047623597328: (AgentBase, {})}),
140047623597328: (AgentBase, {})})}),
140047623597328: (AgentBase, {})}),
140047623597328: (AgentBase, {})})}
Check that when min_epsilon=0 and max_epsilon=1
, the actions should become less random as the steps go on. Check that this works on a large batch of 200 steps
…
epislon
should be 0 at the end of this…
= AgentBase(model)
agent = StepFieldSelector(agent,field='state')
agent = SimpleModelRunner(agent)
agent = ArgMaxer(agent)
agent = EpsilonSelector(agent,min_epsilon=0,max_epsilon=1,max_steps=100,ret_mask=True)
selector = AgentHead(selector)
agent
= None
actions = None
masks = None
epsilons for i in range(200):
for action,mask in agent([SimpleStep.random(state=tensor([[1.,2.,3.,4.]]*200))]):
if actions is None: actions = action
else: actions = torch.vstack((actions,action))
if masks is None: masks = mask
else: masks = torch.hstack((masks,mask))
if epsilons is None: epsilons = tensor([selector.epsilon])
else: epsilons = torch.hstack((epsilons,tensor([selector.epsilon])))
200*200)//2)].sum(),200) # We do not expect this to equal a perfect 200...
test_ne(masks[:((200*200)//2)].sum(),0) # ... but we also dont expect it to be 0
test_ne(masks[:((assert 1000<masks[:((200*200)//2)].sum()<10_000,\
"""We expect this to be somewhere between 1000 and 10,000, generally in the 9000 range since
for 200 steps, we are running 200 inputs"""
200*200)//2):].sum(),0) # We fully expect this to be 0 after the half way point
test_eq(masks[((sum(0)[0],200) # All the "left" generally shouldnt be 1
test_ne(actions.sum(0)[1],0) # All the "right" generally shouldnt be 0
test_ne(actions.0)
test_eq(selector.epsilon,200)
test_eq(selector.step,# Since the max steps are 100, and we go for 200 steps, the first 100 epislon entries shouldnt be 0
100].sum(),0)
test_ne(epsilons[:# In fact the first 100 should sum up to somewhere between 40 and 50. (expected 49.5)
40<epsilons[:100].sum()<50,True)
test_eq(# Everything after 100 should be 0
100:].sum(),0)
test_eq(epsilons[ traverse(agent)
{140047622145488: (AgentHead,
{140047622147472: (EpsilonSelector,
{140047622146256: (ArgMaxer,
{140047622146128: (SimpleModelRunner,
{140047622147600: (StepFieldSelector,
{140047622146192: (AgentBase, {})}),
140047622146192: (AgentBase, {})})}),
140047622146192: (AgentBase, {})}),
140047622146192: (AgentBase, {})})}
Check that when min_epsilon=0 and max_epsilon=0
, that the actions have 0% likihood of randomness applied. Check that this works on single batches over 200 steps
…
= AgentBase(model)
agent = StepFieldSelector(agent,field='state')
agent = SimpleModelRunner(agent)
agent = ArgMaxer(agent)
agent = EpsilonSelector(agent,min_epsilon=0,max_epsilon=1,max_steps=100,ret_mask=True)
selector = AgentHead(selector)
agent
= None
actions = None
masks = None
epsilons for i in range(200):
for action,mask in agent([SimpleStep.random(state=tensor([[1.,2.,3.,4.]]))]):
if actions is None: actions = action
else: actions = torch.vstack((actions,action))
if masks is None: masks = mask
else: masks = torch.hstack((masks,mask))
if epsilons is None: epsilons = tensor([selector.epsilon])
else: epsilons = torch.hstack((epsilons,tensor([selector.epsilon])))
200//2)].sum(),200) # We do not expect this to equal a perfect 200...
test_ne(masks[:(200//2)].sum(),0) # ... but we also dont expect it to be 0
test_ne(masks[:(assert 40<masks[:(200//2)].sum()<60,'We expect this to be somewhere between 60, generally in the ~50 range'
200//2):].sum(),0) # We fully expect this to be 0 after the half way point
test_eq(masks[(sum(0)[0],200) # All the "left" generally shouldnt be 1
test_ne(actions.sum(0)[1],0) # All the "right" generally shouldnt be 0
test_ne(actions.0)
test_eq(selector.epsilon,200)
test_eq(selector.step,# Since the max steps are 100, and we go for 200 steps, the first 100 epislon entries shouldnt be 0
100].sum(),0)
test_ne(epsilons[:# In fact the first 100 should sum up to somewhere between 40 and 50. (expected 49.5)
40<epsilons[:100].sum()<50,True)
test_eq(# Everything after 100 should be 0
100:].sum(),0)
test_eq(epsilons[
traverse(agent)
{140047623236112: (AgentHead,
{140047623236496: (EpsilonSelector,
{140047623237136: (ArgMaxer,
{140047623236944: (SimpleModelRunner,
{140047623234128: (StepFieldSelector,
{140047623233616: (AgentBase, {})}),
140047623233616: (AgentBase, {})})}),
140047623233616: (AgentBase, {})}),
140047623233616: (AgentBase, {})})}
EpsilonCollector
EpsilonCollector (*args, **kwds)
LogCollector
specifically manages finding and attaching itself to LoggerBase
s found earlier in the pipeline.
PyPrimativeConverter
PyPrimativeConverter (*args, **kwds)
Iterable-style DataPipe.
All DataPipes that represent an iterable of data samples should subclass this. This style of DataPipes is particularly useful when data come from a stream, or when the number of samples is too large to fit them all in memory. IterDataPipe
is lazily initialized and its elements are computed only when next()
is called on the iterator of an IterDataPipe
.
All subclasses should overwrite :meth:__iter__
, which would return an iterator of samples in this DataPipe. Calling __iter__
of an IterDataPipe
automatically invokes its method reset()
, which by default performs no operation. When writing a custom IterDataPipe
, users should override reset()
if necessary. The common usages include resetting buffers, pointers, and various state variables within the custom IterDataPipe
.
Note: Only one
iterator can be valid for each IterDataPipe
at a time, and the creation a second iterator will invalidate the first one. This constraint is necessary because some IterDataPipe
have internal buffers, whose states can become invalid if there are multiple iterators. The code example below presents details on how this constraint looks in practice. If you have any feedback related to this constraint, please see GitHub IterDataPipe Single Iterator Issue
_.
These DataPipes can be invoked in two ways, using the class constructor or applying their functional form onto an existing IterDataPipe
(recommended, available to most but not all DataPipes). You can chain multiple IterDataPipe
together to form a pipeline that will perform multiple operations in succession.
.. _GitHub IterDataPipe Single Iterator Issue: https://github.com/pytorch/data/issues/45
Note: When a subclass is used with :class:~torch.utils.data.DataLoader
, each item in the DataPipe will be yielded from the :class:~torch.utils.data.DataLoader
iterator. When :attr:num_workers > 0
, each worker process will have a different copy of the DataPipe object, so it is often desired to configure each copy independently to avoid having duplicate data returned from the workers. :func:~torch.utils.data.get_worker_info
, when called in a worker process, returns information about the worker. It can be used in either the dataset’s :meth:__iter__
method or the :class:~torch.utils.data.DataLoader
’s :attr:worker_init_fn
option to modify each copy’s behavior.
Examples: General Usage: >>> # xdoctest: +SKIP >>> from torchdata.datapipes.iter import IterableWrapper, Mapper >>> dp = IterableWrapper(range(10)) >>> map_dp_1 = Mapper(dp, lambda x: x + 1) # Using class constructor >>> map_dp_2 = dp.map(lambda x: x + 1) # Using functional form (recommended) >>> list(map_dp_1) [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] >>> list(map_dp_2) [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] >>> filter_dp = map_dp_1.filter(lambda x: x % 2 == 0) >>> list(filter_dp) [2, 4, 6, 8, 10] Single Iterator Constraint Example: >>> from torchdata.datapipes.iter import IterableWrapper, Mapper >>> dp = IterableWrapper(range(10)) >>> it1 = iter(source_dp) >>> list(it1) [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] >>> it1 = iter(source_dp) >>> it2 = iter(source_dp) # The creation of a new iterator invalidates it1
>>> next(it2) 0 >>> next(it1) # Further usage of it1
will raise a RunTimeError
= PyPrimativeConverter([np.array([0.5])])
pipe L(pipe)
(#1) [0.5]
= PyPrimativeConverter([np.array([1])])
pipe L(pipe)
(#1) [1]
= PyPrimativeConverter([np.array([True])])
pipe L(pipe)
(#1) [True]
from multiprocessing import get_start_method
get_start_method()
'fork'
= ProgressBarLogger() logger_base
traverse(logger_base)
{140047796619344: (ProgressBarLogger, {})}
# Setup up the core NN
0)
torch.manual_seed(= DQN(4,2)
model
= AgentBase(model)
agent = LoggerBasePassThrough(agent,[logger_base])
agent = StepFieldSelector(agent,field='state')
agent = SimpleModelRunner(agent)
agent = ArgMaxer(agent)
agent = EpsilonSelector(agent,min_epsilon=0,max_epsilon=1,max_steps=100)
selector = EpsilonCollector(selector)
agent = ArgMaxer(agent,only_idx=True)
agent = NumpyConverter(agent)
agent = PyPrimativeConverter(agent)
agent = AgentHead(agent)
agent
traverse(agent)
{140047622144464: (AgentHead,
{140047622144656: (PyPrimativeConverter,
{140047622145360: (NumpyConverter,
{140047622144208: (ArgMaxer,
{140047622147152: (EpsilonCollector,
{140047622147088: (EpsilonSelector,
{140047622147728: (ArgMaxer,
{140047622811472: (SimpleModelRunner,
{140047796723664: (StepFieldSelector,
{140047796643280: (LoggerBasePassThrough,
{140047624034640: (AgentBase, {}),
140047796619344: (ProgressBarLogger, {})})}),
140047624034640: (AgentBase, {})})}),
140047624034640: (AgentBase, {})})})})})}),
140047624034640: (AgentBase, {})})}
for action in agent([SimpleStep.random(state=tensor([[1.,2.,3.,4.]]))]*800):
pass # print(action)
= list(logger_base.dequeue())
epsilon_logs len(epsilon_logs),801) test_eq(