from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain_openai import OpenAIEmbeddings from langchain_community.vectorstores import FAISS from langchain_pinecone import PineconeVectorStore from langchain_core.documents import Document from langchain_openai import ChatOpenAI from langchain_core.output_parsers import StrOutputParser from langchain_core.prompts import PromptTemplate from uuid import uuid4 import unicodedata def remove_non_standard_ascii(input_string: str) -> str: normalized_string = unicodedata.normalize('NFKD', input_string) return ''.join(char for char in normalized_string if 'a' <= char <= 'z' or 'A' <= char <= 'Z' or char.isdigit() or char in ' .,!?') def get_text_from_content_for_doc(content): text = "" for page in content: text += content[page]["texte"] return text def get_text_from_content_for_audio(content): return content["transcription"] def get_text_chunks(text): text_splitter = RecursiveCharacterTextSplitter( chunk_size=500, # the character length of the chunck chunk_overlap=100, # the character length of the overlap between chuncks length_function=len # the length function - in this case, character length (aka the python len() fn.) ) chunks = text_splitter.split_text(text) return chunks def get_vectorstore(text_chunks,filename, file_type,namespace,index,enterprise_name): try: embedding = OpenAIEmbeddings(model="text-embedding-3-large") vector_store = PineconeVectorStore(index=index, embedding=embedding,namespace=namespace) file_name = filename.split(".")[0].replace(" ","_").replace("-","_").replace(".","_").replace("/","_").replace("\\","_").strip() documents = [] uuids = [] for i, chunk in enumerate(text_chunks): clean_filename = remove_non_standard_ascii(file_name) document = Document( page_content=chunk, metadata={"filename":filename,"file_type":file_type, "filename_id":clean_filename, "entreprise_name":enterprise_name}, ) uuid = f"{clean_filename}_{i}" uuids.append(uuid) documents.append(document) vector_store.add_documents(documents=documents, ids=uuids) return {"filename_id":clean_filename} except Exception as e: print(e) return False def get_retreive_answer(enterprise_id,prompt,index,common_id): try: embedding = OpenAIEmbeddings(model="text-embedding-3-large") vector_store = PineconeVectorStore(index=index, embedding=embedding,namespace=enterprise_id) retriever = vector_store.as_retriever( search_type="similarity_score_threshold", search_kwargs={"k": 3, "score_threshold": 0.6}, ) if common_id: vector_store_commun = PineconeVectorStore(index=index, embedding=embedding,namespace=common_id) retriever_commun = vector_store_commun.as_retriever( search_type="similarity_score_threshold", search_kwargs={"k": 3, "score_threshold": 0.1}, ) response = retriever.invoke(prompt) + retriever_commun.invoke(prompt) else: response = retriever.invoke(prompt) return response except Exception as e: print(e) return False def generate_response_via_langchain(query: str, stream: bool = False, model: str = "gpt-4o",context:str="",messages = [],style:str="formel",tonality:str="neutre",template:str = ""): # Define the prompt template if template == "": template = "En tant qu'IA experte en marketing, réponds avec un style {style} et une tonalité {tonality} dans ta communcation, sachant le context suivant: {context}, et l'historique de la conversation, {messages}, {query}" prompt = PromptTemplate.from_template(template) # Initialize the OpenAI LLM with the specified model llm = ChatOpenAI(model=model,temperature=0) # Create an LLM chain with the prompt and the LLM llm_chain = prompt | llm | StrOutputParser() if stream: # Return a generator that yields streamed responses return llm_chain.astream({ "query": query, "context": context, "messages": messages, "style": style, "tonality": tonality }) # Invoke the LLM chain and return the result return llm_chain.invoke({"query": query, "context": context, "messages": messages, "style": style, "tonality": tonality}) def setup_rag(file_type,content): if file_type == "pdf": text = get_text_from_content_for_doc(content) elif file_type == "audio": text = get_text_from_content_for_audio(content) chunks = get_text_chunks(text) vectorstore = get_vectorstore(chunks) return vectorstore def prompt_reformatting(prompt:str,context,query:str,style="formel",tonality="neutre"): if context == "": return prompt.format(context="Pas de contexte pertinent",messages="",query=query,style=style,tonality=tonality) docs_names = [] for chunk in context: print(chunk.metadata) chunk_name = chunk.metadata["filename"] if chunk_name not in docs_names: docs_names.append(chunk_name) context = ", ".join(docs_names) prompt = prompt.format(context=context,messages="",query=query,style=style,tonality=tonality) return prompt