Pipes Core

Utilities for templating pipelines

find_dps

 find_dps (graph:Dict[int,Tuple[Union[torch.utils.data.datapipes.datapipe.
           IterDataPipe,torch.utils.data.datapipes.datapipe.MapDataPipe],F
           orwardRef('DataPipeGraph')]], dp_type:Type[Union[torch.utils.da
           ta.datapipes.datapipe.IterDataPipe,torch.utils.data.datapipes.d
           atapipe.MapDataPipe]], include_subclasses:bool=False)

Given the graph of DataPipe generated by traverse function, return DataPipe instances with the provided DataPipe type.


find_dp

 find_dp (graph:Dict[int,Tuple[Union[torch.utils.data.datapipes.datapipe.I
          terDataPipe,torch.utils.data.datapipes.datapipe.MapDataPipe],For
          wardRef('DataPipeGraph')]], dp_type:Type[Union[torch.utils.data.
          datapipes.datapipe.IterDataPipe,torch.utils.data.datapipes.datap
          ipe.MapDataPipe]], include_subclasses:bool=False)

Returns a single DataPipe as opposed to find_dps.

Given the graph of DataPipe generated by traverse function, return DataPipe instances with the provided DataPipe type.

Type Default Details
graph typing.Dict[int, typing.Tuple[typing.Union[torch.utils.data.datapipes.datapipe.IterDataPipe, torch.utils.data.datapipes.datapipe.MapDataPipe], ForwardRef(‘DataPipeGraph’)]] A graph created from the traverse function
dp_type typing.Type[typing.Union[torch.utils.data.datapipes.datapipe.IterDataPipe, torch.utils.data.datapipes.datapipe.MapDataPipe]]
include_subclasses bool False
Returns typing.Union[torch.utils.data.datapipes.datapipe.IterDataPipe, torch.utils.data.datapipes.datapipe.MapDataPipe]

For example if we have a pipeline such as:

class Template(dp.iter.IterDataPipe):
    def __init__(self,source_datapipe=None): self.source_datapipe = source_datapipe
    def __iter__(self): return (o for o in self.source_datapipe)

class A(Template):pass
class B(Template):pass
class C(Template):pass
class D(Template):pass
class E(Template):pass
class F(Template):pass

pipe = A(range(10))
pipe = B(pipe)
pipe = C(pipe)
pipe = D(pipe)
pipe = D(pipe)
pipe = E(pipe)
list(pipe)
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

We can grab the instance C in the middle of the graph via…

find_dp(traverse(pipe),C)
C

If the pipe doesnt exist a LookupError gets raised…

with ExceptionExpected(LookupError):
    find_dp(traverse(pipe),F)

And if there are multiple instances of the same time, you will be warned to use find_dps instead…

find_dp(traverse(pipe),D)
/opt/conda/lib/python3.7/site-packages/ipykernel_launcher.py:15: UserWarning: There are 2 pipes of type <class '__main__.D'>. If this is intended, 
                     please use `find_dps` directly. Returning first instance.
  from ipykernel import kernelapp as app
D

If we try searching for all iterpipes we get nothing…

find_dps(traverse(pipe),dp.iter.IterDataPipe)
[]

However we can include subclasses in our search…

find_dps(traverse(pipe),dp.iter.IterDataPipe,include_subclasses=True)
[E, D, D, C, B, A]

DataPipeAugmentationFn

 DataPipeAugmentationFn (*args, **kwds)

DataPipeAugmentationFn must take in a DataPipe and either output a DataPipe or None. This function should perform some operation on the graph such as replacing, removing, inserting DataPipe’s and DataGraphs. Below is an example that replaces a dp.iter.Batcher datapipe with a dp.iter.Filter

def iseven(i): return i%2==0
def test_replace(pipe:DataPipe) -> DataPipe:
    graph = replace_dp(
        traverse(pipe),
        find_dp(traverse(pipe),dp.iter.Batcher),
        dp.iter.Filter(find_dp(traverse(pipe),dp.iter.IterableWrapper),filter_fn=iseven)
    )
    return list(graph.values())[0][0]

apply_dp_augmentation_fns

 apply_dp_augmentation_fns (pipe:Union[torch.utils.data.datapipes.datapipe
                            .IterDataPipe,torch.utils.data.datapipes.datap
                            ipe.MapDataPipe], dp_augmentation_fns:Union[Tu
                            ple[__main__.DataPipeAugmentationFn],NoneType]
                            , debug:bool=False)

Given a pipe, run dp_augmentation_fns other the pipeline

Given a simple pipeline below…

pipe = dp.iter.IterableWrapper(range(10))
pipe = pipe.batch(2)
pipe = pipe.cycle(2)
pipe = pipe.header(limit=10)
traverse(pipe)
{139934820824464: (HeaderIterDataPipe,
  {139934820824400: (CyclerIterDataPipe,
    {139934820824144: (BatcherIterDataPipe,
      {139934820824080: (IterableWrapperIterDataPipe, {})})})})}

We want to run test_replace over the pipeline which will replace the pipe.batch with a dp.iter.Filter

new_dp = apply_dp_augmentation_fns(pipe,(test_replace,))
test_eq(list(new_dp),[0, 2, 4, 6, 8, 0, 2, 4, 6, 8])
traverse(new_dp)
/opt/conda/lib/python3.7/site-packages/torchdata/datapipes/iter/util/header.py:60: UserWarning: The length of this HeaderIterDataPipe is inferred to be equal to its limit.The actual value may be smaller if the actual length of source_datapipe is smaller than the limit.
  "The length of this HeaderIterDataPipe is inferred to be equal to its limit."
{139934820824464: (HeaderIterDataPipe,
  {139934820824400: (CyclerIterDataPipe,
    {139934820825104: (FilterIterDataPipe,
      {139934820824080: (IterableWrapperIterDataPipe, {})})})})}