Loggers Core

Utilities used for handling log messages and display over multiple processes.

LoggerBase

 LoggerBase (*args, **kwds)

The LoggerBase class outlines simply the buffer. It works in combo with LogCollector datapipe which will add to the buffer.

LoggerBase also filters out the log records to as to not disrupt the training pipeline


LoggerBasePassThrough

 LoggerBasePassThrough (*args, **kwds)

Allows for collectors to find LoggerBases early in the pipeline without worrying about accidently iterating the logger bases at the incorrect time/frequency.

This is mainly used for collectors to call find_dps easily on the pipeline.

logger_base = LoggerBase()
traverse(logger_base)
{140510339508112: (LoggerBase, {})}

LogCollector

 LogCollector (*args, **kwds)

LogCollector specifically manages finding and attaching itself to LoggerBases found earlier in the pipeline.

Notes:

User can init multiple different logger bases if they want

We then can manually add Collectors, custom for certain pipes such as for collecting rewards.


ProgressBarLogger

 ProgressBarLogger (*args, **kwds)

The LoggerBase class outlines simply the buffer. It works in combo with LogCollector datapipe which will add to the buffer.

LoggerBase also filters out the log records to as to not disrupt the training pipeline


RewardCollector

 RewardCollector (*args, **kwds)

LogCollector specifically manages finding and attaching itself to LoggerBases found earlier in the pipeline.


EpocherCollector

 EpocherCollector (*args, **kwds)

Tracks the number of epochs that the pipeline is currently on.


BatchCollector

 BatchCollector (*args, **kwds)

Tracks the number of batches that the pipeline is currently on.


EpisodeCollector

 EpisodeCollector (*args, **kwds)

Collects the episode_n field from steps.


RollingTerminatedRewardCollector

 RollingTerminatedRewardCollector (*args, **kwds)

Collects the total_reward field from steps if terminated is true and logs a rolling average of size rolling_length.


TestSync

 TestSync (*args, **kwds)

Tests getting values from data loader requests.

from torchdata.dataloader2.dataloader2 import DataLoader2
from fastrl.data.dataloader2 import *
import pandas as pd
from fastrl.envs.gym import *
from fastrl.pipes.map.transforms import *
envs = ['CartPole-v1']*10

logger_base = ProgressBarLogger(batch_on_pipe=BatchCollector,epoch_on_pipe=EpocherCollector)

pipe = dp.map.Mapper(envs)
pipe = TypeTransformer(pipe,[GymTypeTransform])
pipe = dp.iter.MapToIterConverter(pipe)
pipe = LoggerBasePassThrough(pipe,[logger_base])
pipe = dp.iter.InMemoryCacheHolder(pipe)
pipe = pipe.cycle()
pipe = GymStepper(pipe,synchronized_reset=True)
pipe = RewardCollector(pipe)
pipe = InputInjester(pipe)
pipe = TestSync(pipe)
pipe = pipe.header(limit=10)


pipe = BatchCollector(pipe,batch_on_pipe=dp.iter.Header)
pipe = EpocherCollector(pipe,epochs=5)
pipe = logger_base.connect_source_datapipe(pipe)
# Turn off the seed so that some envs end before others...
steps = list(pipe)
epoch batch reward
1 10 1.0
2 10 1.0
3 10 1.0
4 10 1.0
4 10 1.0
dl = DataLoader2(
    pipe,
    reading_service=PrototypeMultiProcessingReadingService(
        num_workers = 1,
        protocol_client_type = InputItemIterDataPipeQueueProtocolClient,
        protocol_server_type = InputItemIterDataPipeQueueProtocolServer,
        pipe_type = item_input_pipe_type,
        eventloop = SpawnProcessForDataPipeline
    )
)

# dl = logger_base.connect_source_datapipe(dl)

ActionPublish

 ActionPublish (*args, **kwds)

Publishes an action augmentation to the dataloader.


CacheLoggerBase

 CacheLoggerBase (*args, **kwds)

Short lived logger base meant to dump logs