Spaces:
Running
Running
import os | |
import time | |
import json | |
import asyncio | |
import gradio as gr | |
# set the env | |
from dotenv import load_dotenv | |
load_dotenv() | |
# get the root path of the project | |
current_file_path = os.path.dirname(os.path.abspath(__file__)) | |
root_path = os.path.abspath(current_file_path) | |
from textwrap import dedent | |
from langchain_openai import ChatOpenAI | |
from langchain_openai import OpenAIEmbeddings | |
from langchain_community.vectorstores import FAISS | |
from langchain_core.prompts import ChatPromptTemplate | |
class OurLLM: | |
def __init__(self, model="gpt-4o"): | |
''' | |
params: | |
model: str, | |
模型名称 ["GLM-4-Flash", "GLM-4V-Flash", | |
"gpt-4o-mini", "gpt-4o", "o1-mini", | |
"gemini-1.5-flash-002", "gemini-1.5-pro-002", | |
"Qwen/Qwen2.5-7B-Instruct", "Qwen/Qwen2.5-Coder-7B-Instruct"] | |
''' | |
self.model_name = model | |
OPENAI_API_KEY = os.getenv('OPENAI_API_KEY') | |
OPENAI_API_KEY_DF = os.getenv('OPENAI_API_KEY_DF', OPENAI_API_KEY) | |
OPENAI_API_KEY_AZ = os.getenv('OPENAI_API_KEY_AZ', OPENAI_API_KEY) | |
OPENAI_API_KEY_CD = os.getenv('OPENAI_API_KEY_CD') | |
OPENAI_API_KEY_O1 = os.getenv('OPENAI_API_KEY_O1') | |
OPENAI_API_KEY_GLM = os.getenv('OPENAI_API_KEY_GLM') | |
OPENAI_API_KEY_SC = os.getenv('OPENAI_API_KEY_SC') | |
OPENAI_BASE_URL = os.getenv('OPENAI_BASE_URL') | |
OPENAI_BASE_URL_GLM = os.getenv('OPENAI_BASE_URL_GLM') | |
OPENAI_BASE_URL_SC = os.getenv('OPENAI_BASE_URL_SC') | |
# 创建 API Key 映射 | |
apiKeyMap = { | |
'gemini': {"base_url": OPENAI_BASE_URL, "api_key": OPENAI_API_KEY_DF}, | |
'gpt': {"base_url": OPENAI_BASE_URL, "api_key": OPENAI_API_KEY_AZ}, | |
'o1': {"base_url": OPENAI_BASE_URL, "api_key": OPENAI_API_KEY_O1}, | |
'claude': {"base_url": OPENAI_BASE_URL, "api_key": OPENAI_API_KEY_CD}, | |
'glm': {"base_url": OPENAI_BASE_URL_GLM, "api_key": OPENAI_API_KEY_GLM}, | |
'qwen': {"base_url": OPENAI_BASE_URL_SC, "api_key": OPENAI_API_KEY_SC}, | |
} | |
for name, info in apiKeyMap.items(): | |
if name in model.lower(): | |
self.base_url = info["base_url"] | |
self.api_key = info["api_key"] | |
break | |
assert self.base_url is not None, f"Base URL not found for model: {model}" | |
assert self.api_key is not None, f"API key not found for model: {model}" | |
chat_prompt = ChatPromptTemplate.from_messages( | |
[ | |
("system", "{system_prompt}"), | |
("human", "{input}"), | |
# ("ai", "{chat_history}"), | |
] | |
) | |
self.chat_prompt = chat_prompt | |
self.llm = self.get_llm(model) | |
def clean_json(self, s): | |
return s.replace("```json", "").replace("```", "").strip() | |
def get_system_prompt(self, mode="assistant"): | |
prompt_map = { | |
"assistant": dedent(""" | |
你是一个智能助手,擅长用简洁的中文回答用户的问题。 | |
请确保你的回答准确、清晰、有条理,并且符合中文的语言习惯。 | |
重要提示: | |
1. 回答要简洁明了,避免冗长 | |
2. 使用适当的专业术语 | |
3. 保持客观中立的语气 | |
4. 如果不确定,要明确指出 | |
"""), | |
# search | |
"keyword_expand": dedent(""" | |
你是一个搜索关键词扩展专家,擅长将用户的搜索意图转化为多个相关的搜索词或短语。 | |
用户会输入一段描述他们搜索需求的文本,请你生成与之相关的关键词列表。 | |
你需要返回一个可以直接被 json 库解析的响应,包含以下内容: | |
{ | |
"keywords": [关键词列表], | |
} | |
重要提示: | |
1. 关键词应该包含同义词、近义词、上位词、下位词 | |
2. 短语要体现不同的表达方式和组合 | |
3. 描述句子要涵盖不同的应用场景和用途 | |
4. 所有内容必须与原始搜索意图高度相关 | |
5. 扩展搜索意图到相关的应用场景和工具,例如: | |
- 如果搜索"PDF转MD",应包含PDF内容提取、PDF解析工具、PDF数据处理等 | |
- 如果搜索"图片压缩",应包含批量压缩工具、图片格式转换等 | |
- 如果搜索"代码格式化",应包含代码美化工具、语法检查器、代码风格统一等 | |
- 如果搜索"文本翻译",应包含机器翻译API、多语言翻译工具、离线翻译软件等 | |
- 如果搜索"数据可视化",应包含图表生成工具、数据分析库、交互式图表等 | |
- 如果搜索"网络爬虫",应包含数据采集框架、反爬虫绕过、数据解析工具等 | |
- 如果搜索"API测试",应包含接口测试工具、性能监控、自动化测试框架等 | |
6. 所有内容主要使用英文表达,并对部分关键词添加额外的中文表示 | |
7. 返回内容不要使用任何 markdown 格式 以及任何特殊字符 | |
"""), | |
"zh2en": dedent(""" | |
你是一个专业的中译英翻译专家,尤其擅长学术论文的翻译工作。 | |
请将用户提供的中文内容翻译成地道、专业的英文。 | |
重要提示: | |
1. 使用学术论文常用的表达方式和术语 | |
2. 保持专业、正式的语气 | |
3. 确保译文的准确性和流畅性 | |
4. 对专业术语进行准确翻译 | |
5. 遵循英文学术写作的语法规范 | |
6. 保持原文的逻辑结构 | |
7. 适当使用学术论文常见的过渡词和连接词 | |
8. 如遇到模糊的表达,选择最符合学术上下文的翻译 | |
9. 避免使用口语化或非正式的表达 | |
10. 注意时态和语态的准确使用 | |
"""), | |
"github_score": dedent(""" | |
你是一个语义匹配评分专家,擅长根据用户需求和仓库描述进行语义匹配度评分。 | |
用户会输入两部分内容: | |
1. 用户的具体需求描述 | |
2. 多个仓库的描述列表(以1,2,3等数字开头) | |
请你仔细分析用户需求,并对每个仓库进行评分。 | |
确保返回一个可以直接被 json 库解析的响应,包含以下内容: | |
{ | |
"indices": [仓库编号列表,按分数从高到低], | |
"scores": [编号对应的匹配度评分列表,0-100的整数,表示匹配程度] | |
} | |
重要提示: | |
1. 评分范围为0-100的整数,高于60分表示具有明显相关性 | |
2. 评分要客观反映仓库与需求的契合度 | |
3. 只返回评分大于 60 的仓库 | |
4. 返回内容不要使用任何 markdown 格式 以及任何特殊字符 | |
""") | |
} | |
return prompt_map[mode] | |
def get_llm(self, model="gpt-4o-mini"): | |
''' | |
params: | |
model: str, 模型名称 ["gpt-4o-mini", "gpt-4o", "o1-mini", "gemini-1.5-flash-002"] | |
''' | |
llm = ChatOpenAI( | |
model=model, | |
base_url=self.base_url, | |
api_key=self.api_key, | |
) | |
print(f"Init model {model} successfully!") | |
return llm | |
def ask_question(self, question, system_prompt=None): | |
# 1. 获取系统提示 | |
if system_prompt is None: | |
system_prompt = self.get_system_prompt() | |
# 2. 生成聊天提示 | |
prompt = self.chat_prompt.format(input=question, system_prompt=system_prompt) | |
config = { | |
"configurable": {"response_format": {"type": "json_object"}} | |
} | |
# 3. 调用 LLM 进行回答 | |
for _ in range(10): | |
try: | |
response = self.llm.invoke(prompt, config=config) | |
response.content = self.clean_json(response.content) | |
return response | |
except Exception as e: | |
print(e) | |
time.sleep(10) | |
continue | |
print(f"Failed to call llm for prompt: {prompt[0:10]}") | |
return None | |
async def ask_questions_parallel(self, questions, system_prompt=None): | |
# 1. 获取系统提示 | |
if system_prompt is None: | |
system_prompt = self.get_system_prompt() | |
# 2. 定义异步函数 | |
async def call_llm(prompt): | |
for _ in range(10): | |
try: | |
response = await self.llm.ainvoke(prompt) | |
response.content = self.clean_json(response.content) | |
return response | |
except Exception as e: | |
print(e) | |
await asyncio.sleep(10) | |
continue | |
print(f"Failed to call llm for prompt: {prompt[0:10]}") | |
return None | |
# 3. 构建 prompt | |
prompts = [self.chat_prompt.format(input=question, system_prompt=system_prompt) for question in questions] | |
# 4. 异步调用 | |
tasks = [call_llm(prompt) for prompt in prompts] | |
results = await asyncio.gather(*tasks) | |
return results | |
class RepoSearch: | |
def __init__(self): | |
db_path = os.path.join(root_path, "database", "init") | |
embeddings = OpenAIEmbeddings(api_key=os.getenv("OPENAI_API_KEY"), | |
base_url=os.getenv("OPENAI_BASE_URL"), | |
model="text-embedding-3-small") | |
assert os.path.exists(db_path), f"Database not found: {db_path}" | |
self.vector_db = FAISS.load_local(db_path, embeddings, | |
index_name="init", | |
allow_dangerous_deserialization=True) | |
def search(self, query, k=10): | |
''' | |
name + description + html_url + topics | |
''' | |
results = self.vector_db.similarity_search(query + " technology", k=k) | |
simple_str = "" | |
simple_list = [] | |
for i, doc in enumerate(results): | |
content = json.loads(doc.page_content) | |
metadata = doc.metadata | |
if content["description"] is None: | |
content["description"] = "" | |
# desc = content["description"] if len(content["description"]) < 300 else content["description"][:300] + "..." | |
simple_str += f"\t**{i+1}. {content['name']}** || {content['description']}\n" # 用于大模型匹配 | |
simple_list.append({ | |
"name": content["name"], | |
"description": content["description"], | |
**metadata, # 解包所有 metadata 字段 | |
}) | |
return simple_str, simple_list | |
def main(): | |
search = RepoSearch() | |
llm = OurLLM(model="gpt-4o") | |
def respond( | |
prompt: str, | |
history, | |
is_llm_filter: bool = False, | |
is_keyword_expand: bool = False, | |
match_num: int = 40 | |
): | |
# 1. 初始化历史记录 | |
if not history: | |
history = [{"role": "system", "content": "You are a friendly chatbot"}] | |
history.append({"role": "user", "content": prompt}) | |
response = {"role": "assistant", "content": ""} | |
yield history | |
# 2. 扩展用户问题关键词 | |
if is_keyword_expand: | |
response["content"] = "开始扩展关键词..." | |
yield history + [response] | |
query = llm.ask_question(prompt, system_prompt=llm.get_system_prompt("keyword_expand")).content | |
prompt = ", ".join(json.loads(query)["keywords"]) | |
# 3. 语义向量匹配 | |
response["content"] = "开始语义向量匹配..." | |
yield history + [response] | |
match_str, simple_list = search.search(prompt, match_num) | |
# 4. 通过 LLM 评分得到最匹配的仓库索引 | |
if not is_llm_filter: | |
simple_strs = [f"\t**{i+1}. {repo['name']}** [✨ {repo['star_count'] // 1000}k] || **Description:** {repo['description']} || **Url:** {repo['html_url']} \n" for i, repo in enumerate(simple_list)] | |
response["content"] = "".join(simple_strs) | |
yield history + [response] | |
else: | |
response["content"] = "开始通过 LLM 评分得到最匹配的仓库..." | |
yield history + [response] | |
query = ' ## 用户需要的仓库内容:' + prompt + '\n ## 搜索结果列表:' + match_str | |
out = llm.ask_question(query, system_prompt=llm.get_system_prompt("github_score")).content | |
matched_index = json.loads(out)["indices"] | |
# 5. 通过索引得到最匹配的仓库 | |
result = [simple_list[idx-1] for idx in matched_index] | |
simple_strs = [f"\t**{i+1}. {repo['name']}** [✨ {repo['star_count'] // 1000}k] || **Description:** {repo['description']} || **Url:** {repo['html_url']} \n" for i, repo in enumerate(result)] | |
response["content"] = "".join(simple_strs) | |
yield history + [response] | |
with gr.Blocks() as demo: | |
gr.Markdown("## Github semantic search (基于语义的 github 仓库搜索) 🌐") | |
with gr.Row(): | |
with gr.Column(scale=1): | |
# 添加控制参数 | |
llm_filter = gr.Checkbox( | |
label="使用LLM过滤结果", | |
value=False, | |
info="是否使用 LLM 对搜索结果进行二次过滤" | |
) | |
keyword_expand = gr.Checkbox( | |
label="扩展关键词搜索", | |
value=False, | |
info="是否使用 LLM 扩展搜索关键词" | |
) | |
match_number = gr.Slider( | |
minimum=10, | |
maximum=100, | |
value=40, | |
step=10, | |
label="语义匹配数量", | |
info="进行语义匹配后返回的仓库数量,若使用 LLM 过滤,建议适当增加数量" | |
) | |
with gr.Column(scale=3): | |
chatbot = gr.Chatbot( | |
label="Agent", | |
type="messages", | |
avatar_images=(None, "https://img1.baidu.com/it/u=2193901176,1740242983&fm=253&fmt=auto&app=138&f=JPEG?w=500&h=500"), | |
height="65vh" | |
) | |
prompt = gr.Textbox(max_lines=2, label="Chat Message") | |
# 更新submit调用,包含新的参数 | |
prompt.submit( | |
respond, | |
[prompt, chatbot, llm_filter, keyword_expand, match_number], | |
[chatbot] | |
) | |
prompt.submit(lambda: "", None, [prompt]) | |
demo.launch(share=False) | |
if __name__ == "__main__": | |
main() |