FirstLast

DataPipe for merging multiple StepTypes into a single one by keeping the first and last steps.

FirstLastMerger

 FirstLastMerger (*args, **kwds)

Takes multiple steps and converts them into a single step consisting of properties from the first and last steps. Reward is recalculated to factor in the multiple steps.

Below we see an example where we collect 2 steps for each env, then yield them. This is useful for training models of larger chunks of env step output.

import pandas as pd
from fastrl.envs.gym import GymTypeTransform,GymStepper
from fastrl.pipes.iter.nstep import *
from fastrl.pipes.map.transforms import *
def first_last_test(envs,total_steps,n=1,seed=0):
    pipe = dp.map.Mapper(envs)
    pipe = TypeTransformer(pipe,[GymTypeTransform])
    pipe = dp.iter.MapToIterConverter(pipe)
    pipe = dp.iter.InMemoryCacheHolder(pipe)
    pipe = pipe.cycle()
    pipe = GymStepper(pipe,seed=seed)
    pipe = NStepper(pipe,n=n)
    pipe = FirstLastMerger(pipe)
    pipe = pipe.header(total_steps)
    return list(pipe)

steps = first_last_test(['CartPole-v1']*3,200,2,0)
pd.DataFrame(steps)[['state','next_state','env_id','terminated']][:10]
/opt/conda/lib/python3.7/site-packages/torchdata/datapipes/iter/util/header.py:60: UserWarning: The length of this HeaderIterDataPipe is inferred to be equal to its limit.The actual value may be smaller if the actual length of source_datapipe is smaller than the limit.
  "The length of this HeaderIterDataPipe is inferred to be equal to its limit."
state next_state env_id terminated
0 [tensor(0.0137), tensor(-0.0230), tensor(-0.0459), tensor(-0.0483)] [tensor(0.0167), tensor(0.3685), tensor(-0.0540), tensor(-0.6622)] tensor(139621059434512) tensor(False)
1 [tensor(0.0137), tensor(-0.0230), tensor(-0.0459), tensor(-0.0483)] [tensor(0.0167), tensor(0.3685), tensor(-0.0540), tensor(-0.6622)] tensor(139621060390992) tensor(False)
2 [tensor(0.0137), tensor(-0.0230), tensor(-0.0459), tensor(-0.0483)] [tensor(0.0167), tensor(0.3685), tensor(-0.0540), tensor(-0.6622)] tensor(139621060250832) tensor(False)
3 [tensor(0.0132), tensor(0.1727), tensor(-0.0469), tensor(-0.3552)] [tensor(0.0241), tensor(0.5643), tensor(-0.0672), tensor(-0.9714)] tensor(139621059434512) tensor(False)
4 [tensor(0.0132), tensor(0.1727), tensor(-0.0469), tensor(-0.3552)] [tensor(0.0241), tensor(0.5643), tensor(-0.0672), tensor(-0.9714)] tensor(139621060390992) tensor(False)
5 [tensor(0.0132), tensor(0.1727), tensor(-0.0469), tensor(-0.3552)] [tensor(0.0241), tensor(0.5643), tensor(-0.0672), tensor(-0.9714)] tensor(139621060250832) tensor(False)
6 [tensor(0.0167), tensor(0.3685), tensor(-0.0540), tensor(-0.6622)] [tensor(0.0353), tensor(0.3702), tensor(-0.0866), tensor(-0.7006)] tensor(139621059434512) tensor(False)
7 [tensor(0.0167), tensor(0.3685), tensor(-0.0540), tensor(-0.6622)] [tensor(0.0353), tensor(0.3702), tensor(-0.0866), tensor(-0.7006)] tensor(139621060390992) tensor(False)
8 [tensor(0.0167), tensor(0.3685), tensor(-0.0540), tensor(-0.6622)] [tensor(0.0353), tensor(0.3702), tensor(-0.0866), tensor(-0.7006)] tensor(139621060250832) tensor(False)
9 [tensor(0.0241), tensor(0.5643), tensor(-0.0672), tensor(-0.9714)] [tensor(0.0427), tensor(0.1763), tensor(-0.1007), tensor(-0.4364)] tensor(139621059434512) tensor(False)

