loren-fact-checking / src /pproc_client /pseudo_multiproc_toolkit.py
jiangjiechen's picture
init loren for spaces
7f7285f
raw
history blame
3.52 kB
# -*- coding: utf-8 -*-
"""
@Author : Jiangjie Chen
@Time : 2020/6/8 22:17
@Contact : [email protected]
@Description:
"""
import re
import os
import ujson as json
import tensorflow as tf
def args_to_shell(args):
args_dict = vars(args)
shell_args = ''
for k, v in args_dict.items():
if isinstance(v, bool):
if v: shell_args += f'--{k} '
else:
if isinstance(v, list):
v = ' '.join([str(x) for x in v])
shell_args += f'--{k} {v} '
return shell_args
def _is_proc_file(fname):
return re.search('._\d+_proc$', fname) is not None
def _restore_fname_from_proc(fname):
if _is_proc_file(fname):
return '.'.join(fname.split('.')[:-1])
else:
return fname
def rename_fname_by_proc(fname: str, proc_num: int):
if not _is_proc_file(fname):
fname = fname + f'._{proc_num}_proc'
return fname
def chunks(lst, n):
"""Yield successive n-sized chunks from lst."""
for i in range(0, len(lst), n):
yield lst[i : i + n]
# def slice_dataset_json(in_fname, slice_num):
# with tf.io.gfile.GFile(in_fname) as fin:
# data = json.load(fin)
# sliced_data = chunks(data, slice_num)
# datasets = []
# for i in range(len(list(sliced_data))):
# proc_fname = rename_fname_by_proc(in_fname, i)
# with tf.io.gfile.GFile(proc_fname, 'w') as f:
# js = []
# for line in sliced_data[i]:
# js.append(line)
# json.dump(js, f)
# datasets.append(proc_fname)
# return datasets
def slice_filenames(in_fname, slice_num):
sliced_f = []
for i in range(slice_num):
sliced_f.append(rename_fname_by_proc(in_fname, i))
return sliced_f
def slice_dataset(in_fname, slice_num):
'''
:param in_fname:
:param slice_num:
:return: sliced dataset filenames
'''
with tf.io.gfile.GFile(in_fname) as fin:
data = fin.readlines()
_sliced_data = list(chunks(data, len(data) // slice_num))
if len(_sliced_data) == slice_num + 1: # loose ends
sliced_data = _sliced_data[:slice_num]
sliced_data[-1] += _sliced_data[-1]
else:
sliced_data = _sliced_data
datasets = []
for i in range(len(list(sliced_data))):
proc_fname = rename_fname_by_proc(in_fname, i)
with tf.io.gfile.GFile(proc_fname, 'w') as f:
for line in sliced_data[i]:
f.write(line)
datasets.append(proc_fname)
return datasets
def union_multiproc_files(files, overwrite=False):
real_out_fname = None
for i, file in enumerate(files):
if not _is_proc_file(file):
raise FileNotFoundError(file)
else:
_out_fname = _restore_fname_from_proc(file)
if i > 0 and _out_fname != real_out_fname:
raise ValueError(file, real_out_fname)
real_out_fname = _out_fname
if real_out_fname is None:
raise FileNotFoundError(real_out_fname)
if tf.io.gfile.exists(real_out_fname) and not overwrite:
print(f'Skip {real_out_fname}, as it already exists.')
else:
with tf.io.gfile.GFile(real_out_fname, 'w') as fo:
for file in files:
if _is_proc_file(file):
with tf.io.gfile.GFile(file) as f:
data = f.readlines()
for line in data:
fo.write(line)
print(f'{files} united into {real_out_fname}.')
return real_out_fname
def clean_multiproc_files(files):
for file in files:
if _is_proc_file(file):
if tf.io.gfile.exists(file):
print(f'Removing {file}...')
tf.io.gfile.remove(file)
else:
print(f'Removing {file}, but does not exists.')
if __name__ == '__main__':
test_file = 'cjjpy.py'
sliced_files = slice_dataset(test_file, 2)
file = union_multiproc_files(sliced_files)
clean_multiproc_files(sliced_files)