import redis |
import json |
from redis.commands.search.field import TagField, VectorField,TextField |
from redis.commands.search.indexDefinition import IndexDefinition, IndexType |
from redis.commands.search.query import Query |
import logging |
import numpy as np |
import redis.commands.search |
from typing import List, Dict, Any, Optional |
import pdfplumber |
import google.generativeai as genai |
from openai import OpenAI |
import os |
import re |
logging.basicConfig(level=logging.INFO) |
logger = logging.getLogger() |
r = redis.Redis(host='redis', port=6379, db=0, password=None,decode_responses=True) |
class functions_doc: |
def __init__(self): |
self.client = OpenAI(api_key="sk-proj-YrJ4mMndLNB84kUXZ6WiT3BlbkFJJaAQjjupx7nImW0iAYcX") |
"temperature": 0.2, |
"top_p": 0.75, |
"max_output_tokens": 6000, |
} |
{ |
"threshold": "BLOCK_MEDIUM_AND_ABOVE" |
}, |
{ |
"threshold": "BLOCK_MEDIUM_AND_ABOVE" |
}, |
{ |
"threshold": "BLOCK_MEDIUM_AND_ABOVE" |
}, |
{ |
"threshold": "BLOCK_MEDIUM_AND_ABOVE" |
}, |
] |
genai.configure(api_key="AIzaSyDksULr84HdEiR5ls42xo2_Wja3De3abVw") |
self.model = genai.GenerativeModel(model_name="gemini-1.5-pro-latest", |
generation_config=GENERATION_CONFIG, |
safety_settings=SAFETY_SETTINGS) |
def add_services_to_token(token: str, services: List[Dict]) -> Dict: |
""" |
Add multiple services to a token's list of services, checking if a service with the same name already exists. |
Each service is a dictionary with 'servicename' and 'modelname' (which could be default or empty). |
:param token: The unique token of the user. |
:param services: List of service dictionaries to add. |
:return: A dictionary with a message and the list of added services. |
""" |
services_key = f"token:{token}:docservices" |
try: |
existing_services = r.lrange(services_key, 0, -1) |
existing_service_names = [json.loads(service)['servicename'] for service in existing_services] |
except Exception as e: |
raise Exception("Failed to fetch or parse existing services: " + str(e)) |
if not services or not isinstance(services, list): |
raise ValueError("Invalid services format. It must be a list of services.") |
added_services = [] |
for service in services: |
if not isinstance(service, dict) or 'servicename' not in service: |
continue |
servicename = service.get('servicename') |
if servicename in existing_service_names: |
continue |
service_info = json.dumps({"servicename": servicename, "documents": []}) |
try: |
r.rpush(services_key, service_info) |
added_services.append(servicename) |
except Exception as e: |
raise Exception(f"Failed to add service {servicename}: " + str(e)) |
if not added_services: |
raise Exception("No new services were added. They may already exist or input was invalid.") |
return {"message": "Services successfully added.", "added_services": added_services} |
def remove_service_by_name(self, token, servicename): |
""" |
Remove a service entry from Redis based on the servicename, |
and also remove all associated documents. |
Parameters: |
token (str): Token to identify the user data. |
servicename (str): Name of the service to be removed. |
Returns: |
dict: Status and message of the operation. |
""" |
try: |
user_key = f"token:{token}:docservices" |
pipe = r.pipeline() |
list_length = r.llen(user_key) |
for i in range(list_length): |
service_data = r.lindex(user_key, i) |
if service_data: |
data = json.loads(service_data) |
if data["servicename"] == servicename: |
documents = data["documents"] |
document_names = [doc['documentname'] for doc in documents] |
print(document_names) |
self.remove_documents_from_service( token, servicename, document_names) |
pipe.lrem(user_key, 0, service_data) |
pipe.execute() |
return {"success": True, "message": f"Service {servicename} and all associated documents removed."} |
except Exception as e: |
return {"success": False, "message": str(e)} |
def add_and_store_document(token: str, service_name: str, document_name: str, encoded_file: bytes) -> dict: |
""" |
Adds a document to a specific service within a user's token and immediately stores the document in Redis. |
If the document name already exists in the service, it is not appended or stored again. |
:param token: The unique token of the user. |
:param service_name: The name of the service to which the document will be added. |
:param document_name: The name of the document to add. |
:param encoded_file: The base64 encoded file to be stored. |
:return: A dictionary with a message indicating the result. |
""" |
services_key = f"token:{token}:docservices" |
binary_key_key = f"token:{token}:{service_name}:binarykey" |
try: |
existing_services = r.lrange(services_key, 0, -1) |
service_found = False |
for i, service in enumerate(existing_services): |
service_data = json.loads(service) |
if service_data['servicename'] == service_name: |
service_found = True |
documents = service_data.get('documents', []) |
if any(doc['documentname'] == document_name for doc in documents): |
return {"message": "Document already exists in the service."} |
binary_key = r.incr(binary_key_key) |
documents.append({'documentname': document_name, 'binarykey': str(binary_key)}) |
service_data['documents'] = documents |
updated_service = json.dumps(service_data) |
r.lset(services_key, i, updated_service) |
r.set(service_name + "_" + str(binary_key), encoded_file) |
logging.info("Document stored successfully in Redis.") |
return {"message": "Document successfully added and stored in the service."} |
if not service_found: |
return {"message": "Service not found."} |
except redis.RedisError as e: |
logging.error(f"Failed to store document: {e}") |
return {"status": "error", "message": str(e)} |
except Exception as e: |
logging.error(f"An error occurred: {e}") |
return {"status": "error", "message": "An unexpected error occurred"} |
def personalize_chunking(self, real_text, pdf_path, start_page, end_page): |
text = "" |
with pdfplumber.open(pdf_path) as pdf: |
for page_number in range(start_page - 1, end_page): |
page = pdf.pages[page_number] |
text+= page.extract_text(x_tolerance=2, y_tolerance=4) |
return text.split(real_text) |
def extract_text_from_pdf(self, pdf_path, start_page, end_page): |
chunks = [] |
with pdfplumber.open(pdf_path) as pdf: |
for page_number in range(start_page - 1, end_page): |
page = pdf.pages[page_number] |
text = page.extract_text(x_tolerance=2, y_tolerance=4) |
chunks.append(text) |
return chunks |
def get_document(self, token: str, service_name: str, document_name: str) -> Optional[bytes]: |
""" |
Retrieve a stored PDF file from Redis based on the token, service_name, and document_name. |
Each document is assumed to be stored with a unique key constructed from these parameters. |
""" |
try: |
binary_key = self.get_binary_key(token=token, service_name=service_name, document_name=document_name) |
stored_file = r.get(service_name + "_" + str(binary_key)) |
if stored_file is None: |
logging.info("No document found for the specified key.") |
return {"status": "error", "message": "No document found for the specified key"} |
else: |
logging.info("Document retrieved successfully from Redis.") |
return {"status": "success", "message": "Document retrieved successfully from Redis.","document":stored_file} |
except redis.RedisError as e: |
logging.error(f"Failed to retrieve document: {e}") |
return {"status": "error", "message":f"Failed to retrieve document: {e}"} |
except Exception as e: |
logging.error(f"An error occurred: {e}") |
return None |
def get_documents_from_service(self, token: str, servicename: str) -> dict: |
""" |
Retrieve document names from a specific service within a specific token's list of services. |
:param token: The unique token of the user. |
:param servicename: The name of the service from which documents will be retrieved. |
:return: A dictionary with a list of documents or a message indicating the result. |
""" |
services_key = f"token:{token}:docservices" |
try: |
existing_services = r.lrange(services_key, 0, -1) |
for service in existing_services: |
service_data = json.loads(service) |
if service_data['servicename'] == servicename: |
documents = service_data.get('documents', []) |
return {"success": True, "documents": documents} |
return {"message": "Service not found."} |
except Exception as e: |
raise Exception("Failed to fetch or parse services: " + str(e)) |
def get_binary_key(self, token:str, service_name:str, document_name:str): |
result = self.get_documents_from_service(token=token, servicename=service_name) |
docs = result.get("documents",[]) |
for doc in docs: |
if doc['documentname']==document_name: |
return doc['binarykey'] |
return None |
def remove_documents_from_service(self, token: str, service_name: str, document_names: List[str]) -> dict: |
""" |
Removes multiple PDF documents from Redis and their references from a specific service within a specific token's list of services. |
""" |
try: |
services_key = f"token:{token}:docservices" |
existing_services = r.lrange(services_key, 0, -1) |
updated = False |
for i, service in enumerate(existing_services): |
service_data = json.loads(service) |
if service_data['servicename'] == service_name: |
documents = service_data.get('documents', []) |
new_documents = [doc for doc in documents if doc['documentname'] not in document_names] |
if len(documents) != len(new_documents): |
for document_name in document_names: |
binary_key = self.get_binary_key(token=token, service_name=service_name, document_name=document_name) |
redis_key = service_name + "_" + str(binary_key) |
print("Redis key",redis_key) |
if r.exists(redis_key): |
r.delete(redis_key) |
logging.info(f"Document with key {redis_key} removed successfully from Redis.") |
service_data['documents'] = new_documents |
updated_service = json.dumps(service_data) |
r.lset(services_key, i, updated_service) |
updated = True |
if updated: |
return {"status": "success", "message": "Documents removed successfully from both Redis storage and service list."} |
else: |
return {"status": "error", "message": "No documents found in the service list or no changes were made."} |
except redis.RedisError as e: |
logging.error(f"Failed to delete documents: {e}") |
return {"status": "error", "message": str(e)} |
except Exception as e: |
logging.error(f"An error occurred: {e}") |
return {"status": "error", "message": "An unexpected error occurred"} |
def get_json(self, schema, context, model,comment): |
prompt = "Your task is to extract information from context." |
var = "" |
if comment: |
var = f"""**expilcation of keys in schema**: {comment}""" |
instruction = f""" |
**JSON Format (High Priority)**: Provide the output in a properly formatted JSON structure. |
**Respect Schema (High Priority)**: Utilize the schema below to organize the extracted information from the context. If certain information is absent, leave the corresponding field empty. |
**Error Handling**: If the context does not contain sufficient information to fulfill the requirements, return the following JSON response: {{"message": "Context lacks the desired information"}}. |
```json |
{{ |
{schema} |
}}``` |
{var} |
""" |
template = f""" |
{prompt} |
Consider the following: |
{instruction} |
{context} |
""" |
if model=="gpt-3.5-turbo": |
response = self.client.chat.completions.create( |
model="gpt-3.5-turbo", |
messages=[{"role": "user", "content": template} |
]) |
pred_reponse = response.choices[0].message.content |
return self.parse_json(pred_reponse) |
elif model=="gemini": |
response = self.model.generate_content(template) |
pred_reponse = response.text |
return self.parse_json(pred_reponse) |
def clean_and_load_json(self, s): |
s = re.sub(r'#.*?\n', '', s) |
s = re.sub(r',\s*\n\s*(\]|\})', r'\1', s) |
s = re.sub(r'\n\s*\.\.\.\n', '', s) |
s = re.sub(r',\s*(\]|\})', r'\1', s) |
s = s.strip() |
return json.loads(s) |
def parse_json(self, s): |
try: |
json_str = json.loads(s) |
except: |
start_idx = s.find('{') |
end_idx = s.rfind('}') |
if start_idx == -1 or end_idx == -1: |
raise ValueError("Could not find JSON object in the provided string.") |
json_str = s[start_idx:end_idx+1] |
try: |
json_str = json.loads(json_str) |
except: |
json_str = self.lean_and_load_json(json_str) |
return json_str |