API¶

class torchrunx.Launcher(
hostnames: list[str] | Literal['auto', 'slurm'] = 'auto',
workers_per_host: int | list[int] | Literal['cpu', 'gpu'] = 'gpu',
ssh_config_file: str | os.PathLike | None = None,
backend: Literal['nccl', 'gloo', 'mpi', 'ucc'] | None = 'nccl',
timeout: int = 600,
copy_env_vars: tuple[str, ...] = ('PATH', 'LD_LIBRARY', 'LIBRARY_PATH', 'PYTHON*', 'CUDA*', 'TORCH*', 'PYTORCH*', 'NCCL*'),
extra_env_vars: dict[str, str] | None = None,
env_file: str | os.PathLike | None = None,
)[source]¶

For configuring the function launch environment.

hostnames: list[str] | Literal['auto', 'slurm'] = 'auto'¶

Nodes to launch the function on. By default, infer from SLURM, else ["localhost"].

workers_per_host: int | list[int] | Literal['cpu', 'gpu'] = 'gpu'¶

Number of processes to run per node. By default, number of GPUs per host.

ssh_config_file: str | os.PathLike | None = None¶

For connecting to nodes. By default, "~/.ssh/config" or "/etc/ssh/ssh_config".

backend: Literal['nccl', 'gloo', 'mpi', 'ucc'] | None = 'nccl'¶

Backend for worker process group. By default, NCCL (GPU backend). Use GLOO for CPU backend. None for no process group.

timeout: int = 600¶

Worker process group timeout (seconds).

copy_env_vars: tuple[str, ...] = ('PATH', 'LD_LIBRARY', 'LIBRARY_PATH', 'PYTHON*', 'CUDA*', 'TORCH*', 'PYTORCH*', 'NCCL*')¶

Environment variables to copy from the launcher process to workers. Supports Unix pattern matching syntax.

extra_env_vars: dict[str, str] | None = None¶

Additional environment variables to load onto workers.

env_file: str | os.PathLike | None = None¶

Path to a .env file, containing environment variables to load onto workers.

set_logging_handlers(
handler_factory: Callable[[], list[logging.Handler]] | Literal['auto'] | None,
) Self[source]¶

Provide a handler_factory function to customize processing of agent/worker logs.

Parameters:

handler_factory – Function that constructs and returns logging.Handler objects. See Custom Logging for more details.

run(
func: Callable[FunctionP, FunctionR],
*args: FunctionP.args,
**kwargs: FunctionP.kwargs,
) LaunchResult[FunctionR][source]¶

Distribute a function onto specified nodes and parallelize across workers.

Raises:
class torchrunx.LaunchResult(results: dict[str, list[FunctionR]])[source]¶

Container for objects returned from workers after successful launches.

index(hostname: str, locak_rank: int) FunctionR[source]¶

Get return value from worker by host and local rank.

rank(i: int) FunctionR[source]¶

Get return value from worker by global rank.

exception torchrunx.AgentFailedError[source]¶

Raised if agent fails (e.g. if signal received).

exception torchrunx.WorkerFailedError[source]¶

Raised if a worker fails (e.g. if signal recieved or segmentation fault).