File size: 3,515 Bytes
7f7285f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
# -*- coding: utf-8 -*-

"""
@Author     : Jiangjie Chen
@Time       : 2020/6/8 22:17
@Contact    : [email protected]
@Description:
"""

import re
import os
import ujson as json
import tensorflow as tf


def args_to_shell(args):
	args_dict = vars(args)
	shell_args = ''
	for k, v in args_dict.items():
		if isinstance(v, bool):
			if v: shell_args += f'--{k} '
		else:
			if isinstance(v, list):
				v = ' '.join([str(x) for x in v])
			shell_args += f'--{k} {v} '
	return shell_args


def _is_proc_file(fname):
	return re.search('._\d+_proc$', fname) is not None


def _restore_fname_from_proc(fname):
	if _is_proc_file(fname):
		return '.'.join(fname.split('.')[:-1])
	else:
		return fname


def rename_fname_by_proc(fname: str, proc_num: int):
	if not _is_proc_file(fname):
		fname = fname + f'._{proc_num}_proc'
	return fname


def chunks(lst, n):
	"""Yield successive n-sized chunks from lst."""
	for i in range(0, len(lst), n):
		yield lst[i : i + n]


# def slice_dataset_json(in_fname, slice_num):
# 	with tf.io.gfile.GFile(in_fname) as fin:
# 		data = json.load(fin)
# 		sliced_data = chunks(data, slice_num)
# 		datasets = []
# 		for i in range(len(list(sliced_data))):
# 			proc_fname = rename_fname_by_proc(in_fname, i)
# 			with tf.io.gfile.GFile(proc_fname, 'w') as f:
# 				js = []
# 				for line in sliced_data[i]:
# 					js.append(line)
# 				json.dump(js, f)
# 			datasets.append(proc_fname)
# 	return datasets


def slice_filenames(in_fname, slice_num):
	sliced_f = []
	for i in range(slice_num):
		sliced_f.append(rename_fname_by_proc(in_fname, i))
	return sliced_f


def slice_dataset(in_fname, slice_num):
	'''
	:param in_fname:
	:param slice_num:
	:return: sliced dataset filenames
	'''
	with tf.io.gfile.GFile(in_fname) as fin:
		data = fin.readlines()
		_sliced_data = list(chunks(data, len(data) // slice_num))
		if len(_sliced_data) == slice_num + 1:  # loose ends
			sliced_data = _sliced_data[:slice_num]
			sliced_data[-1] += _sliced_data[-1]
		else:
			sliced_data = _sliced_data
		datasets = []
		for i in range(len(list(sliced_data))):
			proc_fname = rename_fname_by_proc(in_fname, i)
			with tf.io.gfile.GFile(proc_fname, 'w') as f:
				for line in sliced_data[i]:
					f.write(line)
			datasets.append(proc_fname)
	return datasets


def union_multiproc_files(files, overwrite=False):
	real_out_fname = None
	for i, file in enumerate(files):
		if not _is_proc_file(file):
			raise FileNotFoundError(file)
		else:
			_out_fname = _restore_fname_from_proc(file)
			if i > 0 and _out_fname != real_out_fname:
				raise ValueError(file, real_out_fname)
			real_out_fname = _out_fname

	if real_out_fname is None:
		raise FileNotFoundError(real_out_fname)

	if tf.io.gfile.exists(real_out_fname) and not overwrite:
		print(f'Skip {real_out_fname}, as it already exists.')
	else:
		with tf.io.gfile.GFile(real_out_fname, 'w') as fo:
			for file in files:
				if _is_proc_file(file):
					with tf.io.gfile.GFile(file) as f:
						data = f.readlines()
						for line in data:
							fo.write(line)
	print(f'{files} united into {real_out_fname}.')
	return real_out_fname


def clean_multiproc_files(files):
	for file in files:
		if _is_proc_file(file):
			if tf.io.gfile.exists(file):
				print(f'Removing {file}...')
				tf.io.gfile.remove(file)
			else:
				print(f'Removing {file}, but does not exists.')


if __name__ == '__main__':
	test_file = 'cjjpy.py'
	sliced_files = slice_dataset(test_file, 2)
	file = union_multiproc_files(sliced_files)
	clean_multiproc_files(sliced_files)