import logging import sys import os import re import base64 import nest_asyncio import pandas as pd from pathlib import Path from typing import Any, Dict, List, Optional from PIL import Image import streamlit as st import torch from llama_index.core import Settings, SimpleDirectoryReader, StorageContext, Document from llama_index.core.storage.docstore import SimpleDocumentStore from llama_index.llms.ollama import Ollama from llama_index.embeddings.ollama import OllamaEmbedding from llama_index.core.node_parser import LangchainNodeParser from langchain.text_splitter import RecursiveCharacterTextSplitter from llama_index.core.storage.chat_store import SimpleChatStore from llama_index.core.memory import ChatMemoryBuffer from llama_index.core.query_engine import RetrieverQueryEngine from llama_index.core.chat_engine import CondensePlusContextChatEngine from llama_index.retrievers.bm25 import BM25Retriever from llama_index.core.retrievers import QueryFusionRetriever from llama_index.vector_stores.chroma import ChromaVectorStore from llama_index.core import VectorStoreIndex from llama_index.llms.huggingface import HuggingFaceLLM from llama_index.llms.huggingface_api import HuggingFaceInferenceAPI from llama_index.embeddings.huggingface import HuggingFaceEmbedding import chromadb #Configuração da imagem da aba im = Image.open("pngegg.png") st.set_page_config(page_title = "Chatbot Carômetro", page_icon=im, layout = "wide") #Removido loop e adicionado os.makedirs os.makedirs("bm25_retriever", exist_ok=True) os.makedirs("chat_store", exist_ok=True) os.makedirs("chroma_db", exist_ok=True) os.makedirs("documentos", exist_ok=True) os.makedirs("curadoria", exist_ok=True) os.makedirs("chroma_db_curadoria", exist_ok=True) # Configuração do Streamlit st.sidebar.title("Configuração de LLM") sidebar_option = st.sidebar.radio("Selecione o LLM", ["gpt-3.5-turbo", "NuExtract-1.5"]) # logo_url = 'app\logos\logo-sicoob.jpg' # st.sidebar.image(logo_url) import base64 #Configuração da imagem da sidebar with open("sicoob-logo.png", "rb") as f: data = base64.b64encode(f.read()).decode("utf-8") st.sidebar.markdown( f"""
""", unsafe_allow_html=True, ) #if sidebar_option == "Ollama": # Settings.llm = Ollama(model="llama3.2:latest", request_timeout=500.0, num_gpu=1) # Settings.embed_model = OllamaEmbedding(model_name="nomic-embed-text:latest") if sidebar_option == "gpt-3.5-turbo": from llama_index.llms.openai import OpenAI from llama_index.embeddings.openai import OpenAIEmbedding Settings.llm = OpenAI(model="gpt-3.5-turbo") Settings.embed_model = OpenAIEmbedding(model_name="text-embedding-ada-002") elif sidebar_option == 'NuExtract-1.5': logging.basicConfig(stream=sys.stdout, level=logging.INFO) logging.getLogger().addHandler(logging.StreamHandler(stream=sys.stdout)) #Embedding do huggingface Settings.embed_model = HuggingFaceEmbedding( model_name="BAAI/bge-small-en-v1.5" ) #Carregamento do modelo local, descomentar o modelo desejado llm = HuggingFaceLLM( context_window=2048, max_new_tokens=2048, generate_kwargs={"do_sample": False}, #query_wrapper_prompt=query_wrapper_prompt, #model_name="Qwen/Qwen2.5-Coder-32B-Instruct", #model_name="Qwen/Qwen2.5-14B-Instruct", # model_name="meta-llama/Llama-3.2-3B", #model_name="HuggingFaceH4/zephyr-7b-beta", # model_name="meta-llama/Meta-Llama-3-8B", model_name="numind/NuExtract-1.5", #model_name="meta-llama/Llama-3.2-3B", tokenizer_name="numind/NuExtract-1.5", device_map="auto", tokenizer_kwargs={"max_length": 512}, # uncomment this if using CUDA to reduce memory usage model_kwargs={"torch_dtype": torch.bfloat16}, ) chat = [ {"role": "user", "content": "Hello, how are you?"}, {"role": "assistant", "content": "I'm doing great. How can I help you today?"}, {"role": "user", "content": "I'd like to show off how chat templating works!"}, ] from transformers import AutoTokenizer tokenizer = AutoTokenizer.from_pretrained("numind/NuExtract-1.5") tokenizer.apply_chat_template(chat, tokenize=False) Settings.chunk_size = 512 Settings.llm = llm else: raise Exception("Opção de LLM inválida!") # Diretórios configurados pelo usuário chat_store_path = os.path.join("chat_store", "chat_store.json") documents_path = os.path.join("documentos") chroma_storage_path = os.path.join("chroma_db") # Diretório para persistência do Chroma chroma_storage_path_curadoria = os.path.join("chroma_db_curadoria") # Diretório para 'curadoria' bm25_persist_path = os.path.join("bm25_retriever") curadoria_path = os.path.join("curadoria") # Classe CSV Customizada (novo código) class CustomPandasCSVReader: """PandasCSVReader modificado para incluir cabeçalhos nos documentos.""" def __init__( self, *args: Any, concat_rows: bool = True, col_joiner: str = ", ", row_joiner: str = "\n", pandas_config: dict = {}, **kwargs: Any ) -> None: self._concat_rows = concat_rows self._col_joiner = col_joiner self._row_joiner = row_joiner self._pandas_config = pandas_config def load_data( self, file: Path, extra_info: Optional[Dict] = None, ) -> List[Document]: df = pd.read_csv(file, **self._pandas_config) text_list = [" ".join(df.columns.astype(str))] text_list += ( df.astype(str) .apply(lambda row: self._col_joiner.join(row.values), axis=1) .tolist() ) metadata = {"filename": file.name, "extension": file.suffix} if extra_info: metadata.update(extra_info) if self._concat_rows: return [Document(text=self._row_joiner.join(text_list), metadata=metadata)] else: return [ Document(text=text, metadata=metadata) for text in text_list ] def clean_documents(documents): """Remove caracteres não desejados diretamente nos textos dos documentos.""" cleaned_documents = [] for doc in documents: cleaned_text = re.sub(r"[^0-9A-Za-zÀ-ÿ ]", "", doc.get_content()) doc.text = cleaned_text cleaned_documents.append(doc) return cleaned_documents from llama_index.readers.google import GoogleDriveReader import json credentials_json = os.getenv('GOOGLE_CREDENTIALS') if credentials_json is None: raise ValueError("The GOOGLE_CREDENTIALS environment variable is not set.") # Write the credentials to a file credentials_path = "credentials.json" with open(credentials_path, 'w') as credentials_file: credentials_file.write(credentials_json) google_drive_reader = GoogleDriveReader(credentials_path=credentials_path) google_drive_reader._creds = google_drive_reader._get_credentials() def are_docs_downloaded(directory_path: str) -> bool: return os.path.isdir(directory_path) and any(os.scandir(directory_path)) def download_original_files_from_folder(greader: GoogleDriveReader, pasta_documentos_drive: str, local_path: str): os.makedirs(local_path, exist_ok=True) files_meta = greader._get_fileids_meta(folder_id=pasta_documentos_drive) if not files_meta: logging.info("Nenhum arquivo encontrado na pasta especificada.") return for fmeta in files_meta: file_id = fmeta[0] file_name = os.path.basename(fmeta[2]) local_file_path = os.path.join(local_path, file_name) if os.path.exists(local_file_path): logging.info(f"Arquivo '{file_name}' já existe localmente, ignorando download.") continue downloaded_file_path = greader._download_file(file_id, local_file_path) if downloaded_file_path: logging.info(f"Arquivo '{file_name}' baixado com sucesso em: {downloaded_file_path}") else: logging.warning(f"Não foi possível baixar '{file_name}'") #DADOS/QA_database/Documentos CSV/documentos pasta_documentos_drive = "1xVzo8s1D0blzR5ZB3m5k4dVWHuRmKUu-" #DADOS/QA_database/Documentos CSV/curadoria pasta_curadoria_drive = "1LRrdOkZy9p0FA3MQAyz-Ssj3ktKTWAwE" # Verifica e baixa arquivos se necessário (novo código) if not are_docs_downloaded(documents_path): logging.info("Baixando arquivos originais do Drive para 'documentos'...") download_original_files_from_folder(google_drive_reader, pasta_documentos_drive, documents_path) else: logging.info("'documentos' já contém arquivos, ignorando download.") if not are_docs_downloaded(curadoria_path): logging.info("Baixando arquivos originais do Drive para 'curadoria'...") download_original_files_from_folder(google_drive_reader, pasta_curadoria_drive, curadoria_path) else: logging.info("'curadoria' já contém arquivos, ignorando download.") # Configuração de leitura de documentos file_extractor = {".csv": CustomPandasCSVReader()} documents = SimpleDirectoryReader( input_dir=documents_path, file_extractor=file_extractor, filename_as_id=True ).load_data() documents = clean_documents(documents) # Configuração do Chroma e BM25 com persistência docstore = SimpleDocumentStore() docstore.add_documents(documents) db = chromadb.PersistentClient(path=chroma_storage_path) chroma_collection = db.get_or_create_collection("dense_vectors") vector_store = ChromaVectorStore(chroma_collection=chroma_collection) # Configuração do StorageContext storage_context = StorageContext.from_defaults( docstore=docstore, vector_store=vector_store ) # Criação/Recarregamento do índice com embeddings if os.path.exists(chroma_storage_path): index = VectorStoreIndex.from_vector_store(vector_store) else: splitter = LangchainNodeParser( RecursiveCharacterTextSplitter(chunk_size=512, chunk_overlap=64) ) index = VectorStoreIndex.from_documents( documents, storage_context=storage_context, transformations=[splitter] ) vector_store.persist() # Criação/Recarregamento do BM25 Retriever if os.path.exists(os.path.join(bm25_persist_path, "params.index.json")): bm25_retriever = BM25Retriever.from_persist_dir(bm25_persist_path) else: bm25_retriever = BM25Retriever.from_defaults( docstore=docstore, similarity_top_k=2, language="portuguese", # Idioma ajustado para seu caso ) os.makedirs(bm25_persist_path, exist_ok=True) bm25_retriever.persist(bm25_persist_path) #Adicionado documentos na pasta curadoria, foi setado para 1200 o chunk pra receber pergunta, contexto e resposta curadoria_documents = SimpleDirectoryReader( input_dir=curadoria_path, file_extractor=file_extractor, filename_as_id=True ).load_data() curadoria_documents = clean_documents(curadoria_documents) curadoria_docstore = SimpleDocumentStore() curadoria_docstore.add_documents(curadoria_documents) db_curadoria = chromadb.PersistentClient(path=chroma_storage_path_curadoria) chroma_collection_curadoria = db_curadoria.get_or_create_collection("dense_vectors_curadoria") vector_store_curadoria = ChromaVectorStore(chroma_collection=chroma_collection_curadoria) # Configuração do StorageContext para 'curadoria' storage_context_curadoria = StorageContext.from_defaults( docstore=curadoria_docstore, vector_store=vector_store_curadoria ) # Criação/Recarregamento do índice com embeddings para 'curadoria' if os.path.exists(chroma_storage_path_curadoria): curadoria_index = VectorStoreIndex.from_vector_store(vector_store_curadoria) else: curadoria_splitter = LangchainNodeParser( RecursiveCharacterTextSplitter(chunk_size=1200, chunk_overlap=100) ) curadoria_index = VectorStoreIndex.from_documents( curadoria_documents, storage_context=storage_context_curadoria, transformations=[curadoria_splitter] ) vector_store_curadoria.persist() curadoria_retriever = curadoria_index.as_retriever(similarity_top_k=2) # Combinação de Retrievers (Embeddings + BM25) vector_retriever = index.as_retriever(similarity_top_k=2) retriever = QueryFusionRetriever( [vector_retriever, bm25_retriever, curadoria_retriever], similarity_top_k=2, num_queries=0, mode="reciprocal_rerank", use_async=True, verbose=True, query_gen_prompt=( "Gere {num_queries} perguntas de busca relacionadas à seguinte pergunta. " "Priorize o significado da pergunta sobre qualquer histórico de conversa. " "Se o histórico não for relevante para a pergunta, ignore-o. " "Não adicione explicações, notas ou introduções. Apenas escreva as perguntas. " "Pergunta: {query}\n\n" "Perguntas:\n" ), ) # Configuração do chat engine nest_asyncio.apply() memory = ChatMemoryBuffer.from_defaults(token_limit=3900) query_engine = RetrieverQueryEngine.from_args(retriever) chat_engine = CondensePlusContextChatEngine.from_defaults( query_engine, memory=memory, context_prompt=( "Você é um assistente virtual capaz de interagir normalmente, além de" " fornecer informações sobre organogramas e listar funcionários." " Aqui estão os documentos relevantes para o contexto:\n" "{context_str}" "\nInstrução: Use o histórico da conversa anterior, ou o contexto acima, para responder." ), verbose=True, ) # Armazenamento do chat chat_store = SimpleChatStore() if os.path.exists(chat_store_path): chat_store = SimpleChatStore.from_persist_path(persist_path=chat_store_path) else: chat_store.persist(persist_path=chat_store_path) # Interface do Chatbot st.title("Chatbot Carômetro") st.write("Este chatbot pode te ajudar a conseguir informações relevantes sobre os carômetros da Sicoob.") if 'chat_history' not in st.session_state: st.session_state.chat_history = [] for message in st.session_state.chat_history: role, text = message.split(":", 1) with st.chat_message(role.strip().lower()): st.write(text.strip()) user_input = st.chat_input("Digite sua pergunta") if user_input: # Exibir a mensagem do usuário e adicionar ao histórico with st.chat_message('user'): st.write(user_input) st.session_state.chat_history.append(f"user: {user_input}") # Placeholder para a mensagem do assistente with st.chat_message('assistant'): message_placeholder = st.empty() assistant_message = '' # Obter a resposta em streaming do chat_engine response = chat_engine.stream_chat(user_input) for token in response.response_gen: assistant_message += token # Atualizar o placeholder da mensagem message_placeholder.markdown(assistant_message + "▌") # Remover o cursor após a conclusão message_placeholder.markdown(assistant_message) st.session_state.chat_history.append(f"assistant: {assistant_message}")