rai_toolbox.mushin.MultiRunMetricsWorkflow#

class rai_toolbox.mushin.MultiRunMetricsWorkflow(eval_task_cfg=None, working_dir=None)[source]#

Abstract class for workflows that record metrics using Hydra multirun.

This workflow creates subdirectories of multirun experiments using Hydra. These directories contain the Hydra YAML configuration and any saved metrics file (defined by the evaluationf task):

├── working_dir
│    ├── <experiment directory name: 0>
│    |    ├── <hydra output subdirectory: (default: .hydra)>
|    |    |    ├── config.yaml
|    |    |    ├── hydra.yaml
|    |    |    ├── overrides.yaml
│    |    ├── <metrics_filename>
│    ├── <experiment directory name: 1>
|    |    ...

The evaluation task is expected to return a dictionary that maps metric-name (str) -> value (number | Sequence[number])

Examples

Let’s create a simple workflow where we perform a multirun over a parameter, epsilon, and evaluate a task function that computes an accuracy and loss based on that epsilon value and a specified scale.

>>> from rai_toolbox.mushin.workflows import MultiRunMetricsWorkflow
>>> from rai_toolbox.mushin import multirun
>>> class LocalRobustness(MultiRunMetricsWorkflow):
...     @staticmethod
...     def task(epsilon: float, scale: float) -> dict:
...         epsilon *= scale
...         val = 100 - epsilon**2
...         result = dict(accuracies=val+2, loss=epsilon**2)
...         tr.save(result, "test_metrics.pt")
...         return result

We’ll run this workflow for six total configurations of three epsilon values and two scale values. This will launch a Hydra multirun job and aggregate the results.

>>> wf = LocalRobustness()
>>> wf.run(epsilon=multirun([1.0, 2.0, 3.0]), scale=multirun([0.1, 1.0]))
[2022-05-02 11:57:59,219][HYDRA] Launching 6 jobs locally
[2022-05-02 11:57:59,220][HYDRA]    #0 : +epsilon=1.0 +scale=0.1
[2022-05-02 11:57:59,312][HYDRA]    #1 : +epsilon=1.0 +scale=1.0
[2022-05-02 11:57:59,405][HYDRA]    #2 : +epsilon=2.0 +scale=0.1
[2022-05-02 11:57:59,498][HYDRA]    #3 : +epsilon=2.0 +scale=1.0
[2022-05-02 11:57:59,590][HYDRA]    #4 : +epsilon=3.0 +scale=0.1
[2022-05-02 11:57:59,683][HYDRA]    #5 : +epsilon=3.0 +scale=1.0

Now that this workflow has run, we can view the results as an xarray-dataset whose coordinates reflect the multirun parameters that were varied, and whose data-variables are our recorded metrics: “accuracies” and “loss”.

>>> ds = wf.to_xarray()
>>> ds
<xarray.Dataset>
Dimensions:     (epsilon: 3, scale: 2)
Coordinates:
* epsilon     (epsilon) float64 1.0 2.0 3.0
* scale       (scale) float64 0.1 1.0
Data variables:
    accuracies  (epsilon, scale) float64 102.0 101.0 102.0 98.0 101.9 93.0
    loss        (epsilon, scale) float64 0.01 1.0 0.04 4.0 0.09 9.0

We can also load this workflow by providing the working directory where it was run.

>>> loaded = LocalRobustness().load_from_dir(wf.working_dir)
>>> loaded.to_xarray()
<xarray.Dataset>
Dimensions:     (epsilon: 3, scale: 2)
Coordinates:
* epsilon     (epsilon) float64 1.0 2.0 3.0
* scale       (scale) float64 0.1 1.0
Data variables:
    accuracies  (epsilon, scale) float64 102.0 101.0 102.0 98.0 101.9 93.0
    loss        (epsilon, scale) float64 0.01 1.0 0.04 4.0 0.09 9.0
__init__(eval_task_cfg=None, working_dir=None)[source]#

Workflows and experiments using Hydra.

Parameters:
eval_task_cfg: Mapping | None (default: None)

The workflow configuration object.

static pre_task(*args, **kwargs)#

Called prior to task

This can be useful for doing things like setting random seeds, which must occur prior to instantiating objects for the evaluation task.

Notes

This function is automatically wrapped by zen, which is responsible for parsing the function’s signature and then extracting and instantiating the corresponding fields from a Hydra config object – passing them to the function. This behavior can be modified by self.run(pre_task_fn_wrapper=...)

static task(*args, **kwargs)[source]#

Abstract staticmethod for users to define the task that is configured and launched by the workflow

run(*, task_fn_wrapper=<function zen>, working_dir=None, sweeper=None, launcher=None, overrides=None, version_base='1.1', target_job_dirs=None, to_dictconfig=False, config_name='rai_workflow', job_name='rai_workflow', with_log_configuration=True, **workflow_overrides)[source]#

Run the experiment.

