DocUA's picture
add genai
040a3df
raw
history blame
28 kB
import os
import re
import gradio as gr
import pandas as pd
import requests
import json
import faiss
import nest_asyncio
import sys
import boto3
from pathlib import Path
from bs4 import BeautifulSoup
from typing import Union, List
import asyncio
from anthropic import Anthropic
from openai import OpenAI
import google.generativeai as genai
from llama_index.core import (
StorageContext,
ServiceContext,
VectorStoreIndex,
Settings,
load_index_from_storage
)
from llama_index.llms.openai import OpenAI
from llama_index.core.llms import ChatMessage
from llama_index.core.schema import IndexNode
from llama_index.core.storage.docstore import SimpleDocumentStore
from llama_index.retrievers.bm25 import BM25Retriever
from llama_index.embeddings.openai import OpenAIEmbedding
# from llama_index.vector_stores.faiss import FaissVectorStore
from llama_index.core.retrievers import QueryFusionRetriever
from llama_index.core.workflow import Event, Context, Workflow, StartEvent, StopEvent, step
from llama_index.core.schema import NodeWithScore
from llama_index.core.prompts import PromptTemplate
from llama_index.core.response_synthesizers import ResponseMode, get_response_synthesizer
from prompts import SYSTEM_PROMPT, LEGAL_POSITION_PROMPT, PRECEDENT_ANALYSIS_TEMPLATE
from dotenv import load_dotenv
load_dotenv()
aws_access_key_id = os.getenv("AWS_ACCESS_KEY_ID")
aws_secret_access_key = os.getenv("AWS_SECRET_ACCESS_KEY")
openai_api_key = os.getenv("OPENAI_API_KEY")
anthropic_api_key=os.getenv("ANTHROPIC_API_KEY")
genai.configure(api_key=os.environ["GOOGLE_API_KEY"])
embed_model = OpenAIEmbedding(model_name="text-embedding-3-small")
Settings.embed_model = embed_model
Settings.context_window = 20000
Settings.chunk_size = 2048
Settings.similarity_top_k = 20
# Параметри S3
BUCKET_NAME = "legal-position"
PREFIX_RETRIEVER = "Save_Index/" # Префікс для всього вмісту, який потрібно завантажити
LOCAL_DIR = Path("Save_Index_Local") # Локальна директорія для збереження даних з S3
# Ініціалізація клієнта S3
s3_client = boto3.client(
"s3",
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
region_name="eu-north-1"
)
# Створюємо локальну директорію, якщо вона не існує
LOCAL_DIR.mkdir(parents=True, exist_ok=True)
# Функція для завантаження файлу з S3
def download_s3_file(bucket_name, s3_key, local_path):
s3_client.download_file(bucket_name, s3_key, str(local_path))
print(f"Завантажено: {s3_key} -> {local_path}")
# Функція для завантаження всієї папки з S3 у локальну директорію
def download_s3_folder(bucket_name, prefix, local_dir):
response = s3_client.list_objects_v2(Bucket=bucket_name, Prefix=prefix)
if 'Contents' in response:
for obj in response['Contents']:
s3_key = obj['Key']
# Пропускаємо "папку" (кореневий префікс) у S3
if s3_key.endswith('/'):
continue
# Визначаємо локальний шлях, де буде збережений файл
local_file_path = local_dir / Path(s3_key).relative_to(prefix)
local_file_path.parent.mkdir(parents=True, exist_ok=True) # створення підкаталогів, якщо потрібно
# Завантажуємо файл
s3_client.download_file(bucket_name, s3_key, str(local_file_path))
print(f"Завантажено: {s3_key} -> {local_file_path}")
# Перевіряємо, чи існує локальна директорія
if not LOCAL_DIR.exists():
print(f"Локальна директорія {LOCAL_DIR} відсутня. Починаємо завантаження...")
LOCAL_DIR.mkdir(parents=True, exist_ok=True) # Створення директорії
download_s3_folder(BUCKET_NAME, PREFIX_RETRIEVER, LOCAL_DIR)
else:
print(f"Локальна директорія {LOCAL_DIR} вже існує. Завантаження пропущено.")
# Apply nest_asyncio to handle nested async calls
nest_asyncio.apply()
class RetrieverEvent(Event):
nodes: list[NodeWithScore]
state_lp_json = gr.State()
state_nodes = gr.State()
from enum import Enum
class ModelProvider(str, Enum):
OPENAI = "openai"
ANTHROPIC = "anthropic"
class ModelName(str, Enum):
# OpenAI models
GPT4o = "gpt-4o"
GPT4o_MINI = "gpt-4o-mini"
# Anthropic models
CLAUDE3_5_SONNET = "claude-3-5-sonnet-latest"
CLAUDE3_5_HAIKU = "claude-3-5-haiku-latest"
class LLMAnalyzer:
def __init__(self, provider: ModelProvider, model_name: ModelName):
self.provider = provider
self.model_name = model_name
if provider == ModelProvider.OPENAI:
self.client = OpenAI(model=model_name)
elif provider == ModelProvider.ANTHROPIC:
# Додаємо API ключ при ініціалізації
self.client = Anthropic(api_key=anthropic_api_key)
else:
raise ValueError(f"Unsupported provider: {provider}")
async def analyze(self, prompt: str, response_schema: dict) -> str:
if self.provider == ModelProvider.OPENAI:
return await self._analyze_with_openai(prompt, response_schema)
else:
return await self._analyze_with_anthropic(prompt, response_schema)
async def _analyze_with_openai(self, prompt: str, response_schema: dict) -> str:
messages = [
ChatMessage(role="system",
content="Ти - кваліфікований юрист-аналітик, експерт з правових позицій Верховного Суду."),
ChatMessage(role="user", content=prompt)
]
# Правильний формат для response_format
response_format = {
"type": "json_schema",
"json_schema": {
"name": "relevant_positions_schema", # Додаємо обов'язкове поле name
"schema": response_schema
}
}
response = self.client.chat(
messages=messages,
response_format=response_format,
temperature=0
)
return response.message.content
async def _analyze_with_anthropic(self, prompt: str, response_schema: dict) -> str:
response = self.client.messages.create( # Прибрали await
model=self.model_name,
max_tokens=2000,
messages=[
{
"role": "assistant",
"content": "Ти - кваліфікований юрист-аналітик, експерт з правових позицій Верховного Суду."
},
{
"role": "user",
"content": prompt
}
]
)
return response.content[0].text
class PrecedentAnalysisWorkflow(Workflow):
def __init__(self, provider: ModelProvider = ModelProvider.OPENAI,
model_name: ModelName = ModelName.GPT4o_MINI):
super().__init__()
self.analyzer = LLMAnalyzer(provider, model_name)
@step
async def analyze(self, ctx: Context, ev: StartEvent) -> StopEvent:
try:
# Отримуємо параметри з події з дефолтними значеннями
query = ev.get("query", "")
question = ev.get("question", "")
nodes = ev.get("nodes", [])
# Перевірка на пусті значення
if not query:
return StopEvent(result="Помилка: Не надано текст нового рішення (query)")
if not nodes:
return StopEvent(result="Помилка: Не надано правові позиції для аналізу (nodes)")
# Підготовка контексту
context_parts = []
for i, node in enumerate(nodes, 1):
node_text = node.node.text if hasattr(node, 'node') else node.text
metadata = node.node.metadata if hasattr(node, 'node') else node.metadata
lp_id = metadata.get('lp_id', f'unknown_{i}')
context_parts.append(f"Source {i} (ID: {lp_id}):\n{node_text}")
context_str = "\n\n".join(context_parts)
# Схема відповіді
response_schema = {
"type": "object",
"properties": {
"relevant_positions": {
"type": "array",
"items": {
"type": "object",
"properties": {
"lp_id": {"type": "string"},
"source_index": {"type": "string"},
"description": {"type": "string"}
},
"required": ["lp_id", "source_index", "description"]
}
}
},
"required": ["relevant_positions"]
}
# Формування промпту
prompt = PRECEDENT_ANALYSIS_TEMPLATE.format(
query=query,
question=question if question else "Загальний аналіз релевантності",
context_str=context_str
)
# Отримання відповіді від моделі
response_content = await self.analyzer.analyze(prompt, response_schema)
try:
parsed_response = json.loads(response_content)
if "relevant_positions" in parsed_response:
response_lines = []
for position in parsed_response["relevant_positions"]:
position_text = (
f"* [{position['source_index']}] {position['description']} "
)
response_lines.append(position_text)
response_text = "\n".join(response_lines)
return StopEvent(result=response_text)
else:
return StopEvent(result="Не знайдено релевантних правових позицій")
except json.JSONDecodeError:
return StopEvent(result="Помилка обробки відповіді від AI")
except Exception as e:
return StopEvent(result=f"Error during analysis: {str(e)}")
# Формування промпту та отримання відповіді
prompt = PRECEDENT_ANALYSIS_TEMPLATE.format(
query=query,
question=question if question else "Загальний аналіз релевантності",
context_str=context_str
)
messages = [
ChatMessage(role="system", content="Ти - кваліфікований юрист-аналітик."),
ChatMessage(role="user", content=prompt)
]
response = llm_analyse.chat(
messages=messages,
response_format=response_format
)
try:
parsed_response = json.loads(response.message.content)
if "relevant_positions" in parsed_response:
# Форматуємо результат
response_lines = []
for position in parsed_response["relevant_positions"]:
position_text = (
f"* [{position['source_index']}]: {position['description']} "
)
response_lines.append(position_text)
response_text = "\n".join(response_lines)
return StopEvent(result=response_text)
else:
return StopEvent(result="Помилка: відповідь не містить аналізу правових позицій")
except json.JSONDecodeError:
return StopEvent(result="Помилка обробки відповіді від AI")
def parse_doc_ids(doc_ids):
if doc_ids is None:
return []
if isinstance(doc_ids, list):
return [str(id).strip('[]') for id in doc_ids]
if isinstance(doc_ids, str):
cleaned = doc_ids.strip('[]').replace(' ', '')
if cleaned:
return [id.strip() for id in cleaned.split(',')]
return []
def get_links_html(doc_ids):
parsed_ids = parse_doc_ids(doc_ids)
if not parsed_ids:
return ""
links = [f"[Рішення ВС: {doc_id}](https://reyestr.court.gov.ua/Review/{doc_id})"
for doc_id in parsed_ids]
return ", ".join(links)
def parse_lp_ids(lp_ids):
if lp_ids is None:
return []
if isinstance(lp_ids, (str, int)):
cleaned = str(lp_ids).strip('[]').replace(' ', '')
if cleaned:
return [cleaned]
return []
def get_links_html_lp(lp_ids):
parsed_ids = parse_lp_ids(lp_ids)
if not parsed_ids:
return ""
links = [f"[ПП ВС: {lp_id}](https://lpd.court.gov.ua/home/search/{lp_id})" for lp_id in parsed_ids]
return ", ".join(links)
def initialize_components():
try:
# Використовуємо папку `Save_Index_Local`, куди завантажено файли з S3
persist_path = Path("Save_Index_Local")
# Перевірка існування локальної директорії
if not persist_path.exists():
raise FileNotFoundError(f"Directory not found: {persist_path}")
# Перевірка наявності необхідних файлів і папок
required_files = ['docstore_es_filter.json', 'bm25_retriever_es']
missing_files = [f for f in required_files if not (persist_path / f).exists()]
if missing_files:
raise FileNotFoundError(f"Missing required files: {', '.join(missing_files)}")
# Ініціалізація компонентів
global retriever_bm25
# Ініціалізація `SimpleDocumentStore` з `docstore_es_filter.json`
docstore = SimpleDocumentStore.from_persist_path(str(persist_path / "docstore_es_filter.json"))
# Ініціалізація `BM25Retriever` з папки `bm25_retriever_es`
bm25_retriever = BM25Retriever.from_persist_dir(str(persist_path / "bm25_retriever_es"))
# Ініціалізація `QueryFusionRetriever` з налаштуваннями
retriever_bm25 = QueryFusionRetriever(
[
bm25_retriever,
],
similarity_top_k=Settings.similarity_top_k,
num_queries=1,
use_async=True,
)
return True
except Exception as e:
print(f"Error initializing components: {str(e)}", file=sys.stderr)
return False
def extract_court_decision_text(url):
response = requests.get(url)
soup = BeautifulSoup(response.content, 'html.parser')
unwanted_texts = [
"Доступ до Реєстру здійснюється в тестовому (обмеженому) режимі.",
"З метою упередження перешкоджанню стабільній роботі Реєстру"
]
decision_text = ""
for paragraph in soup.find_all('p'):
text = paragraph.get_text(separator="\n").strip()
if not any(unwanted_text in text for unwanted_text in unwanted_texts):
decision_text += text + "\n"
return decision_text.strip()
# Constants for JSON schema
LEGAL_POSITION_SCHEMA = {
"type": "json_schema",
"json_schema": {
"name": "lp_schema",
"schema": {
"type": "object",
"properties": {
"title": {"type": "string", "description": "Title of the legal position"},
"text": {"type": "string", "description": "Text of the legal position"},
"proceeding": {"type": "string", "description": "Type of court proceedings"},
"category": {"type": "string", "description": "Category of the legal position"},
},
"required": ["title", "text", "proceeding", "category"],
"additionalProperties": False
},
"strict": True
}
}
# def generate_legal_position(court_decision_text, comment_input):
# try:
# # Ініціалізація моделі
# llm_lp = OpenAI(
# # model="ft:gpt-4o-mini-2024-07-18:personal:legal-position-400:AT3wvKsU",
# model="ft:gpt-4o-mini-2024-07-18:personal:legal-position-1500:Aaiu4WZd",
# temperature=0
# )
#
# # Формування повідомлень для чату
# # Формуємо контент з урахуванням коментаря
# content = LEGAL_POSITION_PROMPT.format(
# court_decision_text=court_decision_text,
# comment=comment_input if comment_input else "Коментар відсутній"
# )
#
# # Формування повідомлень для чату
# messages = [
# ChatMessage(role="system", content=SYSTEM_PROMPT),
# ChatMessage(role="user", content=content),
# ]
#
# # Отримання відповіді від моделі
# response = llm_lp.chat(messages, response_format=LEGAL_POSITION_SCHEMA)
#
# # Обробка відповіді
# parsed_response = json.loads(response.message.content)
#
# # Перевірка наявності обов'язкових полів
# if all(field in parsed_response for field in ["title", "text", "proceeding", "category"]):
# return parsed_response
#
# return {
# "title": "Error: Missing required fields in response",
# "text": response.message.content,
# "proceeding": "Unknown",
# "category": "Error"
# }
#
# except json.JSONDecodeError:
# return {
# "title": "Error parsing response",
# "text": response.message.content,
# "proceeding": "Unknown",
# "category": "Error"
# }
# except Exception as e:
# return {
# "title": "Unexpected error",
# "text": str(e),
# "proceeding": "Unknown",
# "category": "Error"
# }
def generate_legal_position(court_decision_text, comment_input):
if not isinstance(court_decision_text, str) or not court_decision_text.strip():
return {
"title": "Invalid input",
"text": "Court decision text is required and must be non-empty.",
"status": "Error"
}
try:
# Конфігурація моделі
generation_config = {
"temperature": 0,
"max_output_tokens": 8192,
"response_mime_type": "application/json", # Виправлено дублювання
}
# Ініціалізація моделі
model = genai.GenerativeModel(
model_name="gemini-1.5-flash",
generation_config=generation_config,
system_instruction=SYSTEM_PROMPT,
)
content = LEGAL_POSITION_PROMPT.format(
court_decision_text=court_decision_text,
comment=comment_input if comment_input else "Коментар відсутній"
)
# Створення сесії чату
chat_session = model.start_chat(history=[])
response = chat_session.send_message(content)
# Обробка відповіді
parsed_response = json.loads(response.text)
# Перевірка наявності обов'язкових полів
if all(field in parsed_response for field in ["title", "text", "proceeding", "category"]):
return parsed_response
return {
"title": "Error: Missing required fields in response",
"text": response.text,
"proceeding": "Unknown",
"category": "Error"
}
except json.JSONDecodeError:
return {
"title": "Error parsing response",
"text": response.text,
"proceeding": "Unknown",
"category": "Error"
}
except Exception as e:
return {
"title": "Unexpected error",
"text": str(e),
"proceeding": "Unknown",
"category": "Error"
}
def create_gradio_interface():
async def generate_position_action(url):
try:
court_decision_text = extract_court_decision_text(url)
legal_position_json = generate_legal_position(court_decision_text, comment_input)
position_output_content = f"**Короткий зміст позиції суду за введеним рішенням:**\n *{legal_position_json['title']}*: \n{legal_position_json['text']} **Категорія:** \n{legal_position_json['category']} ({legal_position_json['proceeding']})\n\n"
return position_output_content, legal_position_json
except Exception as e:
return f"Error during position generation: {str(e)}", None
async def search_with_ai_action(legal_position_json):
try:
query_text = legal_position_json["title"] + ': ' + legal_position_json["text"] + ': ' + legal_position_json["proceeding"] + ': ' + legal_position_json["category"]
nodes = await retriever_bm25.aretrieve(query_text)
sources_output = "\n **Результати пошуку (наявні правові позиції ВСУ):** \n\n"
for index, node in enumerate(nodes, start=1):
source_title = node.node.metadata.get('title')
doc_ids = node.node.metadata.get('doc_id')
lp_ids = node.node.metadata.get('lp_id')
links = get_links_html(doc_ids)
links_lp = get_links_html_lp(lp_ids)
sources_output += f"\n[{index}] *{source_title}* {links_lp} 👉 Score: {node.score} {links}\n"
return sources_output, nodes
except Exception as e:
return f"Error during search: {str(e)}", None
async def analyze_action(legal_position_json, question, nodes, provider, model_name):
try:
workflow = PrecedentAnalysisWorkflow(
provider=ModelProvider(provider),
model_name=ModelName(model_name)
)
query = (
f"{legal_position_json['title']}: "
f"{legal_position_json['text']}: "
f"{legal_position_json['proceeding']}: "
f"{legal_position_json['category']}"
)
response_text = await workflow.run(
query=query,
question=question,
nodes=nodes
)
output = f"**Аналіз ШІ (модель: {model_name}):**\n{response_text}\n\n"
output += "**Наявні в базі Правові Позицій Верховного Суду:**\n\n"
analysis_lines = response_text.split('\n')
for line in analysis_lines:
if line.startswith('* ['):
index = line[3:line.index(']')]
node = nodes[int(index) - 1]
source_node = node.node
source_title = source_node.metadata.get('title', 'Невідомий заголовок')
source_text_lp = node.text
doc_ids = source_node.metadata.get('doc_id')
lp_id = source_node.metadata.get('lp_id')
links = get_links_html(doc_ids)
links_lp = get_links_html_lp(lp_id)
output += f"[{index}]: *{source_title}* | {source_text_lp} | {links_lp} | {links}\n\n"
return output
except Exception as e:
return f"Error during analysis: {str(e)}"
def update_model_choices(provider):
if provider == ModelProvider.OPENAI.value:
return gr.Dropdown(choices=[m.value for m in ModelName if m.value.startswith("gpt")])
else:
return gr.Dropdown(choices=[m.value for m in ModelName if m.value.startswith("claude")])
with gr.Blocks() as app:
# Далі ваш код інтерфейсу...
gr.Markdown("# Аналізатор релевантних Правових Позицій Верховного Суду для нового судового рішення")
with gr.Row():
comment_input = gr.Textbox(label="Коментар до формування короткого змісту судового рішення:")
url_input = gr.Textbox(label="URL судового рішення:")
question_input = gr.Textbox(label="Уточнююче питання для аналізу:")
with gr.Row():
provider_dropdown = gr.Dropdown(
choices=[p.value for p in ModelProvider],
value=ModelProvider.OPENAI.value,
label="Провайдер AI",
)
model_dropdown = gr.Dropdown(
choices=[m.value for m in ModelName if m.value.startswith("gpt")],
value=ModelName.GPT4o_MINI.value,
label="Модель",
)
with gr.Row():
generate_position_button = gr.Button("Генерувати короткий зміст позиції суду")
search_with_ai_button = gr.Button("Пошук із ШІ", interactive=False)
analyze_button = gr.Button("Аналіз", interactive=False)
position_output = gr.Markdown(label="Короткий зміст позиції суду за введеним рішенням")
search_output = gr.Markdown(label="Результат пошуку")
analysis_output = gr.Markdown(label="Результат аналізу")
state_lp_json = gr.State()
state_nodes = gr.State()
# Підключення функцій до кнопок
generate_position_button.click(
fn=generate_position_action,
inputs=url_input,
outputs=[position_output, state_lp_json]
).then(
fn=lambda: gr.update(interactive=True),
inputs=None,
outputs=search_with_ai_button
)
search_with_ai_button.click(
fn=search_with_ai_action,
inputs=state_lp_json,
outputs=[search_output, state_nodes]
).then(
fn=lambda: gr.update(interactive=True),
inputs=None,
outputs=analyze_button
)
analyze_button.click(
fn=analyze_action,
inputs=[state_lp_json, question_input, state_nodes, provider_dropdown, model_dropdown],
outputs=analysis_output
)
provider_dropdown.change(
fn=update_model_choices,
inputs=provider_dropdown,
outputs=model_dropdown
)
return app
if __name__ == "__main__":
if initialize_components():
print("Components initialized successfully!")
app = create_gradio_interface()
app.launch(share=True)
else:
print("Failed to initialize components. Please check the paths and try again.", file=sys.stderr)
sys.exit(1)