TorchData DataPipe Notes

Notes about coding insert behaviors

Inserting into preexisting pipelines

Reference: https://github.com/pytorch/data/issues/750

Original Graph: A -> B

Expected New Graph: A -> C -> B

from fastrl.pipes.core import *
import torchdata.datapipes as dp
class Template(dp.iter.IterDataPipe):
    def __init__(self,source_datapipe=None): self.source_datapipe = source_datapipe
    def __iter__(self): return (o for o in self.source_datapipe)

class A(Template):pass
class B(Template):pass
class C(Template):pass
class D(Template):pass
class E(Template):pass

Let’s say that we have a graph: A -> B

pipe = A(range(10))
pipe = B(pipe)

test_eq(list(pipe),range(10))
traverse(pipe)
{140342007547536: (B, {140342007547600: (A, {})})}

Let’s say that we want a graph: A -> C -> B

new_dp = replace_dp(
    traverse(pipe,only_datapipe=True),
    find_dp(traverse(pipe,only_datapipe=True),A),
    C(find_dp(traverse(pipe,only_datapipe=True),A))
)
new_dp
{140342007951312: (B, {140342007546576: (C, {140342007950928: (A, {})})})}

We have C to point to A and then we replace A with C

Now, what if we want a graph A -> C -> D -> E -> B

pipe = A(range(10))
pipe = B(pipe)

test_eq(list(pipe),range(10))
traverse(pipe)
{140342007227536: (B, {140342007227600: (A, {})})}
def sub_graph(pipe):
    pipe = C(pipe)
    pipe = D(pipe)
    pipe = E(pipe)
    return pipe
new_dp = replace_dp(
    traverse(pipe,only_datapipe=True),
    find_dp(traverse(pipe,only_datapipe=True),A),
    sub_graph(find_dp(traverse(pipe,only_datapipe=True),A))
)
new_dp
{140342007227536: (B,
  {140342007247376: (E,
    {140342007247312: (D,
      {140342007247120: (C, {140342007227600: (A, {})})})})})}
list(list(new_dp.values())[0][0])
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]