TL;DR
Create a minimal PyTorch Lightning configuration with trainer and module
fields, e.g.,
from pytorch_lightning import Trainer
from hydra_zen import builds, make_config
from rai_toolbox.mushin import HydraDDP
MyLightningModule = # load/create your lightning module
Config = make_config(
module=builds(MyLightningModule)
trainer=builds(Trainer, gpus=2, strategy=builds(HydraDDP)),
)
Define a task function:
from hydra_zen import instantiate
def task_fn(cfg):
obj = instantiate(cfg)
obj.trainer.fit(obj.module))
Simply launch a PyTorch Lightning job, e.g., launch(Config, task_fn),
or create a command line interface to run your job.
Tip
Using HydraDDP(), PyTorch Lightning’s ddp-mode Trainer
becomes compatible with interactive environments such as Jupyter Notebooks!
Run PyTorch Lightning DDP in Hydra#
Using Hydra to run PyTorch Lightning’s Distributed Data Parallel (DDP) Strategy
often has issues
, in part because the strategy launches subprocesses where the command is derived from
values in sys.argv.
The rAI-toolbox comes with a custom strategy, HydraDDP(),
that addresses the challenge of running Hydra and Lightning together using DDP.
In this How-To we will:
Define the requirements for a Hydra configuration.
Build a hydra-zen configuration to execute a PyTorch Lightning multi-GPU training task.
Launch the training task.
Examine the logged files in the Hydra working directory.
First, in order to use HydraDDP(), the Hydra configuration
must contain the following two sub-configurations:
Config
├── trainer: A ``pytorch_lightning.Trainer`` configuration
├── module: A ``pytorch_lightning.LightningModule`` configuration
├── datamodule: [OPTIONAL] A `pytorch_lightning.LightningDataModule` configuration
This configuration requirement enables HydraDDP() to use a
toolbox-provided task function (rai_toolbox.mushin.lightning._pl_main.py) that is
launched for each subprocess:
def task(trainer: Trainer, module: LightningModule, pl_testing: bool, pl_predicting: bool, pl_local_rank: int) -> None:
if pl_testing:
log.info(f"Rank {pl_local_rank}: Launched subprocess using Training.test")
trainer.test(module)
elif pl_predicting:
log.info(f"Rank {pl_local_rank}: Launched subprocess using Training.predict")
trainer.predict(module, datamodule=datamodule)
else:
log.info(f"Rank {pl_local_rank}: Launched subprocess using Training.fit")
trainer.fit(module)
Note that the configuration flags for pl_testing, pl_predicting, and pl_local_rank are
automatically set by HydraDDP() before execution.
Next let’s create an example configuration and task function using hydra-zen:
import pytorch_lightning as pl
from hydra_zen import builds, make_config, instantiate, launch
from rai_toolbox.mushin import HydraDDP
from rai_toolbox.mushin.testing.lightning import SimpleLightningModule
TrainerConfig = builds(
pl.Trainer,
strategy=builds(HydraDDP),
populate_full_signature=True,
)
ModuleConfig = builds(SimpleLightningModule, populate_full_signature=True)
Config = make_config(
trainer=TrainerConfig,
module=ModuleConfig
)
def task_function(cfg):
obj = instantiate(cfg)
obj.trainer.fit(obj.module)
Next, we launch the training job. For the purpose of this How-To, we will run only for a single epoch and in “fast dev run” mode.
>>> job = launch(Config, task_function,
... overrides=["trainer.gpus=2",
... "trainer.max_epochs=1",
... "trainer.fast_dev_run=True",
... ]
... )
GPU available: True, used: True
...
Lastly, the Hydra working directory will contain these two items:
The Hydra directory,
.hydra, storing the YAML configuration filesThe file,
zen_launch.log, storing any logging outputs from the run
The log file should contain the following information:
[2022-04-21 20:35:40,794][__main__][INFO] - Rank 1: Launched subprocess using Training.fit
[2022-04-21 20:35:42,800][torch.distributed.distributed_c10d][INFO] - Added key: store_based_barrier_key:1 to store for rank: 1
[2022-04-21 20:35:42,801][torch.distributed.distributed_c10d][INFO] - Added key: store_based_barrier_key:1 to store for rank: 0
[2022-04-21 20:35:42,802][torch.distributed.distributed_c10d][INFO] - Rank 0: Completed store-based barrier for key:store_based_barrier_key:1 with 2 nodes.
[2022-04-21 20:35:42,810][torch.distributed.distributed_c10d][INFO] - Rank 1: Completed store-based barrier for key:store_based_barrier_key:1 with 2 nodes.
Here you can see that the first line in the logged output indicates that the subprocess was launched for the second (Rank 1) GPU as expected.
Bonus: Adding Some Bells & Whistles to Our Hydra Application#
There are a couple of enhancements that we can add to our Hydra-based application,
which are beyond the scope of this How-To; it is simple to add a command line interface to our code and to make the HydraDDP() strategy available
as a swappable configuration group. We refer the reader to the linked tutorials for
further explanation and instruction.
The code from this How-To has been modified accordingly and placed in the script
pl_trainer.py:
pl_trainer.py#import hydra
from hydra.core.config_store import ConfigStore
import pytorch_lightning as pl
from hydra_zen import builds, make_config, instantiate
from rai_toolbox.mushin import HydraDDP
from rai_toolbox.mushin.testing.lightning import SimpleLightningModule
TrainerConfig = builds(pl.Trainer, populate_full_signature=True)
ModuleConfig = builds(SimpleLightningModule, populate_full_signature=True)
Config = make_config(trainer=TrainerConfig, module=ModuleConfig)
cs = ConfigStore.instance()
cs.store(group="trainer/strategy",
name="hydra_ddp",
node=builds(HydraDDP),
)
cs.store(name="pl_app", node=Config)
@hydra.main(config_path=None, config_name="pl_app")
def task_function(cfg):
obj = instantiate(cfg)
obj.trainer.fit(obj.module)
if __name__ == "__main__":
task_function()
We can configure and run this code from the command line:
$ python pl_trainer.py +trainer/strategy=hydra_ddp trainer.gpus=2 trainer.max_epochs=1 trainer.fast_dev_run=True
GPU available: True, used: True
...