File size: 7,347 Bytes
7c73423
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
import gradio as gr
import json
import copy
import pandas as pd
from vocab import tokenizer_factory
from character_util import iter_vocab
from utils.log_util import logger
from functools import lru_cache

default_user_input = """\

Replace this text in the input field to see how tokenization works.

Buenos días!

华为发布Mate60手机。

ラグビーワールドカップ2023フランス"""
# default_tokenizer_name_1 = "Meta/llama3"
default_tokenizer_name_1 = "gradientai/Llama-3-8B-Instruct-Gradient-1048k"
default_tokenizer_name_2 = "openai/gpt-4o"


@lru_cache
def _tokenize(

        text: str,

        tokenizer_name: str,

        color_num: int = 5,

        add_special_token: bool = False

):
    logger.info("param=" + json.dumps({"text": text, "tokenizer_type": tokenizer_name}, ensure_ascii=False))
    pos_tokens = []
    tokenizer = tokenizer_factory.get_tokenizer(tokenizer_name)
    if add_special_token:
        encoding = tokenizer.encode(text, add_special_tokens=True)
    else:
        encoding = tokenizer.encode(text, add_special_tokens=False)

    table = []

    for idx, token_id in enumerate(encoding):
        decoded_text = tokenizer.decode([token_id])  # 特殊字符解码后会统一变成 �,对应 "\ufffd"
        pos_tokens.extend([(decoded_text, str(idx % color_num))])

        # token  "Byte":  # 这是 utf-8编码吧?
        token = tokenizer.convert_ids_to_tokens([token_id], skip_special_tokens=False)[0]
        if isinstance(token, bytes):
            try:
                token_str = token.decode("utf-8")
            except:
                token_str = token.decode("utf-8", errors="ignore")
                logger.error(f"{idx}: decode_error: " + json.dumps(  # gpt_35_turbo 经常有token会decode error,这里用来记录一下
                    {"tokenizer_type": tokenizer_name, "token": str(token), "token_str": token_str},
                    ensure_ascii=False))

            token_bytes = token
            # json_dumps = json.dumps(token_str)
        elif isinstance(token, str):
            token_str = token
            token_bytes = bytes(token_str, "utf-8")
            # json_dumps = json.dumps(token_str)
        else:
            logger.error(f"{idx}: wrong type for token {token_id} {type(token)} " + json.dumps(
                {"text": text, "tokenizer_type": tokenizer_name}, ensure_ascii=False))
            token_str = token
            token_bytes = token
            # continue

        # ⭐
        # TODO: gpt3.5_turbo错误: 只有id和text是对的,token和 utf8都是错的。说明 convert_ids_to_tokens 出错了。
        table.append(
            {"TokenID": token_id,
             "Token": token_str,  # utf-8解码后的字符串,为什么有些是 <0xE7>,表示什么?比如llama
             "Text": decoded_text,  #
             # "Bytes": token_bytes,  # bytes类型在gradio前端页面被解码成字符串,比如   b'\xe4\xb8\xad' 仍然显示成 "中"。因此 str(token_bytes)
             "UTF8 Bytes": str(token_bytes),
             # "Unicode": json_dumps  # unicode, 如果是ascii码,就直接显示。如果不是ascii码,就显示unicode
             }
        )

    table_df = pd.DataFrame(table)
    logger.info(f"tokenizer_type={tokenizer_name}, Tokens={table[:4]}")
    return pos_tokens, len(encoding), table_df


def tokenize(

        text: str,

        tokenizer_name: str,

        color_num: int = 5,

        add_special_token: bool = False

):
    """ tokenize wrapper

    As gr.Update would be overwritten after passing to frontend, we apply lru_cache in _tokenize.

    """
    pos_tokens, num_tokens, table_df = _tokenize(text, tokenizer_name, color_num, add_special_token)
    return gr.update(value=pos_tokens, label=f"Tokens: {num_tokens}"), table_df


