import json import os import shutil from glob import glob def read_json_file(file_path): file_path = "./script/"+file_path with open(file_path, 'r', encoding='utf-8') as file: data = json.load(file) return data def get_prompt(query: str, history: list): use_message = {"role": "user", "content": query} if history is None: history = [] history.append(use_message) message = {"text": history} return message def process_response(response_str: str, history: list): res_dict: dict = json.loads(response_str) code = res_dict.get("header", {}).get("code") status = res_dict.get("header", {}).get("status", 2) if code == 0: res_dict = res_dict.get("payload", {}).get( "choices", {}).get("text", [{}])[0] res_content = res_dict.get("content", "") if len(res_dict) > 0 and len(res_content) > 0: # Ignore the unnecessary data if "index" in res_dict: del res_dict["index"] response = res_content if status == 0: history.append(res_dict) else: history[-1]["content"] += response response = history[-1]["content"] return response, history, status else: return "", history, status else: print("error code ", code) print("you can see this website to know code detail") print("https://www.xfyun.cn/doc/spark/%E6%8E%A5%E5%8F%A3%E8%AF%B4%E6%98%8E.html") return "", history, status def init_script(history: list, jsonfile): script_data = read_json_file(jsonfile) return script_data def create_script(name, characters, summary, details): import os if not os.path.exists("script"): os.mkdir("script") data = { "name": name, "characters": characters, "summary": summary, "details": details } json_data = json.dumps(data, ensure_ascii=False) print(json_data) with open(f"./script/{name}.json", "w", encoding='utf-8') as file: file.write(json_data) pass def txt2vec(name: str, file_path: str): from langchain.document_loaders import TextLoader from langchain.text_splitter import RecursiveCharacterTextSplitter loader = TextLoader(file_path) data = loader.load() text_splitter = RecursiveCharacterTextSplitter( chunk_size=256, chunk_overlap=128) split_docs = text_splitter.split_documents(data) from langchain.embeddings.huggingface import HuggingFaceEmbeddings import sentence_transformers EMBEDDING_MODEL = "model/text2vec_ernie/" embeddings = HuggingFaceEmbeddings(model_name=EMBEDDING_MODEL) embeddings.client = sentence_transformers.SentenceTransformer( embeddings.model_name, device='cuda') from langchain.vectorstores import FAISS db = FAISS.from_documents(split_docs, embeddings) db.save_local(f"data/faiss/{name}/") def pdf2vec(name: str, file_path: str): from langchain.document_loaders import PyPDFLoader loader = PyPDFLoader(file_path) split_docs = loader.load_and_split() from langchain.embeddings.huggingface import HuggingFaceEmbeddings import sentence_transformers EMBEDDING_MODEL = "model/text2vec_ernie/" embeddings = HuggingFaceEmbeddings(model_name=EMBEDDING_MODEL) embeddings.client = sentence_transformers.SentenceTransformer( embeddings.model_name, device='cuda') from langchain.vectorstores import FAISS db = FAISS.from_documents(split_docs, embeddings) db.save_local(f"data/faiss/{name}/") def mycopyfile(srcfile, dstpath): # 复制函数 if not os.path.isfile(srcfile): print("%s not exist!" % (srcfile)) else: fpath, fname = os.path.split(srcfile) print(fpath) print(fname) # 分离文件名和路径 if not os.path.exists(dstpath): os.makedirs(dstpath) # 创建路径 shutil.copy(srcfile, dstpath + fname) # 复制文件 print("copy %s -> %s" % (srcfile, dstpath + fname))