Spaces:
Running
Running
import os | |
import re | |
import gradio as gr | |
import pandas as pd | |
import requests | |
import json | |
import faiss | |
import nest_asyncio | |
import sys | |
import boto3 | |
from pathlib import Path | |
from bs4 import BeautifulSoup | |
from typing import Union, List | |
import asyncio | |
from anthropic import Anthropic | |
from openai import OpenAI | |
import google.generativeai as genai | |
from llama_index.core import ( | |
StorageContext, | |
ServiceContext, | |
VectorStoreIndex, | |
Settings, | |
load_index_from_storage | |
) | |
from llama_index.llms.openai import OpenAI | |
from llama_index.core.llms import ChatMessage | |
from llama_index.core.schema import IndexNode | |
from llama_index.core.storage.docstore import SimpleDocumentStore | |
from llama_index.retrievers.bm25 import BM25Retriever | |
from llama_index.embeddings.openai import OpenAIEmbedding | |
# from llama_index.vector_stores.faiss import FaissVectorStore | |
from llama_index.core.retrievers import QueryFusionRetriever | |
from llama_index.core.workflow import Event, Context, Workflow, StartEvent, StopEvent, step | |
from llama_index.core.schema import NodeWithScore | |
from llama_index.core.prompts import PromptTemplate | |
from llama_index.core.response_synthesizers import ResponseMode, get_response_synthesizer | |
from prompts import SYSTEM_PROMPT, LEGAL_POSITION_PROMPT, PRECEDENT_ANALYSIS_TEMPLATE | |
from dotenv import load_dotenv | |
load_dotenv() | |
aws_access_key_id = os.getenv("AWS_ACCESS_KEY_ID") | |
aws_secret_access_key = os.getenv("AWS_SECRET_ACCESS_KEY") | |
openai_api_key = os.getenv("OPENAI_API_KEY") | |
anthropic_api_key=os.getenv("ANTHROPIC_API_KEY") | |
genai.configure(api_key=os.environ["GOOGLE_API_KEY"]) | |
embed_model = OpenAIEmbedding(model_name="text-embedding-3-small") | |
Settings.embed_model = embed_model | |
Settings.context_window = 20000 | |
Settings.chunk_size = 2048 | |
Settings.similarity_top_k = 20 | |
# Параметри S3 | |
BUCKET_NAME = "legal-position" | |
PREFIX_RETRIEVER = "Save_Index/" # Префікс для всього вмісту, який потрібно завантажити | |
LOCAL_DIR = Path("Save_Index_Local") # Локальна директорія для збереження даних з S3 | |
# Ініціалізація клієнта S3 | |
s3_client = boto3.client( | |
"s3", | |
aws_access_key_id=aws_access_key_id, | |
aws_secret_access_key=aws_secret_access_key, | |
region_name="eu-north-1" | |
) | |
# Створюємо локальну директорію, якщо вона не існує | |
LOCAL_DIR.mkdir(parents=True, exist_ok=True) | |
# Функція для завантаження файлу з S3 | |
def download_s3_file(bucket_name, s3_key, local_path): | |
s3_client.download_file(bucket_name, s3_key, str(local_path)) | |
print(f"Завантажено: {s3_key} -> {local_path}") | |
# Функція для завантаження всієї папки з S3 у локальну директорію | |
def download_s3_folder(bucket_name, prefix, local_dir): | |
response = s3_client.list_objects_v2(Bucket=bucket_name, Prefix=prefix) | |
if 'Contents' in response: | |
for obj in response['Contents']: | |
s3_key = obj['Key'] | |
# Пропускаємо "папку" (кореневий префікс) у S3 | |
if s3_key.endswith('/'): | |
continue | |
# Визначаємо локальний шлях, де буде збережений файл | |
local_file_path = local_dir / Path(s3_key).relative_to(prefix) | |
local_file_path.parent.mkdir(parents=True, exist_ok=True) # створення підкаталогів, якщо потрібно | |
# Завантажуємо файл | |
s3_client.download_file(bucket_name, s3_key, str(local_file_path)) | |
print(f"Завантажено: {s3_key} -> {local_file_path}") | |
# Перевіряємо, чи існує локальна директорія | |
if not LOCAL_DIR.exists(): | |
print(f"Локальна директорія {LOCAL_DIR} відсутня. Починаємо завантаження...") | |
LOCAL_DIR.mkdir(parents=True, exist_ok=True) # Створення директорії | |
download_s3_folder(BUCKET_NAME, PREFIX_RETRIEVER, LOCAL_DIR) | |
else: | |
print(f"Локальна директорія {LOCAL_DIR} вже існує. Завантаження пропущено.") | |
# Apply nest_asyncio to handle nested async calls | |
nest_asyncio.apply() | |
class RetrieverEvent(Event): | |
nodes: list[NodeWithScore] | |
state_lp_json = gr.State() | |
state_nodes = gr.State() | |
from enum import Enum | |
class ModelProvider(str, Enum): | |
OPENAI = "openai" | |
ANTHROPIC = "anthropic" | |
class ModelName(str, Enum): | |
# OpenAI models | |
GPT4o = "gpt-4o" | |
GPT4o_MINI = "gpt-4o-mini" | |
# Anthropic models | |
CLAUDE3_5_SONNET = "claude-3-5-sonnet-latest" | |
CLAUDE3_5_HAIKU = "claude-3-5-haiku-latest" | |
class LLMAnalyzer: | |
def __init__(self, provider: ModelProvider, model_name: ModelName): | |
self.provider = provider | |
self.model_name = model_name | |
if provider == ModelProvider.OPENAI: | |
self.client = OpenAI(model=model_name) | |
elif provider == ModelProvider.ANTHROPIC: | |
# Додаємо API ключ при ініціалізації | |
self.client = Anthropic(api_key=anthropic_api_key) | |
else: | |
raise ValueError(f"Unsupported provider: {provider}") | |
async def analyze(self, prompt: str, response_schema: dict) -> str: | |
if self.provider == ModelProvider.OPENAI: | |
return await self._analyze_with_openai(prompt, response_schema) | |
else: | |
return await self._analyze_with_anthropic(prompt, response_schema) | |
async def _analyze_with_openai(self, prompt: str, response_schema: dict) -> str: | |
messages = [ | |
ChatMessage(role="system", | |
content="Ти - кваліфікований юрист-аналітик, експерт з правових позицій Верховного Суду."), | |
ChatMessage(role="user", content=prompt) | |
] | |
# Правильний формат для response_format | |
response_format = { | |
"type": "json_schema", | |
"json_schema": { | |
"name": "relevant_positions_schema", # Додаємо обов'язкове поле name | |
"schema": response_schema | |
} | |
} | |
response = self.client.chat( | |
messages=messages, | |
response_format=response_format, | |
temperature=0 | |
) | |
return response.message.content | |
async def _analyze_with_anthropic(self, prompt: str, response_schema: dict) -> str: | |
response = self.client.messages.create( # Прибрали await | |
model=self.model_name, | |
max_tokens=2000, | |
messages=[ | |
{ | |
"role": "assistant", | |
"content": "Ти - кваліфікований юрист-аналітик, експерт з правових позицій Верховного Суду." | |
}, | |
{ | |
"role": "user", | |
"content": prompt | |
} | |
] | |
) | |
return response.content[0].text | |
class PrecedentAnalysisWorkflow(Workflow): | |
def __init__(self, provider: ModelProvider = ModelProvider.OPENAI, | |
model_name: ModelName = ModelName.GPT4o_MINI): | |
super().__init__() | |
self.analyzer = LLMAnalyzer(provider, model_name) | |
async def analyze(self, ctx: Context, ev: StartEvent) -> StopEvent: | |
try: | |
# Отримуємо параметри з події з дефолтними значеннями | |
query = ev.get("query", "") | |
question = ev.get("question", "") | |
nodes = ev.get("nodes", []) | |
# Перевірка на пусті значення | |
if not query: | |
return StopEvent(result="Помилка: Не надано текст нового рішення (query)") | |
if not nodes: | |
return StopEvent(result="Помилка: Не надано правові позиції для аналізу (nodes)") | |
# Підготовка контексту | |
context_parts = [] | |
for i, node in enumerate(nodes, 1): | |
node_text = node.node.text if hasattr(node, 'node') else node.text | |
metadata = node.node.metadata if hasattr(node, 'node') else node.metadata | |
lp_id = metadata.get('lp_id', f'unknown_{i}') | |
context_parts.append(f"Source {i} (ID: {lp_id}):\n{node_text}") | |
context_str = "\n\n".join(context_parts) | |
# Схема відповіді | |
response_schema = { | |
"type": "object", | |
"properties": { | |
"relevant_positions": { | |
"type": "array", | |
"items": { | |
"type": "object", | |
"properties": { | |
"lp_id": {"type": "string"}, | |
"source_index": {"type": "string"}, | |
"description": {"type": "string"} | |
}, | |
"required": ["lp_id", "source_index", "description"] | |
} | |
} | |
}, | |
"required": ["relevant_positions"] | |
} | |
# Формування промпту | |
prompt = PRECEDENT_ANALYSIS_TEMPLATE.format( | |
query=query, | |
question=question if question else "Загальний аналіз релевантності", | |
context_str=context_str | |
) | |
# Отримання відповіді від моделі | |
response_content = await self.analyzer.analyze(prompt, response_schema) | |
try: | |
parsed_response = json.loads(response_content) | |
if "relevant_positions" in parsed_response: | |
response_lines = [] | |
for position in parsed_response["relevant_positions"]: | |
position_text = ( | |
f"* [{position['source_index']}] {position['description']} " | |
) | |
response_lines.append(position_text) | |
response_text = "\n".join(response_lines) | |
return StopEvent(result=response_text) | |
else: | |
return StopEvent(result="Не знайдено релевантних правових позицій") | |
except json.JSONDecodeError: | |
return StopEvent(result="Помилка обробки відповіді від AI") | |
except Exception as e: | |
return StopEvent(result=f"Error during analysis: {str(e)}") | |
# Формування промпту та отримання відповіді | |
prompt = PRECEDENT_ANALYSIS_TEMPLATE.format( | |
query=query, | |
question=question if question else "Загальний аналіз релевантності", | |
context_str=context_str | |
) | |
messages = [ | |
ChatMessage(role="system", content="Ти - кваліфікований юрист-аналітик."), | |
ChatMessage(role="user", content=prompt) | |
] | |
response = llm_analyse.chat( | |
messages=messages, | |
response_format=response_format | |
) | |
try: | |
parsed_response = json.loads(response.message.content) | |
if "relevant_positions" in parsed_response: | |
# Форматуємо результат | |
response_lines = [] | |
for position in parsed_response["relevant_positions"]: | |
position_text = ( | |
f"* [{position['source_index']}]: {position['description']} " | |
) | |
response_lines.append(position_text) | |
response_text = "\n".join(response_lines) | |
return StopEvent(result=response_text) | |
else: | |
return StopEvent(result="Помилка: відповідь не містить аналізу правових позицій") | |
except json.JSONDecodeError: | |
return StopEvent(result="Помилка обробки відповіді від AI") | |
def parse_doc_ids(doc_ids): | |
if doc_ids is None: | |
return [] | |
if isinstance(doc_ids, list): | |
return [str(id).strip('[]') for id in doc_ids] | |
if isinstance(doc_ids, str): | |
cleaned = doc_ids.strip('[]').replace(' ', '') | |
if cleaned: | |
return [id.strip() for id in cleaned.split(',')] | |
return [] | |
def get_links_html(doc_ids): | |
parsed_ids = parse_doc_ids(doc_ids) | |
if not parsed_ids: | |
return "" | |
links = [f"[Рішення ВС: {doc_id}](https://reyestr.court.gov.ua/Review/{doc_id})" | |
for doc_id in parsed_ids] | |
return ", ".join(links) | |
def parse_lp_ids(lp_ids): | |
if lp_ids is None: | |
return [] | |
if isinstance(lp_ids, (str, int)): | |
cleaned = str(lp_ids).strip('[]').replace(' ', '') | |
if cleaned: | |
return [cleaned] | |
return [] | |
def get_links_html_lp(lp_ids): | |
parsed_ids = parse_lp_ids(lp_ids) | |
if not parsed_ids: | |
return "" | |
links = [f"[ПП ВС: {lp_id}](https://lpd.court.gov.ua/home/search/{lp_id})" for lp_id in parsed_ids] | |
return ", ".join(links) | |
def initialize_components(): | |
try: | |
# Використовуємо папку `Save_Index_Local`, куди завантажено файли з S3 | |
persist_path = Path("Save_Index_Local") | |
# Перевірка існування локальної директорії | |
if not persist_path.exists(): | |
raise FileNotFoundError(f"Directory not found: {persist_path}") | |
# Перевірка наявності необхідних файлів і папок | |
required_files = ['docstore_es_filter.json', 'bm25_retriever_es'] | |
missing_files = [f for f in required_files if not (persist_path / f).exists()] | |
if missing_files: | |
raise FileNotFoundError(f"Missing required files: {', '.join(missing_files)}") | |
# Ініціалізація компонентів | |
global retriever_bm25 | |
# Ініціалізація `SimpleDocumentStore` з `docstore_es_filter.json` | |
docstore = SimpleDocumentStore.from_persist_path(str(persist_path / "docstore_es_filter.json")) | |
# Ініціалізація `BM25Retriever` з папки `bm25_retriever_es` | |
bm25_retriever = BM25Retriever.from_persist_dir(str(persist_path / "bm25_retriever_es")) | |
# Ініціалізація `QueryFusionRetriever` з налаштуваннями | |
retriever_bm25 = QueryFusionRetriever( | |
[ | |
bm25_retriever, | |
], | |
similarity_top_k=Settings.similarity_top_k, | |
num_queries=1, | |
use_async=True, | |
) | |
return True | |
except Exception as e: | |
print(f"Error initializing components: {str(e)}", file=sys.stderr) | |
return False | |
def extract_court_decision_text(url): | |
response = requests.get(url) | |
soup = BeautifulSoup(response.content, 'html.parser') | |
unwanted_texts = [ | |
"Доступ до Реєстру здійснюється в тестовому (обмеженому) режимі.", | |
"З метою упередження перешкоджанню стабільній роботі Реєстру" | |
] | |
decision_text = "" | |
for paragraph in soup.find_all('p'): | |
text = paragraph.get_text(separator="\n").strip() | |
if not any(unwanted_text in text for unwanted_text in unwanted_texts): | |
decision_text += text + "\n" | |
return decision_text.strip() | |
# Constants for JSON schema | |
LEGAL_POSITION_SCHEMA = { | |
"type": "json_schema", | |
"json_schema": { | |
"name": "lp_schema", | |
"schema": { | |
"type": "object", | |
"properties": { | |
"title": {"type": "string", "description": "Title of the legal position"}, | |
"text": {"type": "string", "description": "Text of the legal position"}, | |
"proceeding": {"type": "string", "description": "Type of court proceedings"}, | |
"category": {"type": "string", "description": "Category of the legal position"}, | |
}, | |
"required": ["title", "text", "proceeding", "category"], | |
"additionalProperties": False | |
}, | |
"strict": True | |
} | |
} | |
# def generate_legal_position(court_decision_text, comment_input): | |
# try: | |
# # Ініціалізація моделі | |
# llm_lp = OpenAI( | |
# # model="ft:gpt-4o-mini-2024-07-18:personal:legal-position-400:AT3wvKsU", | |
# model="ft:gpt-4o-mini-2024-07-18:personal:legal-position-1500:Aaiu4WZd", | |
# temperature=0 | |
# ) | |
# | |
# # Формування повідомлень для чату | |
# # Формуємо контент з урахуванням коментаря | |
# content = LEGAL_POSITION_PROMPT.format( | |
# court_decision_text=court_decision_text, | |
# comment=comment_input if comment_input else "Коментар відсутній" | |
# ) | |
# | |
# # Формування повідомлень для чату | |
# messages = [ | |
# ChatMessage(role="system", content=SYSTEM_PROMPT), | |
# ChatMessage(role="user", content=content), | |
# ] | |
# | |
# # Отримання відповіді від моделі | |
# response = llm_lp.chat(messages, response_format=LEGAL_POSITION_SCHEMA) | |
# | |
# # Обробка відповіді | |
# parsed_response = json.loads(response.message.content) | |
# | |
# # Перевірка наявності обов'язкових полів | |
# if all(field in parsed_response for field in ["title", "text", "proceeding", "category"]): | |
# return parsed_response | |
# | |
# return { | |
# "title": "Error: Missing required fields in response", | |
# "text": response.message.content, | |
# "proceeding": "Unknown", | |
# "category": "Error" | |
# } | |
# | |
# except json.JSONDecodeError: | |
# return { | |
# "title": "Error parsing response", | |
# "text": response.message.content, | |
# "proceeding": "Unknown", | |
# "category": "Error" | |
# } | |
# except Exception as e: | |
# return { | |
# "title": "Unexpected error", | |
# "text": str(e), | |
# "proceeding": "Unknown", | |
# "category": "Error" | |
# } | |
def generate_legal_position(court_decision_text, comment_input): | |
if not isinstance(court_decision_text, str) or not court_decision_text.strip(): | |
return { | |
"title": "Invalid input", | |
"text": "Court decision text is required and must be non-empty.", | |
"status": "Error" | |
} | |
try: | |
# Конфігурація моделі | |
generation_config = { | |
"temperature": 0, | |
"max_output_tokens": 8192, | |
"response_mime_type": "application/json", # Виправлено дублювання | |
} | |
# Ініціалізація моделі | |
model = genai.GenerativeModel( | |
model_name="gemini-1.5-flash", | |
generation_config=generation_config, | |
system_instruction=SYSTEM_PROMPT, | |
) | |
content = LEGAL_POSITION_PROMPT.format( | |
court_decision_text=court_decision_text, | |
comment=comment_input if comment_input else "Коментар відсутній" | |
) | |
# Створення сесії чату | |
chat_session = model.start_chat(history=[]) | |
response = chat_session.send_message(content) | |
# Обробка відповіді | |
parsed_response = json.loads(response.text) | |
# Перевірка наявності обов'язкових полів | |
if all(field in parsed_response for field in ["title", "text", "proceeding", "category"]): | |
return parsed_response | |
return { | |
"title": "Error: Missing required fields in response", | |
"text": response.text, | |
"proceeding": "Unknown", | |
"category": "Error" | |
} | |
except json.JSONDecodeError: | |
return { | |
"title": "Error parsing response", | |
"text": response.text, | |
"proceeding": "Unknown", | |
"category": "Error" | |
} | |
except Exception as e: | |
return { | |
"title": "Unexpected error", | |
"text": str(e), | |
"proceeding": "Unknown", | |
"category": "Error" | |
} | |
def create_gradio_interface(): | |
async def generate_position_action(url): | |
try: | |
court_decision_text = extract_court_decision_text(url) | |
legal_position_json = generate_legal_position(court_decision_text, comment_input) | |
position_output_content = f"**Короткий зміст позиції суду за введеним рішенням:**\n *{legal_position_json['title']}*: \n{legal_position_json['text']} **Категорія:** \n{legal_position_json['category']} ({legal_position_json['proceeding']})\n\n" | |
return position_output_content, legal_position_json | |
except Exception as e: | |
return f"Error during position generation: {str(e)}", None | |
async def search_with_ai_action(legal_position_json): | |
try: | |
query_text = legal_position_json["title"] + ': ' + legal_position_json["text"] + ': ' + legal_position_json["proceeding"] + ': ' + legal_position_json["category"] | |
nodes = await retriever_bm25.aretrieve(query_text) | |
sources_output = "\n **Результати пошуку (наявні правові позиції ВСУ):** \n\n" | |
for index, node in enumerate(nodes, start=1): | |
source_title = node.node.metadata.get('title') | |
doc_ids = node.node.metadata.get('doc_id') | |
lp_ids = node.node.metadata.get('lp_id') | |
links = get_links_html(doc_ids) | |
links_lp = get_links_html_lp(lp_ids) | |
sources_output += f"\n[{index}] *{source_title}* {links_lp} 👉 Score: {node.score} {links}\n" | |
return sources_output, nodes | |
except Exception as e: | |
return f"Error during search: {str(e)}", None | |
async def analyze_action(legal_position_json, question, nodes, provider, model_name): | |
try: | |
workflow = PrecedentAnalysisWorkflow( | |
provider=ModelProvider(provider), | |
model_name=ModelName(model_name) | |
) | |
query = ( | |
f"{legal_position_json['title']}: " | |
f"{legal_position_json['text']}: " | |
f"{legal_position_json['proceeding']}: " | |
f"{legal_position_json['category']}" | |
) | |
response_text = await workflow.run( | |
query=query, | |
question=question, | |
nodes=nodes | |
) | |
output = f"**Аналіз ШІ (модель: {model_name}):**\n{response_text}\n\n" | |
output += "**Наявні в базі Правові Позицій Верховного Суду:**\n\n" | |
analysis_lines = response_text.split('\n') | |
for line in analysis_lines: | |
if line.startswith('* ['): | |
index = line[3:line.index(']')] | |
node = nodes[int(index) - 1] | |
source_node = node.node | |
source_title = source_node.metadata.get('title', 'Невідомий заголовок') | |
source_text_lp = node.text | |
doc_ids = source_node.metadata.get('doc_id') | |
lp_id = source_node.metadata.get('lp_id') | |
links = get_links_html(doc_ids) | |
links_lp = get_links_html_lp(lp_id) | |
output += f"[{index}]: *{source_title}* | {source_text_lp} | {links_lp} | {links}\n\n" | |
return output | |
except Exception as e: | |
return f"Error during analysis: {str(e)}" | |
def update_model_choices(provider): | |
if provider == ModelProvider.OPENAI.value: | |
return gr.Dropdown(choices=[m.value for m in ModelName if m.value.startswith("gpt")]) | |
else: | |
return gr.Dropdown(choices=[m.value for m in ModelName if m.value.startswith("claude")]) | |
with gr.Blocks() as app: | |
# Далі ваш код інтерфейсу... | |
gr.Markdown("# Аналізатор релевантних Правових Позицій Верховного Суду для нового судового рішення") | |
with gr.Row(): | |
comment_input = gr.Textbox(label="Коментар до формування короткого змісту судового рішення:") | |
url_input = gr.Textbox(label="URL судового рішення:") | |
question_input = gr.Textbox(label="Уточнююче питання для аналізу:") | |
with gr.Row(): | |
provider_dropdown = gr.Dropdown( | |
choices=[p.value for p in ModelProvider], | |
value=ModelProvider.OPENAI.value, | |
label="Провайдер AI", | |
) | |
model_dropdown = gr.Dropdown( | |
choices=[m.value for m in ModelName if m.value.startswith("gpt")], | |
value=ModelName.GPT4o_MINI.value, | |
label="Модель", | |
) | |
with gr.Row(): | |
generate_position_button = gr.Button("Генерувати короткий зміст позиції суду") | |
search_with_ai_button = gr.Button("Пошук із ШІ", interactive=False) | |
analyze_button = gr.Button("Аналіз", interactive=False) | |
position_output = gr.Markdown(label="Короткий зміст позиції суду за введеним рішенням") | |
search_output = gr.Markdown(label="Результат пошуку") | |
analysis_output = gr.Markdown(label="Результат аналізу") | |
state_lp_json = gr.State() | |
state_nodes = gr.State() | |
# Підключення функцій до кнопок | |
generate_position_button.click( | |
fn=generate_position_action, | |
inputs=url_input, | |
outputs=[position_output, state_lp_json] | |
).then( | |
fn=lambda: gr.update(interactive=True), | |
inputs=None, | |
outputs=search_with_ai_button | |
) | |
search_with_ai_button.click( | |
fn=search_with_ai_action, | |
inputs=state_lp_json, | |
outputs=[search_output, state_nodes] | |
).then( | |
fn=lambda: gr.update(interactive=True), | |
inputs=None, | |
outputs=analyze_button | |
) | |
analyze_button.click( | |
fn=analyze_action, | |
inputs=[state_lp_json, question_input, state_nodes, provider_dropdown, model_dropdown], | |
outputs=analysis_output | |
) | |
provider_dropdown.change( | |
fn=update_model_choices, | |
inputs=provider_dropdown, | |
outputs=model_dropdown | |
) | |
return app | |
if __name__ == "__main__": | |
if initialize_components(): | |
print("Components initialized successfully!") | |
app = create_gradio_interface() | |
app.launch(share=True) | |
else: | |
print("Failed to initialize components. Please check the paths and try again.", file=sys.stderr) | |
sys.exit(1) | |