import pandas as pd
from fastrl.envs.gym import GymTypeTransform,GymStepper
from fastrl.pipes.iter.nstep import *
from fastrl.pipes.map.transforms import *FirstLast
DataPipe for merging multiple
StepTypes into a single one by keeping the first and last steps.
FirstLastMerger
FirstLastMerger (*args, **kwds)
Takes multiple steps and converts them into a single step consisting of properties from the first and last steps. Reward is recalculated to factor in the multiple steps.
Below we see an example where we collect 2 steps for each env, then yield them. This is useful for training models of larger chunks of env step output.
def first_last_test(envs,total_steps,n=1,seed=0):
pipe = dp.map.Mapper(envs)
pipe = TypeTransformer(pipe,[GymTypeTransform])
pipe = dp.iter.MapToIterConverter(pipe)
pipe = dp.iter.InMemoryCacheHolder(pipe)
pipe = pipe.cycle()
pipe = GymStepper(pipe,seed=seed)
pipe = NStepper(pipe,n=n)
pipe = FirstLastMerger(pipe)
pipe = pipe.header(total_steps)
return list(pipe)
steps = first_last_test(['CartPole-v1']*3,200,2,0)
pd.DataFrame(steps)[['state','next_state','env_id','terminated']][:10]/opt/conda/lib/python3.7/site-packages/torchdata/datapipes/iter/util/header.py:60: UserWarning: The length of this HeaderIterDataPipe is inferred to be equal to its limit.The actual value may be smaller if the actual length of source_datapipe is smaller than the limit.
"The length of this HeaderIterDataPipe is inferred to be equal to its limit."
| state | next_state | env_id | terminated | |
|---|---|---|---|---|
| 0 | [tensor(0.0137), tensor(-0.0230), tensor(-0.0459), tensor(-0.0483)] | [tensor(0.0167), tensor(0.3685), tensor(-0.0540), tensor(-0.6622)] | tensor(139621059434512) | tensor(False) |
| 1 | [tensor(0.0137), tensor(-0.0230), tensor(-0.0459), tensor(-0.0483)] | [tensor(0.0167), tensor(0.3685), tensor(-0.0540), tensor(-0.6622)] | tensor(139621060390992) | tensor(False) |
| 2 | [tensor(0.0137), tensor(-0.0230), tensor(-0.0459), tensor(-0.0483)] | [tensor(0.0167), tensor(0.3685), tensor(-0.0540), tensor(-0.6622)] | tensor(139621060250832) | tensor(False) |
| 3 | [tensor(0.0132), tensor(0.1727), tensor(-0.0469), tensor(-0.3552)] | [tensor(0.0241), tensor(0.5643), tensor(-0.0672), tensor(-0.9714)] | tensor(139621059434512) | tensor(False) |
| 4 | [tensor(0.0132), tensor(0.1727), tensor(-0.0469), tensor(-0.3552)] | [tensor(0.0241), tensor(0.5643), tensor(-0.0672), tensor(-0.9714)] | tensor(139621060390992) | tensor(False) |
| 5 | [tensor(0.0132), tensor(0.1727), tensor(-0.0469), tensor(-0.3552)] | [tensor(0.0241), tensor(0.5643), tensor(-0.0672), tensor(-0.9714)] | tensor(139621060250832) | tensor(False) |
| 6 | [tensor(0.0167), tensor(0.3685), tensor(-0.0540), tensor(-0.6622)] | [tensor(0.0353), tensor(0.3702), tensor(-0.0866), tensor(-0.7006)] | tensor(139621059434512) | tensor(False) |
| 7 | [tensor(0.0167), tensor(0.3685), tensor(-0.0540), tensor(-0.6622)] | [tensor(0.0353), tensor(0.3702), tensor(-0.0866), tensor(-0.7006)] | tensor(139621060390992) | tensor(False) |
| 8 | [tensor(0.0167), tensor(0.3685), tensor(-0.0540), tensor(-0.6622)] | [tensor(0.0353), tensor(0.3702), tensor(-0.0866), tensor(-0.7006)] | tensor(139621060250832) | tensor(False) |
| 9 | [tensor(0.0241), tensor(0.5643), tensor(-0.0672), tensor(-0.9714)] | [tensor(0.0427), tensor(0.1763), tensor(-0.1007), tensor(-0.4364)] | tensor(139621059434512) | tensor(False) |
First, NStepper(pipe,n=1) with FirstLastMerger should be identical to a pipelines that never used it.
import pandas as pd
from fastrl.envs.gym import GymTypeTransform,GymStepperpipe = dp.map.Mapper(['CartPole-v1']*3)
pipe = TypeTransformer(pipe,[GymTypeTransform])
pipe = dp.iter.MapToIterConverter(pipe)
pipe = dp.iter.InMemoryCacheHolder(pipe)
pipe = pipe.cycle()
pipe = GymStepper(pipe,seed=0)
pipe = pipe.header(10)
no_n_steps = list(pipe)
steps = first_last_test(['CartPole-v1']*3,10,1,0)/opt/conda/lib/python3.7/site-packages/torchdata/datapipes/iter/util/header.py:60: UserWarning: The length of this HeaderIterDataPipe is inferred to be equal to its limit.The actual value may be smaller if the actual length of source_datapipe is smaller than the limit.
"The length of this HeaderIterDataPipe is inferred to be equal to its limit."
If n=1 we should expect that regardless of the number of envs, both n-step and simple environment pipelines should be identical.
test_len(steps,no_n_steps)
for field in ['next_state','state','terminated']:
for i,(step,no_n_step) in enumerate(zip(steps,no_n_steps)):
test_eq(getattr(step,field),getattr(no_n_step,field))n_first_last_steps_expected
n_first_last_steps_expected (default_steps:int)
This function doesnt do much for now. FirstLastMerger pretty much undoes the number of steps nsteps does.
| Type | Details | |
|---|---|---|
| default_steps | int | The number of steps the episode would run without n_steps |
expected_n_steps = n_first_last_steps_expected(default_steps=18)
print('Given the above values, we expect a single episode to be ',expected_n_steps,' steps long')
steps = first_last_test(['CartPole-v1']*1,expected_n_steps+1,2,0)
# The first episode should have ended on row 34, beign 35 steps long. The 36th row should be a new episode
test_eq(steps[-2].terminated,tensor([True]))
test_eq(steps[-2].episode_n,tensor([1]))
test_eq(steps[-2].step_n,tensor([18]))
test_eq(steps[-1].terminated,tensor([False]))
test_eq(steps[-1].episode_n,tensor([2]))
test_eq(steps[-1].step_n,tensor([2])) # Main difference, the "step" for the new episode will be 2 instead of 1Given the above values, we expect a single episode to be 18 steps long
expected_n_steps = n_first_last_steps_expected(default_steps=18)
print('Given the above values, we expect a single episode to be ',expected_n_steps,' steps long')
steps = first_last_test(['CartPole-v1']*1,expected_n_steps+1,4,0)
# The first episode should have ended on row 34, beign 35 steps long. The 36th row should be a new episode
test_eq(steps[-2].terminated,tensor([True]))
test_eq(steps[-2].episode_n,tensor([1]))
test_eq(steps[-2].step_n,tensor([18]))
test_eq(steps[-1].terminated,tensor([False]))
test_eq(steps[-1].episode_n,tensor([2]))
test_eq(steps[-1].step_n,tensor([4]))Given the above values, we expect a single episode to be 18 steps long
expected_n_steps = n_first_last_steps_expected(default_steps=18)
print('Given the above values, we expect a single episode to be ',expected_n_steps,' steps long')
steps = first_last_test(['CartPole-v1']*3,expected_n_steps*3+1,2,0)
# The first episode should have ended on row 34, beign 35 steps long. The 36th row should be a new episode
test_eq(steps[-2].terminated,tensor([True]))
test_eq(steps[-2].episode_n,tensor([1]))
test_eq(steps[-2].step_n,tensor([18]))
test_eq(steps[-1].terminated,tensor([False]))
test_eq(steps[-1].episode_n,tensor([2]))
test_eq(steps[-1].step_n,tensor([2]))Given the above values, we expect a single episode to be 18 steps long