import numpy as np |
import redis |
import requests |
from fastapi import FastAPI, HTTPException, status |
from typing import List |
from datetime import datetime |
from fastapi.concurrency import run_in_threadpool |
from fastapi import Query |
from srs.utils import functions_doc |
from fastapi import FastAPI, WebSocket, WebSocketDisconnect |
import asyncio |
from uuid import uuid4 |
from pydantic import BaseModel, validator |
from fastapi import FastAPI, HTTPException, status, Depends |
from pydantic import BaseModel,conlist |
from typing import List, Optional,Dict, Any |
import redis |
import json |
import logging |
from fastapi.responses import JSONResponse |
import base64 |
import PyPDF2 |
import io |
r = redis.Redis(host='redis', port=6379, db=0, password=None,decode_responses=True) |
app=FastAPI() |
class ServiceRemovalRequest(BaseModel): |
token: str |
servicename: str |
class Document(BaseModel): |
token: str |
service_name: str |
document_name: str |
class ServicesResponse(BaseModel): |
message: str |
added_services: List[str] |
class RemoveDocumentsRequest(BaseModel): |
token: str |
service_name: str |
document_names: List[str] |
class Service(BaseModel): |
servicename: str |
class TokenServicesRequest(BaseModel): |
token: str |
services: List[Service] |
class services(BaseModel): |
token: str |
class AddDocumentRequest(BaseModel): |
token: str |
servicename: str |
documentname: str |
class StoreDocumentServicesRequest(BaseModel): |
token: str |
service_name: str |
document_name: str |
file: bytes |
class DocumentChunks(BaseModel): |
token: str |
service_name: str |
document_name: str |
method: str = "chunk_per_page" |
split_token: Optional[str] = "" |
start_page: int = 1 |
end_page: int = 1 |
@validator('split_token', always=True) |
def check_real_text(cls, v, values): |
method = values.get('method') |
if method == 'personalize_chunking' and not v: |
raise ValueError('split_token is required when method is personalize_chunking') |
return v |
class DocumentRespons(BaseModel): |
token: str |
service_name: str |
document_name: str |
method: str = "chunk_per_page" |
model : str = "gpt-3.5-turbo" |
schema: dict |
comment: Optional[dict] = {} |
split_token: Optional[str] = "" |
start_page: int = 1 |
end_page: int = 1 |
@validator('split_token', always=True) |
def check_real_text(cls, v, values): |
method = values.get('method') |
if method == 'personalize_chunking' and not v: |
raise ValueError('split_token is required when method is personalize_chunking') |
return v |
@app.post("/add_services", status_code=status.HTTP_201_CREATED) |
async def add_services(request: TokenServicesRequest): |
""" |
Adds a list of services to a given token. |
This endpoint accepts a request with a token and a list of services, attempting to add each service to the specified token. |
The service information must include all necessary details like name and description. |
Parameters: |
- request (TokenServicesRequest): A model containing the authorization token and a list of services to be added. |
Returns: |
- A list of dictionaries, each representing a successfully added service with its details. |
Raises: |
- HTTPException: 400 Bad Request if any value error occurs during processing, typically due to invalid input. |
- HTTPException: 500 Internal Server Error if any unexpected errors occur during the process. |
""" |
try: |
services_dicts = [service.dict() for service in request.services] |
result = functions_doc.add_services_to_token(request.token, services_dicts) |
return result |
except ValueError as ve: |
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(ve)) |
except Exception as e: |
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) |
@app.delete("/remove_service/") |
async def remove_service(request: ServiceRemovalRequest): |
""" |
Removes a specified service associated with a token. |
This endpoint allows the removal of a service by its name from a list associated with a given token. |
Before attempting to remove the service, it verifies that the token exists. If the token does not exist, |
a 404 error is returned. If the service name does not exist under the token or cannot be removed, |
a 400 error is raised with a specific message. |
Parameters: |
- request (ServiceRemovalRequest): A model containing the authorization token and the name of the service to be removed. |
Returns: |
- A dictionary with a success status and a message indicating the outcome of the operation. |
Raises: |
- HTTPException: 404 Not Found if the token does not exist. |
- HTTPException: 400 Bad Request if the service cannot be removed or does not exist under the token. |
- HTTPException: 500 Internal Server Error for any other unexpected errors. |
""" |
try: |
user_key = f"token:{request.token}:docservices" |
if not r.exists(user_key): |
raise HTTPException(status_code=404, detail="Token not found.") |
manager_doc = functions_doc() |
result = manager_doc.remove_service_by_name(token=request.token, servicename=request.servicename) |
if result["success"]: |
return {"success": True, "message": result["message"]} |
else: |
raise HTTPException(status_code=400, detail=result["message"]) |
except Exception as e: |
raise HTTPException(status_code=500, detail=str(e)) |
@app.post("/add_and_store_document/", summary="Store a Document in Redis", |
description="Stores a document as a base64 encoded string in Redis. The document is tagged with additional metadata and associated with a unique key.") |
async def add_and_store_document(request:StoreDocumentServicesRequest): |
""" |
Stores a file document in Redis as a base64 encoded string with its corresponding tags and document names. |
Args: |
token (str): The unique identifier for the user. |
service_name (str): The service under which the document will be stored. |
document_name (str): The name of the document to be stored. |
file (UploadFile): The file document to be stored as a base64 encoded string. |
Returns: |
JSONResponse: A JSON response indicating the status of the operation ("success" or "error") and a message describing the result or the error encountered. |
Raises: |
HTTPException: An HTTP exception is raised with a status code of 400 or 500 depending on the type of error during the storing process. |
""" |
try: |
response = functions_doc.add_and_store_document(token=request.token, service_name=request.service_name,document_name= request.document_name, encoded_file=request.file) |
return JSONResponse(status_code=200, content={"status":"success", "message":response}) |
except redis.RedisError as e: |
logging.error(f"Failed to store document: {e}") |
return JSONResponse(status_code=500, content={"status": "error", "message": str(e)}) |
except Exception as e: |
logging.error(f"An error occurred: {e}") |
return JSONResponse(status_code=500, content={"status": "error", "message": "An unexpected error occurred"}) |
@app.delete("/remove_documents/", summary="Remove Multiple Documents", |
description="Removes multiple documents from both the Redis store and the specified service list under a given token.") |
async def remove_documents(request: RemoveDocumentsRequest): |
""" |
Removes multiple documents from Redis storage and their references from a specified service list under a user's token. |
Args: |
request (RemoveDocumentsRequest): A Pydantic model that includes the token, the service name, and a list of document names to be removed. |
Returns: |
dict: A dictionary indicating the status of the operation ("success" or "error") and a message describing the result or error encountered. |
Raises: |
HTTPException: An HTTP exception is raised with status code 400 or 500, depending on the type of error during the document removal process. |
""" |
try: |
manager_doc = functions_doc() |
response = manager_doc.remove_documents_from_service(token = request.token, service_name = request.service_name, document_names = request.document_names) |
return response |
except Exception as e: |
raise HTTPException(status_code=400, detail=str(e)) |
@app.get("/services/", response_model=list) |
def retrieve_service_names(request: services) -> list: |
""" |
Endpoint to retrieve service names for a given token. |
:param token: The unique token of the user. |
:return: A list of service names associated with the token or an empty list if none are found. |
""" |
services_key = f"token:{request.token}:docservices" |
try: |
existing_services = r.lrange(services_key, 0, -1) |
service_names = [json.loads(service)['servicename'] for service in existing_services] |
return service_names if service_names else [] |
except Exception as e: |
raise HTTPException(status_code=500, detail=f"Failed to fetch or parse services: {str(e)}") |
@app.get("/documents/", response_model=dict) |
def retrieve_documents(request:ServiceRemovalRequest) -> dict: |
""" |
Endpoint to retrieve document names from a specific service for a given token. |
:param token: The unique token of the user. |
:param servicename: The name of the service from which to retrieve documents. |
:return: A dictionary containing a list of documents or a message if the service is not found. |
""" |
try: |
manager_doc = functions_doc() |
response = manager_doc.get_documents_from_service(token = request.token, servicename = request.servicename) |
return response |
except Exception as e: |
raise HTTPException(status_code=400, detail=str(e)) |
@app.get("/get_document/", response_model= Optional[bytes]) |
def get_document(request: Document) -> dict: |
""" |
Retrieves a document stored as base64-encoded bytes from a specified service for a given token. |
This endpoint is responsible for fetching a document by name from a specific service associated with a token. |
The document is expected to be stored in base64-encoded byte format. The response will include the document |
if available or return an appropriate error message if not found or in case of an error. |
Parameters: |
- request (Document): A model containing the authorization token, service name, and document name. |
Returns: |
- A JSON response containing the document in base64-encoded format or an error message. |
Raises: |
- HTTPException: 400 Bad Request if there's an error during the retrieval process. |
""" |
try: |
manager_doc = functions_doc() |
response = manager_doc.get_document(token=request.token, service_name=request.service_name, document_name=request.document_name) |
return JSONResponse(status_code=200, content=response) |
except Exception as e: |
raise HTTPException(status_code=400, detail=str(e)) |
@app.get("/get_num_pages/", response_model= dict) |
def get_num_pages(request: Document) -> dict: |
""" |
Retrieves the number of pages in a PDF document from a specified service for a given token. |
This endpoint fetches a document stored as a base64-encoded string, decodes it, and counts the number of pages using PyPDF2. |
Parameters: |
- request (Document): A model containing the authorization token, service name, and document name. |
Returns: |
- A JSON response with the status, message including the number of pages, and the number of pages if successful. |
Raises: |
- HTTPException: 400 Bad Request if there's an error during the retrieval or processing of the document. |
""" |
try: |
manager_doc = functions_doc() |
response = manager_doc.get_document(token=request.token, service_name=request.service_name, document_name=request.document_name) |
if response["status"]=="success": |
decoded_file = base64.b64decode(response["document"]) |
pdf_file_like = io.BytesIO(decoded_file) |
pdf_reader = PyPDF2.PdfReader(pdf_file_like) |
number_of_pages = len(pdf_reader.pages) |
return JSONResponse(status_code=200, content={"status": "success", "message": f"Document has {number_of_pages} pages.", "num_pages": number_of_pages}) |
return JSONResponse(status_code=200, content=response) |
except Exception as e: |
raise HTTPException(status_code=400, detail=str(e)) |
@app.get("/get_chunks/", response_model=dict) |
def get_chunks(request: DocumentChunks) -> dict: |
""" |
Retrieves text chunks from a specified range of pages in a document according to the chunking method. |
This endpoint decodes a stored document and processes it to extract text chunks from specified pages. |
Users must specify a valid start and end page, and a chunking method. The method can be 'chunk_per_page' |
for straightforward chunking by page, or 'personalize_chunking' which may use additional text parameters. |
Parameters: |
- request (DocumentChunks): A model containing the necessary details to fetch and chunk the document. |
Returns: |
- A dictionary response with the status, a message including the count of chunks, and the chunks themselves. |
Raises: |
- HTTPException: 400 Bad Request if there are parameter errors or processing fails. |
""" |
try: |
manager_doc = functions_doc() |
response = manager_doc.get_document(token=request.token, service_name=request.service_name, document_name=request.document_name) |
if response["status"] == "success": |
decoded_file = base64.b64decode(response["document"]) |
pdf_file_like = io.BytesIO(decoded_file) |
pdf_reader = PyPDF2.PdfReader(pdf_file_like) |
number_of_pages = len(pdf_reader.pages) |
if request.start_page < 1 or request.end_page > number_of_pages or request.start_page > request.end_page: |
raise HTTPException(status_code=400, detail="Invalid start_page or end_page.") |
if request.method == "chunk_per_page": |
chunks = manager_doc.extract_text_from_pdf(pdf_file_like, request.start_page, request.end_page) |
return {"status": "success", "message": f"Document has {len(chunks)} chunk(s).", "chunks": chunks} |
elif request.method == "personalize_chunking": |
personalized_chunks = manager_doc.personalize_chunking(request.split_token, pdf_file_like, request.start_page, request.end_page) |
return {"status": "success", "message": f"Document has {len(personalized_chunks)} personalized chunk(s).", "chunks": personalized_chunks} |
else: |
raise HTTPException(status_code=400, detail="Invalid method provided.") |
return response |
except Exception as e: |
raise HTTPException(status_code=400, detail=str(e)) |
@app.get("/structure_response/", response_model=dict) |
def structure_response(request: DocumentRespons)-> dict: |
""" |
Retrieves and processes chunks of a document into structured JSON based on specific criteria. |
This endpoint decodes a stored document and processes it to extract and transform text chunks |
from specified pages into structured JSON format. Users must specify a valid start and end page, |
and a chunking method. The method can be 'chunk_per_page' for straightforward chunking by page, |
or 'personalize_chunking' which may use additional text parameters. The model parameter can take |
values like "gpt-3.5-turbo" or "gemini" for processing the chunks. The processing method and output |
schema are specified by the user. |
Parameters: |
- request (DocumentRespons): A model containing the details needed to fetch, chunk, and structure the document. |
Returns: |
- A dictionary response with the status and the structured JSON if successful. |
Raises: |
- HTTPException: 400 Bad Request for parameter errors or processing issues. |
- HTTPException: 500 Internal Server Error for any other unexpected errors. |
""" |
try: |
manager_doc = functions_doc() |
response = manager_doc.get_document(token=request.token, service_name=request.service_name, document_name=request.document_name) |
if response["status"] == "success": |
decoded_file = base64.b64decode(response["document"]) |
pdf_file_like = io.BytesIO(decoded_file) |
pdf_reader = PyPDF2.PdfReader(pdf_file_like) |
number_of_pages = len(pdf_reader.pages) |
if request.start_page < 1 or request.end_page > number_of_pages or request.start_page > request.end_page: |
raise HTTPException(status_code=400, detail="Invalid start_page or end_page.") |
if request.method == "chunk_per_page": |
chunks = manager_doc.extract_text_from_pdf(pdf_file_like, request.start_page, request.end_page) |
json_list = process_chunks(chunks, manager_doc, request.schema, request.model,request.comment) |
return {"status": "success", "json": json_list} |
elif request.method == "personalize_chunking": |
personalized_chunks = manager_doc.personalize_chunking(request.split_token, pdf_file_like, request.start_page, request.end_page) |
json_list = process_chunks(personalized_chunks, manager_doc, request.schema, request.model,request.comment) |
return {"status": "success", "json": json_list} |
else: |
raise HTTPException(status_code=400, detail="Invalid method provided.") |
return response |
except Exception as e: |
raise HTTPException(status_code=500, detail=str(e)) |
def process_chunks(chunks, manager_doc, schema, model,comment): |
json_list = [] |
for chunk in chunks: |
try: |
response = manager_doc.get_json(schema, chunk, model,comment) |
except Exception as e: |
response = {} |
json_list.append(response) |
return json_list |