File size: 6,281 Bytes
37b3db0 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 |
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
# This source code is licensed under the license found in the
# LICENSE file in the root directory of this source tree.
# --------------------------------------------------------
# A script to run multinode training with submitit.
# --------------------------------------------------------
import argparse
import os
import uuid
from pathlib import Path
import sys
import training.main as main
import submitit
def parse_args():
parser = argparse.ArgumentParser("Submitit for openclip")
parser.add_argument("config_name", type=str, help="name of the config.")
parser.add_argument("weights_pretrained", type=str, help="path of the pretrained weights.")
parser.add_argument("logs_folder_name", type=str, help="name of the folder for saving logs.")
parser.add_argument("--max_job_time", default="0-24:00:00", type=str, help="Number of gpus to request on each node")
parser.add_argument("--ngpus", default=None, type=int, help="Number of gpus to request on each node")
parser.add_argument("--nodes", default=None, type=int, help="Number of nodes to request")
parser.add_argument("--resume", default="", type=str, help="resume a checkpoint.")
parser.add_argument("--timeout", default=4320, type=int, help="Duration of the job")
parser.add_argument("--job_dir", default="", type=str, help="Job dir. Leave empty for automatic.")
parser.add_argument("--partition", default="learnlab", type=str, help="Partition where to submit")
parser.add_argument("--use_volta32", action='store_true', help="Request 32G V100 GPUs")
parser.add_argument('--comment', default="", type=str, help="Comment to pass to scheduler")
args = parser.parse_args()
return args
def get_shared_folder() -> Path:
user = os.getenv("PWD")
if Path(user).is_dir():
p = Path(f"{user}/openclip")
return p
raise RuntimeError("No shared folder available")
def get_init_file():
# Init file must not exist, but it's parent dir must exist.
os.makedirs(str(get_shared_folder()), exist_ok=True)
init_file = get_shared_folder() / f"{uuid.uuid4().hex}_init"
if init_file.exists():
return init_file
class Trainer(object):
def __init__(self, args):
self.args = args
self.args.config.dist_url = get_init_file().as_uri()
def __call__(self):
import sys
import training.main as main
def checkpoint(self):
import os
import submitit
self.args.config.dist_url = get_init_file().as_uri()
checkpoint_file = os.path.join(self.args.config.output_dir, "checkpoints", "")
if os.path.exists(checkpoint_file):
self.args.config.resume = checkpoint_file
print("Requeuing ", self.args)
empty_trainer = type(self)(self.args)
return submitit.helpers.DelayedSubmission(empty_trainer)
def _setup_gpu_args(self):
import submitit
from pathlib import Path
job_env = submitit.JobEnvironment()
if self.args.ngpus >= 1:
self.args.config.local_rank = job_env.local_rank
self.args.config.rank = job_env.global_rank
self.args.config.world_size = job_env.num_tasks
print(f"Process group: {job_env.num_tasks} tasks, rank: {job_env.global_rank}")
def main(args):
if args.job_dir == "":
args.job_dir = get_shared_folder()
assert args.job_dir != ""
if os.path.exists(args.job_dir) and len(args.resume) == 0 and not hasattr(args.config, "eval"):
raise ValueError(f"{args.job_dir} existed, rm -rf {args.job_dir} ?")
args.job_dir = Path(args.job_dir) / "%j"
# Note that the folder will depend on the job_id, to easily track experiments
executor = submitit.AutoExecutor(folder=args.job_dir, slurm_max_num_timeout=30)
num_gpus_per_node = args.ngpus
nodes = args.nodes
timeout_min = args.timeout
partition = args.partition
kwargs = {}
if args.use_volta32:
kwargs['slurm_constraint'] = 'volta32gb'
if args.comment:
kwargs['slurm_comment'] = args.comment
slurm_additional_parameters = {
"account": "# CHANGE ME (add your slurm account id)",
"gpus-per-node": f"A100:{num_gpus_per_node}",
"time": args.max_job_time
tasks_per_node=num_gpus_per_node, # one task per GPU
# Below are cluster dependent parameters
args.dist_url = get_init_file().as_uri()
args.output_dir = args.job_dir
trainer = Trainer(args)
job = executor.submit(trainer)
print("Submitted job_id:", job.job_id, "@", str(args.job_dir).replace("%j", job.job_id))
def submit():
args = parse_args()
from configs import search_config
from copy import deepcopy
config = search_config(args.config_name)
_args = deepcopy(args)
if len(args.resume):
checkpoint_file = os.path.join(config.output_dir, "checkpoints", args.resume)
args.resume = checkpoint_file
config.resume = checkpoint_file
if len(args.weights_pretrained):
config.logs = args.logs_folder_name
config.pretrained = args.weights_pretrained
setattr(_args, "config", config)
if args.ngpus is not None:
_args.ngpus = args.ngpus
elif hasattr(config, "ngpus"):
_args.ngpus = config.ngpus
raise ValueError("must specify ngpus in arg or config.")
if args.nodes is not None:
_args.nodes = args.nodes
elif hasattr(config, "nodes"):
_args.nodes = config.nodes
raise ValueError("must specify ngpus in arg or config.")
_args.job_dir = config.output_dir
if __name__ == "__main__":