adminzy / app.py
BramLeo's picture
Update app.py
cf05cdf verified
raw
history blame
4.63 kB
import gradio as gr
import gspread
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline
from oauth2client.service_account import ServiceAccountCredentials
from llama_index.core import VectorStoreIndex, Settings
from llama_index.core.node_parser import SentenceSplitter
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from llama_index.core.schema import Document
# =============== 1. Cache dan Inisialisasi Index Google Sheets ===============
cached_index = None
def read_google_sheets():
try:
scope = ["https://www.googleapis.com/auth/spreadsheets", "https://www.googleapis.com/auth/drive"]
creds = ServiceAccountCredentials.from_json_keyfile_name("credentials.json", scope)
client = gspread.authorize(creds)
SPREADSHEET_ID = "1e_cNMhwF-QYpyYUpqQh-XCw-OdhWS6EuYsoBUsVtdNg"
sheet_names = ["datatarget", "datacuti", "dataabsen", "datalembur", "pkb"]
all_data = []
spreadsheet = client.open_by_key(SPREADSHEET_ID)
for sheet_name in sheet_names:
try:
sheet = spreadsheet.worksheet(sheet_name)
data = sheet.get_all_values()
all_data.append(f"=== Data dari {sheet_name.upper()} ===")
all_data.extend([" | ".join(row) for row in data])
all_data.append("\n")
except gspread.exceptions.WorksheetNotFound:
all_data.append(f"❌ ERROR: Worksheet {sheet_name} tidak ditemukan.")
return "\n".join(all_data).strip()
except Exception as e:
return f"❌ ERROR: {str(e)}"
def initialize_index():
global cached_index
text_data = read_google_sheets()
document = Document(text=text_data)
parser = SentenceSplitter(chunk_size=100, chunk_overlap=30)
nodes = parser.get_nodes_from_documents([document])
embedding = HuggingFaceEmbedding("sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2")
Settings.embed_model = embedding
cached_index = VectorStoreIndex(nodes)
def search_google_sheets_vector(query):
if cached_index is None:
initialize_index()
retriever = cached_index.as_retriever(similarity_top_k=3)
retrieved_nodes = retriever.retrieve(query)
results = [node.text for node in retrieved_nodes]
return "\n".join(results) if results else "Maaf, saya tidak menemukan informasi yang relevan."
# =============== 2. Load Model Transformers ===============
def load_model():
model_id = "HuggingFaceH4/zephyr-7b-beta"
tokenizer = AutoTokenizer.from_pretrained(model_id)
model = AutoModelForCausalLM.from_pretrained(
model_id,
device_map="auto",
torch_dtype=torch.float16
)
pipe = pipeline(
"text-generation",
model=model,
tokenizer=tokenizer,
max_new_tokens=512,
temperature=0.7,
repetition_penalty=1.2,
do_sample=True,
)
return pipe
# =============== 3. Buat Prompt dan Jawaban ===============
def generate_prompt(user_message, context_data):
prompt = f"""
### SISTEM:
Anda adalah chatbot HRD yang membantu karyawan memahami administrasi perusahaan. Jangan menjawab menggunakan Bahasa Inggris. Gunakan Bahasa Indonesia dengan gaya profesional dan ramah. Jika informasi tidak tersedia dalam dokumen, katakan dengan sopan bahwa Anda tidak tahu. Jawaban harus singkat, jelas, dan sesuai konteks. Jangan memberikan jawaban untuk pertanyaan yang tidak diajukan oleh pengguna. Jangan menyertakan rekomendasi pertanyaan lain.
### DATA:
{context_data}
### PERTANYAAN:
{user_message}
### JAWABAN:
"""
return prompt.strip()
# =============== 4. Generate Response ===============
def generate_response(message, history, pipe):
context = search_google_sheets_vector(message) # πŸ” Pencarian berbasis vektor
full_prompt = generate_prompt(message, context)
response = pipe(full_prompt)[0]["generated_text"]
cleaned = response.split("### JAWABAN:")[-1].strip()
history = history or []
history.append((message, cleaned))
return cleaned
# =============== 5. Jalankan Gradio ===============
def main():
pipe = load_model()
initialize_index() # πŸ”Ή Inisialisasi index sebelum chatbot berjalan
def chatbot_response(message, history):
return generate_response(message, history, pipe)
gr.Interface(
fn=chatbot_response,
inputs=["text"],
outputs=["text"],
title="Chatbot HRD - Transformers",
theme="compact"
).launch(share=True)
if __name__ == "__main__":
main()