Individual workflows can explicitly define workflow_overrides to improve readability and undstanding of what parameters are expected for a particular workflow.

Parameters:
task_fn_wrapper: Callable[[Callable[…, T1]], Callable[[Any], T1]] | None, optional (default=rai_toolbox.mushin.zen)

A wrapper applied to self.task prior to launching the task. The default wrapper is rai_toolbox.mushin.zen. Specify None for no wrapper to be applied.

working_dir: str (default: None, the Hydra default will be used)

The directory to run the experiment in. This value is used for setting hydra.sweep.dir.

sweeper: str | None (default: None)

The configuration name of the Hydra Sweeper to use (i.e., the override for hydra/sweeper=sweeper)

launcher: str | None (default: None)

The configuration name of the Hydra Launcher to use (i.e., the override for hydra/launcher=launcher)

overrides: List[str] | None (default: None)

Parameter overrides not considered part of the workflow parameter set. This is helpful for filtering out parameters stored in self.workflow_overrides.

version_baseOptional[str], optional (default=1.1)

Available starting with Hydra 1.2.0. - If the version_base parameter is not specified, Hydra 1.x will use defaults compatible with version 1.1. Also in this case, a warning is issued to indicate an explicit version_base is preferred. - If the version_base parameter is None, then the defaults are chosen for the current minor Hydra version. For example for Hydra 1.2, then would imply config_path=None and hydra.job.chdir=False. - If the version_base parameter is an explicit version string like “1.1”, then the defaults appropriate to that version are used.

to_dictconfig: bool (default: False)

If True, convert a dataclasses.dataclass to a omegaconf.DictConfig. Note, this will remove Hydra’s cabability for validation with structured configurations.

config_namestr (default: “rai_workflow”)

Name of the stored configuration in Hydra’s ConfigStore API.

job_namestr (default: “rai_workflow”)

Name of job for logging.

with_log_configurationbool (default: True)

If True, enables the configuration of the logging subsystem from the loaded config.

**workflow_overrides: str | int | float | bool | multirun | hydra_list | dict

These parameters represent the values for configurations to use for the experiment.

Passing param=multirun([1, 2, 3]) will perform a multirun over those three param values, whereas passing param=hydra_list([1, 2, 3]) will pass the entire list as a single input.

These values will be appended to the overrides for the Hydra job.

load_from_dir(working_dir, metrics_filename)[source]#

Loading workflow job data from a given working directory. The workflow is loaded in-place and “self” is returned by this method.

Parameters:
working_dir: str | Path

The base working directory of the experiment. It is expected that subdirectories within this working directory will contain individual Hydra jobs data (yaml configurations) and saved metrics files.

metrics_filename: str | Sequence[str] | None

The filename(s) or glob-pattern(s) uses to load the metrics. If None, the metrics stored in self.metrics is used.

Returns:
loaded_workflowSelf
load_metrics(metrics_filename)[source]#

Loads and aggregates across all multirun working dirs, and stores the metrics in self.metrics.

self.metric_load_fn is used to load each job’s metric file(s).

Parameters:
metrics_filenamestr | Sequence[str]

The filename(s) or glob-pattern(s) uses to load the metrics. If None, the metrics stored in self.metrics is used.

Returns:
metricsDict[str, List[Any]]

Examples

Creating a workflow that saves named metrics using torch.save

>>> from rai_toolbox.mushin.workflows import MultiRunMetricsWorkflow, multirun
>>> import torch as tr
>>>
... class TorchWorkFlow(MultiRunMetricsWorkflow):
...     @staticmethod
...     def task(a, b):
...         tr.save(dict(a=a, b=b), "metrics.pt")
...
>>> wf = TorchWorkFlow()
>>> wf.run(a=multirun([1, 2, 3]), b=False)
[2022-06-01 12:35:51,650][HYDRA] Launching 3 jobs locally
[2022-06-01 12:35:51,650][HYDRA]        #0 : +a=1 +b=False
[2022-06-01 12:35:51,715][HYDRA]        #1 : +a=2 +b=False
[2022-06-01 12:35:51,780][HYDRA]        #2 : +a=3 +b=False

MultiRunMetricsWorkflow uses torch.load by default to load metrics files (refer to metric_load_fn to change this behavior).

>>> wf.load_metrics("metrics.pt")
defaultdict(list, {'a': [1, 2, 3], 'b': [False, False, False]})
>>> wf.metrics
defaultdict(list, {'a': [1, 2, 3], 'b': [False, False, False]})
static metric_load_fn(file_path)[source]#

Loads a metric file and returns a dictionary of metric-name -> metric-value mappings.

The default metric load function is torch.load.

Parameters:
file_pathPath
Returns:
named_metricsMapping[str, Any]

metric-name -> metric-value(s)

Examples

Designing a workflow that uses the pickle module to save and load metrics

