Spaces:
Build error
Build error
# Copyright (c) Facebook, Inc. and its affiliates. | |
# | |
# This source code is licensed under the MIT license found in the | |
# LICENSE file in the root directory of this source tree. | |
import logging | |
import os | |
import sys | |
import numpy as np | |
import joblib | |
import torch | |
import tqdm | |
logging.basicConfig( | |
format="%(asctime)s | %(levelname)s | %(name)s | %(message)s", | |
datefmt="%Y-%m-%d %H:%M:%S", | |
level=os.environ.get("LOGLEVEL", "INFO").upper(), | |
stream=sys.stdout, | |
) | |
logger = logging.getLogger("dump_km_label") | |
class ApplyKmeans(object): | |
def __init__(self, km_path): | |
self.km_model = joblib.load(km_path) | |
self.C_np = self.km_model.cluster_centers_.transpose() | |
self.Cnorm_np = (self.C_np ** 2).sum(0, keepdims=True) | |
self.C = torch.from_numpy(self.C_np) | |
self.Cnorm = torch.from_numpy(self.Cnorm_np) | |
if torch.cuda.is_available(): | |
self.C = self.C.cuda() | |
self.Cnorm = self.Cnorm.cuda() | |
def __call__(self, x): | |
if isinstance(x, torch.Tensor): | |
dist = ( | |
x.pow(2).sum(1, keepdim=True) | |
- 2 * torch.matmul(x, self.C) | |
+ self.Cnorm | |
) | |
return dist.argmin(dim=1).cpu().numpy() | |
else: | |
dist = ( | |
(x ** 2).sum(1, keepdims=True) | |
- 2 * np.matmul(x, self.C_np) | |
+ self.Cnorm_np | |
) | |
return np.argmin(dist, axis=1) | |
def get_feat_iterator(feat_dir, split, nshard, rank): | |
feat_path = f"{feat_dir}/{split}_{rank}_{nshard}.npy" | |
leng_path = f"{feat_dir}/{split}_{rank}_{nshard}.len" | |
with open(leng_path, "r") as f: | |
lengs = [int(line.rstrip()) for line in f] | |
offsets = [0] + np.cumsum(lengs[:-1]).tolist() | |
def iterate(): | |
feat = np.load(feat_path, mmap_mode="r") | |
assert feat.shape[0] == (offsets[-1] + lengs[-1]) | |
for offset, leng in zip(offsets, lengs): | |
yield feat[offset: offset + leng] | |
return iterate, len(lengs) | |
def dump_label(feat_dir, split, km_path, nshard, rank, lab_dir): | |
apply_kmeans = ApplyKmeans(km_path) | |
generator, num = get_feat_iterator(feat_dir, split, nshard, rank) | |
iterator = generator() | |
lab_path = f"{lab_dir}/{split}_{rank}_{nshard}.km" | |
os.makedirs(lab_dir, exist_ok=True) | |
with open(lab_path, "w") as f: | |
for feat in tqdm.tqdm(iterator, total=num): | |
# feat = torch.from_numpy(feat).cuda() | |
lab = apply_kmeans(feat).tolist() | |
f.write(" ".join(map(str, lab)) + "\n") | |
logger.info("finished successfully") | |
if __name__ == "__main__": | |
import argparse | |
parser = argparse.ArgumentParser() | |
parser.add_argument("feat_dir") | |
parser.add_argument("split") | |
parser.add_argument("km_path") | |
parser.add_argument("nshard", type=int) | |
parser.add_argument("rank", type=int) | |
parser.add_argument("lab_dir") | |
args = parser.parse_args() | |
logging.info(str(args)) | |
dump_label(**vars(args)) | |