| |
| |
|
|
| |
|
|
| import os |
| import sys |
| import shutil |
| import subprocess |
| import warnings |
| from shlex import split |
| from abc import ABC, abstractmethod |
| from deepspeed.accelerator import get_accelerator |
| from deepspeed.utils import logger, get_numactl_cmd |
| from deepspeed.launcher.constants import PDSH_MAX_FAN_OUT, MVAPICH_TMP_HOSTFILE |
|
|
|
|
| class MultiNodeRunner(ABC): |
|
|
| def __init__(self, args, world_info_base64): |
| self.args = args |
| self.validate_args() |
| self.user_arguments = self.parse_user_args() |
| self.user_script = args.user_script |
| self.world_info_base64 = world_info_base64 |
| self.exports = {} |
|
|
| @abstractmethod |
| def backend_exists(self): |
| """Return whether the corresponding backend exists""" |
|
|
| @abstractmethod |
| def get_cmd(self, environment, active_resources): |
| """Return the command to execute on node""" |
|
|
| def add_export(self, key, var): |
| self.exports[key.strip()] = var.strip() |
|
|
| def parse_user_args(self): |
| return self.args.user_args |
|
|
| @property |
| def name(self): |
| """Return the name of the backend""" |
| return self.__class__.__name__ |
|
|
| def validate_args(self): |
| """Validate self.args""" |
|
|
|
|
| class PDSHRunner(MultiNodeRunner): |
|
|
| def __init__(self, args, world_info_base64): |
| super().__init__(args, world_info_base64) |
|
|
| def backend_exists(self): |
| return shutil.which('pdsh') |
|
|
| @property |
| def name(self): |
| return "pdsh" |
|
|
| def parse_user_args(self): |
| return list(map(lambda x: x if x.startswith("-") else f"'{x}'", self.args.user_args)) |
|
|
| def get_cmd(self, environment, active_resources): |
| environment['PDSH_RCMD_TYPE'] = 'ssh' |
|
|
| active_workers = ",".join(active_resources.keys()) |
| logger.info("Running on the following workers: %s" % active_workers) |
|
|
| |
| |
| pdsh_cmd_args = ['pdsh', '-S', '-f', str(PDSH_MAX_FAN_OUT), '-w', active_workers] + split( |
| self.args.launcher_args) |
|
|
| exports = "" |
| for key, val in self.exports.items(): |
| exports += "export {}={}; ".format(key, val) |
|
|
| |
| |
| deepspeed_launch = [ |
| exports, f"cd {os.path.abspath('.')};", sys.executable, "-u", "-m", "deepspeed.launcher.launch", |
| f'--world_info={self.world_info_base64}', "--node_rank=%n", f"--master_addr={self.args.master_addr}", |
| f"--master_port={self.args.master_port}" |
| ] |
| if self.args.no_python: |
| deepspeed_launch.append("--no_python") |
| if self.args.module: |
| deepspeed_launch.append("--module") |
| if self.args.no_local_rank: |
| deepspeed_launch.append("--no_local_rank") |
| if self.args.save_pid: |
| deepspeed_launch += ["--save_pid", f"{os.getpid()}"] |
| if self.args.elastic_training: |
| deepspeed_launch.append("--enable_elastic_training") |
| deepspeed_launch.append(f"--max_elastic_nodes={self.args.max_elastic_nodes}") |
| deepspeed_launch.append(f"--min_elastic_nodes={self.args.min_elastic_nodes}") |
|
|
| cmd_to_search = [i + "\\" for i in deepspeed_launch[2:6]] |
|
|
| kill_command = pdsh_cmd_args + ["pkill -f ", " ".join(cmd_to_search)[:-2]] |
| return pdsh_cmd_args + deepspeed_launch + [self.user_script] + self.user_arguments, kill_command |
|
|
|
|
| class OpenMPIRunner(MultiNodeRunner): |
|
|
| def __init__(self, args, world_info_base64, resource_pool): |
| super().__init__(args, world_info_base64) |
| self.resource_pool = resource_pool |
| self.add_export('UCX_TLS', 'tcp') |
|
|
| def backend_exists(self): |
| |
| return shutil.which('ompi_info') |
|
|
| @property |
| def name(self): |
| return "openmpi" |
|
|
| def validate_args(self): |
| super().validate_args() |
| |
| if self.args.include != "" or self.args.exclude != "": |
| raise ValueError(f"{self.name} backend does not support worker include/exclusion") |
| if self.args.num_nodes != -1 or self.args.num_gpus != -1: |
| raise ValueError(f"{self.name} backend does not support limiting num nodes/gpus") |
|
|
| def get_cmd(self, environment, active_resources): |
| total_process_count = sum(self.resource_pool.values()) |
|
|
| mpirun_cmd = [ |
| 'mpirun', |
| '-n', |
| f'{total_process_count}', |
| '-hostfile', |
| f'{self.args.hostfile}', |
| '--mca', |
| 'btl', |
| '^openib', |
| '--mca', |
| 'btl_tcp_if_include', |
| 'eth0', |
| ] + split(self.args.launcher_args) |
|
|
| export_cmd = [] |
| for k, v in self.exports.items(): |
| export_cmd += ['-x', "{}={}".format(k, v)] |
|
|
| python_exec = [] |
| if not self.args.no_python: |
| python_exec = [sys.executable, "-u"] |
| if self.args.module: |
| python_exec.append("-m") |
|
|
| return mpirun_cmd + export_cmd + python_exec + [self.user_script] + self.user_arguments |
|
|
|
|
| class MPICHRunner(MultiNodeRunner): |
|
|
| def __init__(self, args, world_info_base64, resource_pool): |
| super().__init__(args, world_info_base64) |
| self.resource_pool = resource_pool |
|
|
| def backend_exists(self): |
| |
| return shutil.which('mpirun') |
|
|
| @property |
| def name(self): |
| return "mpich" |
|
|
| def validate_args(self): |
| super().validate_args() |
| |
| if self.args.include != "" or self.args.exclude != "": |
| raise ValueError(f"{self.name} backend does not support worker include/exclusion") |
|
|
| if self.args.num_nodes != -1 or self.args.num_gpus != -1: |
| raise ValueError(f"{self.name} backend does not support limiting num nodes/gpus") |
|
|
| def get_cmd(self, environment, active_resources): |
| devices_per_node = self.resource_pool.values() |
| total_process_count = sum(devices_per_node) |
| process_per_node = list(devices_per_node)[0] |
| if not all([n == process_per_node for n in devices_per_node]): |
| raise ValueError("MPICH requires same number of devices per node") |
|
|
| mpirun_cmd = [ |
| 'mpirun', |
| ] + split(self.args.launcher_args) |
| export_cmd = [] |
|
|
| for k, v in self.exports.items(): |
| export_cmd += ['-genv', "{}={}".format(k, v)] |
|
|
| export_cmd += ['-genv', 'MASTER_ADDR', str(self.args.master_addr)] |
| export_cmd += ['-genv', 'MASTER_PORT', str(self.args.master_port)] |
| export_cmd += ['-genv', 'WORLD_SIZE', str(total_process_count)] |
| export_cmd += ['-genv', 'LOCAL_SIZE', str(process_per_node)] |
|
|
| hosts = list(self.resource_pool.keys()) |
|
|
| per_host_cmd = [] |
| host_id = 0 |
| host_count = 0 |
| for i in range(total_process_count): |
| local_rank = i % process_per_node |
| python_exec = [] |
| if not self.args.no_python: |
| python_exec += [sys.executable, "-u"] |
| if self.args.module: |
| python_exec.append("-m") |
| env_mapping = ['-env', 'RANK', str(i)] |
| env_mapping += ['-env', 'LOCAL_RANK', str(local_rank)] |
| if i == 0: |
| per_host_cmd = ['-n', '1', '-host', hosts[host_id] |
| ] + env_mapping + python_exec + [self.user_script] + self.user_arguments |
| else: |
| per_host_cmd = per_host_cmd + [':', '-n', '1', '-host', hosts[host_id] |
| ] + env_mapping + python_exec + [self.user_script] + self.user_arguments |
| host_count = host_count + 1 |
| if host_count == process_per_node: |
| host_id = host_id + 1 |
| host_count = 0 |
|
|
| return mpirun_cmd + export_cmd + per_host_cmd |
|
|
|
|
| class IMPIRunner(MultiNodeRunner): |
|
|
| def __init__(self, args, world_info_base64, resource_pool): |
| super().__init__(args, world_info_base64) |
| self.resource_pool = resource_pool |
|
|
| def backend_exists(self): |
| |
| return shutil.which('mpirun') |
|
|
| @property |
| def name(self): |
| return "impi" |
|
|
| def validate_args(self): |
| super().validate_args() |
| |
| if self.args.include != "" or self.args.exclude != "": |
| raise ValueError(f"{self.name} backend does not support worker include/exclusion") |
|
|
| if self.args.num_nodes != -1 or self.args.num_gpus != -1: |
| raise ValueError(f"{self.name} backend does not support limiting num nodes/gpus") |
|
|
| def get_cmd(self, environment, active_resources): |
| devices_per_node = self.resource_pool.values() |
| total_process_count = sum(devices_per_node) |
| process_per_node = list(devices_per_node)[0] |
| if not all([n == process_per_node for n in devices_per_node]): |
| raise ValueError("Intel MPI requires same number of devices per node") |
|
|
| mpirun_cmd = [ |
| 'mpirun', |
| '-ppn', |
| f'{process_per_node}', |
| ] + split(self.args.launcher_args) |
| export_cmd = [] |
|
|
| for k, v in self.exports.items(): |
| export_cmd += ['-genv', f'{k}', f'{v}'] |
|
|
| if self.args.bind_cores_to_rank: |
| cores_per_rank, _ = get_numactl_cmd(self.args.bind_core_list, process_per_node, 0) |
| export_cmd += ['-genv', 'OMP_NUM_THREADS', str(cores_per_rank)] |
|
|
| export_cmd += ['-genv', 'MASTER_ADDR', str(self.args.master_addr)] |
| export_cmd += ['-genv', 'MASTER_PORT', str(self.args.master_port)] |
| export_cmd += ['-genv', 'WORLD_SIZE', str(total_process_count)] |
| export_cmd += ['-genv', 'LOCAL_SIZE', str(process_per_node)] |
|
|
| export_cmd += ['-hosts'] |
| hosts = "" |
| for i, host in enumerate(self.resource_pool.keys()): |
| if i == 0: |
| hosts = f"{host}" |
| else: |
| hosts += f",{host}" |
| export_cmd += [hosts] |
|
|
| per_host_cmd = [] |
|
|
| for i in range(total_process_count): |
| local_rank = i % process_per_node |
| python_exec = [] |
| if self.args.bind_cores_to_rank: |
| _, numactl_cmd = get_numactl_cmd(self.args.bind_core_list, process_per_node, local_rank) |
| python_exec += numactl_cmd |
|
|
| if not self.args.no_python: |
| python_exec += [sys.executable, "-u"] |
| if self.args.module: |
| python_exec.append("-m") |
| env_mapping = ['-env', 'RANK', str(i)] |
| env_mapping += ['-env', 'LOCAL_RANK', str(local_rank)] |
| if i == 0: |
| per_host_cmd = ['-n', '1'] + env_mapping + python_exec + [self.user_script] + self.user_arguments |
| else: |
| per_host_cmd = per_host_cmd + [':', '-n', '1'] + env_mapping + python_exec + [self.user_script |
| ] + self.user_arguments |
| print(mpirun_cmd + export_cmd + per_host_cmd) |
| return mpirun_cmd + export_cmd + per_host_cmd |
|
|
|
|
| class SlurmRunner(MultiNodeRunner): |
|
|
| def __init__(self, args, world_info_base64, resource_pool): |
| super().__init__(args, world_info_base64) |
| self.resource_pool = resource_pool |
|
|
| def backend_exists(self): |
| return shutil.which('sinfo') |
|
|
| @property |
| def name(self): |
| return 'slurm' |
|
|
| def get_cmd(self, environment, active_resources): |
| assert not getattr(self.args, 'detect_nvlink_pairs', |
| False), "slurm backend does not support remapping visible devices" |
| total_process_count = sum(self.resource_pool.values()) |
| srun_cmd = [ |
| 'srun', |
| '-n', |
| f'{total_process_count}', |
| ] + split(self.args.launcher_args) |
|
|
| if getattr(self.args, 'slurm_comment', ''): |
| srun_cmd += ['--comment', self.args.slurm_comment] |
|
|
| if self.args.include != "": |
| srun_cmd.append('--include') |
| srun_cmd.append(f'{self.args.include}') |
| if self.args.exclude != "": |
| srun_cmd.append('--exclude') |
| srun_cmd.append(f'{self.args.exclude}') |
| if self.args.num_nodes > 0: |
| srun_cmd.append('--nodes') |
| srun_cmd.append(f'{self.args.num_nodes}') |
| if self.args.num_gpus > 0: |
| srun_cmd.append('--gpus') |
| srun_cmd.append(f'{self.args.num_gpus}') |
|
|
| exports = '--export=ALL' |
| for key, val in self.exports.items(): |
| exports += f",{key}={val}" |
|
|
| python_exec = [sys.executable, "-u"] |
| command = srun_cmd + [exports] + python_exec + [self.user_script] + self.user_arguments |
| return command |
|
|
|
|
| class MVAPICHRunner(MultiNodeRunner): |
|
|
| def __init__(self, args, world_info_base64, resource_pool): |
| super().__init__(args, world_info_base64) |
| self.resource_pool = resource_pool |
|
|
| |
| self.add_export('MV2_SMP_USE_CMA', '0') |
|
|
| |
| self.add_export('MV2_DEBUG_SHOW_BACKTRACE', '1') |
|
|
| |
| if get_accelerator().device_name() == 'cuda': |
| self.add_export('MV2_USE_CUDA', '1') |
|
|
| |
| self.add_export('MV2_SUPPORT_DL', '1') |
|
|
| |
| self.add_export('MV2_ENABLE_AFFINITY', '0') |
|
|
| |
| self.add_export('MV2_INTER_ALLGATHER_TUNING', '5') |
| self.add_export('MV2_CUDA_USE_NAIVE', '0') |
|
|
| def backend_exists(self): |
| |
| mpiname_exists = shutil.which('mpiname') |
| exists = False |
| if not mpiname_exists: |
| warnings.warn("mpiname does not exist, mvapich is not installed properly") |
| else: |
| results = subprocess.check_output('mpiname', shell=True) |
| mpiname_results = results.decode('utf-8').strip() |
| if "MVAPICH2-GDR" in mpiname_results: |
| exists = True |
| else: |
| warnings.warn(f"Expected MVAPICH2-GDR as return for mpiname but received {mpiname_results}") |
| return exists |
|
|
| @property |
| def name(self): |
| return "mvapich" |
|
|
| def validate_args(self): |
| super().validate_args() |
| |
| if self.args.include != "" or self.args.exclude != "": |
| raise ValueError(f"{self.name} backend does not support worker include/exclusion") |
| if self.args.num_nodes != -1 or self.args.num_gpus != -1: |
| raise ValueError(f"{self.name} backend does not support limiting num nodes/gpus") |
|
|
| def get_cmd(self, environment, active_resources): |
| devices_per_node = self.resource_pool.values() |
| total_process_count = sum(devices_per_node) |
| process_per_node = list(devices_per_node)[0] |
| if not all([n == process_per_node for n in devices_per_node]): |
| raise ValueError("mvapich requires same number of devices per node") |
|
|
| with open(MVAPICH_TMP_HOSTFILE, 'w') as fd: |
| for host in self.resource_pool.keys(): |
| fd.write(f'{host}\n') |
|
|
| mpirun_cmd = [ |
| 'mpirun', |
| '-np', |
| f'{total_process_count}', |
| '-ppn', |
| f'{process_per_node}', |
| '--hostfile', |
| f'{MVAPICH_TMP_HOSTFILE}', |
| ] + split(self.args.launcher_args) |
|
|
| export_cmd = [] |
| for k, v in self.exports.items(): |
| export_cmd += ['-env', "{}={}".format(k, v)] |
|
|
| python_exec = [] |
| if not self.args.no_python: |
| python_exec = [sys.executable, "-u"] |
| if self.args.module: |
| python_exec.append("-m") |
|
|
| return mpirun_cmd + export_cmd + python_exec + [self.user_script] + self.user_arguments |
|
|