>>> from rai_toolbox.mushin import MultiRunMetricsWorkflow, multirun
>>> import pickle
>>>
>>> class PickledWorkFlow(MultiRunMetricsWorkflow):
...     @staticmethod
...     def metric_load_fn(file_path: Path):
...         with file_path.open("rb") as f:
...             return pickle.load(f)
...
...     @staticmethod
...     def task(a, b):
...         with open("./metrics.pkl", "wb") as f:
...             pickle.dump(dict(a=a, b=b), f)
>>>
>>> wf = PickleWorkFlow()
>>> wf.run(a=multirun([1, 2, 3]), b=False)
>>> wf.load_metrics("metrics.pkl")
>>> wf.metrics
dict(a=[1, 2, 3], b=[False, False, False])
to_xarray(include_working_subdirs_as_data_var=False, coord_from_metrics=None, non_multirun_params_as_singleton_dims=False, metrics_filename=None)[source]#

Convert workflow data to xarray Dataset.

Parameters:
include_working_subdirs_as_data_varbool, optional (default=False)

If True then the data-variable “working_subdir” will be included in the xarray. This data variable is used to lookup the working sub-dir path (a string) by multirun coordinate.

coord_from_metricsstr | None (default: None)

If not None defines the metric key to use as a coordinate in the Dataset. This function assumes that this coordinate represents the leading dimension for all data-variables.

non_multirun_params_as_singleton_dimsbool, optional (default=False)

If True then non-multirun entries from workflow_overrides will be included as length-1 dimensions in the xarray. Useful for merging/ concatenation with other Datasets

metrics_filename: Optional[str]

The filename or glob-pattern uses to load the metrics. If None, the metrics stored in self.metrics is used.

Returns:
resultsxarray.Dataset

A dataset whose dimensions and coordinate-values are determined by the quantities over which the multi-run was performed. The data variables correspond to the named results returned by the jobs.

property target_dir_multirun_overrides#

For a multirun that sweeps over the target directories of a previous multirun, target_dir_multirun_overrides provides the flattened overrides for that previous run.

Examples

>>> class A(MultiRunMetricsWorkflow):
...     @staticmethod
...     def task(value: float, scale: float):
...         pass
...
>>> class B(MultiRunMetricsWorkflow):
...     @staticmethod
...     def task():
...         pass
>>> a = A()
>>> a.run(value=multirun([-1.0, 0.0, 1.0]), scale=multirun([11.0, 9.0]))
[2022-05-13 17:19:51,497][HYDRA] Launching 6 jobs locally
[2022-05-13 17:19:51,497][HYDRA]        #0 : +value=-1.0 +scale=11.0
[2022-05-13 17:19:51,555][HYDRA]        #1 : +value=-1.0 +scale=9.0
[2022-05-13 17:19:51,729][HYDRA]        #2 : +value=1.0 +scale=11.0
[2022-05-13 17:19:51,787][HYDRA]        #3 : +value=1.0 +scale=9.0
>>> b = B()
>>> b.run(target_job_dirs=a.multirun_working_dirs)
[2022-05-13 17:19:59,900][HYDRA] Launching 6 jobs locally
[2022-05-13 17:19:59,900][HYDRA]        #0 : +job_dir=/home/scratch/multirun/0
[2022-05-13 17:19:59,958][HYDRA]        #1 : +job_dir=/home/scratch/multirun/1
[2022-05-13 17:20:00,015][HYDRA]        #2 : +job_dir=/home/scratch/multirun/2
[2022-05-13 17:20:00,073][HYDRA]        #3 : +job_dir=/home/scratch/multirun/3
>>> b.target_dir_multirun_overrides
{'value': [-1.0, -1.0, 1.0, 1.0],
 'scale': [11.0, 9.0, 11.0, 9.0]}

Methods

__init__([eval_task_cfg, working_dir])

Workflows and experiments using Hydra.

pre_task(*args, **kwargs)

Called prior to task

task(*args, **kwargs)

Abstract staticmethod for users to define the task that is configured and launched by the workflow

run(*[, task_fn_wrapper, working_dir, ...])

Run the experiment.

metric_load_fn(file_path)

Loads a metric file and returns a dictionary of metric-name -> metric-value mappings.

load_from_dir(working_dir, metrics_filename)

Loading workflow job data from a given working directory.

load_metrics(metrics_filename)

Loads and aggregates across all multirun working dirs, and stores the metrics in self.metrics.

metric_load_fn(file_path)

Loads a metric file and returns a dictionary of metric-name -> metric-value mappings.

target_dir_multirun_overrides

For a multirun that sweeps over the target directories of a previous multirun, target_dir_multirun_overrides provides the flattened overrides for that previous run.

to_xarray([...])

Convert workflow data to xarray Dataset.

jobs_post_process()

Method to extract attributes and metrics relevant to the workflow.

plot(**kwargs)

Plot workflow metrics.

validate([include_pre_task])

Validates that the configuration will execute with the user-defined evaluation task