import os import re import gradio as gr import pandas as pd import requests import json import faiss import nest_asyncio import sys import boto3 from pathlib import Path from bs4 import BeautifulSoup from typing import Union, List import asyncio from llama_index.core import ( StorageContext, ServiceContext, VectorStoreIndex, Settings, load_index_from_storage ) from llama_index.llms.openai import OpenAI from llama_index.core.llms import ChatMessage from llama_index.core.schema import IndexNode from llama_index.core.storage.docstore import SimpleDocumentStore from llama_index.retrievers.bm25 import BM25Retriever from llama_index.embeddings.openai import OpenAIEmbedding # from llama_index.vector_stores.faiss import FaissVectorStore from llama_index.core.retrievers import QueryFusionRetriever from llama_index.core.workflow import Event, Context, Workflow, StartEvent, StopEvent, step from llama_index.core.schema import NodeWithScore from llama_index.core.prompts import PromptTemplate from llama_index.core.response_synthesizers import ResponseMode, get_response_synthesizer from prompts import CITATION_QA_TEMPLATE, CITATION_REFINE_TEMPLATE from dotenv import load_dotenv load_dotenv() aws_access_key_id = os.getenv("AWS_ACCESS_KEY_ID") aws_secret_access_key = os.getenv("AWS_SECRET_ACCESS_KEY") openai_api_key = os.getenv("OPENAI_API_KEY") embed_model = OpenAIEmbedding(model_name="text-embedding-3-small") Settings.embed_model = embed_model Settings.context_window = 20000 Settings.chunk_size = 2048 Settings.similarity_top_k = 20 # Параметри S3 BUCKET_NAME = "legal-position" PREFIX_RETRIEVER = "Save_Index/" # Префікс для всього вмісту, який потрібно завантажити LOCAL_DIR = Path("Save_Index_Local") # Локальна директорія для збереження даних з S3 # Ініціалізація клієнта S3 s3_client = boto3.client( "s3", aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key, region_name="eu-north-1" ) # # Ініціалізація клієнта S3 # s3_client = boto3.client( # "s3", # aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"), # aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"), # region_name="eu-north-1" # ) # Створюємо локальну директорію, якщо вона не існує LOCAL_DIR.mkdir(parents=True, exist_ok=True) # Функція для завантаження файлу з S3 def download_s3_file(bucket_name, s3_key, local_path): s3_client.download_file(bucket_name, s3_key, str(local_path)) print(f"Завантажено: {s3_key} -> {local_path}") # Функція для завантаження всієї папки з S3 у локальну директорію def download_s3_folder(bucket_name, prefix, local_dir): response = s3_client.list_objects_v2(Bucket=bucket_name, Prefix=prefix) if 'Contents' in response: for obj in response['Contents']: s3_key = obj['Key'] # Пропускаємо "папку" (кореневий префікс) у S3 if s3_key.endswith('/'): continue # Визначаємо локальний шлях, де буде збережений файл local_file_path = local_dir / Path(s3_key).relative_to(prefix) local_file_path.parent.mkdir(parents=True, exist_ok=True) # створення підкаталогів, якщо потрібно # Завантажуємо файл s3_client.download_file(bucket_name, s3_key, str(local_file_path)) print(f"Завантажено: {s3_key} -> {local_file_path}") # Завантаження всього вмісту папки `Save_Index` з S3 у локальну директорію `Save_Index_Local` download_s3_folder(BUCKET_NAME, PREFIX_RETRIEVER, LOCAL_DIR) # PERSIST_DIR = "/home/docsa/Legal_Position/Save_index" # Apply nest_asyncio to handle nested async calls nest_asyncio.apply() class RetrieverEvent(Event): nodes: list[NodeWithScore] state_lp_json = gr.State() state_nodes = gr.State() class CitationQueryEngineWorkflow(Workflow): @step async def retrieve(self, ctx: Context, ev: StartEvent) -> Union[RetrieverEvent, None]: query = ev.get("query") question = ev.get("question") nodes = ev.get("nodes") # Отримуємо nodes з події if not query: return None await ctx.set("query", query) await ctx.set("question", question) if nodes is not None: # Використовуємо передані nodes return RetrieverEvent(nodes=nodes) else: # Якщо nodes не передані, не виконуємо додатковий пошук return None @step async def synthesize(self, ctx: Context, ev: RetrieverEvent) -> StopEvent: query = await ctx.get("query", default=None) question = await ctx.get("question", default=None) llm_answer = OpenAI(model="gpt-4o-mini", temperature=0) synthesizer = get_response_synthesizer( llm=llm_answer, text_qa_template=CITATION_QA_TEMPLATE, refine_template=CITATION_REFINE_TEMPLATE, response_mode=ResponseMode.COMPACT, use_async=True, ) response = await synthesizer.asynthesize(query=query, question=question, nodes=ev.nodes) return StopEvent(result=response) def parse_doc_ids(doc_ids): if doc_ids is None: return [] if isinstance(doc_ids, list): return [str(id).strip('[]') for id in doc_ids] if isinstance(doc_ids, str): cleaned = doc_ids.strip('[]').replace(' ', '') if cleaned: return [id.strip() for id in cleaned.split(',')] return [] def get_links_html(doc_ids): parsed_ids = parse_doc_ids(doc_ids) if not parsed_ids: return "" links = [f"[Рішення ВС: {doc_id}](https://reyestr.court.gov.ua/Review/{doc_id})" for doc_id in parsed_ids] return ", ".join(links) def parse_lp_ids(lp_ids): if lp_ids is None: return [] if isinstance(lp_ids, (str, int)): cleaned = str(lp_ids).strip('[]').replace(' ', '') if cleaned: return [cleaned] return [] def get_links_html_lp(lp_ids): parsed_ids = parse_lp_ids(lp_ids) if not parsed_ids: return "" links = [f"[ПП ВС: {lp_id}](https://lpd.court.gov.ua/home/search/{lp_id})" for lp_id in parsed_ids] return ", ".join(links) def initialize_components(): try: # Використовуємо папку `Save_Index_Local`, куди завантажено файли з S3 persist_path = Path("Save_Index_Local") # Перевірка існування локальної директорії if not persist_path.exists(): raise FileNotFoundError(f"Directory not found: {persist_path}") # Перевірка наявності необхідних файлів і папок required_files = ['docstore_es_filter.json', 'bm25_retriever_es'] missing_files = [f for f in required_files if not (persist_path / f).exists()] if missing_files: raise FileNotFoundError(f"Missing required files: {', '.join(missing_files)}") # Ініціалізація компонентів global retriever_bm25 # Ініціалізація `SimpleDocumentStore` з `docstore_es_filter.json` docstore = SimpleDocumentStore.from_persist_path(str(persist_path / "docstore_es_filter.json")) # Ініціалізація `BM25Retriever` з папки `bm25_retriever_es` bm25_retriever = BM25Retriever.from_persist_dir(str(persist_path / "bm25_retriever_es")) # Ініціалізація `QueryFusionRetriever` з налаштуваннями retriever_bm25 = QueryFusionRetriever( [ bm25_retriever, ], similarity_top_k=Settings.similarity_top_k, num_queries=1, use_async=True, ) return True except Exception as e: print(f"Error initializing components: {str(e)}", file=sys.stderr) return False def extract_court_decision_text(url): response = requests.get(url) soup = BeautifulSoup(response.content, 'html.parser') unwanted_texts = [ "Доступ до Реєстру здійснюється в тестовому (обмеженому) режимі.", "З метою упередження перешкоджанню стабільній роботі Реєстру" ] decision_text = "" for paragraph in soup.find_all('p'): text = paragraph.get_text(separator="\n").strip() if not any(unwanted_text in text for unwanted_text in unwanted_texts): decision_text += text + "\n" return decision_text.strip() def generate_legal_position(court_decision_text, user_question): # llm_lp = OpenAI(model="gpt-4o-mini", temperature=0) # llm_lp = OpenAI(model="ft:gpt-4o-mini-2024-07-18:personal:legal-position-100:ASPFc3vF", temperature=0) llm_lp = OpenAI(model="ft:gpt-4o-mini-2024-07-18:personal:legal-position-400:AT3wvKsU", temperature=0) response_format = { "type": "json_schema", "json_schema": { "name": "lp_schema", "schema": { "type": "object", "properties": { "title": {"type": "string", "description": "Title of the legal position"}, "text": {"type": "string", "description": "Text of the legal position"}, "proceeding": {"type": "string", "description": "Type of court proceedings"}, "category": {"type": "string", "description": "Category of the legal position"}, }, "required": ["title", "text", "proceeding", "category"], "additionalProperties": False }, "strict": True } } system_prompt = """ Дій як кваліфікований юрист. : """ prompt = f"""Дотримуйся цих інструкцій. 1. Спочатку вам буде надано текст судового рішення: {court_decision_text} 2. Уважно прочитай та проаналізуй текст наданого судового рішення. Зверни увагу на: - Юридичну суть рішення - Основне правове обґрунтування - Головні юридичні міркування 3. На основі аналізу сформулюй короткий зміст позиції суду, дотримуючись таких вказівок: - Будь чіткими, точними та обґрунтованими - Використовуй відповідну юридичну термінологію - Зберігай стислість, але повністю передай суть судового рішення - Уникай додаткових пояснень чи коментарів - Спробуй узагальнювати та уникати специфічної інформації (наприклад, імен або назв) під час подачі результатів - Використовуйте лише українську мову 4. Створи короткий заголовок, який відображає основну суть судового рішення та зазнач його категорію. 5. Додатково визнач тип судочинства, до якої відноситься дане рішення. Використовуй лише один із цих типів: 'Адміністративне судочинство', 'Кримінальне судочинство', 'Цивільне судочинство', 'Господарське судочинство' 6. Відформатуй відповідь у форматі JSON: {{ "title": "Заголовок судового рішення", "text": "Текст короткого змісту позиції суду", "proceeding": "Тип судочинства", "category": "Категорія судового рішення" }} """ messages = [ ChatMessage(role="system", content=system_prompt), ChatMessage(role="user", content=prompt), ] response = llm_lp.chat(messages, response_format=response_format) try: parsed_response = json.loads(response.message.content) if "title" in parsed_response and "text" in parsed_response: return parsed_response else: return { "title": "Error: Missing required fields in response", "text": response.message.content } except json.JSONDecodeError: return { "title": "Error parsing response", "text": response.message.content } def create_gradio_interface(): with gr.Blocks() as app: gr.Markdown("# Аналізатор судових рішень на основі правових позицій Верховного Суду") with gr.Row(): url_input = gr.Textbox(label="URL судового рішення:") question_input = gr.Textbox(label="Ваше питання:") with gr.Row(): generate_position_button = gr.Button("Генерувати короткий зміст позиції суду") search_with_ai_button = gr.Button("Пошук із ШІ", interactive=False) search_without_ai_button = gr.Button("Пошук без ШІ") analyze_button = gr.Button("Аналіз", interactive=False) position_output = gr.Markdown(label="Короткий зміст позиції суду за введеним рішенням") search_output = gr.Markdown(label="Результат пошуку") analysis_output = gr.Markdown(label="Результат аналізу") # Два об'єкти стану для зберігання legal_position_json та nodes state_lp_json = gr.State() state_nodes = gr.State() async def generate_position_action(url): try: court_decision_text = extract_court_decision_text(url) legal_position_json = generate_legal_position(court_decision_text, "") position_output_content = f"**Короткий зміст позиції суду за введеним рішенням:**\n *{legal_position_json['title']}*: \n{legal_position_json['text']} **Категорія:** \n{legal_position_json['category']} ({legal_position_json['proceeding']})\n\n" return position_output_content, legal_position_json except Exception as e: return f"Error during position generation: {str(e)}", None async def search_with_ai_action(legal_position_json): try: query_text = legal_position_json["title"] + ': ' + legal_position_json["text"] + ': ' + legal_position_json["proceeding"] + ': ' + legal_position_json["category"] nodes = await retriever_bm25.aretrieve(query_text) sources_output = "\n **Результати пошуку (наявні правові позиції ВСУ):** \n\n" for index, node in enumerate(nodes, start=1): source_title = node.node.metadata.get('title') doc_ids = node.node.metadata.get('doc_id') lp_ids = node.node.metadata.get('lp_id') links = get_links_html(doc_ids) links_lp = get_links_html_lp(lp_ids) sources_output += f"\n[{index}] *{source_title}* {links_lp} 👉 Score: {node.score} {links}\n" return sources_output, nodes except Exception as e: return f"Error during search: {str(e)}", None async def search_without_ai_action(url): try: court_decision_text = extract_court_decision_text(url) nodes = await retriever_bm25.aretrieve(court_decision_text) search_output_content = f"**Результати пошуку (наявні правові позиції ВСУ):** \n\n" for index, node in enumerate(nodes, start=1): source_title = node.node.metadata.get('title', 'Невідомий заголовок') doc_ids = node.node.metadata.get('doc_id') links = get_links_html(doc_ids) search_output_content += f"\n[{index}] *{source_title}* 👉 Score: {node.score} {links}\n" return search_output_content, nodes except Exception as e: return f"Error during search: {str(e)}", None import re import re async def analyze_action(legal_position_json, question, nodes): try: workflow = CitationQueryEngineWorkflow(timeout=600) # Запускаємо workflow і отримуємо об'єкт Response response = await workflow.run( query=legal_position_json["title"] + ': ' + legal_position_json["text"] + ': ' + legal_position_json["proceeding"] + ': ' + legal_position_json["category"], question=question, nodes=nodes # Передаємо nodes у workflow ) # Отримуємо текст відповіді з об'єкта Response response_text = str(response) # Обробка цитат у тексті відповіді citations = re.findall(r'\[(\d+)\]', response_text) unique_citations = sorted(set(citations), key=int) output = f"**Аналіз Штучного Інтелекту:**\n{response_text}\n\n" output += "**Цитовані джерела існуючих правових позицій Верховного Суду:**\n" # Перевіряємо наявність source_nodes в об'єкті Response source_nodes = getattr(response, 'source_nodes', []) # Проходимо по унікальних цитатах та зіставляємо з `lp_id` у source_nodes for citation in unique_citations: found = False # Змінна для відстеження, чи знайдено джерело для lp_id for index, source_node_with_score in enumerate(source_nodes, start=1): source_node = source_node_with_score.node lp_id = source_node.metadata.get('lp_id') # Отримуємо lp_id із метаданих джерела # Якщо lp_id збігається з цитатою if str(lp_id) == citation: found = True source_title = source_node.metadata.get('title', 'Невідомий заголовок') doc_ids = source_node.metadata.get('doc_id') links = get_links_html(doc_ids) links_lp = get_links_html_lp(lp_id) # Використовуємо `index` як номер джерела на початку рядка output += f"[{index}]: *{source_title}* {links_lp} 👉 Score: {source_node_with_score.score} {links}\n" break # Вихід із циклу при знайденому відповідному джерелі if not found: output += f"[{citation}]: Немає відповідного джерела для lp_id {citation}\n" return output except Exception as e: return f"Error during analysis: {str(e)}" # Підключаємо функції до кнопок з оновленими входами та виходами generate_position_button.click( fn=generate_position_action, inputs=url_input, outputs=[position_output, state_lp_json] ) generate_position_button.click( fn=lambda: gr.update(interactive=True), inputs=None, outputs=search_with_ai_button ) search_with_ai_button.click( fn=search_with_ai_action, inputs=state_lp_json, outputs=[search_output, state_nodes] ) search_with_ai_button.click( fn=lambda: gr.update(interactive=True), inputs=None, outputs=analyze_button ) search_without_ai_button.click( fn=search_without_ai_action, inputs=url_input, outputs=[search_output, state_nodes] ) search_without_ai_button.click( fn=lambda: gr.update(interactive=True), inputs=None, outputs=analyze_button ) analyze_button.click( fn=analyze_action, inputs=[state_lp_json, question_input, state_nodes], outputs=analysis_output ) return app if __name__ == "__main__": if initialize_components(): print("Components initialized successfully!") app = create_gradio_interface() app.launch(share=True) else: print("Failed to initialize components. Please check the paths and try again.", file=sys.stderr) sys.exit(1)