NStep

DataPipe for producing grouped steps env-wise.

NStepper

 NStepper (*args, **kwds)

Accepts a source_datapipe or iterable whose next() produces a StepType of max size n that will contain steps from a single environment with a subset of fields from SimpleStep, namely terminated and env_id.


NStepFlattener

 NStepFlattener (*args, **kwds)

Handles unwrapping StepTypes in tuples better than dp.iter.UnBatcher and dp.iter.Flattener

Below we see an example where we collect 2 steps for each env, then yield them. This is useful for training models of larger chunks of env step output.

import pandas as pd
from fastrl.envs.gym import GymTypeTransform,GymStepper
def n_step_test(envs,total_steps,n=1,seed=0):
    pipe = dp.map.Mapper(envs)
    pipe = TypeTransformer(pipe,[GymTypeTransform])
    pipe = dp.iter.MapToIterConverter(pipe)
    pipe = dp.iter.InMemoryCacheHolder(pipe)
    pipe = pipe.cycle()
    pipe = GymStepper(pipe,seed=seed)
    pipe = NStepper(pipe,n=n)
    pipe = NStepFlattener(pipe)
    pipe = pipe.header(total_steps)
    return list(pipe)

steps = n_step_test(['CartPole-v1']*3,200,2,0)
pd.DataFrame(steps)[['state','next_state','env_id','terminated']][:10]
/opt/conda/lib/python3.7/site-packages/torchdata/datapipes/iter/util/header.py:60: UserWarning: The length of this HeaderIterDataPipe is inferred to be equal to its limit.The actual value may be smaller if the actual length of source_datapipe is smaller than the limit.
  "The length of this HeaderIterDataPipe is inferred to be equal to its limit."
state next_state env_id terminated
0 [tensor(0.0137), tensor(-0.0230), tensor(-0.0459), tensor(-0.0483)] [tensor(0.0132), tensor(0.1727), tensor(-0.0469), tensor(-0.3552)] tensor(140096952787408) tensor(False)
1 [tensor(0.0132), tensor(0.1727), tensor(-0.0469), tensor(-0.3552)] [tensor(0.0167), tensor(0.3685), tensor(-0.0540), tensor(-0.6622)] tensor(140096952787408) tensor(False)
2 [tensor(0.0137), tensor(-0.0230), tensor(-0.0459), tensor(-0.0483)] [tensor(0.0132), tensor(0.1727), tensor(-0.0469), tensor(-0.3552)] tensor(140096952804368) tensor(False)
3 [tensor(0.0132), tensor(0.1727), tensor(-0.0469), tensor(-0.3552)] [tensor(0.0167), tensor(0.3685), tensor(-0.0540), tensor(-0.6622)] tensor(140096952804368) tensor(False)
4 [tensor(0.0137), tensor(-0.0230), tensor(-0.0459), tensor(-0.0483)] [tensor(0.0132), tensor(0.1727), tensor(-0.0469), tensor(-0.3552)] tensor(140096952805136) tensor(False)
5 [tensor(0.0132), tensor(0.1727), tensor(-0.0469), tensor(-0.3552)] [tensor(0.0167), tensor(0.3685), tensor(-0.0540), tensor(-0.6622)] tensor(140096952805136) tensor(False)
6 [tensor(0.0132), tensor(0.1727), tensor(-0.0469), tensor(-0.3552)] [tensor(0.0167), tensor(0.3685), tensor(-0.0540), tensor(-0.6622)] tensor(140096952787408) tensor(False)
7 [tensor(0.0167), tensor(0.3685), tensor(-0.0540), tensor(-0.6622)] [tensor(0.0241), tensor(0.5643), tensor(-0.0672), tensor(-0.9714)] tensor(140096952787408) tensor(False)
8 [tensor(0.0132), tensor(0.1727), tensor(-0.0469), tensor(-0.3552)] [tensor(0.0167), tensor(0.3685), tensor(-0.0540), tensor(-0.6622)] tensor(140096952804368) tensor(False)
9 [tensor(0.0167), tensor(0.3685), tensor(-0.0540), tensor(-0.6622)] [tensor(0.0241), tensor(0.5643), tensor(-0.0672), tensor(-0.9714)] tensor(140096952804368) tensor(False)

NStepper Tests

There are a couple properties that we expect from n-step output: - tuples should be n size at max, however can be smaller. - done n-steps unravel into multiple tuples yielded individually.

