|
import gradio as gr |
|
import gspread |
|
import torch |
|
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline |
|
from oauth2client.service_account import ServiceAccountCredentials |
|
from llama_index.core import VectorStoreIndex, Settings |
|
from llama_index.core.node_parser import SentenceSplitter |
|
from llama_index.embeddings.huggingface import HuggingFaceEmbedding |
|
from llama_index.core.schema import Document |
|
|
|
|
|
cached_index = None |
|
|
|
def read_google_sheets(): |
|
try: |
|
scope = ["https://www.googleapis.com/auth/spreadsheets", "https://www.googleapis.com/auth/drive"] |
|
creds = ServiceAccountCredentials.from_json_keyfile_name("credentials.json", scope) |
|
client = gspread.authorize(creds) |
|
|
|
SPREADSHEET_ID = "1e_cNMhwF-QYpyYUpqQh-XCw-OdhWS6EuYsoBUsVtdNg" |
|
sheet_names = ["datatarget", "datacuti", "dataabsen", "datalembur", "pkb"] |
|
|
|
all_data = [] |
|
spreadsheet = client.open_by_key(SPREADSHEET_ID) |
|
|
|
for sheet_name in sheet_names: |
|
try: |
|
sheet = spreadsheet.worksheet(sheet_name) |
|
data = sheet.get_all_values() |
|
all_data.append(f"=== Data dari {sheet_name.upper()} ===") |
|
all_data.extend([" | ".join(row) for row in data]) |
|
all_data.append("\n") |
|
except gspread.exceptions.WorksheetNotFound: |
|
all_data.append(f"β ERROR: Worksheet {sheet_name} tidak ditemukan.") |
|
|
|
return "\n".join(all_data).strip() |
|
except Exception as e: |
|
return f"β ERROR: {str(e)}" |
|
|
|
def initialize_index(): |
|
global cached_index |
|
text_data = read_google_sheets() |
|
document = Document(text=text_data) |
|
parser = SentenceSplitter(chunk_size=100, chunk_overlap=30) |
|
nodes = parser.get_nodes_from_documents([document]) |
|
|
|
embedding = HuggingFaceEmbedding("sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2") |
|
Settings.embed_model = embedding |
|
|
|
cached_index = VectorStoreIndex(nodes) |
|
|
|
def search_google_sheets_vector(query): |
|
if cached_index is None: |
|
initialize_index() |
|
|
|
retriever = cached_index.as_retriever(similarity_top_k=3) |
|
retrieved_nodes = retriever.retrieve(query) |
|
|
|
results = [node.text for node in retrieved_nodes] |
|
return "\n".join(results) if results else "Maaf, saya tidak menemukan informasi yang relevan." |
|
|
|
|
|
def load_model(): |
|
model_id = "HuggingFaceH4/zephyr-7b-beta" |
|
tokenizer = AutoTokenizer.from_pretrained(model_id) |
|
model = AutoModelForCausalLM.from_pretrained( |
|
model_id, |
|
device_map="auto", |
|
torch_dtype=torch.float16 |
|
) |
|
pipe = pipeline( |
|
"text-generation", |
|
model=model, |
|
tokenizer=tokenizer, |
|
max_new_tokens=512, |
|
temperature=0.7, |
|
repetition_penalty=1.2, |
|
do_sample=True, |
|
) |
|
return pipe |
|
|
|
|
|
def generate_prompt(user_message, context_data): |
|
prompt = f""" |
|
### SISTEM: |
|
Anda adalah chatbot HRD yang membantu karyawan memahami administrasi perusahaan. Jangan menjawab menggunakan Bahasa Inggris. Gunakan Bahasa Indonesia dengan gaya profesional dan ramah. Jika informasi tidak tersedia dalam dokumen, katakan dengan sopan bahwa Anda tidak tahu. Jawaban harus singkat, jelas, dan sesuai konteks. Jangan memberikan jawaban untuk pertanyaan yang tidak diajukan oleh pengguna. Jangan menyertakan rekomendasi pertanyaan lain. |
|
### DATA: |
|
{context_data} |
|
### PERTANYAAN: |
|
{user_message} |
|
### JAWABAN: |
|
""" |
|
return prompt.strip() |
|
|
|
|
|
def generate_response(message, history, pipe): |
|
context = search_google_sheets_vector(message) |
|
full_prompt = generate_prompt(message, context) |
|
response = pipe(full_prompt)[0]["generated_text"] |
|
|
|
cleaned = response.split("### JAWABAN:")[-1].strip() |
|
history = history or [] |
|
history.append((message, cleaned)) |
|
return cleaned |
|
|
|
|
|
def main(): |
|
pipe = load_model() |
|
initialize_index() |
|
|
|
def chatbot_response(message, history): |
|
return generate_response(message, history, pipe) |
|
|
|
gr.Interface( |
|
fn=chatbot_response, |
|
inputs=["text"], |
|
outputs=["text"], |
|
title="Chatbot HRD - Transformers", |
|
theme="compact" |
|
).launch(share=True) |
|
|
|
if __name__ == "__main__": |
|
main() |