File size: 2,519 Bytes
e4da5dd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
import os
import csv
import json
import shutil

TMP_DIR = "./__pycache__"

TAB_CONFIG = ["jsonl ⇆ csv", "json ⇆ csv", "json ⇆ jsonl"]

MODE = {"from": "jsonl", "to": "csv"}


def clean_cache(dir_path=TMP_DIR):
    if os.path.exists(dir_path):
        shutil.rmtree(dir_path)

    if not os.path.exists(dir_path):
        os.makedirs(dir_path)


def encoder_json(file_path: str):
    with open(file_path, "r", encoding="utf-8") as file:
        data_list = list(json.load(file))

    return data_list


def encoder_jsonl(file_path: str):
    data_list = []
    with open(file_path, "r", encoding="utf-8") as file:
        for line in file:
            # 加载每一行的 JSON 数据
            json_data = json.loads(line.strip())
            data_list.append(json_data)

    return data_list


def encoder_csv(file_path: str):
    data_list = []
    with open(file_path, "r", encoding="utf-8") as file:
        csv_reader = csv.DictReader(file)
        for row in csv_reader:
            data_list.append(dict(row))

    return data_list


def decoder_json(data_list: list, file_path=f"{TMP_DIR}/output.json"):
    if data_list:
        with open(file_path, "w", encoding="utf-8") as file:
            # 将整个列表转换成 JSON 格式并写入文件
            json.dump(data_list, file, ensure_ascii=False, indent=4)

    return file_path


def decoder_csv(data_list: list, file_path=f"{TMP_DIR}/output.csv"):
    if data_list:
        # 提取第一个字典的键作为表头
        header = list(data_list[0].keys())
        with open(file_path, "w", newline="", encoding="utf-8") as file:
            csv_writer = csv.writer(file)
            # 写入表头
            csv_writer.writerow(header)
            # 逐项写入字典的值
            for item in data_list:
                csv_writer.writerow([item[key] for key in header])

    return file_path


def decoder_jsonl(data_list: list, file_path=f"{TMP_DIR}/output.jsonl"):
    if data_list:
        with open(file_path, "w", encoding="utf-8") as file:
            for data in data_list:
                # 将每个 JSON 对象转换成字符串并写入文件,每行一个对象
                json_line = json.dumps(data, ensure_ascii=False)
                file.write(json_line + "\n")

    return file_path


def change_mode(input: str):
    affix = input.split(" ")
    if affix[1] == "→":
        MODE["from"] = affix[0]
        MODE["to"] = affix[2]

    else:
        MODE["from"] = affix[2]
        MODE["to"] = affix[0]