- In other words if `n=3`, meaning we want to yield 3 blocks of steps per env, then if we have
  [step5,step6,step7] where step7 is `done` we will get individual tuples in the order:
  
      1. [step5,step6,step7]
      2. [step6,step7]
      3. [step7]

First, NStepper(pipe,n=1) when falttened should be identical to a pipelines that never used it.

import pandas as pd
from fastrl.envs.gym import GymTypeTransform,GymStepper
pipe = dp.map.Mapper(['CartPole-v1']*3)
pipe = TypeTransformer(pipe,[GymTypeTransform])
pipe = dp.iter.MapToIterConverter(pipe)
pipe = dp.iter.InMemoryCacheHolder(pipe)
pipe = pipe.cycle()
pipe = GymStepper(pipe,seed=0)
pipe = pipe.header(10)

no_n_steps = list(pipe)
steps = n_step_test(['CartPole-v1']*3,10,1,0)

If n=1 we should expect that regardless of the number of envs, both n-step and simple environment pipelines should be identical.

test_len(steps,no_n_steps)
for field in ['next_state','state','terminated']:
    for i,(step,no_n_step) in enumerate(zip(steps,no_n_steps)): 
        test_eq(getattr(step,field),getattr(no_n_step,field))

We should expect n=1 -> 3 to have the same basic shape…

steps1 = n_step_test(['CartPole-v1']*1,30,1,0)
steps2 = n_step_test(['CartPole-v1']*1,30,2,0)
steps3 = n_step_test(['CartPole-v1']*1,30,3,0)
for o in itertools.chain(steps1,steps2,steps3):
    test_eq(len(o),12)
    test_eq(isinstance(o,SimpleStep),True)

n_steps_expected

 n_steps_expected (default_steps:int, n:int)

Produces the expected number of steps, assuming a fully deterministic episode based on default_steps and n

Given n=2, given 1 envs, knowing that CartPole-v1 when seed=0 will always run 18 steps, the total steps will be:

\[ 18 * n - \sum_{0}^{n - 1}(i) \]

Type Details
default_steps int The number of steps the episode would run without n_steps
n int The n-step value that we are planning ot use
expected_n_steps = n_steps_expected(default_steps=18,n=2)
print('Given the above values, we expect a single episode to be ',expected_n_steps,' steps long')
steps = n_step_test(['CartPole-v1']*1,expected_n_steps+1,2,0)
# The first episode should have ended on row 34, beign 35 steps long. The 36th row should be a new episode
test_eq(steps[-2].terminated,tensor([True]))
test_eq(steps[-2].episode_n,tensor([1]))
test_eq(steps[-2].step_n,tensor([18]))
test_eq(steps[-1].terminated,tensor([False]))
test_eq(steps[-1].episode_n,tensor([2]))
test_eq(steps[-1].step_n,tensor([1]))
Given the above values, we expect a single episode to be  35  steps long
expected_n_steps = n_steps_expected(default_steps=18,n=4)
print('Given the above values, we expect a single episode to be ',expected_n_steps,' steps long')
steps = n_step_test(['CartPole-v1']*1,expected_n_steps+1,4,0)
# The first episode should have ended on row 34, beign 35 steps long. The 36th row should be a new episode
test_eq(steps[-2].terminated,tensor([True]))
test_eq(steps[-2].episode_n,tensor([1]))
test_eq(steps[-2].step_n,tensor([18]))
test_eq(steps[-1].terminated,tensor([False]))
test_eq(steps[-1].episode_n,tensor([2]))
test_eq(steps[-1].step_n,tensor([1]))
Given the above values, we expect a single episode to be  66  steps long
expected_n_steps = n_steps_expected(default_steps=18,n=2)
print('Given the above values, we expect a single episode to be ',expected_n_steps,' steps long')
steps = n_step_test(['CartPole-v1']*3,expected_n_steps*3+1,2,0)
# The first episode should have ended on row 34, beign 35 steps long. The 36th row should be a new episode
test_eq(steps[-2].terminated,tensor([True]))
test_eq(steps[-2].episode_n,tensor([1]))
test_eq(steps[-2].step_n,tensor([18]))
test_eq(steps[-1].terminated,tensor([False]))
test_eq(steps[-1].episode_n,tensor([2]))
test_eq(steps[-1].step_n,tensor([1]))
Given the above values, we expect a single episode to be  35  steps long