import gradio as gr import gspread import torch from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline from oauth2client.service_account import ServiceAccountCredentials from llama_index.core import VectorStoreIndex, Settings from llama_index.core.node_parser import SentenceSplitter from llama_index.embeddings.huggingface import HuggingFaceEmbedding from llama_index.core.schema import Document # =============== 1. Cache dan Inisialisasi Index Google Sheets =============== cached_index = None cached_data = {} def read_google_sheets_separated(): try: scope = ["https://www.googleapis.com/auth/spreadsheets", "https://www.googleapis.com/auth/drive"] creds = ServiceAccountCredentials.from_json_keyfile_name("credentials.json", scope) client = gspread.authorize(creds) SPREADSHEET_ID = "1e_cNMhwF-QYpyYUpqQh-XCw-OdhWS6EuYsoBUsVtdNg" sheet_names = ["datatarget", "datacuti", "dataabsen", "datalembur", "pkb"] spreadsheet = client.open_by_key(SPREADSHEET_ID) data_map = {} for sheet_name in sheet_names: try: sheet = spreadsheet.worksheet(sheet_name) data = sheet.get_all_values() headers = data[0] rows = data[1:] entries = [] if sheet_name == "datatarget": for row in rows: if len(row) >= 4: try: jumlah = int(row[3]) status = "KURANG" if jumlah < 0 else "LEBIH" entries.append( f"[SELISIH] Mesin: {row[0]} | Kategori: {row[1]} | Bulan: {row[2]} | Selisih: {abs(jumlah)} pcs ({status})" ) except ValueError: # Tangani jika data tidak valid entries.append( f"[WARNING] Data tidak valid: {' | '.join(row)}" ) elif sheet_name == "datacuti": for row in rows: if len(row) >= 3: entries.append(f"{row[0]} memiliki sisa cuti {row[1]} hari pada tahun {row[2]}") elif sheet_name == "dataabsen": for row in rows: if len(row) >= 3: entries.append(f"Kehadiran {row[0]} adalah {row[1]} hari pada bulan {row[2]}") elif sheet_name == "datalembur": for row in rows: if len(row) >= 3: entries.append(f"{row[0]} telah lembur sebanyak {row[1]} jam pada bulan {row[2]}") elif sheet_name == "pkb": for row in rows: if len(row) >= 4: bab, poin, kategori, isi = row[0], row[1], row[2], row[3] entries.append(f"Bab {bab}, Poin {poin} - Kategori: {kategori}\nIsi: {isi}") data_map[sheet_name] = entries except gspread.exceptions.WorksheetNotFound: data_map[sheet_name] = [f"❌ ERROR: Worksheet {sheet_name} tidak ditemukan."] return data_map except Exception as e: return {"error": str(e)} def detect_intent(message): msg = message.lower() intent_keywords = { "pkb": ["ketentuan", "aturan", "kompensasi", "hak", "berlaku", "diperbolehkan", "pkb", "perusahaan", "pekerja", "tenaga kerja asing", "jam kerja", "kerja lembur", "perjalanan dinas", "pengupahan", "pemutusan hubungan kerja", "jaminan sosial", "kesejahteraan", "fasilitas kerja", "alih tugas", "kewajiban", "disiplin kerja", "larangan", "sanksi", "mogok", "pesangon", "penghargaan masa kerja", "uang pisah"], "cuti": ["cuti", "sisa cuti", "jumlah cuti", "berapa hari cuti", "libur"], "target": ["target", "aktual", "selisih", "produksi", "mesin", "pcs"], "lembur": ["lembur", "jam lembur", "berapa jam", "jam kerja tambahan"], "absensi": ["absensi", "hadir", "tidak hadir", "izin", "masuk", "alpha", "berapa hari masuk", "kehadiran"] } scores = {} for intent, keywords in intent_keywords.items(): scores[intent] = sum(1 for k in keywords if k in msg) best_intent = max(scores, key=scores.get) # Jika tidak ada keyword yang cocok, fallback ke "all" return best_intent if scores[best_intent] > 0 else "all" def initialize_index(): global cached_index, cached_data cached_data = read_google_sheets_separated() all_text = sum(cached_data.values(), []) document = Document(text="\n".join(all_text)) parser = SentenceSplitter(chunk_size=256, chunk_overlap=30) nodes = parser.get_nodes_from_documents([document]) embedding = HuggingFaceEmbedding("sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2") Settings.embed_model = embedding cached_index = VectorStoreIndex(nodes) def search_google_sheets_vector(query): if cached_data == {}: initialize_index() intent = detect_intent(query) if intent == "pkb": selected_data = cached_data.get("pkb", []) elif intent == "cuti": selected_data = cached_data.get("datacuti", []) elif intent == "target": selected_data = cached_data.get("datatarget", []) elif intent == "absensi": selected_data =cached_data.get("dataabsen", []) elif intent == "lembur": selected_data =cached_data.get("datalembur", []) else: selected_data = sum(cached_data.values(), []) document = Document(text="\n".join(selected_data)) parser = SentenceSplitter(chunk_size=256, chunk_overlap=30) nodes = parser.get_nodes_from_documents([document]) embedding = HuggingFaceEmbedding("sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2") Settings.embed_model = embedding temp_index = VectorStoreIndex(nodes) retriever = temp_index.as_retriever(similarity_top_k=3) retriever.similarity_cutoff = 1.0 retrieved_nodes = retriever.retrieve(query) results = [node.text for node in retrieved_nodes] return "\n".join(results) if results else "Maaf, saya tidak menemukan informasi yang relevan." # =============== 2. Load Model Transformers =============== def load_model(): model_id = "NousResearch/Llama-2-7b-chat-hf" tokenizer = AutoTokenizer.from_pretrained(model_id) model = AutoModelForCausalLM.from_pretrained( model_id, device_map="auto", torch_dtype=torch.float16 ) pipe = pipeline( "text-generation", model=model, tokenizer=tokenizer, max_new_tokens=512, temperature=0.7, repetition_penalty=1.2, do_sample=True, ) return pipe # =============== 3. Prompt Generator =============== def generate_prompt(user_message, context_data): prompt = f""" ### SISTEM: Anda adalah chatbot HRD yang membantu karyawan memahami administrasi perusahaan. Jangan menjawab menggunakan Bahasa Inggris. Gunakan Bahasa Indonesia dengan gaya profesional dan ramah. Jika informasi tidak tersedia dalam dokumen, katakan dengan sopan bahwa Anda tidak tahu. Jawaban harus singkat, jelas, dan sesuai konteks. Jangan memberikan jawaban untuk pertanyaan yang tidak diajukan oleh pengguna. Jangan menyertakan rekomendasi pertanyaan lain. ### DATA: {context_data} ### PERTANYAAN: {user_message} ### JAWABAN: """ return prompt.strip() # =============== 4. Generate Response =============== def should_use_history(message): keywords = ["jika", "tadi", "sebelumnya","kalau begitu", "gimana kalau", "lanjutkan", "terus", "bagaimana dengan", "berarti", "jadi", "oke lalu"] return any(kata in message.lower() for kata in keywords) def generate_response(message, history, pipe): if should_use_history(message) and history: previous_message = history[-1][0] combined_message = previous_message + " " + message else: combined_message = message context = search_google_sheets_vector(combined_message) if "❌ ERROR" in context or context.strip() == "" or "tidak ditemukan" in context.lower(): return "Maaf, saya tidak menemukan informasi yang relevan untuk pertanyaan tersebut." full_prompt = generate_prompt(message, context) response = pipe(full_prompt)[0]["generated_text"] cleaned = response.split("### JAWABAN:")[-1].strip() history = history or [] history.append((message, cleaned)) return cleaned # =============== 5. Jalankan Gradio =============== def main(): pipe = load_model() initialize_index() def chatbot_fn(message, history): return generate_response(message, history, pipe) gr.ChatInterface( fn=chatbot_fn, title="Chatbot HRD - Transformers", theme="compact" ).launch(share=True) if __name__ == "__main__": main()