from datasets import load_dataset from transformers import ( DPRQuestionEncoder, DPRQuestionEncoderTokenizer, MT5ForConditionalGeneration, AutoTokenizer, AutoModelForCTC, Wav2Vec2Tokenizer, ) from general_utils import ( embed_questions, transcript, remove_chars_to_tts, parse_final_answer, ) from typing import List import gradio as gr from article_app import article, description, examples from haystack.nodes import DensePassageRetriever from haystack.document_stores import InMemoryDocumentStore import numpy as np from sentence_transformers import SentenceTransformer, util, CrossEncoder topk = 21 minchars = 200 min_snippet_length = 20 device = "cpu" covidterms = ["covid19", "covid", "coronavirus", "covid-19", "sars-cov-2"] models = { "wav2vec2-iic": { "processor": Wav2Vec2Tokenizer.from_pretrained( "IIC/wav2vec2-spanish-multilibrispeech" ), "model": AutoModelForCTC.from_pretrained( "IIC/wav2vec2-spanish-multilibrispeech" ), }, } tts_es = gr.Interface.load("huggingface/facebook/tts_transformer-es-css10") params_generate = { "min_length": 50, "max_length": 250, "do_sample": False, "early_stopping": True, "num_beams": 8, "temperature": 1.0, "top_k": None, "top_p": None, "no_repeat_ngram_size": 3, "num_return_sequences": 1, } dpr = DensePassageRetriever( document_store=InMemoryDocumentStore(), query_embedding_model="IIC/dpr-spanish-question_encoder-allqa-base", passage_embedding_model="IIC/dpr-spanish-passage_encoder-allqa-base", max_seq_len_query=64, max_seq_len_passage=256, batch_size=512, use_gpu=False, ) mt5_tokenizer = AutoTokenizer.from_pretrained("IIC/mt5-base-lfqa-es") mt5_lfqa = MT5ForConditionalGeneration.from_pretrained("IIC/mt5-base-lfqa-es") similarity_model = SentenceTransformer( "distiluse-base-multilingual-cased", device="cpu" ) crossencoder = CrossEncoder("IIC/roberta-base-bne-ranker", device="cpu") dataset = load_dataset("IIC/spanish_biomedical_crawled_corpus", split="train") dataset = dataset.filter(lambda example: len(example["text"]) > minchars) dataset.load_faiss_index( "embeddings", "dpr_index_bio_newdpr.faiss", ) def query_index(question: str): question_embedding = dpr.embed_queries([question])[0] scores, closest_passages = dataset.get_nearest_examples( "embeddings", question_embedding, k=topk ) contexts = [ closest_passages["text"][i] for i in range(len(closest_passages["text"])) ] # [:int(topk / 3)] return [ context for context in contexts if len(context.split()) > min_snippet_length ] """ def sort_on_similarity(question, contexts, include_rank: int = 5): # TODO: METER AQUÍ EL CROSSENCODER nuestro question_encoded = similarity_model.encode([question])[0] ctxs_encoded = similarity_model.encode(contexts) similarity_scores = [ util.cos_sim(question_encoded, ctx_encoded) for ctx_encoded in ctxs_encoded ] similarity_ranking_idx = np.flip(np.argsort(similarity_scores)) return [contexts[idx] for idx in similarity_ranking_idx][:include_rank] """ def sort_on_similarity(question, contexts, include_rank: int = 5): question_encoded = similarity_model.encode([question])[0] ctxs_encoded = similarity_model.encode(contexts) sim_scores_ss = [ util.cos_sim(question_encoded, ctx_encoded) for ctx_encoded in ctxs_encoded ] text_pairs = [[question, ctx] for ctx in contexts] similarity_scores = crossencoder.predict(text_pairs) similarity_scores = sim_scores_ss * similarity_scores similarity_ranking_idx = np.flip(np.argsort(similarity_scores)) return [contexts[idx] for idx in similarity_ranking_idx][:include_rank] def create_context(contexts: List): return "
" + "
".join(contexts) def create_model_input(question: str, context: str): return f"question: {question} context: {context}" def generate_answer(model_input, update_params): model_input = mt5_tokenizer( model_input, truncation=True, padding=True, return_tensors="pt", max_length=1024 ) params_generate.update(update_params) answers_encoded = mt5_lfqa.generate( input_ids=model_input["input_ids"].to(device), attention_mask=model_input["attention_mask"].to(device), **params_generate, ) answers = mt5_tokenizer.batch_decode( answers_encoded, skip_special_tokens=True, clean_up_tokenization_spaces=True ) results = [{"generated_text": answer} for answer in answers] return results def search_and_answer( question, audio_file, audio_array, min_length_answer, num_beams, no_repeat_ngram_size, temperature, max_answer_length, wav2vec2_name, do_tts, ): update_params = { "min_length": min_length_answer, "max_length": max_answer_length, "num_beams": int(num_beams), "temperature": temperature, "no_repeat_ngram_size": no_repeat_ngram_size, } if not question: s2t_model = models[wav2vec2_name]["model"] s2t_processor = models[wav2vec2_name]["processor"] question = transcript( audio_file, audio_array, processor=s2t_processor, model=s2t_model ) print(f"Transcripted question: *** {question} ****") if any([any([term in word.lower() for term in covidterms]) for word in question.split(" ")]): return "Del COVID no queremos saber ya más nada, lo sentimos, pregúntame sobre otra cosa :P ", "tmptdsnrh_8.flac" contexts = query_index(question) contexts = sort_on_similarity(question, contexts) context = create_context(contexts) model_input = create_model_input(question, context) answers = generate_answer(model_input, update_params) final_answer = answers[0]["generated_text"] if do_tts: audio_answer = tts_es(remove_chars_to_tts(final_answer)) final_answer, documents = parse_final_answer(final_answer, contexts) return final_answer, documents, audio_answer if do_tts else "audio_troll.flac" if __name__ == "__main__": gr.Interface( search_and_answer, inputs=[ gr.inputs.Textbox( lines=2, label="Question", placeholder="Type your question (in spanish) to the system.", optional=True, ), gr.inputs.Audio( source="upload", type="filepath", label="Upload your audio asking a question here.", optional=True, ), gr.inputs.Audio( source="microphone", type="numpy", label="Record your audio asking a question.", optional=True, ), gr.inputs.Slider( minimum=10, maximum=200, default=50, label="Minimum size for the answer", step=1, ), gr.inputs.Slider( minimum=4, maximum=12, default=8, label="number of beams", step=1 ), gr.inputs.Slider( minimum=2, maximum=5, default=3, label="no repeat n-gram size", step=1 ), gr.inputs.Slider( minimum=0.8, maximum=2.0, default=1.0, label="temperature", step=0.1 ), gr.inputs.Slider( minimum=220, maximum=360, default=250, label="maximum answer length", step=1, ), gr.inputs.Dropdown( ["wav2vec2-iic"], type="value", default=None, label="Select the speech recognition model.", optional=False, ), gr.inputs.Checkbox( default=False, label="Text to Speech", optional=True), ], outputs=[ gr.outputs.HTML( label="Generated Answer." ), gr.outputs.HTML( label="Documents used." ), gr.outputs.Audio(label="Answer in audio"), ], description=description, examples=examples, theme="grass", article=article, thumbnail="IIC_logoP.png", css="https://cdn.jsdelivr.net/npm/bootstrap@3.3.7/dist/css/bootstrap.min.css", ).launch()