import gradio as gr import spaces import subprocess import os import shutil import string import random import glob from pypdf import PdfReader from sentence_transformers import SentenceTransformer model_name = os.environ.get("MODEL", "Snowflake/snowflake-arctic-embed-m") chunk_size = int(os.environ.get("CHUNK_SIZE", 128)) default_max_characters = int(os.environ.get("DEFAULT_MAX_CHARACTERS", 258)) model = SentenceTransformer(model_name) # model.to(device="cuda") @spaces.GPU def embed(queries, chunks) -> dict[str, list[tuple[str, float]]]: query_embeddings = model.encode(queries, prompt_name="query") document_embeddings = model.encode(chunks) scores = query_embeddings @ document_embeddings.T results = {} for query, query_scores in zip(queries, scores): chunk_idxs = [i for i in range(len(chunks))] # Obtenha uma estrutura como {query: [(chunk_idx, score), (chunk_idx, score), ...]} results[query] = list(zip(chunk_idxs, query_scores)) return results def extract_text_from_pdf(reader): full_text = "" for idx, page in enumerate(reader.pages): text = page.extract_text() if len(text) > 0: full_text += f"---- Página {idx} ----\n" + page.extract_text() + "\n\n" return full_text.strip() def convert(filename) -> str: plain_text_filetypes = [ ".txt", ".csv", ".tsv", ".md", ".yaml", ".toml", ".json", ".json5", ".jsonc", ] # Já é um arquivo de texto simples que não se beneficiaria com o pandoc, então retorne o conteúdo if any(filename.endswith(ft) for ft in plain_text_filetypes): with open(filename, "r") as f: return f.read() if filename.endswith(".pdf"): return extract_text_from_pdf(PdfReader(filename)) raise ValueError(f"Tipo de arquivo não suportado: {filename}") def chunk_to_length(text, max_length=512): chunks = [] while len(text) > max_length: chunks.append(text[:max_length]) text = text[max_length:] chunks.append(text) return chunks @spaces.GPU def predict(query, max_characters) -> str: #Incorpore a consulta query_embedding = model.encode(query, prompt_name="query") # Inicialize uma lista para armazenar todos os pedaços e suas semelhanças em todos os documentos all_chunks = [] # Iterar por todos os documentos for filename, doc in docs.items(): # Calcular produto escalar entre consultas e incorporações de documentos similarities = doc["embeddings"] @ query_embedding.T # Adicione pedaços e semelhanças à lista all_chunks all_chunks.extend([(filename, chunk, sim) for chunk, sim in zip(doc["chunks"], similarities)]) # Classifica todos os pedaços por similaridade all_chunks.sort(key=lambda x: x[2], reverse=True) # Inicialize um dicionário para armazenar partes relevantes para cada documento relevant_chunks = {} # Adicione os pedaços mais relevantes até que max_characters seja alcançado total_chars = 0 for filename, chunk, _ in all_chunks: if total_chars + len(chunk) <= max_characters: if filename not in relevant_chunks: relevant_chunks[filename] = [] relevant_chunks[filename].append(chunk) total_chars += len(chunk) else: break return {"relevant_chunks": relevant_chunks} docs = {} for filename in glob.glob("src/*"): if filename.endswith("add_your_files_here"): continue converted_doc = convert(filename) chunks = chunk_to_length(converted_doc, chunk_size) embeddings = model.encode(chunks) docs[filename] = { "chunks": chunks, "embeddings": embeddings, } gr.Interface( predict, inputs=[ gr.Textbox(label="Consulta feita sobre os documentos"), gr.Number(label="Máximo de caracteres de saída", value=default_max_characters), ], outputs=[gr.Dict(label="Pedaços relevantes")], title="Demonstração do modelo de ferramenta da comunidade ", description='''"Para usar o no HuggingChat com seus próprios documentos , comece clonando este espaço, adicione seus documentos à pasta `src` e então crie uma ferramenta comunitária com este espaço!" ,''' ).launch()