import numpy as np import redis import requests from fastapi import FastAPI, HTTPException, status from typing import List from datetime import datetime from fastapi.concurrency import run_in_threadpool from fastapi import Query from srs.utils import functions_doc from fastapi import FastAPI, WebSocket, WebSocketDisconnect import asyncio from uuid import uuid4 from pydantic import BaseModel, validator from fastapi import FastAPI, HTTPException, status, Depends from pydantic import BaseModel,conlist from typing import List, Optional,Dict, Any import redis import json import logging from fastapi.responses import JSONResponse import base64 import PyPDF2 import io r = redis.Redis(host='redis', port=6379, db=0, password=None,decode_responses=True) # Instantiate your document functions class app=FastAPI() class ServiceRemovalRequest(BaseModel): token: str servicename: str class Document(BaseModel): token: str service_name: str document_name: str class ServicesResponse(BaseModel): message: str added_services: List[str] class RemoveDocumentsRequest(BaseModel): token: str service_name: str document_names: List[str] # Define request body model class Service(BaseModel): servicename: str class TokenServicesRequest(BaseModel): token: str services: List[Service] class services(BaseModel): token: str class AddDocumentRequest(BaseModel): token: str servicename: str documentname: str class StoreDocumentServicesRequest(BaseModel): token: str service_name: str document_name: str file: bytes class DocumentChunks(BaseModel): token: str service_name: str document_name: str method: str = "chunk_per_page" split_token: Optional[str] = "" start_page: int = 1 end_page: int = 1 @validator('split_token', always=True) def check_real_text(cls, v, values): method = values.get('method') if method == 'personalize_chunking' and not v: raise ValueError('split_token is required when method is personalize_chunking') return v class DocumentRespons(BaseModel): token: str service_name: str document_name: str method: str = "chunk_per_page" model : str = "gpt-3.5-turbo" schema: dict comment: Optional[dict] = {} split_token: Optional[str] = "" start_page: int = 1 end_page: int = 1 @validator('split_token', always=True) def check_real_text(cls, v, values): method = values.get('method') if method == 'personalize_chunking' and not v: raise ValueError('split_token is required when method is personalize_chunking') return v @app.post("/add_services", status_code=status.HTTP_201_CREATED) async def add_services(request: TokenServicesRequest): """ Adds a list of services to a given token. This endpoint accepts a request with a token and a list of services, attempting to add each service to the specified token. The service information must include all necessary details like name and description. Parameters: - request (TokenServicesRequest): A model containing the authorization token and a list of services to be added. Returns: - A list of dictionaries, each representing a successfully added service with its details. Raises: - HTTPException: 400 Bad Request if any value error occurs during processing, typically due to invalid input. - HTTPException: 500 Internal Server Error if any unexpected errors occur during the process. """ try: # Convert services to dicts services_dicts = [service.dict() for service in request.services] result = functions_doc.add_services_to_token(request.token, services_dicts) return result except ValueError as ve: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(ve)) except Exception as e: raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) @app.delete("/remove_service/") async def remove_service(request: ServiceRemovalRequest): """ Removes a specified service associated with a token. This endpoint allows the removal of a service by its name from a list associated with a given token. Before attempting to remove the service, it verifies that the token exists. If the token does not exist, a 404 error is returned. If the service name does not exist under the token or cannot be removed, a 400 error is raised with a specific message. Parameters: - request (ServiceRemovalRequest): A model containing the authorization token and the name of the service to be removed. Returns: - A dictionary with a success status and a message indicating the outcome of the operation. Raises: - HTTPException: 404 Not Found if the token does not exist. - HTTPException: 400 Bad Request if the service cannot be removed or does not exist under the token. - HTTPException: 500 Internal Server Error for any other unexpected errors. """ try: # Check if the token exists in Redis user_key = f"token:{request.token}:docservices" if not r.exists(user_key): raise HTTPException(status_code=404, detail="Token not found.") # If checks pass, proceed to remove the service manager_doc = functions_doc() result = manager_doc.remove_service_by_name(token=request.token, servicename=request.servicename) if result["success"]: return {"success": True, "message": result["message"]} else: raise HTTPException(status_code=400, detail=result["message"]) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.post("/add_and_store_document/", summary="Store a Document in Redis", description="Stores a document as a base64 encoded string in Redis. The document is tagged with additional metadata and associated with a unique key.") async def add_and_store_document(request:StoreDocumentServicesRequest): """ Stores a file document in Redis as a base64 encoded string with its corresponding tags and document names. Args: token (str): The unique identifier for the user. service_name (str): The service under which the document will be stored. document_name (str): The name of the document to be stored. file (UploadFile): The file document to be stored as a base64 encoded string. Returns: JSONResponse: A JSON response indicating the status of the operation ("success" or "error") and a message describing the result or the error encountered. Raises: HTTPException: An HTTP exception is raised with a status code of 400 or 500 depending on the type of error during the storing process. """ try: # Read file content as bytes #encoded_file = await request.file.read() # Store the document # doc_manager = functions_doc() response = functions_doc.add_and_store_document(token=request.token, service_name=request.service_name,document_name= request.document_name, encoded_file=request.file) return JSONResponse(status_code=200, content={"status":"success", "message":response}) except redis.RedisError as e: logging.error(f"Failed to store document: {e}") return JSONResponse(status_code=500, content={"status": "error", "message": str(e)}) except Exception as e: logging.error(f"An error occurred: {e}") return JSONResponse(status_code=500, content={"status": "error", "message": "An unexpected error occurred"}) @app.delete("/remove_documents/", summary="Remove Multiple Documents", description="Removes multiple documents from both the Redis store and the specified service list under a given token.") async def remove_documents(request: RemoveDocumentsRequest): """ Removes multiple documents from Redis storage and their references from a specified service list under a user's token. Args: request (RemoveDocumentsRequest): A Pydantic model that includes the token, the service name, and a list of document names to be removed. Returns: dict: A dictionary indicating the status of the operation ("success" or "error") and a message describing the result or error encountered. Raises: HTTPException: An HTTP exception is raised with status code 400 or 500, depending on the type of error during the document removal process. """ try: manager_doc = functions_doc() response = manager_doc.remove_documents_from_service(token = request.token, service_name = request.service_name, document_names = request.document_names) return response except Exception as e: raise HTTPException(status_code=400, detail=str(e)) @app.get("/services/", response_model=list) def retrieve_service_names(request: services) -> list: """ Endpoint to retrieve service names for a given token. :param token: The unique token of the user. :return: A list of service names associated with the token or an empty list if none are found. """ services_key = f"token:{request.token}:docservices" try: existing_services = r.lrange(services_key, 0, -1) service_names = [json.loads(service)['servicename'] for service in existing_services] return service_names if service_names else [] except Exception as e: raise HTTPException(status_code=500, detail=f"Failed to fetch or parse services: {str(e)}") @app.get("/documents/", response_model=dict) def retrieve_documents(request:ServiceRemovalRequest) -> dict: """ Endpoint to retrieve document names from a specific service for a given token. :param token: The unique token of the user. :param servicename: The name of the service from which to retrieve documents. :return: A dictionary containing a list of documents or a message if the service is not found. """ try: manager_doc = functions_doc() response = manager_doc.get_documents_from_service(token = request.token, servicename = request.servicename) return response except Exception as e: raise HTTPException(status_code=400, detail=str(e)) @app.get("/get_document/", response_model= Optional[bytes]) def get_document(request: Document) -> dict: """ Retrieves a document stored as base64-encoded bytes from a specified service for a given token. This endpoint is responsible for fetching a document by name from a specific service associated with a token. The document is expected to be stored in base64-encoded byte format. The response will include the document if available or return an appropriate error message if not found or in case of an error. Parameters: - request (Document): A model containing the authorization token, service name, and document name. Returns: - A JSON response containing the document in base64-encoded format or an error message. Raises: - HTTPException: 400 Bad Request if there's an error during the retrieval process. """ try: manager_doc = functions_doc() response = manager_doc.get_document(token=request.token, service_name=request.service_name, document_name=request.document_name) return JSONResponse(status_code=200, content=response) except Exception as e: raise HTTPException(status_code=400, detail=str(e)) @app.get("/get_num_pages/", response_model= dict) def get_num_pages(request: Document) -> dict: """ Retrieves the number of pages in a PDF document from a specified service for a given token. This endpoint fetches a document stored as a base64-encoded string, decodes it, and counts the number of pages using PyPDF2. Parameters: - request (Document): A model containing the authorization token, service name, and document name. Returns: - A JSON response with the status, message including the number of pages, and the number of pages if successful. Raises: - HTTPException: 400 Bad Request if there's an error during the retrieval or processing of the document. """ try: manager_doc = functions_doc() response = manager_doc.get_document(token=request.token, service_name=request.service_name, document_name=request.document_name) if response["status"]=="success": decoded_file = base64.b64decode(response["document"]) # Use BytesIO to create a file-like object in memory from the decoded data pdf_file_like = io.BytesIO(decoded_file) # Use PyPDF2 to read the file-like object and count the pages pdf_reader = PyPDF2.PdfReader(pdf_file_like) number_of_pages = len(pdf_reader.pages) return JSONResponse(status_code=200, content={"status": "success", "message": f"Document has {number_of_pages} pages.", "num_pages": number_of_pages}) return JSONResponse(status_code=200, content=response) except Exception as e: raise HTTPException(status_code=400, detail=str(e)) @app.get("/get_chunks/", response_model=dict) def get_chunks(request: DocumentChunks) -> dict: """ Retrieves text chunks from a specified range of pages in a document according to the chunking method. This endpoint decodes a stored document and processes it to extract text chunks from specified pages. Users must specify a valid start and end page, and a chunking method. The method can be 'chunk_per_page' for straightforward chunking by page, or 'personalize_chunking' which may use additional text parameters. Parameters: - request (DocumentChunks): A model containing the necessary details to fetch and chunk the document. Returns: - A dictionary response with the status, a message including the count of chunks, and the chunks themselves. Raises: - HTTPException: 400 Bad Request if there are parameter errors or processing fails. """ try: manager_doc = functions_doc() response = manager_doc.get_document(token=request.token, service_name=request.service_name, document_name=request.document_name) if response["status"] == "success": decoded_file = base64.b64decode(response["document"]) pdf_file_like = io.BytesIO(decoded_file) pdf_reader = PyPDF2.PdfReader(pdf_file_like) number_of_pages = len(pdf_reader.pages) if request.start_page < 1 or request.end_page > number_of_pages or request.start_page > request.end_page: raise HTTPException(status_code=400, detail="Invalid start_page or end_page.") if request.method == "chunk_per_page": chunks = manager_doc.extract_text_from_pdf(pdf_file_like, request.start_page, request.end_page) return {"status": "success", "message": f"Document has {len(chunks)} chunk(s).", "chunks": chunks} elif request.method == "personalize_chunking": # Assuming you process personalized chunking here: personalized_chunks = manager_doc.personalize_chunking(request.split_token, pdf_file_like, request.start_page, request.end_page) return {"status": "success", "message": f"Document has {len(personalized_chunks)} personalized chunk(s).", "chunks": personalized_chunks} else: raise HTTPException(status_code=400, detail="Invalid method provided.") return response except Exception as e: raise HTTPException(status_code=400, detail=str(e)) @app.get("/structure_response/", response_model=dict) def structure_response(request: DocumentRespons)-> dict: """ Retrieves and processes chunks of a document into structured JSON based on specific criteria. This endpoint decodes a stored document and processes it to extract and transform text chunks from specified pages into structured JSON format. Users must specify a valid start and end page, and a chunking method. The method can be 'chunk_per_page' for straightforward chunking by page, or 'personalize_chunking' which may use additional text parameters. The model parameter can take values like "gpt-3.5-turbo" or "gemini" for processing the chunks. The processing method and output schema are specified by the user. Parameters: - request (DocumentRespons): A model containing the details needed to fetch, chunk, and structure the document. Returns: - A dictionary response with the status and the structured JSON if successful. Raises: - HTTPException: 400 Bad Request for parameter errors or processing issues. - HTTPException: 500 Internal Server Error for any other unexpected errors. """ try: # Assuming functions_doc() returns an instance with necessary methods manager_doc = functions_doc() response = manager_doc.get_document(token=request.token, service_name=request.service_name, document_name=request.document_name) if response["status"] == "success": decoded_file = base64.b64decode(response["document"]) pdf_file_like = io.BytesIO(decoded_file) pdf_reader = PyPDF2.PdfReader(pdf_file_like) number_of_pages = len(pdf_reader.pages) if request.start_page < 1 or request.end_page > number_of_pages or request.start_page > request.end_page: raise HTTPException(status_code=400, detail="Invalid start_page or end_page.") if request.method == "chunk_per_page": chunks = manager_doc.extract_text_from_pdf(pdf_file_like, request.start_page, request.end_page) json_list = process_chunks(chunks, manager_doc, request.schema, request.model,request.comment) return {"status": "success", "json": json_list} elif request.method == "personalize_chunking": personalized_chunks = manager_doc.personalize_chunking(request.split_token, pdf_file_like, request.start_page, request.end_page) json_list = process_chunks(personalized_chunks, manager_doc, request.schema, request.model,request.comment) return {"status": "success", "json": json_list} else: raise HTTPException(status_code=400, detail="Invalid method provided.") return response except Exception as e: raise HTTPException(status_code=500, detail=str(e)) def process_chunks(chunks, manager_doc, schema, model,comment): json_list = [] for chunk in chunks: try: response = manager_doc.get_json(schema, chunk, model,comment) except Exception as e: response = {} json_list.append(response) return json_list