rai_toolbox.mushin.MultiRunMetricsWorkflow#
- class rai_toolbox.mushin.MultiRunMetricsWorkflow(eval_task_cfg=None, working_dir=None)[source]#
Abstract class for workflows that record metrics using Hydra multirun.
This workflow creates subdirectories of multirun experiments using Hydra. These directories contain the Hydra YAML configuration and any saved metrics file (defined by the evaluationf task):
├── working_dir │ ├── <experiment directory name: 0> │ | ├── <hydra output subdirectory: (default: .hydra)> | | | ├── config.yaml | | | ├── hydra.yaml | | | ├── overrides.yaml │ | ├── <metrics_filename> │ ├── <experiment directory name: 1> | | ...
The evaluation task is expected to return a dictionary that maps
metric-name (str) -> value (number | Sequence[number])
Examples
Let’s create a simple workflow where we perform a multirun over a parameter,
epsilon
, and evaluate a task function that computes an accuracy and loss based on thatepsilon
value and a specifiedscale
.>>> from rai_toolbox.mushin.workflows import MultiRunMetricsWorkflow >>> from rai_toolbox.mushin import multirun
>>> class LocalRobustness(MultiRunMetricsWorkflow): ... @staticmethod ... def task(epsilon: float, scale: float) -> dict: ... epsilon *= scale ... val = 100 - epsilon**2 ... result = dict(accuracies=val+2, loss=epsilon**2) ... tr.save(result, "test_metrics.pt") ... return result
We’ll run this workflow for six total configurations of three
epsilon
values and twoscale
values. This will launch a Hydra multirun job and aggregate the results.>>> wf = LocalRobustness() >>> wf.run(epsilon=multirun([1.0, 2.0, 3.0]), scale=multirun([0.1, 1.0])) [2022-05-02 11:57:59,219][HYDRA] Launching 6 jobs locally [2022-05-02 11:57:59,220][HYDRA] #0 : +epsilon=1.0 +scale=0.1 [2022-05-02 11:57:59,312][HYDRA] #1 : +epsilon=1.0 +scale=1.0 [2022-05-02 11:57:59,405][HYDRA] #2 : +epsilon=2.0 +scale=0.1 [2022-05-02 11:57:59,498][HYDRA] #3 : +epsilon=2.0 +scale=1.0 [2022-05-02 11:57:59,590][HYDRA] #4 : +epsilon=3.0 +scale=0.1 [2022-05-02 11:57:59,683][HYDRA] #5 : +epsilon=3.0 +scale=1.0
Now that this workflow has run, we can view the results as an xarray-dataset whose coordinates reflect the multirun parameters that were varied, and whose data-variables are our recorded metrics: “accuracies” and “loss”.
>>> ds = wf.to_xarray() >>> ds <xarray.Dataset> Dimensions: (epsilon: 3, scale: 2) Coordinates: * epsilon (epsilon) float64 1.0 2.0 3.0 * scale (scale) float64 0.1 1.0 Data variables: accuracies (epsilon, scale) float64 102.0 101.0 102.0 98.0 101.9 93.0 loss (epsilon, scale) float64 0.01 1.0 0.04 4.0 0.09 9.0
We can also load this workflow by providing the working directory where it was run.
>>> loaded = LocalRobustness().load_from_dir(wf.working_dir) >>> loaded.to_xarray() <xarray.Dataset> Dimensions: (epsilon: 3, scale: 2) Coordinates: * epsilon (epsilon) float64 1.0 2.0 3.0 * scale (scale) float64 0.1 1.0 Data variables: accuracies (epsilon, scale) float64 102.0 101.0 102.0 98.0 101.9 93.0 loss (epsilon, scale) float64 0.01 1.0 0.04 4.0 0.09 9.0
- __init__(eval_task_cfg=None, working_dir=None)[source]#
Workflows and experiments using Hydra.
- Parameters:
- eval_task_cfg: Mapping | None (default: None)
The workflow configuration object.
- static pre_task(*args, **kwargs)#
Called prior to
task
This can be useful for doing things like setting random seeds, which must occur prior to instantiating objects for the evaluation task.
Notes
This function is automatically wrapped by
zen
, which is responsible for parsing the function’s signature and then extracting and instantiating the corresponding fields from a Hydra config object – passing them to the function. This behavior can be modified byself.run(pre_task_fn_wrapper=...)
- static task(*args, **kwargs)[source]#
Abstract
staticmethod
for users to define the task that is configured and launched by the workflow
- run(*, task_fn_wrapper=<function zen>, working_dir=None, sweeper=None, launcher=None, overrides=None, version_base='1.1', target_job_dirs=None, to_dictconfig=False, config_name='rai_workflow', job_name='rai_workflow', with_log_configuration=True, **workflow_overrides)[source]#
Run the experiment.
Individual workflows can explicitly define
workflow_overrides
to improve readability and undstanding of what parameters are expected for a particular workflow.- Parameters:
- task_fn_wrapper: Callable[[Callable[…, T1]], Callable[[Any], T1]] | None, optional (default=rai_toolbox.mushin.zen)
A wrapper applied to
self.task
prior to launching the task. The default wrapper israi_toolbox.mushin.zen
. SpecifyNone
for no wrapper to be applied.- working_dir: str (default: None, the Hydra default will be used)
The directory to run the experiment in. This value is used for setting
hydra.sweep.dir
.- sweeper: str | None (default: None)
The configuration name of the Hydra Sweeper to use (i.e., the override for
hydra/sweeper=sweeper
)- launcher: str | None (default: None)
The configuration name of the Hydra Launcher to use (i.e., the override for
hydra/launcher=launcher
)- overrides: List[str] | None (default: None)
Parameter overrides not considered part of the workflow parameter set. This is helpful for filtering out parameters stored in
self.workflow_overrides
.- version_baseOptional[str], optional (default=1.1)
Available starting with Hydra 1.2.0. - If the
version_base parameter
is not specified, Hydra 1.x will use defaults compatible with version 1.1. Also in this case, a warning is issued to indicate an explicit version_base is preferred. - If theversion_base parameter
isNone
, then the defaults are chosen for the current minor Hydra version. For example for Hydra 1.2, then would implyconfig_path=None
andhydra.job.chdir=False
. - If theversion_base
parameter is an explicit version string like “1.1”, then the defaults appropriate to that version are used.- to_dictconfig: bool (default: False)
If
True
, convert adataclasses.dataclass
to aomegaconf.DictConfig
. Note, this will remove Hydra’s cabability for validation with structured configurations.- config_namestr (default: “rai_workflow”)
Name of the stored configuration in Hydra’s ConfigStore API.
- job_namestr (default: “rai_workflow”)
Name of job for logging.
- with_log_configurationbool (default: True)
If
True
, enables the configuration of the logging subsystem from the loaded config.- **workflow_overrides: str | int | float | bool | multirun | hydra_list | dict
These parameters represent the values for configurations to use for the experiment.
Passing
param=multirun([1, 2, 3])
will perform a multirun over those three param values, whereas passingparam=hydra_list([1, 2, 3])
will pass the entire list as a single input.These values will be appended to the
overrides
for the Hydra job.
- load_from_dir(working_dir, metrics_filename)[source]#
Loading workflow job data from a given working directory. The workflow is loaded in-place and “self” is returned by this method.
- Parameters:
- working_dir: str | Path
The base working directory of the experiment. It is expected that subdirectories within this working directory will contain individual Hydra jobs data (yaml configurations) and saved metrics files.
- metrics_filename: str | Sequence[str] | None
The filename(s) or glob-pattern(s) uses to load the metrics. If
None
, the metrics stored inself.metrics
is used.
- Returns:
- loaded_workflowSelf
- load_metrics(metrics_filename)[source]#
Loads and aggregates across all multirun working dirs, and stores the metrics in
self.metrics
.self.metric_load_fn
is used to load each job’s metric file(s).- Parameters:
- metrics_filenamestr | Sequence[str]
The filename(s) or glob-pattern(s) uses to load the metrics. If
None
, the metrics stored inself.metrics
is used.
- Returns:
- metricsDict[str, List[Any]]
Examples
Creating a workflow that saves named metrics using
torch.save
>>> from rai_toolbox.mushin.workflows import MultiRunMetricsWorkflow, multirun >>> import torch as tr >>> ... class TorchWorkFlow(MultiRunMetricsWorkflow): ... @staticmethod ... def task(a, b): ... tr.save(dict(a=a, b=b), "metrics.pt") ... >>> wf = TorchWorkFlow() >>> wf.run(a=multirun([1, 2, 3]), b=False) [2022-06-01 12:35:51,650][HYDRA] Launching 3 jobs locally [2022-06-01 12:35:51,650][HYDRA] #0 : +a=1 +b=False [2022-06-01 12:35:51,715][HYDRA] #1 : +a=2 +b=False [2022-06-01 12:35:51,780][HYDRA] #2 : +a=3 +b=False
MultiRunMetricsWorkflow
usestorch.load
by default to load metrics files (refer tometric_load_fn
to change this behavior).>>> wf.load_metrics("metrics.pt") defaultdict(list, {'a': [1, 2, 3], 'b': [False, False, False]}) >>> wf.metrics defaultdict(list, {'a': [1, 2, 3], 'b': [False, False, False]})
- static metric_load_fn(file_path)[source]#
Loads a metric file and returns a dictionary of metric-name -> metric-value mappings.
The default metric load function is
torch.load
.- Parameters:
- file_pathPath
- Returns:
- named_metricsMapping[str, Any]
metric-name -> metric-value(s)
Examples
Designing a workflow that uses the
pickle
module to save and load metrics>>> from rai_toolbox.mushin import MultiRunMetricsWorkflow, multirun >>> import pickle >>> >>> class PickledWorkFlow(MultiRunMetricsWorkflow): ... @staticmethod ... def metric_load_fn(file_path: Path): ... with file_path.open("rb") as f: ... return pickle.load(f) ... ... @staticmethod ... def task(a, b): ... with open("./metrics.pkl", "wb") as f: ... pickle.dump(dict(a=a, b=b), f) >>> >>> wf = PickleWorkFlow() >>> wf.run(a=multirun([1, 2, 3]), b=False) >>> wf.load_metrics("metrics.pkl") >>> wf.metrics dict(a=[1, 2, 3], b=[False, False, False])
- to_xarray(include_working_subdirs_as_data_var=False, coord_from_metrics=None, non_multirun_params_as_singleton_dims=False, metrics_filename=None)[source]#
Convert workflow data to xarray Dataset.
- Parameters:
- include_working_subdirs_as_data_varbool, optional (default=False)
If
True
then the data-variable “working_subdir” will be included in the xarray. This data variable is used to lookup the working sub-dir path (a string) by multirun coordinate.- coord_from_metricsstr | None (default: None)
If not
None
defines the metric key to use as a coordinate in theDataset
. This function assumes that this coordinate represents the leading dimension for all data-variables.- non_multirun_params_as_singleton_dimsbool, optional (default=False)
If
True
then non-multirun entries fromworkflow_overrides
will be included as length-1 dimensions in the xarray. Useful for merging/ concatenation with other Datasets- metrics_filename: Optional[str]
The filename or glob-pattern uses to load the metrics. If
None
, the metrics stored inself.metrics
is used.
- Returns:
- resultsxarray.Dataset
A dataset whose dimensions and coordinate-values are determined by the quantities over which the multi-run was performed. The data variables correspond to the named results returned by the jobs.
- property target_dir_multirun_overrides#
For a multirun that sweeps over the target directories of a previous multirun,
target_dir_multirun_overrides
provides the flattened overrides for that previous run.Examples
>>> class A(MultiRunMetricsWorkflow): ... @staticmethod ... def task(value: float, scale: float): ... pass ...
>>> class B(MultiRunMetricsWorkflow): ... @staticmethod ... def task(): ... pass
>>> a = A() >>> a.run(value=multirun([-1.0, 0.0, 1.0]), scale=multirun([11.0, 9.0])) [2022-05-13 17:19:51,497][HYDRA] Launching 6 jobs locally [2022-05-13 17:19:51,497][HYDRA] #0 : +value=-1.0 +scale=11.0 [2022-05-13 17:19:51,555][HYDRA] #1 : +value=-1.0 +scale=9.0 [2022-05-13 17:19:51,729][HYDRA] #2 : +value=1.0 +scale=11.0 [2022-05-13 17:19:51,787][HYDRA] #3 : +value=1.0 +scale=9.0
>>> b = B() >>> b.run(target_job_dirs=a.multirun_working_dirs) [2022-05-13 17:19:59,900][HYDRA] Launching 6 jobs locally [2022-05-13 17:19:59,900][HYDRA] #0 : +job_dir=/home/scratch/multirun/0 [2022-05-13 17:19:59,958][HYDRA] #1 : +job_dir=/home/scratch/multirun/1 [2022-05-13 17:20:00,015][HYDRA] #2 : +job_dir=/home/scratch/multirun/2 [2022-05-13 17:20:00,073][HYDRA] #3 : +job_dir=/home/scratch/multirun/3
>>> b.target_dir_multirun_overrides {'value': [-1.0, -1.0, 1.0, 1.0], 'scale': [11.0, 9.0, 11.0, 9.0]}
Methods
__init__
([eval_task_cfg, working_dir])Workflows and experiments using Hydra.
pre_task
(*args, **kwargs)Called prior to
task
task
(*args, **kwargs)Abstract
staticmethod
for users to define the task that is configured and launched by the workflowrun
(*[, task_fn_wrapper, working_dir, ...])Run the experiment.
metric_load_fn
(file_path)Loads a metric file and returns a dictionary of metric-name -> metric-value mappings.
load_from_dir
(working_dir, metrics_filename)Loading workflow job data from a given working directory.
load_metrics
(metrics_filename)Loads and aggregates across all multirun working dirs, and stores the metrics in
self.metrics
.metric_load_fn
(file_path)Loads a metric file and returns a dictionary of metric-name -> metric-value mappings.
For a multirun that sweeps over the target directories of a previous multirun,
target_dir_multirun_overrides
provides the flattened overrides for that previous run.to_xarray
([...])Convert workflow data to xarray Dataset.
jobs_post_process
()Method to extract attributes and metrics relevant to the workflow.
plot
(**kwargs)Plot workflow metrics.
validate
([include_pre_task])Validates that the configuration will execute with the user-defined evaluation task