class Template(dp.iter.IterDataPipe):def__init__(self,source_datapipe=None): self.source_datapipe = source_datapipedef__iter__(self): return (o for o inself.source_datapipe)class A(Template):passclass B(Template):passclass C(Template):passclass D(Template):passclass E(Template):passclass F(Template):passpipe = A(range(10))pipe = B(pipe)pipe = C(pipe)pipe = D(pipe)pipe = D(pipe)pipe = E(pipe)list(pipe)
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
We can grab the instance C in the middle of the graph via…
find_dp(traverse(pipe),C)
C
If the pipe doesnt exist a LookupError gets raised…
with ExceptionExpected(LookupError): find_dp(traverse(pipe),F)
And if there are multiple instances of the same time, you will be warned to use find_dps instead…
find_dp(traverse(pipe),D)
/opt/conda/lib/python3.7/site-packages/ipykernel_launcher.py:15: UserWarning: There are 2 pipes of type <class '__main__.D'>. If this is intended,
please use `find_dps` directly. Returning first instance.
from ipykernel import kernelapp as app
D
If we try searching for all iterpipes we get nothing…
DataPipeAugmentationFn must take in a DataPipe and either output a DataPipe or None. This function should perform some operation on the graph such as replacing, removing, inserting DataPipe’s and DataGraphs. Below is an example that replaces a dp.iter.Batcher datapipe with a dp.iter.Filter
/opt/conda/lib/python3.7/site-packages/torchdata/datapipes/iter/util/header.py:60: UserWarning: The length of this HeaderIterDataPipe is inferred to be equal to its limit.The actual value may be smaller if the actual length of source_datapipe is smaller than the limit.
"The length of this HeaderIterDataPipe is inferred to be equal to its limit."