Multiplexer

MapDataPipe for splitting a single datapipe into multiple

MultiplexerMapDataPipe

 MultiplexerMapDataPipe (*args, **kwds)

Yields one element at a time from each of the input Iterable DataPipes (functional name: mux). As in, one element from the 1st input DataPipe, then one element from the 2nd DataPipe in the next iteration, and so on. It ends when the shortest input DataPipe is exhausted.

For example we can have a datapipe a that contains a list of numbers from 0->10, and b that is a map that maps letters to some integers.

dp_index_map gives instructions for each additional datapipe (b) on the order to querying for values.

The result is that the letters a,b, etc get mapped to their respective integers.

a = dp.map.SequenceWrapper(range(10))
b = dp.map.SequenceWrapper({'a': 100, 'b': 200, 'c': 300, 'd': 400})
datapipe = a.mux(b, dp_index_map={b: ['a', 'b', 'c', 'd']})
test_eq(list(datapipe),[0, 100, 1, 200, 2, 300, 3, 400, 4, 5, 6, 7, 8, 9])
list(datapipe)
[0, 100, 1, 200, 2, 300, 3, 400, 4, 5, 6, 7, 8, 9]

In the second example we take a list of numbers 0->12 and split them equally into 3 datapipes…

from fastrl.pipes.map.demux import DemultiplexerMapDataPipe
a = dp.map.SequenceWrapper(range(12))

def split_three_way(a):
    return a%3

k1,k2,k3 = a.demux(num_instances=3,classifier_fn=split_three_way)
for pipe in (k1,k2,k3): test_eq(len(pipe),4)
len(k1),len(k2),len(k3)
(4, 4, 4)

We can then recombine them back into a single datapipe…

combined_pipes=MultiplexerMapDataPipe(k1,k2,k3)
test_eq(list(combined_pipes),range(12))
list(combined_pipes)
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]