import pandas as pd
from fastrl.envs.gym import GymTypeTransform,GymStepper
from fastrl.pipes.iter.nstep import *
from fastrl.pipes.map.transforms import *
FirstLast
DataPipe for merging multiple
StepTypes
into a single one by keeping the first and last steps.
FirstLastMerger
FirstLastMerger (*args, **kwds)
Takes multiple steps and converts them into a single step consisting of properties from the first and last steps. Reward is recalculated to factor in the multiple steps.
Below we see an example where we collect 2 steps for each env, then yield them. This is useful for training models of larger chunks of env step output.
def first_last_test(envs,total_steps,n=1,seed=0):
= dp.map.Mapper(envs)
pipe = TypeTransformer(pipe,[GymTypeTransform])
pipe = dp.iter.MapToIterConverter(pipe)
pipe = dp.iter.InMemoryCacheHolder(pipe)
pipe = pipe.cycle()
pipe = GymStepper(pipe,seed=seed)
pipe = NStepper(pipe,n=n)
pipe = FirstLastMerger(pipe)
pipe = pipe.header(total_steps)
pipe return list(pipe)
= first_last_test(['CartPole-v1']*3,200,2,0)
steps 'state','next_state','env_id','terminated']][:10] pd.DataFrame(steps)[[
/opt/conda/lib/python3.7/site-packages/torchdata/datapipes/iter/util/header.py:60: UserWarning: The length of this HeaderIterDataPipe is inferred to be equal to its limit.The actual value may be smaller if the actual length of source_datapipe is smaller than the limit.
"The length of this HeaderIterDataPipe is inferred to be equal to its limit."
state | next_state | env_id | terminated | |
---|---|---|---|---|
0 | [tensor(0.0137), tensor(-0.0230), tensor(-0.0459), tensor(-0.0483)] | [tensor(0.0167), tensor(0.3685), tensor(-0.0540), tensor(-0.6622)] | tensor(139621059434512) | tensor(False) |
1 | [tensor(0.0137), tensor(-0.0230), tensor(-0.0459), tensor(-0.0483)] | [tensor(0.0167), tensor(0.3685), tensor(-0.0540), tensor(-0.6622)] | tensor(139621060390992) | tensor(False) |
2 | [tensor(0.0137), tensor(-0.0230), tensor(-0.0459), tensor(-0.0483)] | [tensor(0.0167), tensor(0.3685), tensor(-0.0540), tensor(-0.6622)] | tensor(139621060250832) | tensor(False) |
3 | [tensor(0.0132), tensor(0.1727), tensor(-0.0469), tensor(-0.3552)] | [tensor(0.0241), tensor(0.5643), tensor(-0.0672), tensor(-0.9714)] | tensor(139621059434512) | tensor(False) |
4 | [tensor(0.0132), tensor(0.1727), tensor(-0.0469), tensor(-0.3552)] | [tensor(0.0241), tensor(0.5643), tensor(-0.0672), tensor(-0.9714)] | tensor(139621060390992) | tensor(False) |
5 | [tensor(0.0132), tensor(0.1727), tensor(-0.0469), tensor(-0.3552)] | [tensor(0.0241), tensor(0.5643), tensor(-0.0672), tensor(-0.9714)] | tensor(139621060250832) | tensor(False) |
6 | [tensor(0.0167), tensor(0.3685), tensor(-0.0540), tensor(-0.6622)] | [tensor(0.0353), tensor(0.3702), tensor(-0.0866), tensor(-0.7006)] | tensor(139621059434512) | tensor(False) |
7 | [tensor(0.0167), tensor(0.3685), tensor(-0.0540), tensor(-0.6622)] | [tensor(0.0353), tensor(0.3702), tensor(-0.0866), tensor(-0.7006)] | tensor(139621060390992) | tensor(False) |
8 | [tensor(0.0167), tensor(0.3685), tensor(-0.0540), tensor(-0.6622)] | [tensor(0.0353), tensor(0.3702), tensor(-0.0866), tensor(-0.7006)] | tensor(139621060250832) | tensor(False) |
9 | [tensor(0.0241), tensor(0.5643), tensor(-0.0672), tensor(-0.9714)] | [tensor(0.0427), tensor(0.1763), tensor(-0.1007), tensor(-0.4364)] | tensor(139621059434512) | tensor(False) |
First, NStepper(pipe,n=1)
with FirstLastMerger
should be identical to a pipelines that never used it.
import pandas as pd
from fastrl.envs.gym import GymTypeTransform,GymStepper
= dp.map.Mapper(['CartPole-v1']*3)
pipe = TypeTransformer(pipe,[GymTypeTransform])
pipe = dp.iter.MapToIterConverter(pipe)
pipe = dp.iter.InMemoryCacheHolder(pipe)
pipe = pipe.cycle()
pipe = GymStepper(pipe,seed=0)
pipe = pipe.header(10)
pipe
= list(pipe)
no_n_steps = first_last_test(['CartPole-v1']*3,10,1,0) steps
/opt/conda/lib/python3.7/site-packages/torchdata/datapipes/iter/util/header.py:60: UserWarning: The length of this HeaderIterDataPipe is inferred to be equal to its limit.The actual value may be smaller if the actual length of source_datapipe is smaller than the limit.
"The length of this HeaderIterDataPipe is inferred to be equal to its limit."
If n=1
we should expect that regardless of the number of envs, both n-step and simple environment pipelines should be identical.
test_len(steps,no_n_steps)for field in ['next_state','state','terminated']:
for i,(step,no_n_step) in enumerate(zip(steps,no_n_steps)):
getattr(step,field),getattr(no_n_step,field)) test_eq(
n_first_last_steps_expected
n_first_last_steps_expected (default_steps:int)
This function doesnt do much for now. FirstLastMerger
pretty much undoes the number of steps nsteps
does.
Type | Details | |
---|---|---|
default_steps | int | The number of steps the episode would run without n_steps |
= n_first_last_steps_expected(default_steps=18)
expected_n_steps print('Given the above values, we expect a single episode to be ',expected_n_steps,' steps long')
= first_last_test(['CartPole-v1']*1,expected_n_steps+1,2,0)
steps # The first episode should have ended on row 34, beign 35 steps long. The 36th row should be a new episode
-2].terminated,tensor([True]))
test_eq(steps[-2].episode_n,tensor([1]))
test_eq(steps[-2].step_n,tensor([18]))
test_eq(steps[-1].terminated,tensor([False]))
test_eq(steps[-1].episode_n,tensor([2]))
test_eq(steps[-1].step_n,tensor([2])) # Main difference, the "step" for the new episode will be 2 instead of 1 test_eq(steps[
Given the above values, we expect a single episode to be 18 steps long
= n_first_last_steps_expected(default_steps=18)
expected_n_steps print('Given the above values, we expect a single episode to be ',expected_n_steps,' steps long')
= first_last_test(['CartPole-v1']*1,expected_n_steps+1,4,0)
steps # The first episode should have ended on row 34, beign 35 steps long. The 36th row should be a new episode
-2].terminated,tensor([True]))
test_eq(steps[-2].episode_n,tensor([1]))
test_eq(steps[-2].step_n,tensor([18]))
test_eq(steps[-1].terminated,tensor([False]))
test_eq(steps[-1].episode_n,tensor([2]))
test_eq(steps[-1].step_n,tensor([4])) test_eq(steps[
Given the above values, we expect a single episode to be 18 steps long
= n_first_last_steps_expected(default_steps=18)
expected_n_steps print('Given the above values, we expect a single episode to be ',expected_n_steps,' steps long')
= first_last_test(['CartPole-v1']*3,expected_n_steps*3+1,2,0)
steps # The first episode should have ended on row 34, beign 35 steps long. The 36th row should be a new episode
-2].terminated,tensor([True]))
test_eq(steps[-2].episode_n,tensor([1]))
test_eq(steps[-2].step_n,tensor([18]))
test_eq(steps[-1].terminated,tensor([False]))
test_eq(steps[-1].episode_n,tensor([2]))
test_eq(steps[-1].step_n,tensor([2])) test_eq(steps[
Given the above values, we expect a single episode to be 18 steps long