Spaces:
Running
Running
File size: 6,312 Bytes
2319518 f67d239 2319518 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 |
import datetime
import re
import socket
import sys
import traceback
from typing import Literal, Optional
import jieba
import json5
from jieba import analyse
from agent.log import logger
def get_local_ip():
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
# doesn't even have to be reachable
s.connect(('10.255.255.255', 1))
IP = s.getsockname()[0]
except Exception:
IP = '127.0.0.1'
finally:
s.close()
return IP
def print_traceback():
logger.error(''.join(traceback.format_exception(*sys.exc_info())))
def has_chinese_chars(data) -> bool:
text = f'{data}'
return len(re.findall(r'[\u4e00-\u9fff]+', text)) > 0
def get_current_date_str(
lang: Literal['en', 'zh'] = 'en',
hours_from_utc: Optional[int] = None,
) -> str:
if hours_from_utc is None:
cur_time = datetime.datetime.now()
else:
cur_time = datetime.datetime.utcnow() + datetime.timedelta(
hours=hours_from_utc)
if lang == 'en':
date_str = 'Current date: ' + cur_time.strftime('%A, %B %d, %Y')
elif lang == 'zh':
cur_time = cur_time.timetuple()
date_str = f'当前时间:{cur_time.tm_year}年{cur_time.tm_mon}月{cur_time.tm_mday}日,星期'
date_str += ['一', '二', '三', '四', '五', '六', '日'][cur_time.tm_wday]
date_str += '。'
else:
raise NotImplementedError
return date_str
def save_text_to_file(path, text):
try:
with open(path, 'w', encoding='utf-8') as fp:
fp.write(text)
return 'SUCCESS'
except Exception as ex:
print_traceback()
return ex
ignore_words = [
'', ' ', '\t', '\n', '\\', 'is', 'are', 'am', 'what', 'how', '的', '吗', '是',
'了', '啊', '呢', '怎么', '如何', '什么', '?', '?', '!', '!', '“', '”', '‘', '’',
"'", "'", '"', '"', ':', ':', '讲了', '描述', '讲', '说说', '讲讲', '介绍', '总结下',
'总结一下', '文档', '文章', '文稿', '稿子', '论文', 'PDF', 'pdf', '这个', '这篇', '这', '我',
'帮我', '那个', '下', '翻译'
]
def get_split_word(text):
text = text.lower()
_wordlist = jieba.lcut(text.strip())
wordlist = []
for x in _wordlist:
if x in ignore_words:
continue
wordlist.append(x)
return wordlist
def get_keyword_by_llm(text, keyword_agent):
try:
res = keyword_agent.run(text)
res = json5.loads(res)
except Exception:
print('keyword is not json format!')
return get_split_word(text)
# json format
_wordlist = []
try:
if 'keywords_zh' in res and isinstance(res['keywords_zh'], list):
_wordlist.extend([kw.lower() for kw in res['keywords_zh']])
if 'keywords_en' in res and isinstance(res['keywords_en'], list):
_wordlist.extend([kw.lower() for kw in res['keywords_en']])
wordlist = []
for x in _wordlist:
if x in ignore_words:
continue
wordlist.append(x)
wordlist.extend(get_split_word(text))
return wordlist
except Exception:
return get_split_word(text)
def get_key_word(text):
text = text.lower()
_wordlist = analyse.extract_tags(text)
wordlist = []
for x in _wordlist:
if x in ignore_words:
continue
wordlist.append(x)
print('wordlist: ', wordlist)
return wordlist
def get_last_one_line_context(text):
lines = text.split('\n')
n = len(lines)
res = ''
for i in range(n - 1, -1, -1):
if lines[i].strip():
res = lines[i]
break
return res
def extract_urls(text):
pattern = re.compile(r'https?://\S+')
urls = re.findall(pattern, text)
return urls
def extract_obs(text):
k = text.rfind('\nObservation:')
j = text.rfind('\nThought:')
obs = text[k + len('\nObservation:'):j]
return obs.strip()
def extract_code(text):
# Match triple backtick blocks first
triple_match = re.search(r'```[^\n]*\n(.+?)```', text, re.DOTALL)
if triple_match:
text = triple_match.group(1)
else:
try:
text = json5.loads(text)['code']
except Exception:
print_traceback()
# If no code blocks found, return original text
return text
def parse_latest_plugin_call(text):
plugin_name, plugin_args = '', ''
i = text.rfind('\nAction:')
j = text.rfind('\nAction Input:')
k = text.rfind('\nObservation:')
if 0 <= i < j: # If the text has `Action` and `Action input`,
if k < j: # but does not contain `Observation`,
# then it is likely that `Observation` is ommited by the LLM,
# because the output text may have discarded the stop word.
text = text.rstrip() + '\nObservation:' # Add it back.
k = text.rfind('\nObservation:')
plugin_name = text[i + len('\nAction:'):j].strip()
plugin_args = text[j + len('\nAction Input:'):k].strip()
text = text[:k]
return plugin_name, plugin_args, text
# TODO: Say no to these ugly if statements.
def format_answer(text):
action, action_input, output = parse_latest_plugin_call(text)
if 'code_interpreter' in text:
rsp = ''
code = extract_code(action_input)
rsp += ('\n```py\n' + code + '\n```\n')
obs = extract_obs(text)
if '![fig' in obs:
rsp += obs
return rsp
elif 'image_gen' in text:
# get url of FA
# img_urls = URLExtract().find_urls(text.split("Final Answer:")[-1].strip())
obs = text.split('Observation:')[-1].split('\nThought:')[0].strip()
img_urls = []
if obs:
logger.info(repr(obs))
try:
obs = json5.loads(obs)
img_urls.append(obs['image_url'])
except Exception:
print_traceback()
img_urls = []
if not img_urls:
img_urls = extract_urls(text.split('Final Answer:')[-1].strip())
logger.info(img_urls)
rsp = ''
for x in img_urls:
rsp += '\n![picture](' + x.strip() + ')'
return rsp
else:
return text.split('Final Answer:')[-1].strip()
|