First, NStepper(pipe,n=1) with FirstLastMerger should be identical to a pipelines that never used it.

import pandas as pd
from fastrl.envs.gym import GymTypeTransform,GymStepper
pipe = dp.map.Mapper(['CartPole-v1']*3)
pipe = TypeTransformer(pipe,[GymTypeTransform])
pipe = dp.iter.MapToIterConverter(pipe)
pipe = dp.iter.InMemoryCacheHolder(pipe)
pipe = pipe.cycle()
pipe = GymStepper(pipe,seed=0)
pipe = pipe.header(10)

no_n_steps = list(pipe)
steps = first_last_test(['CartPole-v1']*3,10,1,0)
/opt/conda/lib/python3.7/site-packages/torchdata/datapipes/iter/util/header.py:60: UserWarning: The length of this HeaderIterDataPipe is inferred to be equal to its limit.The actual value may be smaller if the actual length of source_datapipe is smaller than the limit.
  "The length of this HeaderIterDataPipe is inferred to be equal to its limit."

If n=1 we should expect that regardless of the number of envs, both n-step and simple environment pipelines should be identical.

test_len(steps,no_n_steps)
for field in ['next_state','state','terminated']:
    for i,(step,no_n_step) in enumerate(zip(steps,no_n_steps)): 
        test_eq(getattr(step,field),getattr(no_n_step,field))

n_first_last_steps_expected

 n_first_last_steps_expected (default_steps:int)

This function doesnt do much for now. FirstLastMerger pretty much undoes the number of steps nsteps does.

Type Details
default_steps int The number of steps the episode would run without n_steps
expected_n_steps = n_first_last_steps_expected(default_steps=18)
print('Given the above values, we expect a single episode to be ',expected_n_steps,' steps long')
steps = first_last_test(['CartPole-v1']*1,expected_n_steps+1,2,0)
# The first episode should have ended on row 34, beign 35 steps long. The 36th row should be a new episode
test_eq(steps[-2].terminated,tensor([True]))
test_eq(steps[-2].episode_n,tensor([1]))
test_eq(steps[-2].step_n,tensor([18]))
test_eq(steps[-1].terminated,tensor([False]))
test_eq(steps[-1].episode_n,tensor([2]))
test_eq(steps[-1].step_n,tensor([2])) # Main difference, the "step" for the new episode will be 2 instead of 1
Given the above values, we expect a single episode to be  18  steps long
expected_n_steps = n_first_last_steps_expected(default_steps=18)
print('Given the above values, we expect a single episode to be ',expected_n_steps,' steps long')
steps = first_last_test(['CartPole-v1']*1,expected_n_steps+1,4,0)
# The first episode should have ended on row 34, beign 35 steps long. The 36th row should be a new episode
test_eq(steps[-2].terminated,tensor([True]))
test_eq(steps[-2].episode_n,tensor([1]))
test_eq(steps[-2].step_n,tensor([18]))
test_eq(steps[-1].terminated,tensor([False]))
test_eq(steps[-1].episode_n,tensor([2]))
test_eq(steps[-1].step_n,tensor([4]))
Given the above values, we expect a single episode to be  18  steps long
expected_n_steps = n_first_last_steps_expected(default_steps=18)
print('Given the above values, we expect a single episode to be ',expected_n_steps,' steps long')
steps = first_last_test(['CartPole-v1']*3,expected_n_steps*3+1,2,0)
# The first episode should have ended on row 34, beign 35 steps long. The 36th row should be a new episode
test_eq(steps[-2].terminated,tensor([True]))
test_eq(steps[-2].episode_n,tensor([1]))
test_eq(steps[-2].step_n,tensor([18]))
test_eq(steps[-1].terminated,tensor([False]))
test_eq(steps[-1].episode_n,tensor([2]))
test_eq(steps[-1].step_n,tensor([2]))
Given the above values, we expect a single episode to be  18  steps long