import os import re import gradio as gr import pandas as pd import requests import json import faiss import nest_asyncio import sys import boto3 from pathlib import Path from bs4 import BeautifulSoup from typing import Union, List import asyncio from anthropic import Anthropic from openai import OpenAI import google.generativeai as genai from llama_index.core import ( StorageContext, ServiceContext, VectorStoreIndex, Settings, load_index_from_storage ) from llama_index.llms.openai import OpenAI from llama_index.core.llms import ChatMessage from llama_index.core.schema import IndexNode from import SimpleDocumentStore from llama_index.retrievers.bm25 import BM25Retriever from llama_index.embeddings.openai import OpenAIEmbedding # from llama_index.vector_stores.faiss import FaissVectorStore from llama_index.core.retrievers import QueryFusionRetriever from llama_index.core.workflow import Event, Context, Workflow, StartEvent, StopEvent, step from llama_index.core.schema import NodeWithScore from llama_index.core.prompts import PromptTemplate from llama_index.core.response_synthesizers import ResponseMode, get_response_synthesizer from config import embed_model, Settings, openai_api_key, anthropic_api_key, aws_access_key_id, aws_secret_access_key from analysis import ModelProvider, ModelName, PrecedentAnalysisWorkflow from prompts import SYSTEM_PROMPT, LEGAL_POSITION_PROMPT, PRECEDENT_ANALYSIS_TEMPLATE # from dotenv import load_dotenv # # load_dotenv() # # aws_access_key_id = os.getenv("AWS_ACCESS_KEY_ID") # aws_secret_access_key = os.getenv("AWS_SECRET_ACCESS_KEY") # openai_api_key = os.getenv("OPENAI_API_KEY") # anthropic_api_key=os.getenv("ANTHROPIC_API_KEY") # genai.configure(api_key=os.environ["GOOGLE_API_KEY"]) # # # embed_model = OpenAIEmbedding(model_name="text-embedding-3-small") # Settings.embed_model = embed_model # Settings.context_window = 20000 # Settings.chunk_size = 2048 # Settings.similarity_top_k = 20 # Параметри S3 BUCKET_NAME = "legal-position" PREFIX_RETRIEVER = "Save_Index/" # Префікс для всього вмісту, який потрібно завантажити LOCAL_DIR = Path("Save_Index_Local") # Локальна директорія для збереження даних з S3 # Ініціалізація клієнта S3 s3_client = boto3.client( "s3", aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key, region_name="eu-north-1" ) # Створюємо локальну директорію, якщо вона не існує LOCAL_DIR.mkdir(parents=True, exist_ok=True) # Функція для завантаження файлу з S3 def download_s3_file(bucket_name, s3_key, local_path): s3_client.download_file(bucket_name, s3_key, str(local_path)) print(f"Завантажено: {s3_key} -> {local_path}") # Функція для завантаження всієї папки з S3 у локальну директорію def download_s3_folder(bucket_name, prefix, local_dir): response = s3_client.list_objects_v2(Bucket=bucket_name, Prefix=prefix) if 'Contents' in response: for obj in response['Contents']: s3_key = obj['Key'] # Пропускаємо "папку" (кореневий префікс) у S3 if s3_key.endswith('/'): continue # Визначаємо локальний шлях, де буде збережений файл local_file_path = local_dir / Path(s3_key).relative_to(prefix) local_file_path.parent.mkdir(parents=True, exist_ok=True) # створення підкаталогів, якщо потрібно # Завантажуємо файл s3_client.download_file(bucket_name, s3_key, str(local_file_path)) print(f"Завантажено: {s3_key} -> {local_file_path}") # Перевіряємо, чи існує локальна директорія if not LOCAL_DIR.exists(): print(f"Локальна директорія {LOCAL_DIR} відсутня. Починаємо завантаження...") LOCAL_DIR.mkdir(parents=True, exist_ok=True) # Створення директорії download_s3_folder(BUCKET_NAME, PREFIX_RETRIEVER, LOCAL_DIR) else: print(f"Локальна директорія {LOCAL_DIR} вже існує. Завантаження пропущено.") # Apply nest_asyncio to handle nested async calls nest_asyncio.apply() class RetrieverEvent(Event): nodes: list[NodeWithScore] state_lp_json = gr.State() state_nodes = gr.State() def parse_doc_ids(doc_ids): if doc_ids is None: return [] if isinstance(doc_ids, list): return [str(id).strip('[]') for id in doc_ids] if isinstance(doc_ids, str): cleaned = doc_ids.strip('[]').replace(' ', '') if cleaned: return [id.strip() for id in cleaned.split(',')] return [] def get_links_html(doc_ids): parsed_ids = parse_doc_ids(doc_ids) if not parsed_ids: return "" links = [f"[Рішення ВС: {doc_id}]({doc_id})" for doc_id in parsed_ids] return ", ".join(links) def parse_lp_ids(lp_ids): if lp_ids is None: return [] if isinstance(lp_ids, (str, int)): cleaned = str(lp_ids).strip('[]').replace(' ', '') if cleaned: return [cleaned] return [] def get_links_html_lp(lp_ids): parsed_ids = parse_lp_ids(lp_ids) if not parsed_ids: return "" links = [f"[ПП ВС: {lp_id}]({lp_id})" for lp_id in parsed_ids] return ", ".join(links) def initialize_components(): try: # Використовуємо папку `Save_Index_Local`, куди завантажено файли з S3 persist_path = Path("Save_Index_Local") # Перевірка існування локальної директорії if not persist_path.exists(): raise FileNotFoundError(f"Directory not found: {persist_path}") # Перевірка наявності необхідних файлів і папок required_files = ['docstore_es_filter.json', 'bm25_retriever_es'] missing_files = [f for f in required_files if not (persist_path / f).exists()] if missing_files: raise FileNotFoundError(f"Missing required files: {', '.join(missing_files)}") # Ініціалізація компонентів global retriever_bm25 # Ініціалізація `SimpleDocumentStore` з `docstore_es_filter.json` docstore = SimpleDocumentStore.from_persist_path(str(persist_path / "docstore_es_filter.json")) # Ініціалізація `BM25Retriever` з папки `bm25_retriever_es` bm25_retriever = BM25Retriever.from_persist_dir(str(persist_path / "bm25_retriever_es")) # Ініціалізація `QueryFusionRetriever` з налаштуваннями retriever_bm25 = QueryFusionRetriever( [ bm25_retriever, ], similarity_top_k=Settings.similarity_top_k, num_queries=1, use_async=True, ) return True except Exception as e: print(f"Error initializing components: {str(e)}", file=sys.stderr) return False def extract_court_decision_text(url): response = requests.get(url) soup = BeautifulSoup(response.content, 'html.parser') unwanted_texts = [ "Доступ до Реєстру здійснюється в тестовому (обмеженому) режимі.", "З метою упередження перешкоджанню стабільній роботі Реєстру" ] decision_text = "" for paragraph in soup.find_all('p'): text = paragraph.get_text(separator="\n").strip() if not any(unwanted_text in text for unwanted_text in unwanted_texts): decision_text += text + "\n" return decision_text.strip() # Constants for JSON schema LEGAL_POSITION_SCHEMA = { "type": "json_schema", "json_schema": { "name": "lp_schema", "schema": { "type": "object", "properties": { "title": {"type": "string", "description": "Title of the legal position"}, "text": {"type": "string", "description": "Text of the legal position"}, "proceeding": {"type": "string", "description": "Type of court proceedings"}, "category": {"type": "string", "description": "Category of the legal position"}, }, "required": ["title", "text", "proceeding", "category"], "additionalProperties": False }, "strict": True } } # def generate_legal_position(court_decision_text, comment_input): # try: # # Ініціалізація моделі # llm_lp = OpenAI( # # model="ft:gpt-4o-mini-2024-07-18:personal:legal-position-400:AT3wvKsU", # model="ft:gpt-4o-mini-2024-07-18:personal:legal-position-1500:Aaiu4WZd", # temperature=0 # ) # # # Формування повідомлень для чату # # Формуємо контент з урахуванням коментаря # content = LEGAL_POSITION_PROMPT.format( # court_decision_text=court_decision_text, # comment=comment_input if comment_input else "Коментар відсутній" # ) # # # Формування повідомлень для чату # messages = [ # ChatMessage(role="system", content=SYSTEM_PROMPT), # ChatMessage(role="user", content=content), # ] # # # Отримання відповіді від моделі # response =, response_format=LEGAL_POSITION_SCHEMA) # # # Обробка відповіді # parsed_response = json.loads(response.message.content) # # # Перевірка наявності обов'язкових полів # if all(field in parsed_response for field in ["title", "text", "proceeding", "category"]): # return parsed_response # # return { # "title": "Error: Missing required fields in response", # "text": response.message.content, # "proceeding": "Unknown", # "category": "Error" # } # # except json.JSONDecodeError: # return { # "title": "Error parsing response", # "text": response.message.content, # "proceeding": "Unknown", # "category": "Error" # } # except Exception as e: # return { # "title": "Unexpected error", # "text": str(e), # "proceeding": "Unknown", # "category": "Error" # } def generate_legal_position(court_decision_text, comment_input): if not isinstance(court_decision_text, str) or not court_decision_text.strip(): return { "title": "Invalid input", "text": "Court decision text is required and must be non-empty.", "status": "Error" } try: # Конфігурація моделі generation_config = { "temperature": 0, "max_output_tokens": 8192, "response_mime_type": "application/json", # Виправлено дублювання } # Ініціалізація моделі model = genai.GenerativeModel( model_name="gemini-1.5-flash", generation_config=generation_config, system_instruction=SYSTEM_PROMPT, ) content = LEGAL_POSITION_PROMPT.format( court_decision_text=court_decision_text, comment=comment_input if comment_input else "Коментар відсутній" ) # Створення сесії чату chat_session = model.start_chat(history=[]) response = chat_session.send_message(content) # Обробка відповіді parsed_response = json.loads(response.text) # Перевірка наявності обов'язкових полів if all(field in parsed_response for field in ["title", "text", "proceeding", "category"]): return parsed_response return { "title": "Error: Missing required fields in response", "text": response.text, "proceeding": "Unknown", "category": "Error" } except json.JSONDecodeError: return { "title": "Error parsing response", "text": response.text, "proceeding": "Unknown", "category": "Error" } except Exception as e: return { "title": "Unexpected error", "text": str(e), "proceeding": "Unknown", "category": "Error" } def create_gradio_interface(): async def generate_position_action(url): try: court_decision_text = extract_court_decision_text(url) legal_position_json = generate_legal_position(court_decision_text, comment_input) position_output_content = f"**Короткий зміст позиції суду за введеним рішенням:**\n *{legal_position_json['title']}*: \n{legal_position_json['text']} **Категорія:** \n{legal_position_json['category']} ({legal_position_json['proceeding']})\n\n" return position_output_content, legal_position_json except Exception as e: return f"Error during position generation: {str(e)}", None async def search_with_ai_action(legal_position_json): try: query_text = legal_position_json["title"] + ': ' + legal_position_json["text"] + ': ' + legal_position_json["proceeding"] + ': ' + legal_position_json["category"] nodes = await retriever_bm25.aretrieve(query_text) sources_output = "\n **Результати пошуку (наявні правові позиції ВСУ):** \n\n" for index, node in enumerate(nodes, start=1): source_title = node.node.metadata.get('title') doc_ids = node.node.metadata.get('doc_id') lp_ids = node.node.metadata.get('lp_id') links = get_links_html(doc_ids) links_lp = get_links_html_lp(lp_ids) sources_output += f"\n[{index}] *{source_title}* {links_lp} 👉 Score: {node.score} {links}\n" return sources_output, nodes except Exception as e: return f"Error during search: {str(e)}", None async def analyze_action(legal_position_json, question, nodes, provider, model_name): try: workflow = PrecedentAnalysisWorkflow( provider=ModelProvider(provider), model_name=ModelName(model_name) ) query = ( f"{legal_position_json['title']}: " f"{legal_position_json['text']}: " f"{legal_position_json['proceeding']}: " f"{legal_position_json['category']}" ) response_text = await query=query, question=question, nodes=nodes ) output = f"**Аналіз ШІ (модель: {model_name}):**\n{response_text}\n\n" output += "**Наявні в базі Правові Позицій Верховного Суду:**\n\n" analysis_lines = response_text.split('\n') for line in analysis_lines: if line.startswith('* ['): index = line[3:line.index(']')] node = nodes[int(index) - 1] source_node = node.node source_title = source_node.metadata.get('title', 'Невідомий заголовок') source_text_lp = node.text doc_ids = source_node.metadata.get('doc_id') lp_id = source_node.metadata.get('lp_id') links = get_links_html(doc_ids) links_lp = get_links_html_lp(lp_id) output += f"[{index}]: *{source_title}* | {source_text_lp} | {links_lp} | {links}\n\n" return output except Exception as e: return f"Error during analysis: {str(e)}" def update_model_choices(provider): if provider == ModelProvider.OPENAI.value: return gr.Dropdown(choices=[m.value for m in ModelName if m.value.startswith("gpt")]) else: return gr.Dropdown(choices=[m.value for m in ModelName if m.value.startswith("claude")]) with gr.Blocks() as app: # Далі ваш код інтерфейсу... gr.Markdown("# Аналізатор релевантних Правових Позицій Верховного Суду для нового судового рішення") with gr.Row(): comment_input = gr.Textbox(label="Коментар до формування короткого змісту судового рішення:") url_input = gr.Textbox(label="URL судового рішення:") question_input = gr.Textbox(label="Уточнююче питання для аналізу:") with gr.Row(): provider_dropdown = gr.Dropdown( choices=[p.value for p in ModelProvider], value=ModelProvider.OPENAI.value, label="Провайдер AI для аналізу", ) model_dropdown = gr.Dropdown( choices=[m.value for m in ModelName if m.value.startswith("gpt")], value=ModelName.GPT4o_MINI.value, label="Модель", ) with gr.Row(): generate_position_button = gr.Button("Генерувати короткий зміст позиції суду") search_with_ai_button = gr.Button("Пошук", interactive=False) analyze_button = gr.Button("Аналіз", interactive=False) position_output = gr.Markdown(label="Короткий зміст позиції суду за введеним рішенням") search_output = gr.Markdown(label="Результат пошуку") analysis_output = gr.Markdown(label="Результат аналізу") state_lp_json = gr.State() state_nodes = gr.State() # Підключення функцій до кнопок fn=generate_position_action, inputs=url_input, outputs=[position_output, state_lp_json] ).then( fn=lambda: gr.update(interactive=True), inputs=None, outputs=search_with_ai_button ) fn=search_with_ai_action, inputs=state_lp_json, outputs=[search_output, state_nodes] ).then( fn=lambda: gr.update(interactive=True), inputs=None, outputs=analyze_button ) fn=analyze_action, inputs=[state_lp_json, question_input, state_nodes, provider_dropdown, model_dropdown], outputs=analysis_output ) provider_dropdown.change( fn=update_model_choices, inputs=provider_dropdown, outputs=model_dropdown ) return app if __name__ == "__main__": if initialize_components(): print("Components initialized successfully!") app = create_gradio_interface() app.launch(share=True) else: print("Failed to initialize components. Please check the paths and try again.", file=sys.stderr) sys.exit(1)