Spaces:
Running
Running
admin
commited on
Commit
•
e4da5dd
1
Parent(s):
ed3849d
upl utils
Browse files
utils.py
ADDED
@@ -0,0 +1,92 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
1 |
+
import os
|
2 |
+
import csv
|
3 |
+
import json
|
4 |
+
import shutil
|
5 |
+
|
6 |
+
TMP_DIR = "./__pycache__"
|
7 |
+
|
8 |
+
TAB_CONFIG = ["jsonl ⇆ csv", "json ⇆ csv", "json ⇆ jsonl"]
|
9 |
+
|
10 |
+
MODE = {"from": "jsonl", "to": "csv"}
|
11 |
+
|
12 |
+
|
13 |
+
def clean_cache(dir_path=TMP_DIR):
|
14 |
+
if os.path.exists(dir_path):
|
15 |
+
shutil.rmtree(dir_path)
|
16 |
+
|
17 |
+
if not os.path.exists(dir_path):
|
18 |
+
os.makedirs(dir_path)
|
19 |
+
|
20 |
+
|
21 |
+
def encoder_json(file_path: str):
|
22 |
+
with open(file_path, "r", encoding="utf-8") as file:
|
23 |
+
data_list = list(json.load(file))
|
24 |
+
|
25 |
+
return data_list
|
26 |
+
|
27 |
+
|
28 |
+
def encoder_jsonl(file_path: str):
|
29 |
+
data_list = []
|
30 |
+
with open(file_path, "r", encoding="utf-8") as file:
|
31 |
+
for line in file:
|
32 |
+
# 加载每一行的 JSON 数据
|
33 |
+
json_data = json.loads(line.strip())
|
34 |
+
data_list.append(json_data)
|
35 |
+
|
36 |
+
return data_list
|
37 |
+
|
38 |
+
|
39 |
+
def encoder_csv(file_path: str):
|
40 |
+
data_list = []
|
41 |
+
with open(file_path, "r", encoding="utf-8") as file:
|
42 |
+
csv_reader = csv.DictReader(file)
|
43 |
+
for row in csv_reader:
|
44 |
+
data_list.append(dict(row))
|
45 |
+
|
46 |
+
return data_list
|
47 |
+
|
48 |
+
|
49 |
+
def decoder_json(data_list: list, file_path=f"{TMP_DIR}/output.json"):
|
50 |
+
if data_list:
|
51 |
+
with open(file_path, "w", encoding="utf-8") as file:
|
52 |
+
# 将整个列表转换成 JSON 格式并写入文件
|
53 |
+
json.dump(data_list, file, ensure_ascii=False, indent=4)
|
54 |
+
|
55 |
+
return file_path
|
56 |
+
|
57 |
+
|
58 |
+
def decoder_csv(data_list: list, file_path=f"{TMP_DIR}/output.csv"):
|
59 |
+
if data_list:
|
60 |
+
# 提取第一个字典的键作为表头
|
61 |
+
header = list(data_list[0].keys())
|
62 |
+
with open(file_path, "w", newline="", encoding="utf-8") as file:
|
63 |
+
csv_writer = csv.writer(file)
|
64 |
+
# 写入表头
|
65 |
+
csv_writer.writerow(header)
|
66 |
+
# 逐项写入字典的值
|
67 |
+
for item in data_list:
|
68 |
+
csv_writer.writerow([item[key] for key in header])
|
69 |
+
|
70 |
+
return file_path
|
71 |
+
|
72 |
+
|
73 |
+
def decoder_jsonl(data_list: list, file_path=f"{TMP_DIR}/output.jsonl"):
|
74 |
+
if data_list:
|
75 |
+
with open(file_path, "w", encoding="utf-8") as file:
|
76 |
+
for data in data_list:
|
77 |
+
# 将每个 JSON 对象转换成字符串并写入文件,每行一个对象
|
78 |
+
json_line = json.dumps(data, ensure_ascii=False)
|
79 |
+
file.write(json_line + "\n")
|
80 |
+
|
81 |
+
return file_path
|
82 |
+
|
83 |
+
|
84 |
+
def change_mode(input: str):
|
85 |
+
affix = input.split(" ")
|
86 |
+
if affix[1] == "→":
|
87 |
+
MODE["from"] = affix[0]
|
88 |
+
MODE["to"] = affix[2]
|
89 |
+
|
90 |
+
else:
|
91 |
+
MODE["from"] = affix[2]
|
92 |
+
MODE["to"] = affix[0]
|