DataLoader2

Warning: This is intended to be a revision of the torchdata dataloader2 and so the source code might be “ugly”. The preference is that

SpawnProcessForDataPipeline

 SpawnProcessForDataPipeline (multiprocessing_ctx, datapipe,
                              call_locally_fn=None, protocol_type=None,
                              pipe_type=None)

DataPipeToQueuesLoop

 DataPipeToQueuesLoop (source_datapipe, req_queue, res_queue,
                       call_locally_fn=None, protocol_type=None,
                       pipe_type=None)

GetInputItemRequest

 GetInputItemRequest (key, value)

Initialize self. See help(type(self)) for accurate signature.


GetInputItemResponse

 GetInputItemResponse (value)

Initialize self. See help(type(self)) for accurate signature.


InputItemIterDataPipeQueueProtocolServer

 InputItemIterDataPipeQueueProtocolServer (request_queue, response_queue)

ProtocolServer takes charge of getting requests from req_queue and fetching data from source datapipe.


InputItemIterDataPipeQueueProtocolClient

 InputItemIterDataPipeQueueProtocolClient (request_queue, response_queue)

ProtocolClient takes charge of putting requests into req_queue and returning results from res_queue.


AgentLoggerMerger

 AgentLoggerMerger (*args, **kwds)

Inserts values from input_jests into the current pipeline.


PrototypeMultiProcessingReadingService

 PrototypeMultiProcessingReadingService (num_workers:int=0,
                                         multiprocessing_context=None,
                                         protocol_client_type=None,
                                         protocol_server_type=None,
                                         pipe_type=None, eventloop=None)

Helper class that provides a standard way to create an ABC using inheritance.


InputInjester

 InputInjester (*args, **kwds)

Inserts values from input_jests into the current pipeline.


item_input_pipe_type

 item_input_pipe_type ()

Initialize self. See help(type(self)) for accurate signature.


DataPipeBehindQueues

 DataPipeBehindQueues (source_datapipe, protocol, full_stop=False,
                       blocking_request_get=False)

Indefinitely iterates over req_queue and passing values from source_datapipe to res_queue If raise_stop is true, raises exception when StopIteration received from the source_datapipe

reading_service=PrototypeMultiProcessingReadingService(
    num_workers = 1,
    protocol_client_type = InputItemIterDataPipeQueueProtocolClient,
    protocol_server_type = InputItemIterDataPipeQueueProtocolServer,
    pipe_type = item_input_pipe_type,
    eventloop = SpawnProcessForDataPipeline
)