def tokenize_pair(text, tokenizer_type_1, tokenizer_type_2):
    """

    input_text.change

    """
    pos_tokens_1, table_df_1 = tokenize(text, tokenizer_type_1)
    pos_tokens_2, table_df_2 = tokenize(text, tokenizer_type_2)
    return pos_tokens_1, table_df_1, pos_tokens_2, table_df_2


@lru_cache
def basic_count(tokenizer_name):
    stats = iter_vocab(tokenizer_name)
    return stats['vocab_size'], f'{stats["organization"]}'
    # return tokenizer.vocab_size, f'{stats["中文汉字数"]["中文单字"]}/{stats["中文汉字数"]["中文多字"]}'


# def get_compress_rate(tokenizer_name, all_corpus, unit):
#     tokenizer = tokenizer_factory.get_tokenizer(tokenizer_name)
#     compress_rate_stats = tokenize_corpus(tokenizer, all_corpus)
#     compress_rate = unit_convertor(compress_rate_stats, unit)
#     return compress_rate


@lru_cache
def get_overlap_token_size(tokenizer_name_1, tokenizer_name_2):
    tokenizer1 = tokenizer_factory.get_tokenizer(tokenizer_name_1)
    tokenizer2 = tokenizer_factory.get_tokenizer(tokenizer_name_2)

    vocab_set_1 = tokenizer1.get_vocab().keys()
    vocab_set_2 = tokenizer2.get_vocab().keys()

    token1 = next(iter(vocab_set_1))
    token2 = next(iter(vocab_set_2))
    if type(token1) != type(token2):  # bytes  str
        if isinstance(token1, str):
            vocab_set_1 = set([token.encode("utf-8") for token in vocab_set_1])
        if isinstance(token2, str):
            vocab_set_2 = set([token.encode("utf-8") for token in vocab_set_2])

    overlap_tokens = vocab_set_1 & vocab_set_2
    overlap_token_size = len(overlap_tokens)
    logger.info(
        f"{overlap_token_size} OverlapTokens of {tokenizer_name_1} {tokenizer_name_2}: {list(overlap_tokens)[:10]}")
    return overlap_token_size, overlap_token_size


def on_load(url_params, request: gr.Request):
    """

    onLoad

    """
    text = None
    tokenizer_type_1 = None
    tokenizer_type_2 = None
    try:
        url_params = json.loads(url_params)
    except:
        url_params = {}
    if request:
        logger.info(str(request.headers))
        client_ip = request.client.host
        # local_ip = socket.gethostbyname(socket.gethostbyname(""))
        # headers = request.kwargs['headers']
        # if headers and 'x-forwarded-for' in headers:
        #     x_forwarded_for = headers['x-forwarded-for']
        #     client_ip = x_forwarded_for.split(' ')[0] if x_forwarded_for else ""
        # if "referer" in request.headers:   # not work for huggingface-space
        #     url_params = parse_qs(urlparse(request.headers["referer"]).query)
        #     url_params = {k: v[0] for k, v in url_params.items() if len(v) > 0}
        tokenizer_type_1 = url_params.get("tokenizer1", default_tokenizer_name_1)
        tokenizer_type_2 = url_params.get("tokenizer2", default_tokenizer_name_2)
        text = url_params.get("text", default_user_input)
        logger.info(f"client_ip: {client_ip}; params: {url_params}")
    return text, tokenizer_type_1, tokenizer_type_2


# def compress_rate_unit_change(unit):
#     return gr.update(label=f"Compress Rate: {unit}"), gr.update(label=f"Compress Rate: {unit}"),


def test_coding():
    bytes1 = b'\xe4\xb8\xad'
    print(bytes1)  # b'\xe4\xb8\xad'


if __name__ == "__main__":
    print(get_overlap_token_size("gpt-35-turbo", "gpt-4"))
    # print(basic_count("internlm_chat_7b"))