= LoggerBase() logger_base
Loggers Core
LoggerBase
LoggerBase (*args, **kwds)
The LoggerBase
class outlines simply the buffer
. It works in combo with LogCollector
datapipe which will add to the buffer
.
LoggerBase
also filters out the log records to as to not disrupt the training pipeline
LoggerBasePassThrough
LoggerBasePassThrough (*args, **kwds)
Allows for collectors to find LoggerBase
s early in the pipeline without worrying about accidently iterating the logger bases at the incorrect time/frequency.
This is mainly used for collectors to call find_dps
easily on the pipeline.
traverse(logger_base)
{140510339508112: (LoggerBase, {})}
LogCollector
LogCollector (*args, **kwds)
LogCollector
specifically manages finding and attaching itself to LoggerBase
s found earlier in the pipeline.
Notes:
User can init multiple different logger bases if they want
We then can manually add Collectors, custom for certain pipes such as for collecting rewards.
ProgressBarLogger
ProgressBarLogger (*args, **kwds)
The LoggerBase
class outlines simply the buffer
. It works in combo with LogCollector
datapipe which will add to the buffer
.
LoggerBase
also filters out the log records to as to not disrupt the training pipeline
RewardCollector
RewardCollector (*args, **kwds)
LogCollector
specifically manages finding and attaching itself to LoggerBase
s found earlier in the pipeline.
EpocherCollector
EpocherCollector (*args, **kwds)
Tracks the number of epochs that the pipeline is currently on.
BatchCollector
BatchCollector (*args, **kwds)
Tracks the number of batches that the pipeline is currently on.
EpisodeCollector
EpisodeCollector (*args, **kwds)
Collects the episode_n
field from steps.
RollingTerminatedRewardCollector
RollingTerminatedRewardCollector (*args, **kwds)
Collects the total_reward
field from steps if terminated
is true and logs a rolling average of size rolling_length
.
TestSync
TestSync (*args, **kwds)
Tests getting values from data loader requests.
from torchdata.dataloader2.dataloader2 import DataLoader2
from fastrl.data.dataloader2 import *
import pandas as pd
from fastrl.envs.gym import *
from fastrl.pipes.map.transforms import *
= ['CartPole-v1']*10
envs
= ProgressBarLogger(batch_on_pipe=BatchCollector,epoch_on_pipe=EpocherCollector)
logger_base
= dp.map.Mapper(envs)
pipe = TypeTransformer(pipe,[GymTypeTransform])
pipe = dp.iter.MapToIterConverter(pipe)
pipe = LoggerBasePassThrough(pipe,[logger_base])
pipe = dp.iter.InMemoryCacheHolder(pipe)
pipe = pipe.cycle()
pipe = GymStepper(pipe,synchronized_reset=True)
pipe = RewardCollector(pipe)
pipe = InputInjester(pipe)
pipe = TestSync(pipe)
pipe = pipe.header(limit=10)
pipe
= BatchCollector(pipe,batch_on_pipe=dp.iter.Header)
pipe = EpocherCollector(pipe,epochs=5)
pipe = logger_base.connect_source_datapipe(pipe)
pipe # Turn off the seed so that some envs end before others...
= list(pipe) steps
epoch | batch | reward |
---|---|---|
1 | 10 | 1.0 |
2 | 10 | 1.0 |
3 | 10 | 1.0 |
4 | 10 | 1.0 |
4 | 10 | 1.0 |
= DataLoader2(
dl
pipe,=PrototypeMultiProcessingReadingService(
reading_service= 1,
num_workers = InputItemIterDataPipeQueueProtocolClient,
protocol_client_type = InputItemIterDataPipeQueueProtocolServer,
protocol_server_type = item_input_pipe_type,
pipe_type = SpawnProcessForDataPipeline
eventloop
)
)
# dl = logger_base.connect_source_datapipe(dl)
ActionPublish
ActionPublish (*args, **kwds)
Publishes an action augmentation to the dataloader.
CacheLoggerBase
CacheLoggerBase (*args, **kwds)
Short lived logger base meant to dump logs