import logging
import sys
import os
import re
import base64
import nest_asyncio
import pandas as pd
from pathlib import Path
from typing import Any, Dict, List, Optional
from PIL import Image
import streamlit as st
import torch
from llama_index.core import Settings, SimpleDirectoryReader, StorageContext, Document
from llama_index.core.storage.docstore import SimpleDocumentStore
from llama_index.llms.ollama import Ollama
from llama_index.embeddings.ollama import OllamaEmbedding
from llama_index.core.node_parser import LangchainNodeParser
from langchain.text_splitter import RecursiveCharacterTextSplitter
from llama_index.core.storage.chat_store import SimpleChatStore
from llama_index.core.memory import ChatMemoryBuffer
from llama_index.core.query_engine import RetrieverQueryEngine
from llama_index.core.chat_engine import CondensePlusContextChatEngine
from llama_index.retrievers.bm25 import BM25Retriever
from llama_index.core.retrievers import QueryFusionRetriever
from llama_index.vector_stores.chroma import ChromaVectorStore
from llama_index.core import VectorStoreIndex
from llama_index.llms.huggingface import HuggingFaceLLM
from llama_index.llms.huggingface_api import HuggingFaceInferenceAPI
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
import chromadb
#Configuração da imagem da aba
im = Image.open("pngegg.png")
st.set_page_config(page_title = "Chatbot Carômetro", page_icon=im, layout = "wide")
#Removido loop e adicionado os.makedirs
os.makedirs("bm25_retriever", exist_ok=True)
os.makedirs("chat_store", exist_ok=True)
os.makedirs("chroma_db", exist_ok=True)
os.makedirs("documentos", exist_ok=True)
os.makedirs("curadoria", exist_ok=True)
os.makedirs("chroma_db_curadoria", exist_ok=True)
# Configuração do Streamlit
st.sidebar.title("Configuração de LLM")
sidebar_option = st.sidebar.radio("Selecione o LLM", ["gpt-3.5-turbo", "NuExtract-1.5"])
# logo_url = 'app\logos\logo-sicoob.jpg'
# st.sidebar.image(logo_url)
import base64
#Configuração da imagem da sidebar
with open("sicoob-logo.png", "rb") as f:
data = base64.b64encode(f.read()).decode("utf-8")
st.sidebar.markdown(
f"""
""",
unsafe_allow_html=True,
)
#if sidebar_option == "Ollama":
# Settings.llm = Ollama(model="llama3.2:latest", request_timeout=500.0, num_gpu=1)
# Settings.embed_model = OllamaEmbedding(model_name="nomic-embed-text:latest")
if sidebar_option == "gpt-3.5-turbo":
from llama_index.llms.openai import OpenAI
from llama_index.embeddings.openai import OpenAIEmbedding
Settings.llm = OpenAI(model="gpt-3.5-turbo")
Settings.embed_model = OpenAIEmbedding(model_name="text-embedding-ada-002")
elif sidebar_option == 'NuExtract-1.5':
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
logging.getLogger().addHandler(logging.StreamHandler(stream=sys.stdout))
#Embedding do huggingface
Settings.embed_model = HuggingFaceEmbedding(
model_name="BAAI/bge-small-en-v1.5"
)
#Carregamento do modelo local, descomentar o modelo desejado
llm = HuggingFaceLLM(
context_window=2048,
max_new_tokens=2048,
generate_kwargs={"do_sample": False},
#query_wrapper_prompt=query_wrapper_prompt,
#model_name="Qwen/Qwen2.5-Coder-32B-Instruct",
#model_name="Qwen/Qwen2.5-14B-Instruct",
# model_name="meta-llama/Llama-3.2-3B",
#model_name="HuggingFaceH4/zephyr-7b-beta",
# model_name="meta-llama/Meta-Llama-3-8B",
model_name="numind/NuExtract-1.5",
#model_name="meta-llama/Llama-3.2-3B",
tokenizer_name="numind/NuExtract-1.5",
device_map="auto",
tokenizer_kwargs={"max_length": 512},
# uncomment this if using CUDA to reduce memory usage
model_kwargs={"torch_dtype": torch.bfloat16},
)
chat = [
{"role": "user", "content": "Hello, how are you?"},
{"role": "assistant", "content": "I'm doing great. How can I help you today?"},
{"role": "user", "content": "I'd like to show off how chat templating works!"},
]
from transformers import AutoTokenizer
tokenizer = AutoTokenizer.from_pretrained("numind/NuExtract-1.5")
tokenizer.apply_chat_template(chat, tokenize=False)
Settings.chunk_size = 512
Settings.llm = llm
else:
raise Exception("Opção de LLM inválida!")
# Diretórios configurados pelo usuário
chat_store_path = os.path.join("chat_store", "chat_store.json")
documents_path = os.path.join("documentos")
chroma_storage_path = os.path.join("chroma_db") # Diretório para persistência do Chroma
chroma_storage_path_curadoria = os.path.join("chroma_db_curadoria") # Diretório para 'curadoria'
bm25_persist_path = os.path.join("bm25_retriever")
curadoria_path = os.path.join("curadoria")
# Classe CSV Customizada (novo código)
class CustomPandasCSVReader:
"""PandasCSVReader modificado para incluir cabeçalhos nos documentos."""
def __init__(
self,
*args: Any,
concat_rows: bool = True,
col_joiner: str = ", ",
row_joiner: str = "\n",
pandas_config: dict = {},
**kwargs: Any
) -> None:
self._concat_rows = concat_rows
self._col_joiner = col_joiner
self._row_joiner = row_joiner
self._pandas_config = pandas_config
def load_data(
self,
file: Path,
extra_info: Optional[Dict] = None,
) -> List[Document]:
df = pd.read_csv(file, **self._pandas_config)
text_list = [" ".join(df.columns.astype(str))]
text_list += (
df.astype(str)
.apply(lambda row: self._col_joiner.join(row.values), axis=1)
.tolist()
)
metadata = {"filename": file.name, "extension": file.suffix}
if extra_info:
metadata.update(extra_info)
if self._concat_rows:
return [Document(text=self._row_joiner.join(text_list), metadata=metadata)]
else:
return [
Document(text=text, metadata=metadata)
for text in text_list
]
def clean_documents(documents):
"""Remove caracteres não desejados diretamente nos textos dos documentos."""
cleaned_documents = []
for doc in documents:
cleaned_text = re.sub(r"[^0-9A-Za-zÀ-ÿ ]", "", doc.get_content())
doc.text = cleaned_text
cleaned_documents.append(doc)
return cleaned_documents
from llama_index.readers.google import GoogleDriveReader
import json
credentials_json = os.getenv('GOOGLE_CREDENTIALS')
if credentials_json is None:
raise ValueError("The GOOGLE_CREDENTIALS environment variable is not set.")
# Write the credentials to a file
credentials_path = "credentials.json"
with open(credentials_path, 'w') as credentials_file:
credentials_file.write(credentials_json)
google_drive_reader = GoogleDriveReader(credentials_path=credentials_path)
google_drive_reader._creds = google_drive_reader._get_credentials()
def are_docs_downloaded(directory_path: str) -> bool:
return os.path.isdir(directory_path) and any(os.scandir(directory_path))
def download_original_files_from_folder(greader: GoogleDriveReader, pasta_documentos_drive: str, local_path: str):
os.makedirs(local_path, exist_ok=True)
files_meta = greader._get_fileids_meta(folder_id=pasta_documentos_drive)
if not files_meta:
logging.info("Nenhum arquivo encontrado na pasta especificada.")
return
for fmeta in files_meta:
file_id = fmeta[0]
file_name = os.path.basename(fmeta[2])
local_file_path = os.path.join(local_path, file_name)
if os.path.exists(local_file_path):
logging.info(f"Arquivo '{file_name}' já existe localmente, ignorando download.")
continue
downloaded_file_path = greader._download_file(file_id, local_file_path)
if downloaded_file_path:
logging.info(f"Arquivo '{file_name}' baixado com sucesso em: {downloaded_file_path}")
else:
logging.warning(f"Não foi possível baixar '{file_name}'")
#DADOS/QA_database/Documentos CSV/documentos
pasta_documentos_drive = "1xVzo8s1D0blzR5ZB3m5k4dVWHuRmKUu-"
#DADOS/QA_database/Documentos CSV/curadoria
pasta_curadoria_drive = "1LRrdOkZy9p0FA3MQAyz-Ssj3ktKTWAwE"
# Verifica e baixa arquivos se necessário (novo código)
if not are_docs_downloaded(documents_path):
logging.info("Baixando arquivos originais do Drive para 'documentos'...")
download_original_files_from_folder(google_drive_reader, pasta_documentos_drive, documents_path)
else:
logging.info("'documentos' já contém arquivos, ignorando download.")
if not are_docs_downloaded(curadoria_path):
logging.info("Baixando arquivos originais do Drive para 'curadoria'...")
download_original_files_from_folder(google_drive_reader, pasta_curadoria_drive, curadoria_path)
else:
logging.info("'curadoria' já contém arquivos, ignorando download.")
# Configuração de leitura de documentos
file_extractor = {".csv": CustomPandasCSVReader()}
documents = SimpleDirectoryReader(
input_dir=documents_path,
file_extractor=file_extractor,
filename_as_id=True
).load_data()
documents = clean_documents(documents)
# Configuração do Chroma e BM25 com persistência
docstore = SimpleDocumentStore()
docstore.add_documents(documents)
db = chromadb.PersistentClient(path=chroma_storage_path)
chroma_collection = db.get_or_create_collection("dense_vectors")
vector_store = ChromaVectorStore(chroma_collection=chroma_collection)
# Configuração do StorageContext
storage_context = StorageContext.from_defaults(
docstore=docstore, vector_store=vector_store
)
# Criação/Recarregamento do índice com embeddings
if os.path.exists(chroma_storage_path):
index = VectorStoreIndex.from_vector_store(vector_store)
else:
splitter = LangchainNodeParser(
RecursiveCharacterTextSplitter(chunk_size=512, chunk_overlap=64)
)
index = VectorStoreIndex.from_documents(
documents,
storage_context=storage_context,
transformations=[splitter]
)
vector_store.persist()
# Criação/Recarregamento do BM25 Retriever
if os.path.exists(os.path.join(bm25_persist_path, "params.index.json")):
bm25_retriever = BM25Retriever.from_persist_dir(bm25_persist_path)
else:
bm25_retriever = BM25Retriever.from_defaults(
docstore=docstore,
similarity_top_k=2,
language="portuguese", # Idioma ajustado para seu caso
)
os.makedirs(bm25_persist_path, exist_ok=True)
bm25_retriever.persist(bm25_persist_path)
#Adicionado documentos na pasta curadoria, foi setado para 1200 o chunk pra receber pergunta, contexto e resposta
curadoria_documents = SimpleDirectoryReader(
input_dir=curadoria_path,
file_extractor=file_extractor,
filename_as_id=True
).load_data()
curadoria_documents = clean_documents(curadoria_documents)
curadoria_docstore = SimpleDocumentStore()
curadoria_docstore.add_documents(curadoria_documents)
db_curadoria = chromadb.PersistentClient(path=chroma_storage_path_curadoria)
chroma_collection_curadoria = db_curadoria.get_or_create_collection("dense_vectors_curadoria")
vector_store_curadoria = ChromaVectorStore(chroma_collection=chroma_collection_curadoria)
# Configuração do StorageContext para 'curadoria'
storage_context_curadoria = StorageContext.from_defaults(
docstore=curadoria_docstore, vector_store=vector_store_curadoria
)
# Criação/Recarregamento do índice com embeddings para 'curadoria'
if os.path.exists(chroma_storage_path_curadoria):
curadoria_index = VectorStoreIndex.from_vector_store(vector_store_curadoria)
else:
curadoria_splitter = LangchainNodeParser(
RecursiveCharacterTextSplitter(chunk_size=1200, chunk_overlap=100)
)
curadoria_index = VectorStoreIndex.from_documents(
curadoria_documents, storage_context=storage_context_curadoria, transformations=[curadoria_splitter]
)
vector_store_curadoria.persist()
curadoria_retriever = curadoria_index.as_retriever(similarity_top_k=2)
# Combinação de Retrievers (Embeddings + BM25)
vector_retriever = index.as_retriever(similarity_top_k=2)
retriever = QueryFusionRetriever(
[vector_retriever, bm25_retriever, curadoria_retriever],
similarity_top_k=2,
num_queries=0,
mode="reciprocal_rerank",
use_async=True,
verbose=True,
query_gen_prompt=(
"Gere {num_queries} perguntas de busca relacionadas à seguinte pergunta. "
"Priorize o significado da pergunta sobre qualquer histórico de conversa. "
"Se o histórico não for relevante para a pergunta, ignore-o. "
"Não adicione explicações, notas ou introduções. Apenas escreva as perguntas. "
"Pergunta: {query}\n\n"
"Perguntas:\n"
),
)
# Configuração do chat engine
nest_asyncio.apply()
memory = ChatMemoryBuffer.from_defaults(token_limit=3900)
query_engine = RetrieverQueryEngine.from_args(retriever)
chat_engine = CondensePlusContextChatEngine.from_defaults(
query_engine,
memory=memory,
context_prompt=(
"Você é um assistente virtual capaz de interagir normalmente, além de"
" fornecer informações sobre organogramas e listar funcionários."
" Aqui estão os documentos relevantes para o contexto:\n"
"{context_str}"
"\nInstrução: Use o histórico da conversa anterior, ou o contexto acima, para responder."
),
verbose=True,
)
# Armazenamento do chat
chat_store = SimpleChatStore()
if os.path.exists(chat_store_path):
chat_store = SimpleChatStore.from_persist_path(persist_path=chat_store_path)
else:
chat_store.persist(persist_path=chat_store_path)
# Interface do Chatbot
st.title("Chatbot Carômetro")
st.write("Este chatbot pode te ajudar a conseguir informações relevantes sobre os carômetros da Sicoob.")
if 'chat_history' not in st.session_state:
st.session_state.chat_history = []
for message in st.session_state.chat_history:
role, text = message.split(":", 1)
with st.chat_message(role.strip().lower()):
st.write(text.strip())
user_input = st.chat_input("Digite sua pergunta")
if user_input:
# Exibir a mensagem do usuário e adicionar ao histórico
with st.chat_message('user'):
st.write(user_input)
st.session_state.chat_history.append(f"user: {user_input}")
# Placeholder para a mensagem do assistente
with st.chat_message('assistant'):
message_placeholder = st.empty()
assistant_message = ''
# Obter a resposta em streaming do chat_engine
response = chat_engine.stream_chat(user_input)
for token in response.response_gen:
assistant_message += token
# Atualizar o placeholder da mensagem
message_placeholder.markdown(assistant_message + "▌")
# Remover o cursor após a conclusão
message_placeholder.markdown(assistant_message)
st.session_state.chat_history.append(f"assistant: {assistant_message}")