from fastrl.pipes.core import *
import torchdata.datapipes as dpTorchData DataPipe Notes
Notes about coding insert behaviors
Inserting into preexisting pipelines
Reference: https://github.com/pytorch/data/issues/750
Original Graph: A -> B
Expected New Graph: A -> C -> B
class Template(dp.iter.IterDataPipe):
def __init__(self,source_datapipe=None): self.source_datapipe = source_datapipe
def __iter__(self): return (o for o in self.source_datapipe)
class A(Template):pass
class B(Template):pass
class C(Template):pass
class D(Template):pass
class E(Template):passLet’s say that we have a graph: A -> B
pipe = A(range(10))
pipe = B(pipe)
test_eq(list(pipe),range(10))
traverse(pipe){140342007547536: (B, {140342007547600: (A, {})})}
Let’s say that we want a graph: A -> C -> B
new_dp = replace_dp(
traverse(pipe,only_datapipe=True),
find_dp(traverse(pipe,only_datapipe=True),A),
C(find_dp(traverse(pipe,only_datapipe=True),A))
)
new_dp{140342007951312: (B, {140342007546576: (C, {140342007950928: (A, {})})})}
We have C to point to A and then we replace A with C
Now, what if we want a graph A -> C -> D -> E -> B
pipe = A(range(10))
pipe = B(pipe)
test_eq(list(pipe),range(10))
traverse(pipe){140342007227536: (B, {140342007227600: (A, {})})}
def sub_graph(pipe):
pipe = C(pipe)
pipe = D(pipe)
pipe = E(pipe)
return pipenew_dp = replace_dp(
traverse(pipe,only_datapipe=True),
find_dp(traverse(pipe,only_datapipe=True),A),
sub_graph(find_dp(traverse(pipe,only_datapipe=True),A))
)
new_dp{140342007227536: (B,
{140342007247376: (E,
{140342007247312: (D,
{140342007247120: (C, {140342007227600: (A, {})})})})})}
list(list(new_dp.values())[0][0])[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]