import redis import json from redis.commands.search.field import TagField, VectorField,TextField from redis.commands.search.indexDefinition import IndexDefinition, IndexType from redis.commands.search.query import Query import logging import numpy as np import redis.commands.search from typing import List, Dict, Any, Optional import pdfplumber import google.generativeai as genai from openai import OpenAI import os import re logging.basicConfig(level=logging.INFO) logger = logging.getLogger() r = redis.Redis(host='redis', port=6379, db=0, password=None,decode_responses=True) class functions_doc: def __init__(self): self.client = OpenAI(api_key="sk-proj-YrJ4mMndLNB84kUXZ6WiT3BlbkFJJaAQjjupx7nImW0iAYcX") GENERATION_CONFIG = { "temperature": 0.2, "top_p": 0.75, "max_output_tokens": 6000, } SAFETY_SETTINGS= [ { "category": "HARM_CATEGORY_HARASSMENT", "threshold": "BLOCK_MEDIUM_AND_ABOVE" }, { "category": "HARM_CATEGORY_HATE_SPEECH", "threshold": "BLOCK_MEDIUM_AND_ABOVE" }, { "category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", "threshold": "BLOCK_MEDIUM_AND_ABOVE" }, { "category": "HARM_CATEGORY_DANGEROUS_CONTENT", "threshold": "BLOCK_MEDIUM_AND_ABOVE" }, ] # Define the default schema # Set up the model genai.configure(api_key="AIzaSyDksULr84HdEiR5ls42xo2_Wja3De3abVw") self.model = genai.GenerativeModel(model_name="gemini-1.5-pro-latest", generation_config=GENERATION_CONFIG, safety_settings=SAFETY_SETTINGS) def add_services_to_token(token: str, services: List[Dict]) -> Dict: """ Add multiple services to a token's list of services, checking if a service with the same name already exists. Each service is a dictionary with 'servicename' and 'modelname' (which could be default or empty). :param token: The unique token of the user. :param services: List of service dictionaries to add. :return: A dictionary with a message and the list of added services. """ services_key = f"token:{token}:docservices" try: existing_services = r.lrange(services_key, 0, -1) existing_service_names = [json.loads(service)['servicename'] for service in existing_services] except Exception as e: raise Exception("Failed to fetch or parse existing services: " + str(e)) if not services or not isinstance(services, list): raise ValueError("Invalid services format. It must be a list of services.") added_services = [] for service in services: if not isinstance(service, dict) or 'servicename' not in service: continue servicename = service.get('servicename') if servicename in existing_service_names: continue service_info = json.dumps({"servicename": servicename, "documents": []}) try: r.rpush(services_key, service_info) added_services.append(servicename) except Exception as e: raise Exception(f"Failed to add service {servicename}: " + str(e)) if not added_services: raise Exception("No new services were added. They may already exist or input was invalid.") return {"message": "Services successfully added.", "added_services": added_services} def remove_service_by_name(self, token, servicename): """ Remove a service entry from Redis based on the servicename, and also remove all associated documents. Parameters: token (str): Token to identify the user data. servicename (str): Name of the service to be removed. Returns: dict: Status and message of the operation. """ try: # Define the user key user_key = f"token:{token}:docservices" # Start a Redis pipeline pipe = r.pipeline() # Retrieve the length of the list list_length = r.llen(user_key) for i in range(list_length): # Retrieve each JSON string from the list service_data = r.lindex(user_key, i) if service_data: # Convert JSON string to dictionary data = json.loads(service_data) if data["servicename"] == servicename: # Remove all associated documents documents = data["documents"] document_names = [doc['documentname'] for doc in documents] print(document_names) self.remove_documents_from_service( token, servicename, document_names) # Delete each document found # Remove the JSON string from the list pipe.lrem(user_key, 0, service_data) # Execute the pipeline pipe.execute() return {"success": True, "message": f"Service {servicename} and all associated documents removed."} except Exception as e: return {"success": False, "message": str(e)} def add_and_store_document(token: str, service_name: str, document_name: str, encoded_file: bytes) -> dict: """ Adds a document to a specific service within a user's token and immediately stores the document in Redis. If the document name already exists in the service, it is not appended or stored again. :param token: The unique token of the user. :param service_name: The name of the service to which the document will be added. :param document_name: The name of the document to add. :param encoded_file: The base64 encoded file to be stored. :return: A dictionary with a message indicating the result. """ services_key = f"token:{token}:docservices" binary_key_key = f"token:{token}:{service_name}:binarykey" try: existing_services = r.lrange(services_key, 0, -1) service_found = False for i, service in enumerate(existing_services): service_data = json.loads(service) if service_data['servicename'] == service_name: service_found = True documents = service_data.get('documents', []) if any(doc['documentname'] == document_name for doc in documents): return {"message": "Document already exists in the service."} # Auto-increment binary key binary_key = r.incr(binary_key_key) # Append new document info dictionary documents.append({'documentname': document_name, 'binarykey': str(binary_key)}) service_data['documents'] = documents updated_service = json.dumps(service_data) r.lset(services_key, i, updated_service) # Store the document in Redis r.set(service_name + "_" + str(binary_key), encoded_file) logging.info("Document stored successfully in Redis.") return {"message": "Document successfully added and stored in the service."} if not service_found: return {"message": "Service not found."} except redis.RedisError as e: logging.error(f"Failed to store document: {e}") return {"status": "error", "message": str(e)} except Exception as e: logging.error(f"An error occurred: {e}") return {"status": "error", "message": "An unexpected error occurred"} def personalize_chunking(self, real_text, pdf_path, start_page, end_page): text = "" with pdfplumber.open(pdf_path) as pdf: # Only iterate over the desired page range for page_number in range(start_page - 1, end_page): page = pdf.pages[page_number] # Extract text from the page with specific tolerances text+= page.extract_text(x_tolerance=2, y_tolerance=4) return text.split(real_text) def extract_text_from_pdf(self, pdf_path, start_page, end_page): chunks = [] with pdfplumber.open(pdf_path) as pdf: # Only iterate over the desired page range for page_number in range(start_page - 1, end_page): page = pdf.pages[page_number] # Extract text from the page with specific tolerances text = page.extract_text(x_tolerance=2, y_tolerance=4) chunks.append(text) return chunks def get_document(self, token: str, service_name: str, document_name: str) -> Optional[bytes]: """ Retrieve a stored PDF file from Redis based on the token, service_name, and document_name. Each document is assumed to be stored with a unique key constructed from these parameters. """ try: # Generate a binary key based on inputs binary_key = self.get_binary_key(token=token, service_name=service_name, document_name=document_name) # Retrieve the document from Redis stored_file = r.get(service_name + "_" + str(binary_key)) if stored_file is None: # Log and handle the case where no file is found logging.info("No document found for the specified key.") return {"status": "error", "message": "No document found for the specified key"} else: # Log success logging.info("Document retrieved successfully from Redis.") return {"status": "success", "message": "Document retrieved successfully from Redis.","document":stored_file} except redis.RedisError as e: # Log the Redis error logging.error(f"Failed to retrieve document: {e}") return {"status": "error", "message":f"Failed to retrieve document: {e}"} except Exception as e: # Handle other possible exceptions logging.error(f"An error occurred: {e}") return None def get_documents_from_service(self, token: str, servicename: str) -> dict: """ Retrieve document names from a specific service within a specific token's list of services. :param token: The unique token of the user. :param servicename: The name of the service from which documents will be retrieved. :return: A dictionary with a list of documents or a message indicating the result. """ services_key = f"token:{token}:docservices" try: existing_services = r.lrange(services_key, 0, -1) for service in existing_services: service_data = json.loads(service) if service_data['servicename'] == servicename: documents = service_data.get('documents', []) return {"success": True, "documents": documents} return {"message": "Service not found."} except Exception as e: raise Exception("Failed to fetch or parse services: " + str(e)) def get_binary_key(self, token:str, service_name:str, document_name:str): result = self.get_documents_from_service(token=token, servicename=service_name) docs = result.get("documents",[]) for doc in docs: if doc['documentname']==document_name: return doc['binarykey'] return None # def remove_documents_from_service(self, token: str, service_name: str, document_names: list) -> dict: # """ # Removes multiple PDF documents from Redis and their references from a specific service within a specific token's list of services. # """ # try: # services_key = f"token:{token}:docservices" # existing_services = r.lrange(services_key, 0, -1) # updated = False # for i, service in enumerate(existing_services): # service_data = json.loads(service) # if service_data['servicename'] == service_name: # documents = service_data.get('documents', []) # new_documents = [doc for doc in documents if doc['documentname'] not in document_names] # if len(documents) != len(new_documents): # # Update the service data if any documents are removed # service_data['documents'] = new_documents # updated_service = json.dumps(service_data) # r.lset(services_key, i, updated_service) # updated = True # # Remove documents from direct Redis storage # for document_name in document_names: # binary_key = self.get_binary_key(token = token, service_name = service_name, document_name =document_name) # redis_key = service_name + "_" + str(binary_key) # if r.exists(redis_key): # r.delete(redis_key) # logging.info(f"Document with key {redis_key} removed successfully from Redis.") # if updated: # return {"status": "success", "message": "Documents removed successfully from both Redis storage and service list."} # else: # return {"status": "error", "message": "No documents found in the service list or no changes were made."} # except redis.RedisError as e: # logging.error(f"Failed to delete documents: {e}") # return {"status": "error", "message": str(e)} # except Exception as e: # logging.error(f"An error occurred: {e}") # return {"status": "error", "message": "An unexpected error occurred"} def remove_documents_from_service(self, token: str, service_name: str, document_names: List[str]) -> dict: """ Removes multiple PDF documents from Redis and their references from a specific service within a specific token's list of services. """ try: services_key = f"token:{token}:docservices" existing_services = r.lrange(services_key, 0, -1) updated = False for i, service in enumerate(existing_services): service_data = json.loads(service) if service_data['servicename'] == service_name: documents = service_data.get('documents', []) new_documents = [doc for doc in documents if doc['documentname'] not in document_names] if len(documents) != len(new_documents): # Remove documents from direct Redis storage for document_name in document_names: binary_key = self.get_binary_key(token=token, service_name=service_name, document_name=document_name) redis_key = service_name + "_" + str(binary_key) print("Redis key",redis_key) if r.exists(redis_key): r.delete(redis_key) logging.info(f"Document with key {redis_key} removed successfully from Redis.") # Update the service data if any documents are removed service_data['documents'] = new_documents updated_service = json.dumps(service_data) r.lset(services_key, i, updated_service) updated = True if updated: return {"status": "success", "message": "Documents removed successfully from both Redis storage and service list."} else: return {"status": "error", "message": "No documents found in the service list or no changes were made."} except redis.RedisError as e: logging.error(f"Failed to delete documents: {e}") return {"status": "error", "message": str(e)} except Exception as e: logging.error(f"An error occurred: {e}") return {"status": "error", "message": "An unexpected error occurred"} def get_json(self, schema, context, model,comment): prompt = "Your task is to extract information from context." var = "" if comment: var = f"""**expilcation of keys in schema**: {comment}""" instruction = f""" **JSON Format (High Priority)**: Provide the output in a properly formatted JSON structure. **Respect Schema (High Priority)**: Utilize the schema below to organize the extracted information from the context. If certain information is absent, leave the corresponding field empty. **Error Handling**: If the context does not contain sufficient information to fulfill the requirements, return the following JSON response: {{"message": "Context lacks the desired information"}}. ```json {{ {schema} }}``` {var} """ template = f""" {prompt} Consider the following: {instruction} CONTEXT: {context} """ if model=="gpt-3.5-turbo": response = self.client.chat.completions.create( model="gpt-3.5-turbo", messages=[{"role": "user", "content": template} ]) pred_reponse = response.choices[0].message.content return self.parse_json(pred_reponse) elif model=="gemini": response = self.model.generate_content(template) pred_reponse = response.text return self.parse_json(pred_reponse) def clean_and_load_json(self, s): # Remove comments s = re.sub(r'#.*?\n', '', s) # Remove trailing commas before closing brackets in lists and dictionaries s = re.sub(r',\s*\n\s*(\]|\})', r'\1', s) # Remove patterns like '\n ...\n' s = re.sub(r'\n\s*\.\.\.\n', '', s) # Remove comma before } or ] s = re.sub(r',\s*(\]|\})', r'\1', s) # Remove unnecessary whitespace s = s.strip() #print(s) # Load the cleaned JSON string return json.loads(s) def parse_json(self, s): try: json_str = json.loads(s) except: # Find the index of the first occurrence of '{\n' start_idx = s.find('{') # Find the index of the last occurrence of '\n}' end_idx = s.rfind('}') # If either index is not found, raise an error if start_idx == -1 or end_idx == -1: raise ValueError("Could not find JSON object in the provided string.") # Extract the JSON substring from start_idx to end_idx (inclusive) + 3 to include the closing '\n}' json_str = s[start_idx:end_idx+1] try: json_str = json.loads(json_str) except: json_str = self.lean_and_load_json(json_str) return json_str