|
import redis |
|
import json |
|
from redis.commands.search.field import TagField, VectorField,TextField |
|
from redis.commands.search.indexDefinition import IndexDefinition, IndexType |
|
from redis.commands.search.query import Query |
|
import logging |
|
import numpy as np |
|
import redis.commands.search |
|
from typing import List, Dict, Any, Optional |
|
import pdfplumber |
|
import google.generativeai as genai |
|
from openai import OpenAI |
|
import os |
|
import re |
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger() |
|
r = redis.Redis(host='redis', port=6379, db=0, password=None,decode_responses=True) |
|
|
|
class functions_doc: |
|
def __init__(self): |
|
self.client = OpenAI(api_key="sk-proj-YrJ4mMndLNB84kUXZ6WiT3BlbkFJJaAQjjupx7nImW0iAYcX") |
|
GENERATION_CONFIG = { |
|
"temperature": 0.2, |
|
"top_p": 0.75, |
|
"max_output_tokens": 6000, |
|
} |
|
SAFETY_SETTINGS= [ |
|
{ |
|
"category": "HARM_CATEGORY_HARASSMENT", |
|
"threshold": "BLOCK_MEDIUM_AND_ABOVE" |
|
}, |
|
{ |
|
"category": "HARM_CATEGORY_HATE_SPEECH", |
|
"threshold": "BLOCK_MEDIUM_AND_ABOVE" |
|
}, |
|
{ |
|
"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", |
|
"threshold": "BLOCK_MEDIUM_AND_ABOVE" |
|
}, |
|
{ |
|
"category": "HARM_CATEGORY_DANGEROUS_CONTENT", |
|
"threshold": "BLOCK_MEDIUM_AND_ABOVE" |
|
}, |
|
] |
|
|
|
|
|
|
|
genai.configure(api_key="AIzaSyDksULr84HdEiR5ls42xo2_Wja3De3abVw") |
|
|
|
self.model = genai.GenerativeModel(model_name="gemini-1.5-pro-latest", |
|
generation_config=GENERATION_CONFIG, |
|
safety_settings=SAFETY_SETTINGS) |
|
|
|
def add_services_to_token(token: str, services: List[Dict]) -> Dict: |
|
""" |
|
Add multiple services to a token's list of services, checking if a service with the same name already exists. |
|
Each service is a dictionary with 'servicename' and 'modelname' (which could be default or empty). |
|
|
|
:param token: The unique token of the user. |
|
:param services: List of service dictionaries to add. |
|
:return: A dictionary with a message and the list of added services. |
|
""" |
|
services_key = f"token:{token}:docservices" |
|
try: |
|
existing_services = r.lrange(services_key, 0, -1) |
|
existing_service_names = [json.loads(service)['servicename'] for service in existing_services] |
|
except Exception as e: |
|
raise Exception("Failed to fetch or parse existing services: " + str(e)) |
|
|
|
if not services or not isinstance(services, list): |
|
raise ValueError("Invalid services format. It must be a list of services.") |
|
|
|
added_services = [] |
|
for service in services: |
|
if not isinstance(service, dict) or 'servicename' not in service: |
|
continue |
|
|
|
servicename = service.get('servicename') |
|
|
|
if servicename in existing_service_names: |
|
continue |
|
|
|
service_info = json.dumps({"servicename": servicename, "documents": []}) |
|
try: |
|
r.rpush(services_key, service_info) |
|
added_services.append(servicename) |
|
except Exception as e: |
|
raise Exception(f"Failed to add service {servicename}: " + str(e)) |
|
|
|
if not added_services: |
|
raise Exception("No new services were added. They may already exist or input was invalid.") |
|
|
|
return {"message": "Services successfully added.", "added_services": added_services} |
|
|
|
def remove_service_by_name(self, token, servicename): |
|
""" |
|
Remove a service entry from Redis based on the servicename, |
|
and also remove all associated documents. |
|
|
|
Parameters: |
|
token (str): Token to identify the user data. |
|
servicename (str): Name of the service to be removed. |
|
|
|
Returns: |
|
dict: Status and message of the operation. |
|
""" |
|
try: |
|
|
|
user_key = f"token:{token}:docservices" |
|
|
|
pipe = r.pipeline() |
|
|
|
|
|
list_length = r.llen(user_key) |
|
|
|
for i in range(list_length): |
|
|
|
service_data = r.lindex(user_key, i) |
|
if service_data: |
|
|
|
data = json.loads(service_data) |
|
if data["servicename"] == servicename: |
|
|
|
documents = data["documents"] |
|
document_names = [doc['documentname'] for doc in documents] |
|
print(document_names) |
|
self.remove_documents_from_service( token, servicename, document_names) |
|
|
|
|
|
pipe.lrem(user_key, 0, service_data) |
|
|
|
|
|
pipe.execute() |
|
return {"success": True, "message": f"Service {servicename} and all associated documents removed."} |
|
except Exception as e: |
|
return {"success": False, "message": str(e)} |
|
def add_and_store_document(token: str, service_name: str, document_name: str, encoded_file: bytes) -> dict: |
|
""" |
|
Adds a document to a specific service within a user's token and immediately stores the document in Redis. |
|
If the document name already exists in the service, it is not appended or stored again. |
|
|
|
:param token: The unique token of the user. |
|
:param service_name: The name of the service to which the document will be added. |
|
:param document_name: The name of the document to add. |
|
:param encoded_file: The base64 encoded file to be stored. |
|
:return: A dictionary with a message indicating the result. |
|
""" |
|
services_key = f"token:{token}:docservices" |
|
binary_key_key = f"token:{token}:{service_name}:binarykey" |
|
|
|
try: |
|
existing_services = r.lrange(services_key, 0, -1) |
|
service_found = False |
|
for i, service in enumerate(existing_services): |
|
service_data = json.loads(service) |
|
if service_data['servicename'] == service_name: |
|
service_found = True |
|
documents = service_data.get('documents', []) |
|
|
|
if any(doc['documentname'] == document_name for doc in documents): |
|
return {"message": "Document already exists in the service."} |
|
|
|
|
|
binary_key = r.incr(binary_key_key) |
|
|
|
|
|
documents.append({'documentname': document_name, 'binarykey': str(binary_key)}) |
|
service_data['documents'] = documents |
|
updated_service = json.dumps(service_data) |
|
r.lset(services_key, i, updated_service) |
|
|
|
|
|
r.set(service_name + "_" + str(binary_key), encoded_file) |
|
logging.info("Document stored successfully in Redis.") |
|
return {"message": "Document successfully added and stored in the service."} |
|
|
|
if not service_found: |
|
return {"message": "Service not found."} |
|
|
|
except redis.RedisError as e: |
|
logging.error(f"Failed to store document: {e}") |
|
return {"status": "error", "message": str(e)} |
|
|
|
except Exception as e: |
|
logging.error(f"An error occurred: {e}") |
|
return {"status": "error", "message": "An unexpected error occurred"} |
|
def personalize_chunking(self, real_text, pdf_path, start_page, end_page): |
|
text = "" |
|
with pdfplumber.open(pdf_path) as pdf: |
|
|
|
for page_number in range(start_page - 1, end_page): |
|
page = pdf.pages[page_number] |
|
|
|
text+= page.extract_text(x_tolerance=2, y_tolerance=4) |
|
return text.split(real_text) |
|
|
|
def extract_text_from_pdf(self, pdf_path, start_page, end_page): |
|
chunks = [] |
|
with pdfplumber.open(pdf_path) as pdf: |
|
|
|
for page_number in range(start_page - 1, end_page): |
|
page = pdf.pages[page_number] |
|
|
|
text = page.extract_text(x_tolerance=2, y_tolerance=4) |
|
chunks.append(text) |
|
return chunks |
|
def get_document(self, token: str, service_name: str, document_name: str) -> Optional[bytes]: |
|
""" |
|
Retrieve a stored PDF file from Redis based on the token, service_name, and document_name. |
|
Each document is assumed to be stored with a unique key constructed from these parameters. |
|
""" |
|
|
|
try: |
|
|
|
|
|
binary_key = self.get_binary_key(token=token, service_name=service_name, document_name=document_name) |
|
|
|
|
|
stored_file = r.get(service_name + "_" + str(binary_key)) |
|
|
|
if stored_file is None: |
|
|
|
logging.info("No document found for the specified key.") |
|
return {"status": "error", "message": "No document found for the specified key"} |
|
else: |
|
|
|
logging.info("Document retrieved successfully from Redis.") |
|
return {"status": "success", "message": "Document retrieved successfully from Redis.","document":stored_file} |
|
|
|
except redis.RedisError as e: |
|
|
|
logging.error(f"Failed to retrieve document: {e}") |
|
return {"status": "error", "message":f"Failed to retrieve document: {e}"} |
|
|
|
except Exception as e: |
|
|
|
logging.error(f"An error occurred: {e}") |
|
return None |
|
def get_documents_from_service(self, token: str, servicename: str) -> dict: |
|
""" |
|
Retrieve document names from a specific service within a specific token's list of services. |
|
|
|
:param token: The unique token of the user. |
|
:param servicename: The name of the service from which documents will be retrieved. |
|
:return: A dictionary with a list of documents or a message indicating the result. |
|
""" |
|
services_key = f"token:{token}:docservices" |
|
try: |
|
existing_services = r.lrange(services_key, 0, -1) |
|
for service in existing_services: |
|
service_data = json.loads(service) |
|
if service_data['servicename'] == servicename: |
|
documents = service_data.get('documents', []) |
|
|
|
return {"success": True, "documents": documents} |
|
|
|
return {"message": "Service not found."} |
|
except Exception as e: |
|
raise Exception("Failed to fetch or parse services: " + str(e)) |
|
|
|
def get_binary_key(self, token:str, service_name:str, document_name:str): |
|
result = self.get_documents_from_service(token=token, servicename=service_name) |
|
docs = result.get("documents",[]) |
|
for doc in docs: |
|
if doc['documentname']==document_name: |
|
return doc['binarykey'] |
|
return None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def remove_documents_from_service(self, token: str, service_name: str, document_names: List[str]) -> dict: |
|
""" |
|
Removes multiple PDF documents from Redis and their references from a specific service within a specific token's list of services. |
|
""" |
|
try: |
|
services_key = f"token:{token}:docservices" |
|
existing_services = r.lrange(services_key, 0, -1) |
|
updated = False |
|
for i, service in enumerate(existing_services): |
|
service_data = json.loads(service) |
|
if service_data['servicename'] == service_name: |
|
|
|
documents = service_data.get('documents', []) |
|
new_documents = [doc for doc in documents if doc['documentname'] not in document_names] |
|
|
|
if len(documents) != len(new_documents): |
|
|
|
for document_name in document_names: |
|
binary_key = self.get_binary_key(token=token, service_name=service_name, document_name=document_name) |
|
redis_key = service_name + "_" + str(binary_key) |
|
print("Redis key",redis_key) |
|
if r.exists(redis_key): |
|
r.delete(redis_key) |
|
logging.info(f"Document with key {redis_key} removed successfully from Redis.") |
|
|
|
|
|
service_data['documents'] = new_documents |
|
updated_service = json.dumps(service_data) |
|
r.lset(services_key, i, updated_service) |
|
updated = True |
|
|
|
|
|
|
|
if updated: |
|
return {"status": "success", "message": "Documents removed successfully from both Redis storage and service list."} |
|
else: |
|
return {"status": "error", "message": "No documents found in the service list or no changes were made."} |
|
|
|
except redis.RedisError as e: |
|
logging.error(f"Failed to delete documents: {e}") |
|
return {"status": "error", "message": str(e)} |
|
|
|
except Exception as e: |
|
logging.error(f"An error occurred: {e}") |
|
return {"status": "error", "message": "An unexpected error occurred"} |
|
def get_json(self, schema, context, model,comment): |
|
prompt = "Your task is to extract information from context." |
|
var = "" |
|
if comment: |
|
var = f"""**expilcation of keys in schema**: {comment}""" |
|
instruction = f""" |
|
**JSON Format (High Priority)**: Provide the output in a properly formatted JSON structure. |
|
**Respect Schema (High Priority)**: Utilize the schema below to organize the extracted information from the context. If certain information is absent, leave the corresponding field empty. |
|
**Error Handling**: If the context does not contain sufficient information to fulfill the requirements, return the following JSON response: {{"message": "Context lacks the desired information"}}. |
|
|
|
```json |
|
{{ |
|
{schema} |
|
}}``` |
|
{var} |
|
""" |
|
template = f""" |
|
{prompt} |
|
Consider the following: |
|
{instruction} |
|
|
|
CONTEXT: |
|
{context} |
|
""" |
|
if model=="gpt-3.5-turbo": |
|
response = self.client.chat.completions.create( |
|
model="gpt-3.5-turbo", |
|
messages=[{"role": "user", "content": template} |
|
]) |
|
pred_reponse = response.choices[0].message.content |
|
return self.parse_json(pred_reponse) |
|
elif model=="gemini": |
|
|
|
response = self.model.generate_content(template) |
|
pred_reponse = response.text |
|
return self.parse_json(pred_reponse) |
|
def clean_and_load_json(self, s): |
|
|
|
s = re.sub(r'#.*?\n', '', s) |
|
|
|
|
|
s = re.sub(r',\s*\n\s*(\]|\})', r'\1', s) |
|
|
|
s = re.sub(r'\n\s*\.\.\.\n', '', s) |
|
|
|
s = re.sub(r',\s*(\]|\})', r'\1', s) |
|
|
|
s = s.strip() |
|
|
|
|
|
return json.loads(s) |
|
def parse_json(self, s): |
|
try: |
|
json_str = json.loads(s) |
|
except: |
|
|
|
start_idx = s.find('{') |
|
|
|
|
|
end_idx = s.rfind('}') |
|
|
|
|
|
if start_idx == -1 or end_idx == -1: |
|
raise ValueError("Could not find JSON object in the provided string.") |
|
|
|
|
|
json_str = s[start_idx:end_idx+1] |
|
try: |
|
json_str = json.loads(json_str) |
|
except: |
|
json_str = self.lean_and_load_json(json_str) |
|
return json_str |
|
|
|
|
|
|
|
|