0)
torch.manual_seed(= DQN(4,2)
model
= DQNAgent(model) agent
DQN Basic
Model
DQN
DQN (state_sz:int, action_sz:int, hidden=512, head_layer:fastrl.torch_core.Module=<class 'torch.nn.modules.linear.Linear'>, activition_fn:fastrl.torch_core.Module=<class 'torch.nn.modules.activation.ReLU'>)
Same as nn.Module
, but no need for subclasses to call super().__init__
Type | Default | Details | |
---|---|---|---|
state_sz | int | The input dim of the state | |
action_sz | int | The output dim of the actions | |
hidden | int | 512 | Number of neurons connected between the 2 input/output layers |
head_layer | Module | Linear | DQN extensions such as Dueling DQNs have custom heads |
activition_fn | Module | ReLU | The activiation fn used by DQN |
Agent
DQNAgent
DQNAgent (model, logger_bases=None, min_epsilon=0.02, max_epsilon=1, max_steps=1000, device='cpu', dp_augmentation_fns:Union[List[Ca llable[[Union[torch.utils.data.datapipes.datapipe.IterDataPipe, torch.utils.data.datapipes.datapipe.MapDataPipe]],Union[torch.u tils.data.datapipes.datapipe.IterDataPipe,torch.utils.data.data pipes.datapipe.MapDataPipe,NoneType]]],NoneType]=None)
= tensor([1,2,3,4]).float()
input_tensor = SimpleStep(state=input_tensor)
step
for action in agent([step]):
print(action)
1., 2., 3., 4.])) test_eq(input_tensor,tensor([
1
from fastrl.envs.gym import *
=True AgentHead.debug
# Setup Logger
= ProgressBarLogger()
logger_base
# Setup up the core NN
0)
torch.manual_seed(= DQN(4,2)
model
= DQNAgent(model,[logger_base])
agent
= DataBlock(
block =10)
GymTransformBlock(agent,n
)# dls = L(block.dataloaders(['CartPole-v1']*1,n=10,bs=1))
= L(block.datapipes(['CartPole-v1']*1))
pipes
# list(dls[0])
list(pipes[0])
traverse(agent)
{139835997488656: (AgentHead,
{139835997488848: (PyPrimativeConverter,
{139835997489168: (NumpyConverter,
{139835997490512: (ArgMaxer,
{139835997489104: (EpsilonCollector,
{139835791724432: (EpsilonSelector,
{139835791724368: (ArgMaxer,
{139835791808976: (SimpleModelRunner,
{139835791808912: (InputInjester,
{139835791808784: (StepFieldSelector,
{139837965380880: (AgentBase,
{139835791808400: (ProgressBarLogger, {})})})}),
139837965380880: (AgentBase,
{139835791808400: (ProgressBarLogger, {})})})}),
139837965380880: (AgentBase,
{139835791808400: (ProgressBarLogger, {})})})})})})}),
139837965380880: (AgentBase, {139835791808400: (ProgressBarLogger, {})})})}
Training DataPipes
QCalc
QCalc (*args, **kwds)
Iterable-style DataPipe.
All DataPipes that represent an iterable of data samples should subclass this. This style of DataPipes is particularly useful when data come from a stream, or when the number of samples is too large to fit them all in memory. IterDataPipe
is lazily initialized and its elements are computed only when next()
is called on the iterator of an IterDataPipe
.
All subclasses should overwrite :meth:__iter__
, which would return an iterator of samples in this DataPipe. Calling __iter__
of an IterDataPipe
automatically invokes its method reset()
, which by default performs no operation. When writing a custom IterDataPipe
, users should override reset()
if necessary. The common usages include resetting buffers, pointers, and various state variables within the custom IterDataPipe
.
Note: Only one
iterator can be valid for each IterDataPipe
at a time, and the creation a second iterator will invalidate the first one. This constraint is necessary because some IterDataPipe
have internal buffers, whose states can become invalid if there are multiple iterators. The code example below presents details on how this constraint looks in practice. If you have any feedback related to this constraint, please see GitHub IterDataPipe Single Iterator Issue
_.
These DataPipes can be invoked in two ways, using the class constructor or applying their functional form onto an existing IterDataPipe
(recommended, available to most but not all DataPipes). You can chain multiple IterDataPipe
together to form a pipeline that will perform multiple operations in succession.
.. _GitHub IterDataPipe Single Iterator Issue: https://github.com/pytorch/data/issues/45
Note: When a subclass is used with :class:~torch.utils.data.DataLoader
, each item in the DataPipe will be yielded from the :class:~torch.utils.data.DataLoader
iterator. When :attr:num_workers > 0
, each worker process will have a different copy of the DataPipe object, so it is often desired to configure each copy independently to avoid having duplicate data returned from the workers. :func:~torch.utils.data.get_worker_info
, when called in a worker process, returns information about the worker. It can be used in either the dataset’s :meth:__iter__
method or the :class:~torch.utils.data.DataLoader
’s :attr:worker_init_fn
option to modify each copy’s behavior.
Examples: General Usage: >>> # xdoctest: +SKIP >>> from torchdata.datapipes.iter import IterableWrapper, Mapper >>> dp = IterableWrapper(range(10)) >>> map_dp_1 = Mapper(dp, lambda x: x + 1) # Using class constructor >>> map_dp_2 = dp.map(lambda x: x + 1) # Using functional form (recommended) >>> list(map_dp_1) [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] >>> list(map_dp_2) [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] >>> filter_dp = map_dp_1.filter(lambda x: x % 2 == 0) >>> list(filter_dp) [2, 4, 6, 8, 10] Single Iterator Constraint Example: >>> from torchdata.datapipes.iter import IterableWrapper, Mapper >>> dp = IterableWrapper(range(10)) >>> it1 = iter(source_dp) >>> list(it1) [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] >>> it1 = iter(source_dp) >>> it2 = iter(source_dp) # The creation of a new iterator invalidates it1
>>> next(it2) 0 >>> next(it1) # Further usage of it1
will raise a RunTimeError
TargetCalc
TargetCalc (*args, **kwds)
Iterable-style DataPipe.
All DataPipes that represent an iterable of data samples should subclass this. This style of DataPipes is particularly useful when data come from a stream, or when the number of samples is too large to fit them all in memory. IterDataPipe
is lazily initialized and its elements are computed only when next()
is called on the iterator of an IterDataPipe
.
All subclasses should overwrite :meth:__iter__
, which would return an iterator of samples in this DataPipe. Calling __iter__
of an IterDataPipe
automatically invokes its method reset()
, which by default performs no operation. When writing a custom IterDataPipe
, users should override reset()
if necessary. The common usages include resetting buffers, pointers, and various state variables within the custom IterDataPipe
.
Note: Only one
iterator can be valid for each IterDataPipe
at a time, and the creation a second iterator will invalidate the first one. This constraint is necessary because some IterDataPipe
have internal buffers, whose states can become invalid if there are multiple iterators. The code example below presents details on how this constraint looks in practice. If you have any feedback related to this constraint, please see GitHub IterDataPipe Single Iterator Issue
_.
These DataPipes can be invoked in two ways, using the class constructor or applying their functional form onto an existing IterDataPipe
(recommended, available to most but not all DataPipes). You can chain multiple IterDataPipe
together to form a pipeline that will perform multiple operations in succession.
.. _GitHub IterDataPipe Single Iterator Issue: https://github.com/pytorch/data/issues/45
Note: When a subclass is used with :class:~torch.utils.data.DataLoader
, each item in the DataPipe will be yielded from the :class:~torch.utils.data.DataLoader
iterator. When :attr:num_workers > 0
, each worker process will have a different copy of the DataPipe object, so it is often desired to configure each copy independently to avoid having duplicate data returned from the workers. :func:~torch.utils.data.get_worker_info
, when called in a worker process, returns information about the worker. It can be used in either the dataset’s :meth:__iter__
method or the :class:~torch.utils.data.DataLoader
’s :attr:worker_init_fn
option to modify each copy’s behavior.
Examples: General Usage: >>> # xdoctest: +SKIP >>> from torchdata.datapipes.iter import IterableWrapper, Mapper >>> dp = IterableWrapper(range(10)) >>> map_dp_1 = Mapper(dp, lambda x: x + 1) # Using class constructor >>> map_dp_2 = dp.map(lambda x: x + 1) # Using functional form (recommended) >>> list(map_dp_1) [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] >>> list(map_dp_2) [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] >>> filter_dp = map_dp_1.filter(lambda x: x % 2 == 0) >>> list(filter_dp) [2, 4, 6, 8, 10] Single Iterator Constraint Example: >>> from torchdata.datapipes.iter import IterableWrapper, Mapper >>> dp = IterableWrapper(range(10)) >>> it1 = iter(source_dp) >>> list(it1) [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] >>> it1 = iter(source_dp) >>> it2 = iter(source_dp) # The creation of a new iterator invalidates it1
>>> next(it2) 0 >>> next(it1) # Further usage of it1
will raise a RunTimeError
LossCalc
LossCalc (*args, **kwds)
Iterable-style DataPipe.
All DataPipes that represent an iterable of data samples should subclass this. This style of DataPipes is particularly useful when data come from a stream, or when the number of samples is too large to fit them all in memory. IterDataPipe
is lazily initialized and its elements are computed only when next()
is called on the iterator of an IterDataPipe
.
All subclasses should overwrite :meth:__iter__
, which would return an iterator of samples in this DataPipe. Calling __iter__
of an IterDataPipe
automatically invokes its method reset()
, which by default performs no operation. When writing a custom IterDataPipe
, users should override reset()
if necessary. The common usages include resetting buffers, pointers, and various state variables within the custom IterDataPipe
.
Note: Only one
iterator can be valid for each IterDataPipe
at a time, and the creation a second iterator will invalidate the first one. This constraint is necessary because some IterDataPipe
have internal buffers, whose states can become invalid if there are multiple iterators. The code example below presents details on how this constraint looks in practice. If you have any feedback related to this constraint, please see GitHub IterDataPipe Single Iterator Issue
_.
These DataPipes can be invoked in two ways, using the class constructor or applying their functional form onto an existing IterDataPipe
(recommended, available to most but not all DataPipes). You can chain multiple IterDataPipe
together to form a pipeline that will perform multiple operations in succession.
.. _GitHub IterDataPipe Single Iterator Issue: https://github.com/pytorch/data/issues/45
Note: When a subclass is used with :class:~torch.utils.data.DataLoader
, each item in the DataPipe will be yielded from the :class:~torch.utils.data.DataLoader
iterator. When :attr:num_workers > 0
, each worker process will have a different copy of the DataPipe object, so it is often desired to configure each copy independently to avoid having duplicate data returned from the workers. :func:~torch.utils.data.get_worker_info
, when called in a worker process, returns information about the worker. It can be used in either the dataset’s :meth:__iter__
method or the :class:~torch.utils.data.DataLoader
’s :attr:worker_init_fn
option to modify each copy’s behavior.
Examples: General Usage: >>> # xdoctest: +SKIP >>> from torchdata.datapipes.iter import IterableWrapper, Mapper >>> dp = IterableWrapper(range(10)) >>> map_dp_1 = Mapper(dp, lambda x: x + 1) # Using class constructor >>> map_dp_2 = dp.map(lambda x: x + 1) # Using functional form (recommended) >>> list(map_dp_1) [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] >>> list(map_dp_2) [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] >>> filter_dp = map_dp_1.filter(lambda x: x % 2 == 0) >>> list(filter_dp) [2, 4, 6, 8, 10] Single Iterator Constraint Example: >>> from torchdata.datapipes.iter import IterableWrapper, Mapper >>> dp = IterableWrapper(range(10)) >>> it1 = iter(source_dp) >>> list(it1) [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] >>> it1 = iter(source_dp) >>> it2 = iter(source_dp) # The creation of a new iterator invalidates it1
>>> next(it2) 0 >>> next(it1) # Further usage of it1
will raise a RunTimeError
ModelLearnCalc
ModelLearnCalc (*args, **kwds)
Iterable-style DataPipe.
All DataPipes that represent an iterable of data samples should subclass this. This style of DataPipes is particularly useful when data come from a stream, or when the number of samples is too large to fit them all in memory. IterDataPipe
is lazily initialized and its elements are computed only when next()
is called on the iterator of an IterDataPipe
.
All subclasses should overwrite :meth:__iter__
, which would return an iterator of samples in this DataPipe. Calling __iter__
of an IterDataPipe
automatically invokes its method reset()
, which by default performs no operation. When writing a custom IterDataPipe
, users should override reset()
if necessary. The common usages include resetting buffers, pointers, and various state variables within the custom IterDataPipe
.
Note: Only one
iterator can be valid for each IterDataPipe
at a time, and the creation a second iterator will invalidate the first one. This constraint is necessary because some IterDataPipe
have internal buffers, whose states can become invalid if there are multiple iterators. The code example below presents details on how this constraint looks in practice. If you have any feedback related to this constraint, please see GitHub IterDataPipe Single Iterator Issue
_.
These DataPipes can be invoked in two ways, using the class constructor or applying their functional form onto an existing IterDataPipe
(recommended, available to most but not all DataPipes). You can chain multiple IterDataPipe
together to form a pipeline that will perform multiple operations in succession.
.. _GitHub IterDataPipe Single Iterator Issue: https://github.com/pytorch/data/issues/45
Note: When a subclass is used with :class:~torch.utils.data.DataLoader
, each item in the DataPipe will be yielded from the :class:~torch.utils.data.DataLoader
iterator. When :attr:num_workers > 0
, each worker process will have a different copy of the DataPipe object, so it is often desired to configure each copy independently to avoid having duplicate data returned from the workers. :func:~torch.utils.data.get_worker_info
, when called in a worker process, returns information about the worker. It can be used in either the dataset’s :meth:__iter__
method or the :class:~torch.utils.data.DataLoader
’s :attr:worker_init_fn
option to modify each copy’s behavior.
Examples: General Usage: >>> # xdoctest: +SKIP >>> from torchdata.datapipes.iter import IterableWrapper, Mapper >>> dp = IterableWrapper(range(10)) >>> map_dp_1 = Mapper(dp, lambda x: x + 1) # Using class constructor >>> map_dp_2 = dp.map(lambda x: x + 1) # Using functional form (recommended) >>> list(map_dp_1) [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] >>> list(map_dp_2) [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] >>> filter_dp = map_dp_1.filter(lambda x: x % 2 == 0) >>> list(filter_dp) [2, 4, 6, 8, 10] Single Iterator Constraint Example: >>> from torchdata.datapipes.iter import IterableWrapper, Mapper >>> dp = IterableWrapper(range(10)) >>> it1 = iter(source_dp) >>> list(it1) [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] >>> it1 = iter(source_dp) >>> it2 = iter(source_dp) # The creation of a new iterator invalidates it1
>>> next(it2) 0 >>> next(it1) # Further usage of it1
will raise a RunTimeError
LossCollector
LossCollector (*args, **kwds)
LogCollector
specifically manages finding and attaching itself to LoggerBase
s found earlier in the pipeline.
DQNLearner
DQNLearner (model, dls, logger_bases=None, loss_func=MSELoss(), opt=<class 'torch.optim.adamw.AdamW'>, lr=0.005, bs=128, max_sz=10000, nsteps=1, device=None, batches=None, dp_augment ation_fns:Union[List[Callable[[Union[torch.utils.data.datapip es.datapipe.IterDataPipe,torch.utils.data.datapipes.datapipe. MapDataPipe]],Union[torch.utils.data.datapipes.datapipe.IterD ataPipe,torch.utils.data.datapipes.datapipe.MapDataPipe,NoneT ype]]],NoneType]=None)
Try training with basic defaults…
# Setup Loggers
= ProgressBarLogger(epoch_on_pipe=EpocherCollector,
logger_base =BatchCollector)
batch_on_pipe
# Setup up the core NN
0)
torch.manual_seed(= DQN(4,2).cuda()
model # Setup the Agent
= DQNAgent(model,[logger_base],max_steps=4000,device='cuda')
agent # Setup the DataBlock
= DataBlock(
block =agent,nsteps=1,nskips=1,firstlast=False,bs=1)
GymTransformBlock(agent
)
= L(block.dataloaders(['CartPole-v1']*1,num_workers=0))
dls
# Setup the Learner
= DQNLearner(model,dls,batches=4,#1000,
learner =[logger_base],
logger_bases=2,#128,
bs=100_000,device='cuda')
max_sz# learner.fit(3)
2) learner.fit(
loss | episode | rolling_reward | epoch | batch | epsilon |
---|---|---|---|---|---|
0.44844115 | 1 | None | 1 | 5 | 0.998500 |
0.41163164 | 1 | None | 1 | 5 | 0.997500 |
If we try a regular DQN with nsteps/nskips it doesnt really converge after 130. We cant expect stability at all, and im pretty sure that nsteps (correctly) tries to reduce to number of duplicated states so that the agent can sample more unique state transitions. The problem with this is that the base dqn is not stable, so giving it lots of “new” stuff, im not sure helps. In otherwords, its going to forget the old stuff very quickly, and having duplicate states helps “remind it”
= ProgressBarLogger(epoch_on_pipe=EpocherCollector,
logger_base =BatchCollector)
batch_on_pipe
# Setup up the core NN
0)
torch.manual_seed(= DQN(4,2)
model # Setup the Agent
= DQNAgent(model,[logger_base],max_steps=10000)
agent # Setup the DataBlock
= DataBlock(
block =agent,nsteps=2,nskips=2,firstlast=True) # We basically merge 2 steps into 1 and skip.
GymTransformBlock(agent
)# pipes = L(block.datapipes(['CartPole-v1']*1,n=10))
= L(block.dataloaders(['CartPole-v1']*1))
dls # Setup the Learner
= DQNLearner(model,dls,batches=1000,logger_bases=[logger_base],bs=128,max_sz=20_000,nsteps=2,lr=0.001)
learner # learner.fit(3)
# learner.fit(20)
import pandas as pd
from fastrl.pipes.core import *
from fastrl.pipes.map.transforms import *
from fastrl.envs.gym import GymTypeTransform,GymStepper
from fastrl.loggers.core import *
from fastrl.loggers.jupyter_visualizers import *
from fastrl.loggers.vscode_visualizers import *
# Setup Loggers
= ProgressBarLogger(epoch_on_pipe=EpocherCollector,
logger_base =BatchCollector)
batch_on_pipe
# Setup up the core NN
0)
torch.manual_seed(= DQN(8,4)
model # Setup the Agent
= DQNAgent(model,[logger_base])
agent # Setup the DataBlock
= DataBlock(
block =agent,bs=1,n=1000)
GymTransformBlock(agent
)= L(block.dataloaders(['LunarLander-v2']*1))
dls # Setup the Learner
= DQNLearner(model,dls,logger_bases=[logger_base])
learner 3)
learner.fit(# learner.fit(30)
loss | episode | rolling_reward | epoch | batch | epsilon |
---|---|---|---|---|---|
0.44493392 | 9 | -174.366881 | 1 | 1000 | 0.020000 |
0.57215977 | 11 | -149.441252 | 2 | 1000 | 0.020000 |
0.37252918 | 11 | -149.441252 | 2 | 1000 | 0.020000 |
import pandas as pd
from fastrl.pipes.core import *
from fastrl.pipes.map.transforms import *
from fastrl.envs.gym import GymTypeTransform,GymStepper
from fastrl.loggers.vscode_visualizers import VSCodeTransformBlock