DocUA's picture
Edit output (add list sources)
5fe2968
raw
history blame
10.5 kB
import os
import re
import gradio as gr
import pandas as pd
import requests
import json
import faiss
import nest_asyncio
import sys
from pathlib import Path
from bs4 import BeautifulSoup
from typing import Union, List
import asyncio
import nest_asyncio
nest_asyncio.apply()
from llama_index.core import (
StorageContext,
ServiceContext,
VectorStoreIndex,
Settings,
load_index_from_storage
)
from llama_index.llms.openai import OpenAI
from llama_index.core.llms import ChatMessage
from llama_index.core.schema import IndexNode
from llama_index.core.storage.docstore import SimpleDocumentStore
from llama_index.retrievers.bm25 import BM25Retriever
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.vector_stores.faiss import FaissVectorStore
from llama_index.core.retrievers import QueryFusionRetriever
from llama_index.core.workflow import Event, Context, Workflow, StartEvent, StopEvent, step
from llama_index.core.schema import NodeWithScore
from llama_index.core.prompts import PromptTemplate
from llama_index.core.response_synthesizers import ResponseMode, get_response_synthesizer
from prompts import CITATION_QA_TEMPLATE, CITATION_REFINE_TEMPLATE
# Constants and Settings
from dotenv import load_dotenv
# Завантажуємо налаштування з .env файлу
load_dotenv()
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
os.environ["OPENAI_API_KEY"] = OPENAI_API_KEY
# Initialize embeddings and settings
embed_model = OpenAIEmbedding(model_name="text-embedding-3-small")
Settings.embed_model = embed_model
Settings.context_window = 20000
Settings.chunk_size = 1024
Settings.similarity_top_k = 20
# Your specific persist directory
PERSIST_DIR = "/home/docsa/Legal_Position/Save_index"
# Templates remain the same...
class RetrieverEvent(Event):
"""Result of running retrieval"""
nodes: list[NodeWithScore]
class CitationQueryEngineWorkflow(Workflow):
@step
async def retrieve(self, ctx: Context, ev: StartEvent) -> Union[RetrieverEvent, None]:
query = ev.get("query")
question = ev.get("question")
if not query:
return None
await ctx.set("query", query)
await ctx.set("question", question)
nodes = retriever_fusion_faiss_bm25.retrieve(query)
return RetrieverEvent(nodes=nodes)
@step
async def synthesize(self, ctx: Context, ev: RetrieverEvent) -> StopEvent:
query = await ctx.get("query", default=None)
question = await ctx.get("question", default=None)
llm_answer = OpenAI(model="gpt-4o-mini")
synthesizer = get_response_synthesizer(
llm=llm_answer,
text_qa_template=CITATION_QA_TEMPLATE,
refine_template=CITATION_REFINE_TEMPLATE,
response_mode=ResponseMode.COMPACT,
use_async=True,
)
response = await synthesizer.asynthesize(query=query, question=question, nodes=ev.nodes)
return StopEvent(result=response)
def initialize_components():
try:
persist_path = Path(PERSIST_DIR)
if not persist_path.exists():
raise FileNotFoundError(f"Directory not found: {persist_path}")
required_files = ['docstore.json', 'bm25_retriever', 'index_faiss']
missing_files = [f for f in required_files if not (persist_path / f).exists()]
if missing_files:
raise FileNotFoundError(f"Missing required files: {', '.join(missing_files)}")
global retriever_fusion_faiss_bm25
docstore = SimpleDocumentStore.from_persist_path(str(persist_path / "docstore.json"))
bm25_retriever = BM25Retriever.from_persist_dir(str(persist_path / "bm25_retriever"))
faiss_vector_store = FaissVectorStore.from_persist_dir(str(persist_path / "index_faiss"))
storage_context_faiss = StorageContext.from_defaults(
vector_store=faiss_vector_store,
persist_dir=str(persist_path / "index_faiss")
)
index_faiss = load_index_from_storage(storage_context=storage_context_faiss)
retriever_fusion_faiss_bm25 = QueryFusionRetriever(
[
bm25_retriever,
index_faiss.as_retriever(similarity_top_k=Settings.similarity_top_k, response_mode="no_text")
],
mode="reciprocal_rerank",
similarity_top_k=Settings.similarity_top_k,
num_queries=1,
use_async=True,
)
return True
except Exception as e:
print(f"Error initializing components: {str(e)}", file=sys.stderr)
return False
# Add this function before create_gradio_interface()
async def process_court_decision(url, question, progress=gr.Progress()):
try:
# Extract text from URL
progress(0, desc="Extracting court decision text...")
court_decision_text = extract_court_decision_text(url)
# Generate legal position
progress(0.3, desc="Generating legal position...")
legal_position_json = generate_legal_position(court_decision_text, question)
# Initialize workflow
progress(0.5, desc="Initializing analysis workflow...")
w = CitationQueryEngineWorkflow(timeout=600)
# Run workflow
progress(0.7, desc="Analyzing and finding precedents...")
result = await w.run(query=legal_position_json["Legal_position"], question=question)
# Process results
progress(0.9, desc="Processing results...")
citations = re.findall(r'\[(\d+)\]', result.response)
unique_citations = sorted(set(citations), key=int)
# Prepare output
output = f"**Правова позиція нового судового рішення:**\n{legal_position_json['Title']}: {legal_position_json['Legal_position']}\n\n"
output += f"**Відповідь ШІ:**\n{result.response}\n\n"
output += "**Цитовані джерела існуючих правових позицій Верховного Суду:**\n"
# for citation in unique_citations:
# citation_index = int(citation) - 1
# if 0 <= citation_index < len(result.source_nodes):
# output += f"[{citation}]: {result.source_nodes[citation_index].node.metadata['title']}\n"
# Display each source node with its title and score in Gradio
sources_output = ""
for index, node in enumerate(result.source_nodes, start=1):
sources_output += f"[{index}] {node.metadata['title']} 👉 {node.get_score()} \n\n"
# Combine main output and sources output
final_output = output + "\n\n" + sources_output
progress(1.0, desc="Complete!")
return final_output
except Exception as e:
return f"Error processing court decision: {str(e)}"
# Also, add the extract_court_decision_text function if it's not already there
def extract_court_decision_text(url):
response = requests.get(url)
soup = BeautifulSoup(response.content, 'html.parser')
unwanted_texts = [
"Доступ до Реєстру здійснюється в тестовому (обмеженому) режимі.",
"З метою упередження перешкоджанню стабільній роботі Реєстру"
]
decision_text = ""
for paragraph in soup.find_all('p'):
text = paragraph.get_text(separator="\n").strip()
if not any(unwanted_text in text for unwanted_text in unwanted_texts):
decision_text += text + "\n"
return decision_text.strip()
# And the generate_legal_position function
def generate_legal_position(court_decision_text, user_question):
llm_lp = OpenAI(model="ft:gpt-4o-mini-2024-07-18:dochome:legal-position-100:9wSVvFmd", temperature=0)
system_prompt = """
You are a qualified lawyer tasked with creating a Legal Position based on a court decision.
Your result will be used to search for precedents in the database of existing legal positions of the Supreme Court of Ukraine.
"""
prompt = f"""To create the legal position:
1. Carefully read and analyze the Court decision.
2. Identify the key legal principle or ruling established in the decision.
3. Summarize this principle concisely, focusing on its legal implications.
4. Ensure your summary is clear, precise, and uses appropriate legal terminology.
Format your legal position following these guidelines:
- Keep it brief, ideally no more than 3-4 sentences.
- Use appropriate legal terminology.
- Do not include any additional explanations or comments.
Text content should be in Ukrainian only!
Return the result as JSON in the format:
{{
"Title": "Brief title of the legal position",
"Legal_position": "Full text of the legal position"
}}
Court decision:
{court_decision_text}
User's question:
{user_question}
"""
messages = [
ChatMessage(role="system", content=system_prompt),
ChatMessage(role="user", content=prompt),
]
response = llm_lp.chat(messages)
try:
return json.loads(response.message.content)
except json.JSONDecodeError:
# If JSON parsing fails, create a structured response
return {
"Title": "Error parsing response",
"Legal_position": response.message.content
}
# Update the create_gradio_interface function to use share=True
def create_gradio_interface():
with gr.Blocks() as app:
gr.Markdown("# Аналізатор судових рішень на основі правових позицій Верховного Суду")
with gr.Row():
url_input = gr.Textbox(label="URL судового рішення:")
question_input = gr.Textbox(label="Ваше питання:")
analyze_button = gr.Button("Аналізувати")
output = gr.Markdown(label="Результат аналізу")
analyze_button.click(
fn=lambda url, q: asyncio.run(process_court_decision(url, q)),
inputs=[url_input, question_input],
outputs=output
)
return app
if __name__ == "__main__":
if initialize_components():
print("Components initialized successfully!")
app = create_gradio_interface()
app.launch(share=True) # Added share=True here
else:
print("Failed to initialize components. Please check the paths and try again.", file=sys.stderr)
sys.exit(1)