Chatbot_Mining / app.py
Nguyen17's picture
update
65f3fa5 verified
import yaml
import torch
import logging
import argparse
import warnings
import pandas as pd
from tqdm.auto import tqdm
from jsonargparse import CLI
from types import SimpleNamespace
from llama_index.core.schema import TextNode
from langchain_huggingface import HuggingFaceEmbeddings
from llama_index.core import Prompt, Settings, VectorStoreIndex
from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig, TextStreamer
import gradio as gr
import os
import shutil
from pathlib import Path
from docx.api import Document
from types import SimpleNamespace
from llama_index.core import SimpleDirectoryReader
from utils.process_tables import extract_and_replace_docx_tables
from langchain._api import LangChainDeprecationWarning
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("script.log"),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
def load_config(file_path='config.yaml'):
logger.info('Loading config file ...')
try:
with open(file_path, 'r') as file:
cfg = yaml.safe_load(file)
for k, v in cfg.items():
if isinstance(v, dict):
cfg[k] = SimpleNamespace(**v)
logger.info('Config file loaded successfully.')
return SimpleNamespace(**cfg)
except Exception as e:
logger.error(f'Error loading config file: {e}')
raise
cfg = load_config()
def process_docx_files(data_dir=Path(cfg.dataset.data_dir),
processed_data_dir=Path(cfg.dataset.processed_data_dir),
chunk_marker=cfg.dataset.chunk_marker):
try:
if not os.path.exists(processed_data_dir):
shutil.rmtree(processed_data_dir)
docx_files = [file for file in os.listdir(data_dir) if file.endswith('.docx')]
logger.info(f'Found {len(docx_files)} DOCX files to process.')
for fname in docx_files:
document, html_chunked_tables = extract_and_replace_docx_tables(
docx_file=data_dir / fname,
chunk_marker=chunk_marker
)
document.save(processed_data_dir / f'processed_{fname}')
logger.info(f'Processed and saved {fname}')
except Exception as e:
logger.error(f'Error processing DOCX files: {e}')
raise
def load_processed_data(processed_data_dir=Path(cfg.dataset.processed_data_dir)):
try:
documents = SimpleDirectoryReader(
input_dir=processed_data_dir,
required_exts=[cfg.dataset.required_exts],
).load_data()
logger.info('Processed data loaded successfully.')
return documents
except Exception as e:
logger.error(f'Error loading processed data: {e}')
raise
def get_chunks(documents, chunk_marker=cfg.dataset.chunk_marker):
try:
chunks = [chunk.strip() for doc in documents for chunk in doc.text.split(chunk_marker) if chunk.strip()]
logger.info(f'Extracted {len(chunks)} chunks from documents.')
return chunks
except Exception as e:
logger.error(f'Error extracting chunks: {e}')
raise
def main_prepare():
logger.info('Starting document processing ...')
try:
process_docx_files()
documents = load_processed_data()
chunks = get_chunks(documents)
num_chunks = len(chunks)
logger.info(f'Total number of chunks: {num_chunks}')
df_chunks = pd.DataFrame({'chunk': chunks})
df_chunks.to_pickle('processed_chunks.pickle')
logger.info('All chunks saved to processed_chunks.pickle')
except Exception as e:
logger.error(f'Error in main processing: {e}')
raise
def load_config(config_path='config.yaml'):
print('-> Loading config file ...')
cfg = yaml.safe_load(
open(config_path).read()
)
for k,v in cfg.items():
if type(v) == dict:
cfg[k] = SimpleNamespace(**v)
cfg = SimpleNamespace(**cfg)
return cfg
def get_prompt_template():
template = (
"Bạn là trợ lý ảo hữu ích và thông minh được huấn luyên được để trả lời các câu hỏi từ người dùng giữa trên các thông tin ngữ cảnh liên quan được cung cấp\n"
"Thông tin ngữ cảnh:\n"
"---------------------\n"
"{context_str}"
"\n---------------------\n"
"Dựa trên những thông tin ngữ cảnh bên trên, hãy trả lời câu hỏi sau: {query_str}\n"
)
qa_template = Prompt(template)
return qa_template
def reset_settings(cfg):
embed_model =HuggingFaceEmbeddings(
model_name=cfg.architecture.embedding_model
)
Settings.embed_model = embed_model
Settings.llm = None
def get_retriever(cfg, prompt_template):
chunks = pd.read_pickle('processed_chunks.pickle')['chunk'].values.tolist()
nodes = [TextNode(text=chunk) for chunk in chunks]
index = VectorStoreIndex(nodes=nodes)
retriever = index.as_query_engine(
similarity_top_k=cfg.retrieve.top_k,
text_qa_template=prompt_template
)
return retriever
def load_tokenizer(cfg):
tokenizer = AutoTokenizer.from_pretrained(
cfg.architecture.llm_model,
token=os.getenv('HUGGING_KEY')
)
if tokenizer.pad_token is None:
tokenizer.pad_token = tokenizer.eos_token
return tokenizer
def get_llm(cfg):
if cfg.architecture.llm_quantized:
bnb_config = BitsAndBytesConfig(
load_in_4bit=True,
bnb_4bit_use_double_quant=True,
bnb_4bit_quant_type="nf4",
bnb_4bit_compute_dtype=torch.float16
)
else:
bnb_config = None
llm = AutoModelForCausalLM.from_pretrained(
cfg.architecture.llm_model,
torch_dtype=torch.bfloat16,
device_map=cfg.environment.device,
token= os.getenv('HUGGING_KEY'),
low_cpu_mem_usage=True,
quantization_config=bnb_config,
)
return llm.eval()
def run(text, intensity):
# Log the start of the process
prompt = retriever.query(text).response
prompt = tokenizer.bos_token + '[INST] ' + prompt + ' [/INST]'
streamer = TextStreamer(tokenizer, skip_prompt=True)
input_ids = tokenizer([prompt], return_tensors='pt').to(cfg.environment.device)
sample_outputs = language_model.generate(
**input_ids,
streamer=streamer,
pad_token_id=tokenizer.pad_token_id,
max_new_tokens=cfg.generation.max_new_tokens,
do_sample=cfg.generation.do_sample,
temperature=cfg.generation.temperature
)
return sample_outputs
def vistral_chat():
demo = gr.Interface(fn=run,
inputs=[gr.Textbox(label="Nhập vào nội dung input",value="Con đường xưa em đi"),gr.Slider(label="Độ dài output muốn tạo ra", value=20, minimum=10, maximum=100, step=2),],
outputs=gr.Textbox(label="Output"), # <-- Number of output components: 1
)
demo.launch()
def main1(config_path):
# Configure logging
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
global config
global retriever
global tokenizer
global language_model
try:
# Log the start of the process
logger.info("Starting the process with config file: %s", config_path)
# Load configuration from the file
config = load_config(config_path)
# Load necessary components
prompt_template = get_prompt_template()
# Replace OpenAI embed model and llm with custom ones
reset_settings(config)
# Get retriever
retriever = get_retriever(config, prompt_template)
# Load tokenizer and language model
tokenizer = load_tokenizer(config)
language_model = get_llm(config)
# Start the command line interface
vistral_chat()
# Log successful completion
logger.info("Process completed successfully.")
except FileNotFoundError as e:
logger.error("Configuration file not found: %s", e)
except Exception as e:
logger.exception("An error occurred: %s", e)
if __name__ == "__main__":
warnings.simplefilter("ignore", category=LangChainDeprecationWarning)
# access_token_read = “abc”
# login(token = access_token_read)
main_prepare()
parser = argparse.ArgumentParser(description='Process some configurations.')
parser.add_argument('--config', type=str, default='config.yaml', help='Path to the configuration file')
args = parser.parse_args()
main1(args.config)