Data Block

High level API to quickly get your data in a DataLoaders

Transform Block

Loosely similar to the fastai==2.* TransformBlock, only this time, just like the fastrl Agent and Learner, is simply a DataPipe construction function with augmentation capabilities.

DataPipeOrDataLoader = Union[DataPipe,DataLoader2]
class TransformBlock(Callable[[Union[Iterable,DataPipe]],DataPipeOrDataLoader]):...

TransformBlock

 TransformBlock (*args, **kwds)

Abstract base class for generic types.

A generic type is typically declared by inheriting from this class parameterized with one or more type variables. For example, a generic mapping type might be defined as::

class Mapping(Generic[KT, VT]): def getitem(self, key: KT) -> VT: … # Etc.

This class can then be used as follows::

def lookup_name(mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default

DataBlock as defined below expects single or tuples of TransformBlock callables. These functions need to have the above signatures.

Note that a TransformBlock must take params source and as_dataloader at minimum. Additional params are up to the developer / user.

The simplest example would be:

def add(a,b): return a+b 
def str_cast(a): return str(a)
class TestTransformBlock():

    def __init__(self,
        # Pipeline Parameters
        a:int=1,
        b:str='_test',
        # Additional pipelines to insert, replace, remove
        dp_augmentation_fns:Tuple[DataPipeAugmentationFn]=None
    ) -> None:
        "This class returns a pipeline builder that either will return a DataPipe or a DataLoader"
        store_attr()

    def __call__(
        self,
        # `source` likely will be an iterable that gets pushed into the pipeline when an 
        # experiment is actually being run.
        source:Any,
        # Any parameters needed for the dataloader
        num_workers:int=0,
        # This param must exist: as_dataloader for the datablock to create dataloaders
        as_dataloader:bool=False
    ) -> DataPipeOrDataLoader:
        "This is the function that is actually run by `DataBlock`"
        # This is where the template pipeline gets outlined. Notice that we
        # are feeding source into the pipeline.
        pipe = dp.iter.IterableWrapper(source) # In this example, probably a list of numbers
        pipe = pipe.map(partial(add,b=self.a))          # Add `a` to them
        pipe = pipe.map(str_cast)       # Convert the numbers to str
        pipe = pipe.map(partial(add,b=self.b))          # Concat `b` into the str
        # Once the base pipeline is constructed, we give the user the opportinuty to augment the 
        # pipeline however they like.
        pipe = apply_dp_augmentation_fns(pipe,ifnone(self.dp_augmentation_fns,()))
        # The transform block must be able to return a `DataLoader2` instance
        if as_dataloader:
            pipe = DataLoader2(
                datapipe=pipe,
                reading_service=PrototypeMultiProcessingReadingService(
                    num_workers = num_workers,
                    protocol_client_type = InputItemIterDataPipeQueueProtocolClient,
                    protocol_server_type = InputItemIterDataPipeQueueProtocolServer,
                    pipe_type = item_input_pipe_type,
                    eventloop = SpawnProcessForDataPipeline
                ) if num_workers>0 else None
            )
        return pipe 
    # return _TestTransformBlock

Check that we can return a DataPipe and that an iteration through it is what we expect…

tfm_block = TestTransformBlock()
pipe = tfm_block([1,2,3])
test_eq(type(pipe),dp.iter.Mapper)
test_eq(list(pipe),['2_test', '3_test', '4_test'])

Check that we can return a DataLoader2 and that an iteration through it is what we expect…

tfm_block = TestTransformBlock()
pipe = tfm_block([1,2,3],as_dataloader=True)
test_eq(type(pipe),DataLoader2)
test_eq(list(pipe),['2_test', '3_test', '4_test'])

validate_transform_block

 validate_transform_block (block:__main__.TransformBlock)

InvalidTransformBlock

Common base class for all non-exit exceptions.

Check that TestTransformBlock is infact valid…

validate_transform_block(tfm_block)

And check that invalid TransformBlocks get caught…

def invalid_transform_block():
    def _invalid_transform_block():pass
    return _invalid_transform_block

invalid_tfm_block = invalid_transform_block()
with ExceptionExpected(InvalidTransformBlock):
    try: validate_transform_block(invalid_tfm_block)
    except InvalidTransformBlock as e:
        print(str(e))
        raise
Checked <function invalid_transform_block.<locals>._invalid_transform_block>:
Given kwargs: {}
Given return: <class 'inspect._empty'>
`source:Any` is missing from the arguments
`as_dataloader:bool=False` is missing from the arguments
`DataPipeOrDataLoader` missing from return signature

DataPipeWrapperTransformBlock

 DataPipeWrapperTransformBlock (dp_cls:Union[torch.utils.data.datapipes.da
                                tapipe.IterDataPipe,torch.utils.data.datap
                                ipes.datapipe.MapDataPipe], **dp_kwargs)

Used by DataBlock to support converting DataPipes to TransformBlocks on the fly.

Type Details
dp_cls typing.Union[torch.utils.data.datapipes.datapipe.IterDataPipe, torch.utils.data.datapipes.datapipe.MapDataPipe] The DataPipe to wrap into a TransformBlock,
dp_kwargs
Returns None

Check that we can return a DataPipe and that an iteration through it is what we expect…

tfm_block = TestTransformBlock()
pipe = tfm_block([1,2,3])
test_eq(type(pipe),dp.iter.Mapper)
test_eq(list(pipe),['2_test', '3_test', '4_test'])

Check that we can return a DataLoader2 and that an iteration through it is what we expect…


DataBlock

 DataBlock (*blocks:Tuple[Union[Tuple[__main__.TransformBlock],__main__.Tr
            ansformBlock]], debug:bool=False)

DataBlock is a single object for constructing datapipes and dataloaders from blocks. Below are examples on how blocks eventually get converted to dataloaders.

Example 1: Simplest blocks = ( TestTransformBlock, TestTransformBlock ) -> ( DataLoader2(TestTransformBlock(as_dataloader=True)), DataLoader2(TestTransformBlock(as_dataloader=True)) )

Example 2: Nested Blocks blocks = ( (TestTransformBlock,TestTransformBlock2), TestTransformBlock ) -> ( DataLoader2(TestTransformBlock -> TestTransformBlock2(as_dataloader=True)), DataLoader2(TestTransformBlock) )

In example 2, we can nest the blocks, thus chaining them together. The last one in the chain is used to create the dataloader that is required.

In the below example we want 2 dataloaders, so we the len(blocks) will be 2. However, for the second dataloader we want to change the output, and also cycle twice. We can easily do this by using a tuple instead of a single TestTransformBlock.

block = DataBlock(
    TestTransformBlock(),
    (TestTransformBlock(b='_test2'),DataPipeWrapperTransformBlock(dp.iter.Cycler,count=2)),
    debug=True
)
Interpreting `blocks` input as ['datapipe', 'datapipe_group'], resulting in 2 dataloaders

The resulting datapipes are in the format that we expect…

pipes = block.datapipes([1,2,3])
traverse(pipes[0])
test_eq(type(pipes[0]),dp.iter.Mapper)
test_eq(list(pipes[0]),['2_test', '3_test', '4_test'])
# Second pipe has _test2 as a postfix and cycles the dataset twice
test_eq(type(pipes[1]),dp.iter.Cycler)
test_eq(list(pipes[1]),['2_test2', '3_test2', '4_test2', '2_test2', '3_test2', '4_test2'])

We can easily do the same for the dataloaders…

from shutil import ExecError
pipes = block.dataloaders([1,2,3])
test_eq(type(pipes[0]),DataLoader2)
test_eq(list(pipes[0]),['2_test', '3_test', '4_test'])
# Second pipe has _test2 as a postfix and cycles the dataset twice
test_eq(type(pipes[1]),DataLoader2)
test_eq(list(pipes[1]),['2_test2', '3_test2', '4_test2', '2_test2', '3_test2', '4_test2'])
with ExceptionExpected(TypeError):
    traverse(dp.iter.IterableWrapper(pipes))
    print('torchdata dataloaders are not traverseable once started.')

# TODO: Kind of what I was a afraid of for the transform blocks. In reality,
# I think they should have their inner functions already returned before any
# pickling happens, so this technically shouldn't be happening.
# There are other issues with the dataloader itself though that can only be fixed 
# in torch data.
for k in pipes[0].__dict__:
    try:
        print(k)
        pickle.dumps(pipes[0].__dict__[k])
    except Exception as e:
        print('Got pickle error: ',str(e),' for key ',k)
datapipe
_adapted
_datapipe_iter
Got pickle error:  can't pickle generator objects  for key  _datapipe_iter
_reset_iter
datapipe_adapter_fns
reading_service
reading_service_state
_terminated
valid_iterator_id
_datapipe_before_reading_service_adapt