from fastrl.pipes.core import *
import torchdata.datapipes as dp
TorchData DataPipe Notes
Notes about coding insert behaviors
Inserting into preexisting pipelines
Reference: https://github.com/pytorch/data/issues/750
Original Graph: A -> B
Expected New Graph: A -> C -> B
class Template(dp.iter.IterDataPipe):
def __init__(self,source_datapipe=None): self.source_datapipe = source_datapipe
def __iter__(self): return (o for o in self.source_datapipe)
class A(Template):pass
class B(Template):pass
class C(Template):pass
class D(Template):pass
class E(Template):pass
Let’s say that we have a graph: A -> B
= A(range(10))
pipe = B(pipe)
pipe
list(pipe),range(10))
test_eq( traverse(pipe)
{140342007547536: (B, {140342007547600: (A, {})})}
Let’s say that we want a graph: A -> C -> B
= replace_dp(
new_dp =True),
traverse(pipe,only_datapipe=True),A),
find_dp(traverse(pipe,only_datapipe=True),A))
C(find_dp(traverse(pipe,only_datapipe
) new_dp
{140342007951312: (B, {140342007546576: (C, {140342007950928: (A, {})})})}
We have C
to point to A
and then we replace A
with C
Now, what if we want a graph A -> C -> D -> E -> B
= A(range(10))
pipe = B(pipe)
pipe
list(pipe),range(10))
test_eq( traverse(pipe)
{140342007227536: (B, {140342007227600: (A, {})})}
def sub_graph(pipe):
= C(pipe)
pipe = D(pipe)
pipe = E(pipe)
pipe return pipe
= replace_dp(
new_dp =True),
traverse(pipe,only_datapipe=True),A),
find_dp(traverse(pipe,only_datapipe=True),A))
sub_graph(find_dp(traverse(pipe,only_datapipe
) new_dp
{140342007227536: (B,
{140342007247376: (E,
{140342007247312: (D,
{140342007247120: (C, {140342007227600: (A, {})})})})})}
list(list(new_dp.values())[0][0])
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]