DocUA's picture
Deploy main.py
7e7c4ec
raw
history blame
22.2 kB
import os
import re
import gradio as gr
import pandas as pd
import requests
import json
import faiss
import nest_asyncio
import sys
import boto3
from pathlib import Path
from bs4 import BeautifulSoup
from typing import Union, List
import asyncio
from llama_index.core import (
StorageContext,
ServiceContext,
VectorStoreIndex,
Settings,
load_index_from_storage
)
from llama_index.llms.openai import OpenAI
from llama_index.core.llms import ChatMessage
from llama_index.core.schema import IndexNode
from llama_index.core.storage.docstore import SimpleDocumentStore
from llama_index.retrievers.bm25 import BM25Retriever
from llama_index.embeddings.openai import OpenAIEmbedding
# from llama_index.vector_stores.faiss import FaissVectorStore
from llama_index.core.retrievers import QueryFusionRetriever
from llama_index.core.workflow import Event, Context, Workflow, StartEvent, StopEvent, step
from llama_index.core.schema import NodeWithScore
from llama_index.core.prompts import PromptTemplate
from llama_index.core.response_synthesizers import ResponseMode, get_response_synthesizer
from prompts import CITATION_QA_TEMPLATE, CITATION_REFINE_TEMPLATE
from dotenv import load_dotenv
load_dotenv()
aws_access_key_id = os.getenv("AWS_ACCESS_KEY_ID")
aws_secret_access_key = os.getenv("AWS_SECRET_ACCESS_KEY")
openai_api_key = os.getenv("OPENAI_API_KEY")
embed_model = OpenAIEmbedding(model_name="text-embedding-3-small")
Settings.embed_model = embed_model
Settings.context_window = 20000
Settings.chunk_size = 2048
Settings.similarity_top_k = 20
# Параметри S3
BUCKET_NAME = "legal-position"
PREFIX_RETRIEVER = "Save_Index/" # Префікс для всього вмісту, який потрібно завантажити
LOCAL_DIR = Path("Save_Index_Local") # Локальна директорія для збереження даних з S3
# Ініціалізація клієнта S3
s3_client = boto3.client(
"s3",
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
region_name="eu-north-1"
)
# # Ініціалізація клієнта S3
# s3_client = boto3.client(
# "s3",
# aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"),
# aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"),
# region_name="eu-north-1"
# )
# Створюємо локальну директорію, якщо вона не існує
LOCAL_DIR.mkdir(parents=True, exist_ok=True)
# Функція для завантаження файлу з S3
def download_s3_file(bucket_name, s3_key, local_path):
s3_client.download_file(bucket_name, s3_key, str(local_path))
print(f"Завантажено: {s3_key} -> {local_path}")
# Функція для завантаження всієї папки з S3 у локальну директорію
def download_s3_folder(bucket_name, prefix, local_dir):
response = s3_client.list_objects_v2(Bucket=bucket_name, Prefix=prefix)
if 'Contents' in response:
for obj in response['Contents']:
s3_key = obj['Key']
# Пропускаємо "папку" (кореневий префікс) у S3
if s3_key.endswith('/'):
continue
# Визначаємо локальний шлях, де буде збережений файл
local_file_path = local_dir / Path(s3_key).relative_to(prefix)
local_file_path.parent.mkdir(parents=True, exist_ok=True) # створення підкаталогів, якщо потрібно
# Завантажуємо файл
s3_client.download_file(bucket_name, s3_key, str(local_file_path))
print(f"Завантажено: {s3_key} -> {local_file_path}")
# Завантаження всього вмісту папки `Save_Index` з S3 у локальну директорію `Save_Index_Local`
download_s3_folder(BUCKET_NAME, PREFIX_RETRIEVER, LOCAL_DIR)
# PERSIST_DIR = "/home/docsa/Legal_Position/Save_index"
# Apply nest_asyncio to handle nested async calls
nest_asyncio.apply()
class RetrieverEvent(Event):
nodes: list[NodeWithScore]
state_lp_json = gr.State()
state_nodes = gr.State()
class CitationQueryEngineWorkflow(Workflow):
@step
async def retrieve(self, ctx: Context, ev: StartEvent) -> Union[RetrieverEvent, None]:
query = ev.get("query")
question = ev.get("question")
nodes = ev.get("nodes") # Отримуємо nodes з події
if not query:
return None
await ctx.set("query", query)
await ctx.set("question", question)
if nodes is not None:
# Використовуємо передані nodes
return RetrieverEvent(nodes=nodes)
else:
# Якщо nodes не передані, не виконуємо додатковий пошук
return None
@step
async def synthesize(self, ctx: Context, ev: RetrieverEvent) -> StopEvent:
query = await ctx.get("query", default=None)
question = await ctx.get("question", default=None)
llm_answer = OpenAI(model="gpt-4o-mini", temperature=0)
synthesizer = get_response_synthesizer(
llm=llm_answer,
text_qa_template=CITATION_QA_TEMPLATE,
refine_template=CITATION_REFINE_TEMPLATE,
response_mode=ResponseMode.COMPACT,
use_async=True,
)
response = await synthesizer.asynthesize(query=query, question=question, nodes=ev.nodes)
return StopEvent(result=response)
def parse_doc_ids(doc_ids):
if doc_ids is None:
return []
if isinstance(doc_ids, list):
return [str(id).strip('[]') for id in doc_ids]
if isinstance(doc_ids, str):
cleaned = doc_ids.strip('[]').replace(' ', '')
if cleaned:
return [id.strip() for id in cleaned.split(',')]
return []
def get_links_html(doc_ids):
parsed_ids = parse_doc_ids(doc_ids)
if not parsed_ids:
return ""
links = [f"[Рішення ВС: {doc_id}](https://reyestr.court.gov.ua/Review/{doc_id})"
for doc_id in parsed_ids]
return ", ".join(links)
def parse_lp_ids(lp_ids):
if lp_ids is None:
return []
if isinstance(lp_ids, (str, int)):
cleaned = str(lp_ids).strip('[]').replace(' ', '')
if cleaned:
return [cleaned]
return []
def get_links_html_lp(lp_ids):
parsed_ids = parse_lp_ids(lp_ids)
if not parsed_ids:
return ""
links = [f"[ПП ВС: {lp_id}](https://lpd.court.gov.ua/home/search/{lp_id})" for lp_id in parsed_ids]
return ", ".join(links)
def initialize_components():
try:
# Використовуємо папку `Save_Index_Local`, куди завантажено файли з S3
persist_path = Path("Save_Index_Local")
# Перевірка існування локальної директорії
if not persist_path.exists():
raise FileNotFoundError(f"Directory not found: {persist_path}")
# Перевірка наявності необхідних файлів і папок
required_files = ['docstore_es_filter.json', 'bm25_retriever_es']
missing_files = [f for f in required_files if not (persist_path / f).exists()]
if missing_files:
raise FileNotFoundError(f"Missing required files: {', '.join(missing_files)}")
# Ініціалізація компонентів
global retriever_bm25
# Ініціалізація `SimpleDocumentStore` з `docstore_es_filter.json`
docstore = SimpleDocumentStore.from_persist_path(str(persist_path / "docstore_es_filter.json"))
# Ініціалізація `BM25Retriever` з папки `bm25_retriever_es`
bm25_retriever = BM25Retriever.from_persist_dir(str(persist_path / "bm25_retriever_es"))
# Ініціалізація `QueryFusionRetriever` з налаштуваннями
retriever_bm25 = QueryFusionRetriever(
[
bm25_retriever,
],
similarity_top_k=Settings.similarity_top_k,
num_queries=1,
use_async=True,
)
return True
except Exception as e:
print(f"Error initializing components: {str(e)}", file=sys.stderr)
return False
def extract_court_decision_text(url):
response = requests.get(url)
soup = BeautifulSoup(response.content, 'html.parser')
unwanted_texts = [
"Доступ до Реєстру здійснюється в тестовому (обмеженому) режимі.",
"З метою упередження перешкоджанню стабільній роботі Реєстру"
]
decision_text = ""
for paragraph in soup.find_all('p'):
text = paragraph.get_text(separator="\n").strip()
if not any(unwanted_text in text for unwanted_text in unwanted_texts):
decision_text += text + "\n"
return decision_text.strip()
def generate_legal_position(court_decision_text, user_question):
# llm_lp = OpenAI(model="gpt-4o-mini", temperature=0)
# llm_lp = OpenAI(model="ft:gpt-4o-mini-2024-07-18:personal:legal-position-100:ASPFc3vF", temperature=0)
llm_lp = OpenAI(model="ft:gpt-4o-mini-2024-07-18:personal:legal-position-400:AT3wvKsU", temperature=0)
response_format = {
"type": "json_schema",
"json_schema": {
"name": "lp_schema",
"schema": {
"type": "object",
"properties": {
"title": {"type": "string", "description": "Title of the legal position"},
"text": {"type": "string", "description": "Text of the legal position"},
"proceeding": {"type": "string", "description": "Type of court proceedings"},
"category": {"type": "string", "description": "Category of the legal position"},
},
"required": ["title", "text", "proceeding", "category"],
"additionalProperties": False
},
"strict": True
}
}
system_prompt = """
Дій як кваліфікований юрист. :
"""
prompt = f"""Дотримуйся цих інструкцій.
1. Спочатку вам буде надано текст судового рішення:
<court_decision>
{court_decision_text}
</court_decision>
2. Уважно прочитай та проаналізуй текст наданого судового рішення. Зверни увагу на:
- Юридичну суть рішення
- Основне правове обґрунтування
- Головні юридичні міркування
3. На основі аналізу сформулюй короткий зміст позиції суду, дотримуючись таких вказівок:
- Будь чіткими, точними та обґрунтованими
- Використовуй відповідну юридичну термінологію
- Зберігай стислість, але повністю передай суть судового рішення
- Уникай додаткових пояснень чи коментарів
- Спробуй узагальнювати та уникати специфічної інформації (наприклад, імен або назв) під час подачі результатів
- Використовуйте лише українську мову
4. Створи короткий заголовок, який відображає основну суть судового рішення та зазнач його категорію.
5. Додатково визнач тип судочинства, до якої відноситься дане рішення.
Використовуй лише один із цих типів: 'Адміністративне судочинство', 'Кримінальне судочинство', 'Цивільне судочинство', 'Господарське судочинство'
6. Відформатуй відповідь у форматі JSON:
{{
"title": "Заголовок судового рішення",
"text": "Текст короткого змісту позиції суду",
"proceeding": "Тип судочинства",
"category": "Категорія судового рішення"
}}
"""
messages = [
ChatMessage(role="system", content=system_prompt),
ChatMessage(role="user", content=prompt),
]
response = llm_lp.chat(messages, response_format=response_format)
try:
parsed_response = json.loads(response.message.content)
if "title" in parsed_response and "text" in parsed_response:
return parsed_response
else:
return {
"title": "Error: Missing required fields in response",
"text": response.message.content
}
except json.JSONDecodeError:
return {
"title": "Error parsing response",
"text": response.message.content
}
def create_gradio_interface():
with gr.Blocks() as app:
gr.Markdown("# Аналізатор судових рішень на основі правових позицій Верховного Суду")
with gr.Row():
url_input = gr.Textbox(label="URL судового рішення:")
question_input = gr.Textbox(label="Ваше питання:")
with gr.Row():
generate_position_button = gr.Button("Генерувати короткий зміст позиції суду")
search_with_ai_button = gr.Button("Пошук із ШІ", interactive=False)
search_without_ai_button = gr.Button("Пошук без ШІ")
analyze_button = gr.Button("Аналіз", interactive=False)
position_output = gr.Markdown(label="Короткий зміст позиції суду за введеним рішенням")
search_output = gr.Markdown(label="Результат пошуку")
analysis_output = gr.Markdown(label="Результат аналізу")
# Два об'єкти стану для зберігання legal_position_json та nodes
state_lp_json = gr.State()
state_nodes = gr.State()
async def generate_position_action(url):
try:
court_decision_text = extract_court_decision_text(url)
legal_position_json = generate_legal_position(court_decision_text, "")
position_output_content = f"**Короткий зміст позиції суду за введеним рішенням:**\n *{legal_position_json['title']}*: \n{legal_position_json['text']} **Категорія:** \n{legal_position_json['category']} ({legal_position_json['proceeding']})\n\n"
return position_output_content, legal_position_json
except Exception as e:
return f"Error during position generation: {str(e)}", None
async def search_with_ai_action(legal_position_json):
try:
query_text = legal_position_json["title"] + ': ' + legal_position_json["text"] + ': ' + legal_position_json["proceeding"] + ': ' + legal_position_json["category"]
nodes = await retriever_bm25.aretrieve(query_text)
sources_output = "\n **Результати пошуку (наявні правові позиції ВСУ):** \n\n"
for index, node in enumerate(nodes, start=1):
source_title = node.node.metadata.get('title')
doc_ids = node.node.metadata.get('doc_id')
lp_ids = node.node.metadata.get('lp_id')
links = get_links_html(doc_ids)
links_lp = get_links_html_lp(lp_ids)
sources_output += f"\n[{index}] *{source_title}* {links_lp} 👉 Score: {node.score} {links}\n"
return sources_output, nodes
except Exception as e:
return f"Error during search: {str(e)}", None
async def search_without_ai_action(url):
try:
court_decision_text = extract_court_decision_text(url)
nodes = await retriever_bm25.aretrieve(court_decision_text)
search_output_content = f"**Результати пошуку (наявні правові позиції ВСУ):** \n\n"
for index, node in enumerate(nodes, start=1):
source_title = node.node.metadata.get('title', 'Невідомий заголовок')
doc_ids = node.node.metadata.get('doc_id')
links = get_links_html(doc_ids)
search_output_content += f"\n[{index}] *{source_title}* 👉 Score: {node.score} {links}\n"
return search_output_content, nodes
except Exception as e:
return f"Error during search: {str(e)}", None
import re
import re
async def analyze_action(legal_position_json, question, nodes):
try:
workflow = CitationQueryEngineWorkflow(timeout=600)
# Запускаємо workflow і отримуємо об'єкт Response
response = await workflow.run(
query=legal_position_json["title"] + ': ' + legal_position_json["text"] + ': ' +
legal_position_json["proceeding"] + ': ' + legal_position_json["category"],
question=question,
nodes=nodes # Передаємо nodes у workflow
)
# Отримуємо текст відповіді з об'єкта Response
response_text = str(response)
# Обробка цитат у тексті відповіді
citations = re.findall(r'\[(\d+)\]', response_text)
unique_citations = sorted(set(citations), key=int)
output = f"**Аналіз Штучного Інтелекту:**\n{response_text}\n\n"
output += "**Цитовані джерела існуючих правових позицій Верховного Суду:**\n"
# Перевіряємо наявність source_nodes в об'єкті Response
source_nodes = getattr(response, 'source_nodes', [])
# Проходимо по унікальних цитатах та зіставляємо з `lp_id` у source_nodes
for citation in unique_citations:
found = False # Змінна для відстеження, чи знайдено джерело для lp_id
for index, source_node_with_score in enumerate(source_nodes, start=1):
source_node = source_node_with_score.node
lp_id = source_node.metadata.get('lp_id') # Отримуємо lp_id із метаданих джерела
# Якщо lp_id збігається з цитатою
if str(lp_id) == citation:
found = True
source_title = source_node.metadata.get('title', 'Невідомий заголовок')
doc_ids = source_node.metadata.get('doc_id')
links = get_links_html(doc_ids)
links_lp = get_links_html_lp(lp_id)
# Використовуємо `index` як номер джерела на початку рядка
output += f"[{index}]: *{source_title}* {links_lp} 👉 Score: {source_node_with_score.score} {links}\n"
break # Вихід із циклу при знайденому відповідному джерелі
if not found:
output += f"[{citation}]: Немає відповідного джерела для lp_id {citation}\n"
return output
except Exception as e:
return f"Error during analysis: {str(e)}"
# Підключаємо функції до кнопок з оновленими входами та виходами
generate_position_button.click(
fn=generate_position_action,
inputs=url_input,
outputs=[position_output, state_lp_json]
)
generate_position_button.click(
fn=lambda: gr.update(interactive=True),
inputs=None,
outputs=search_with_ai_button
)
search_with_ai_button.click(
fn=search_with_ai_action,
inputs=state_lp_json,
outputs=[search_output, state_nodes]
)
search_with_ai_button.click(
fn=lambda: gr.update(interactive=True),
inputs=None,
outputs=analyze_button
)
search_without_ai_button.click(
fn=search_without_ai_action,
inputs=url_input,
outputs=[search_output, state_nodes]
)
search_without_ai_button.click(
fn=lambda: gr.update(interactive=True),
inputs=None,
outputs=analyze_button
)
analyze_button.click(
fn=analyze_action,
inputs=[state_lp_json, question_input, state_nodes],
outputs=analysis_output
)
return app
if __name__ == "__main__":
if initialize_components():
print("Components initialized successfully!")
app = create_gradio_interface()
app.launch(share=True)
else:
print("Failed to initialize components. Please check the paths and try again.", file=sys.stderr)
sys.exit(1)