Spaces:
Running
Running
import logging | |
import sys | |
import os | |
import re | |
import base64 | |
import nest_asyncio | |
import pandas as pd | |
from pathlib import Path | |
from typing import Any, Dict, List, Optional | |
from PIL import Image | |
import streamlit as st | |
import torch | |
from llama_index.core import Settings, SimpleDirectoryReader, StorageContext, Document | |
from llama_index.core.storage.docstore import SimpleDocumentStore | |
from llama_index.llms.ollama import Ollama | |
from llama_index.embeddings.ollama import OllamaEmbedding | |
from llama_index.core.node_parser import LangchainNodeParser | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
from llama_index.core.storage.chat_store import SimpleChatStore | |
from llama_index.core.memory import ChatMemoryBuffer | |
from llama_index.core.query_engine import RetrieverQueryEngine | |
from llama_index.core.chat_engine import CondensePlusContextChatEngine | |
from llama_index.retrievers.bm25 import BM25Retriever | |
from llama_index.core.retrievers import QueryFusionRetriever | |
from llama_index.vector_stores.chroma import ChromaVectorStore | |
from llama_index.core import VectorStoreIndex | |
from llama_index.llms.huggingface import HuggingFaceLLM | |
from llama_index.llms.huggingface_api import HuggingFaceInferenceAPI | |
from llama_index.embeddings.huggingface import HuggingFaceEmbedding | |
import chromadb | |
#Configuração da imagem da aba | |
im = Image.open("pngegg.png") | |
st.set_page_config(page_title = "Chatbot Carômetro", page_icon=im, layout = "wide") | |
#Removido loop e adicionado os.makedirs | |
os.makedirs("bm25_retriever", exist_ok=True) | |
os.makedirs("chat_store", exist_ok=True) | |
os.makedirs("chroma_db", exist_ok=True) | |
os.makedirs("documentos", exist_ok=True) | |
os.makedirs("curadoria", exist_ok=True) | |
os.makedirs("chroma_db_curadoria", exist_ok=True) | |
# Configuração do Streamlit | |
st.sidebar.title("Configuração de LLM") | |
sidebar_option = st.sidebar.radio("Selecione o LLM", ["gpt-3.5-turbo", "NuExtract-1.5"]) | |
# logo_url = 'app\logos\logo-sicoob.jpg' | |
# st.sidebar.image(logo_url) | |
import base64 | |
#Configuração da imagem da sidebar | |
with open("sicoob-logo.png", "rb") as f: | |
data = base64.b64encode(f.read()).decode("utf-8") | |
st.sidebar.markdown( | |
f""" | |
<div style="display:table;margin-top:-80%;margin-left:0%;"> | |
<img src="data:image/png;base64,{data}" width="250" height="70"> | |
</div> | |
""", | |
unsafe_allow_html=True, | |
) | |
#if sidebar_option == "Ollama": | |
# Settings.llm = Ollama(model="llama3.2:latest", request_timeout=500.0, num_gpu=1) | |
# Settings.embed_model = OllamaEmbedding(model_name="nomic-embed-text:latest") | |
if sidebar_option == "gpt-3.5-turbo": | |
from llama_index.llms.openai import OpenAI | |
from llama_index.embeddings.openai import OpenAIEmbedding | |
Settings.llm = OpenAI(model="gpt-3.5-turbo") | |
Settings.embed_model = OpenAIEmbedding(model_name="text-embedding-ada-002") | |
elif sidebar_option == 'NuExtract-1.5': | |
logging.basicConfig(stream=sys.stdout, level=logging.INFO) | |
logging.getLogger().addHandler(logging.StreamHandler(stream=sys.stdout)) | |
#Embedding do huggingface | |
Settings.embed_model = HuggingFaceEmbedding( | |
model_name="BAAI/bge-small-en-v1.5" | |
) | |
#Carregamento do modelo local, descomentar o modelo desejado | |
llm = HuggingFaceLLM( | |
context_window=2048, | |
max_new_tokens=2048, | |
generate_kwargs={"do_sample": False}, | |
#query_wrapper_prompt=query_wrapper_prompt, | |
#model_name="Qwen/Qwen2.5-Coder-32B-Instruct", | |
#model_name="Qwen/Qwen2.5-14B-Instruct", | |
# model_name="meta-llama/Llama-3.2-3B", | |
#model_name="HuggingFaceH4/zephyr-7b-beta", | |
# model_name="meta-llama/Meta-Llama-3-8B", | |
model_name="numind/NuExtract-1.5", | |
#model_name="meta-llama/Llama-3.2-3B", | |
tokenizer_name="numind/NuExtract-1.5", | |
device_map="auto", | |
tokenizer_kwargs={"max_length": 512}, | |
# uncomment this if using CUDA to reduce memory usage | |
model_kwargs={"torch_dtype": torch.bfloat16}, | |
) | |
chat = [ | |
{"role": "user", "content": "Hello, how are you?"}, | |
{"role": "assistant", "content": "I'm doing great. How can I help you today?"}, | |
{"role": "user", "content": "I'd like to show off how chat templating works!"}, | |
] | |
from transformers import AutoTokenizer | |
tokenizer = AutoTokenizer.from_pretrained("numind/NuExtract-1.5") | |
tokenizer.apply_chat_template(chat, tokenize=False) | |
Settings.chunk_size = 512 | |
Settings.llm = llm | |
else: | |
raise Exception("Opção de LLM inválida!") | |
# Diretórios configurados pelo usuário | |
chat_store_path = os.path.join("chat_store", "chat_store.json") | |
documents_path = os.path.join("documentos") | |
chroma_storage_path = os.path.join("chroma_db") # Diretório para persistência do Chroma | |
chroma_storage_path_curadoria = os.path.join("chroma_db_curadoria") # Diretório para 'curadoria' | |
bm25_persist_path = os.path.join("bm25_retriever") | |
curadoria_path = os.path.join("curadoria") | |
# Classe CSV Customizada (novo código) | |
class CustomPandasCSVReader: | |
"""PandasCSVReader modificado para incluir cabeçalhos nos documentos.""" | |
def __init__( | |
self, | |
*args: Any, | |
concat_rows: bool = True, | |
col_joiner: str = ", ", | |
row_joiner: str = "\n", | |
pandas_config: dict = {}, | |
**kwargs: Any | |
) -> None: | |
self._concat_rows = concat_rows | |
self._col_joiner = col_joiner | |
self._row_joiner = row_joiner | |
self._pandas_config = pandas_config | |
def load_data( | |
self, | |
file: Path, | |
extra_info: Optional[Dict] = None, | |
) -> List[Document]: | |
df = pd.read_csv(file, **self._pandas_config) | |
text_list = [" ".join(df.columns.astype(str))] | |
text_list += ( | |
df.astype(str) | |
.apply(lambda row: self._col_joiner.join(row.values), axis=1) | |
.tolist() | |
) | |
metadata = {"filename": file.name, "extension": file.suffix} | |
if extra_info: | |
metadata.update(extra_info) | |
if self._concat_rows: | |
return [Document(text=self._row_joiner.join(text_list), metadata=metadata)] | |
else: | |
return [ | |
Document(text=text, metadata=metadata) | |
for text in text_list | |
] | |
def clean_documents(documents): | |
"""Remove caracteres não desejados diretamente nos textos dos documentos.""" | |
cleaned_documents = [] | |
for doc in documents: | |
cleaned_text = re.sub(r"[^0-9A-Za-zÀ-ÿ ]", "", doc.get_content()) | |
doc.text = cleaned_text | |
cleaned_documents.append(doc) | |
return cleaned_documents | |
from llama_index.readers.google import GoogleDriveReader | |
import json | |
credentials_json = os.getenv('GOOGLE_CREDENTIALS') | |
if credentials_json is None: | |
raise ValueError("The GOOGLE_CREDENTIALS environment variable is not set.") | |
# Write the credentials to a file | |
credentials_path = "credentials.json" | |
with open(credentials_path, 'w') as credentials_file: | |
credentials_file.write(credentials_json) | |
google_drive_reader = GoogleDriveReader(credentials_path=credentials_path) | |
google_drive_reader._creds = google_drive_reader._get_credentials() | |
def are_docs_downloaded(directory_path: str) -> bool: | |
return os.path.isdir(directory_path) and any(os.scandir(directory_path)) | |
def download_original_files_from_folder(greader: GoogleDriveReader, pasta_documentos_drive: str, local_path: str): | |
os.makedirs(local_path, exist_ok=True) | |
files_meta = greader._get_fileids_meta(folder_id=pasta_documentos_drive) | |
if not files_meta: | |
logging.info("Nenhum arquivo encontrado na pasta especificada.") | |
return | |
for fmeta in files_meta: | |
file_id = fmeta[0] | |
file_name = os.path.basename(fmeta[2]) | |
local_file_path = os.path.join(local_path, file_name) | |
if os.path.exists(local_file_path): | |
logging.info(f"Arquivo '{file_name}' já existe localmente, ignorando download.") | |
continue | |
downloaded_file_path = greader._download_file(file_id, local_file_path) | |
if downloaded_file_path: | |
logging.info(f"Arquivo '{file_name}' baixado com sucesso em: {downloaded_file_path}") | |
else: | |
logging.warning(f"Não foi possível baixar '{file_name}'") | |
#DADOS/QA_database/Documentos CSV/documentos | |
pasta_documentos_drive = "1xVzo8s1D0blzR5ZB3m5k4dVWHuRmKUu-" | |
#DADOS/QA_database/Documentos CSV/curadoria | |
pasta_curadoria_drive = "1LRrdOkZy9p0FA3MQAyz-Ssj3ktKTWAwE" | |
# Verifica e baixa arquivos se necessário (novo código) | |
if not are_docs_downloaded(documents_path): | |
logging.info("Baixando arquivos originais do Drive para 'documentos'...") | |
download_original_files_from_folder(google_drive_reader, pasta_documentos_drive, documents_path) | |
else: | |
logging.info("'documentos' já contém arquivos, ignorando download.") | |
if not are_docs_downloaded(curadoria_path): | |
logging.info("Baixando arquivos originais do Drive para 'curadoria'...") | |
download_original_files_from_folder(google_drive_reader, pasta_curadoria_drive, curadoria_path) | |
else: | |
logging.info("'curadoria' já contém arquivos, ignorando download.") | |
# Configuração de leitura de documentos | |
file_extractor = {".csv": CustomPandasCSVReader()} | |
documents = SimpleDirectoryReader( | |
input_dir=documents_path, | |
file_extractor=file_extractor, | |
filename_as_id=True | |
).load_data() | |
documents = clean_documents(documents) | |
# Configuração do Chroma e BM25 com persistência | |
docstore = SimpleDocumentStore() | |
docstore.add_documents(documents) | |
db = chromadb.PersistentClient(path=chroma_storage_path) | |
chroma_collection = db.get_or_create_collection("dense_vectors") | |
vector_store = ChromaVectorStore(chroma_collection=chroma_collection) | |
# Configuração do StorageContext | |
storage_context = StorageContext.from_defaults( | |
docstore=docstore, vector_store=vector_store | |
) | |
# Criação/Recarregamento do índice com embeddings | |
if os.path.exists(chroma_storage_path): | |
index = VectorStoreIndex.from_vector_store(vector_store) | |
else: | |
splitter = LangchainNodeParser( | |
RecursiveCharacterTextSplitter(chunk_size=512, chunk_overlap=64) | |
) | |
index = VectorStoreIndex.from_documents( | |
documents, | |
storage_context=storage_context, | |
transformations=[splitter] | |
) | |
vector_store.persist() | |
# Criação/Recarregamento do BM25 Retriever | |
if os.path.exists(os.path.join(bm25_persist_path, "params.index.json")): | |
bm25_retriever = BM25Retriever.from_persist_dir(bm25_persist_path) | |
else: | |
bm25_retriever = BM25Retriever.from_defaults( | |
docstore=docstore, | |
similarity_top_k=2, | |
language="portuguese", # Idioma ajustado para seu caso | |
) | |
os.makedirs(bm25_persist_path, exist_ok=True) | |
bm25_retriever.persist(bm25_persist_path) | |
#Adicionado documentos na pasta curadoria, foi setado para 1200 o chunk pra receber pergunta, contexto e resposta | |
curadoria_documents = SimpleDirectoryReader( | |
input_dir=curadoria_path, | |
file_extractor=file_extractor, | |
filename_as_id=True | |
).load_data() | |
curadoria_documents = clean_documents(curadoria_documents) | |
curadoria_docstore = SimpleDocumentStore() | |
curadoria_docstore.add_documents(curadoria_documents) | |
db_curadoria = chromadb.PersistentClient(path=chroma_storage_path_curadoria) | |
chroma_collection_curadoria = db_curadoria.get_or_create_collection("dense_vectors_curadoria") | |
vector_store_curadoria = ChromaVectorStore(chroma_collection=chroma_collection_curadoria) | |
# Configuração do StorageContext para 'curadoria' | |
storage_context_curadoria = StorageContext.from_defaults( | |
docstore=curadoria_docstore, vector_store=vector_store_curadoria | |
) | |
# Criação/Recarregamento do índice com embeddings para 'curadoria' | |
if os.path.exists(chroma_storage_path_curadoria): | |
curadoria_index = VectorStoreIndex.from_vector_store(vector_store_curadoria) | |
else: | |
curadoria_splitter = LangchainNodeParser( | |
RecursiveCharacterTextSplitter(chunk_size=1200, chunk_overlap=100) | |
) | |
curadoria_index = VectorStoreIndex.from_documents( | |
curadoria_documents, storage_context=storage_context_curadoria, transformations=[curadoria_splitter] | |
) | |
vector_store_curadoria.persist() | |
curadoria_retriever = curadoria_index.as_retriever(similarity_top_k=2) | |
# Combinação de Retrievers (Embeddings + BM25) | |
vector_retriever = index.as_retriever(similarity_top_k=2) | |
retriever = QueryFusionRetriever( | |
[vector_retriever, bm25_retriever, curadoria_retriever], | |
similarity_top_k=2, | |
num_queries=0, | |
mode="reciprocal_rerank", | |
use_async=True, | |
verbose=True, | |
query_gen_prompt=( | |
"Gere {num_queries} perguntas de busca relacionadas à seguinte pergunta. " | |
"Priorize o significado da pergunta sobre qualquer histórico de conversa. " | |
"Se o histórico não for relevante para a pergunta, ignore-o. " | |
"Não adicione explicações, notas ou introduções. Apenas escreva as perguntas. " | |
"Pergunta: {query}\n\n" | |
"Perguntas:\n" | |
), | |
) | |
# Configuração do chat engine | |
nest_asyncio.apply() | |
memory = ChatMemoryBuffer.from_defaults(token_limit=3900) | |
query_engine = RetrieverQueryEngine.from_args(retriever) | |
chat_engine = CondensePlusContextChatEngine.from_defaults( | |
query_engine, | |
memory=memory, | |
context_prompt=( | |
"Você é um assistente virtual capaz de interagir normalmente, além de" | |
" fornecer informações sobre organogramas e listar funcionários." | |
" Aqui estão os documentos relevantes para o contexto:\n" | |
"{context_str}" | |
"\nInstrução: Use o histórico da conversa anterior, ou o contexto acima, para responder." | |
), | |
verbose=True, | |
) | |
# Armazenamento do chat | |
chat_store = SimpleChatStore() | |
if os.path.exists(chat_store_path): | |
chat_store = SimpleChatStore.from_persist_path(persist_path=chat_store_path) | |
else: | |
chat_store.persist(persist_path=chat_store_path) | |
# Interface do Chatbot | |
st.title("Chatbot Carômetro") | |
st.write("Este chatbot pode te ajudar a conseguir informações relevantes sobre os carômetros da Sicoob.") | |
if 'chat_history' not in st.session_state: | |
st.session_state.chat_history = [] | |
for message in st.session_state.chat_history: | |
role, text = message.split(":", 1) | |
with st.chat_message(role.strip().lower()): | |
st.write(text.strip()) | |
user_input = st.chat_input("Digite sua pergunta") | |
if user_input: | |
# Exibir a mensagem do usuário e adicionar ao histórico | |
with st.chat_message('user'): | |
st.write(user_input) | |
st.session_state.chat_history.append(f"user: {user_input}") | |
# Placeholder para a mensagem do assistente | |
with st.chat_message('assistant'): | |
message_placeholder = st.empty() | |
assistant_message = '' | |
# Obter a resposta em streaming do chat_engine | |
response = chat_engine.stream_chat(user_input) | |
for token in response.response_gen: | |
assistant_message += token | |
# Atualizar o placeholder da mensagem | |
message_placeholder.markdown(assistant_message + "▌") | |
# Remover o cursor após a conclusão | |
message_placeholder.markdown(assistant_message) | |
st.session_state.chat_history.append(f"assistant: {assistant_message}") |