= Union[DataPipe,DataLoader2]
DataPipeOrDataLoader class TransformBlock(Callable[[Union[Iterable,DataPipe]],DataPipeOrDataLoader]):...
Data Block
DataLoader
s
Transform Block
Loosely similar to the fastai==2.*
TransformBlock
, only this time, just like the fastrl Agent and Learner, is simply a DataPipe construction function with augmentation capabilities.
TransformBlock
TransformBlock (*args, **kwds)
Abstract base class for generic types.
A generic type is typically declared by inheriting from this class parameterized with one or more type variables. For example, a generic mapping type might be defined as::
class Mapping(Generic[KT, VT]): def getitem(self, key: KT) -> VT: … # Etc.
This class can then be used as follows::
def lookup_name(mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default
DataBlock
as defined below expects single or tuples of TransformBlock
callables. These functions need to have the above signatures.
Note that a TransformBlock
must take params source
and as_dataloader
at minimum. Additional params are up to the developer / user.
The simplest example would be:
def add(a,b): return a+b
def str_cast(a): return str(a)
class TestTransformBlock():
def __init__(self,
# Pipeline Parameters
int=1,
a:str='_test',
b:# Additional pipelines to insert, replace, remove
=None
dp_augmentation_fns:Tuple[DataPipeAugmentationFn]-> None:
) "This class returns a pipeline builder that either will return a DataPipe or a DataLoader"
store_attr()
def __call__(
self,
# `source` likely will be an iterable that gets pushed into the pipeline when an
# experiment is actually being run.
source:Any,# Any parameters needed for the dataloader
int=0,
num_workers:# This param must exist: as_dataloader for the datablock to create dataloaders
bool=False
as_dataloader:-> DataPipeOrDataLoader:
) "This is the function that is actually run by `DataBlock`"
# This is where the template pipeline gets outlined. Notice that we
# are feeding source into the pipeline.
= dp.iter.IterableWrapper(source) # In this example, probably a list of numbers
pipe = pipe.map(partial(add,b=self.a)) # Add `a` to them
pipe = pipe.map(str_cast) # Convert the numbers to str
pipe = pipe.map(partial(add,b=self.b)) # Concat `b` into the str
pipe # Once the base pipeline is constructed, we give the user the opportinuty to augment the
# pipeline however they like.
= apply_dp_augmentation_fns(pipe,ifnone(self.dp_augmentation_fns,()))
pipe # The transform block must be able to return a `DataLoader2` instance
if as_dataloader:
= DataLoader2(
pipe =pipe,
datapipe=PrototypeMultiProcessingReadingService(
reading_service= num_workers,
num_workers = InputItemIterDataPipeQueueProtocolClient,
protocol_client_type = InputItemIterDataPipeQueueProtocolServer,
protocol_server_type = item_input_pipe_type,
pipe_type = SpawnProcessForDataPipeline
eventloop if num_workers>0 else None
)
)return pipe
# return _TestTransformBlock
Check that we can return a DataPipe
and that an iteration through it is what we expect…
= TestTransformBlock()
tfm_block = tfm_block([1,2,3])
pipe type(pipe),dp.iter.Mapper)
test_eq(list(pipe),['2_test', '3_test', '4_test']) test_eq(
Check that we can return a DataLoader2
and that an iteration through it is what we expect…
= TestTransformBlock()
tfm_block = tfm_block([1,2,3],as_dataloader=True)
pipe type(pipe),DataLoader2)
test_eq(list(pipe),['2_test', '3_test', '4_test']) test_eq(
validate_transform_block
validate_transform_block (block:__main__.TransformBlock)
InvalidTransformBlock
Common base class for all non-exit exceptions.
Check that TestTransformBlock
is infact valid…
validate_transform_block(tfm_block)
And check that invalid TransformBlock
s get caught…
def invalid_transform_block():
def _invalid_transform_block():pass
return _invalid_transform_block
= invalid_transform_block()
invalid_tfm_block with ExceptionExpected(InvalidTransformBlock):
try: validate_transform_block(invalid_tfm_block)
except InvalidTransformBlock as e:
print(str(e))
raise
Checked <function invalid_transform_block.<locals>._invalid_transform_block>:
Given kwargs: {}
Given return: <class 'inspect._empty'>
`source:Any` is missing from the arguments
`as_dataloader:bool=False` is missing from the arguments
`DataPipeOrDataLoader` missing from return signature
DataPipeWrapperTransformBlock
DataPipeWrapperTransformBlock (dp_cls:Union[torch.utils.data.datapipes.da tapipe.IterDataPipe,torch.utils.data.datap ipes.datapipe.MapDataPipe], **dp_kwargs)
Used by DataBlock
to support converting DataPipe
s to TransformBlock
s on the fly.
Type | Details | |
---|---|---|
dp_cls | typing.Union[torch.utils.data.datapipes.datapipe.IterDataPipe, torch.utils.data.datapipes.datapipe.MapDataPipe] | The DataPipe to wrap into a TransformBlock , |
dp_kwargs | ||
Returns | None |
Check that we can return a DataPipe
and that an iteration through it is what we expect…
= TestTransformBlock()
tfm_block = tfm_block([1,2,3])
pipe type(pipe),dp.iter.Mapper)
test_eq(list(pipe),['2_test', '3_test', '4_test']) test_eq(
Check that we can return a DataLoader2
and that an iteration through it is what we expect…
DataBlock
DataBlock (*blocks:Tuple[Union[Tuple[__main__.TransformBlock],__main__.Tr ansformBlock]], debug:bool=False)
DataBlock
is a single object for constructing datapipes and dataloaders from blocks
. Below are examples on how blocks
eventually get converted to dataloaders.
Example 1: Simplest blocks = ( TestTransformBlock, TestTransformBlock ) -> ( DataLoader2(TestTransformBlock(as_dataloader=True)), DataLoader2(TestTransformBlock(as_dataloader=True)) )
Example 2: Nested Blocks blocks = ( (TestTransformBlock,TestTransformBlock2), TestTransformBlock ) -> ( DataLoader2(TestTransformBlock -> TestTransformBlock2(as_dataloader=True)), DataLoader2(TestTransformBlock) )
In example 2, we can nest the blocks, thus chaining them together. The last one in the chain is used to create the dataloader that is required.
In the below example we want 2 dataloaders, so we the len(blocks) will be 2. However, for the second dataloader we want to change the output, and also cycle twice. We can easily do this by using a tuple instead of a single TestTransformBlock
.
= DataBlock(
block
TestTransformBlock(),='_test2'),DataPipeWrapperTransformBlock(dp.iter.Cycler,count=2)),
(TestTransformBlock(b=True
debug )
Interpreting `blocks` input as ['datapipe', 'datapipe_group'], resulting in 2 dataloaders
The resulting datapipes are in the format that we expect…
= block.datapipes([1,2,3])
pipes 0])
traverse(pipes[type(pipes[0]),dp.iter.Mapper)
test_eq(list(pipes[0]),['2_test', '3_test', '4_test'])
test_eq(# Second pipe has _test2 as a postfix and cycles the dataset twice
type(pipes[1]),dp.iter.Cycler)
test_eq(list(pipes[1]),['2_test2', '3_test2', '4_test2', '2_test2', '3_test2', '4_test2']) test_eq(
We can easily do the same for the dataloaders…
from shutil import ExecError
= block.dataloaders([1,2,3])
pipes type(pipes[0]),DataLoader2)
test_eq(list(pipes[0]),['2_test', '3_test', '4_test'])
test_eq(# Second pipe has _test2 as a postfix and cycles the dataset twice
type(pipes[1]),DataLoader2)
test_eq(list(pipes[1]),['2_test2', '3_test2', '4_test2', '2_test2', '3_test2', '4_test2'])
test_eq(with ExceptionExpected(TypeError):
iter.IterableWrapper(pipes))
traverse(dp.print('torchdata dataloaders are not traverseable once started.')
# TODO: Kind of what I was a afraid of for the transform blocks. In reality,
# I think they should have their inner functions already returned before any
# pickling happens, so this technically shouldn't be happening.
# There are other issues with the dataloader itself though that can only be fixed
# in torch data.
for k in pipes[0].__dict__:
try:
print(k)
0].__dict__[k])
pickle.dumps(pipes[except Exception as e:
print('Got pickle error: ',str(e),' for key ',k)
datapipe
_adapted
_datapipe_iter
Got pickle error: can't pickle generator objects for key _datapipe_iter
_reset_iter
datapipe_adapter_fns
reading_service
reading_service_state
_terminated
valid_iterator_id
_datapipe_before_reading_service_adapt