|
import numpy as np |
|
import redis |
|
import requests |
|
from fastapi import FastAPI, HTTPException, status |
|
from typing import List |
|
from datetime import datetime |
|
from fastapi.concurrency import run_in_threadpool |
|
from fastapi import Query |
|
from srs.utils import functions_doc |
|
from fastapi import FastAPI, WebSocket, WebSocketDisconnect |
|
import asyncio |
|
from uuid import uuid4 |
|
from pydantic import BaseModel, validator |
|
from fastapi import FastAPI, HTTPException, status, Depends |
|
from pydantic import BaseModel,conlist |
|
from typing import List, Optional,Dict, Any |
|
import redis |
|
import json |
|
import logging |
|
from fastapi.responses import JSONResponse |
|
import base64 |
|
import PyPDF2 |
|
import io |
|
r = redis.Redis(host='redis', port=6379, db=0, password=None,decode_responses=True) |
|
|
|
|
|
app=FastAPI() |
|
class ServiceRemovalRequest(BaseModel): |
|
token: str |
|
servicename: str |
|
|
|
class Document(BaseModel): |
|
token: str |
|
service_name: str |
|
document_name: str |
|
|
|
|
|
class ServicesResponse(BaseModel): |
|
message: str |
|
added_services: List[str] |
|
|
|
class RemoveDocumentsRequest(BaseModel): |
|
token: str |
|
service_name: str |
|
document_names: List[str] |
|
|
|
|
|
|
|
class Service(BaseModel): |
|
servicename: str |
|
|
|
class TokenServicesRequest(BaseModel): |
|
token: str |
|
services: List[Service] |
|
class services(BaseModel): |
|
token: str |
|
|
|
class AddDocumentRequest(BaseModel): |
|
token: str |
|
servicename: str |
|
documentname: str |
|
|
|
class StoreDocumentServicesRequest(BaseModel): |
|
token: str |
|
service_name: str |
|
document_name: str |
|
file: bytes |
|
|
|
class DocumentChunks(BaseModel): |
|
token: str |
|
service_name: str |
|
document_name: str |
|
method: str = "chunk_per_page" |
|
split_token: Optional[str] = "" |
|
start_page: int = 1 |
|
end_page: int = 1 |
|
|
|
@validator('split_token', always=True) |
|
def check_real_text(cls, v, values): |
|
method = values.get('method') |
|
if method == 'personalize_chunking' and not v: |
|
raise ValueError('split_token is required when method is personalize_chunking') |
|
return v |
|
|
|
class DocumentRespons(BaseModel): |
|
token: str |
|
service_name: str |
|
document_name: str |
|
method: str = "chunk_per_page" |
|
model : str = "gpt-3.5-turbo" |
|
schema: dict |
|
comment: Optional[dict] = {} |
|
split_token: Optional[str] = "" |
|
start_page: int = 1 |
|
end_page: int = 1 |
|
|
|
@validator('split_token', always=True) |
|
def check_real_text(cls, v, values): |
|
method = values.get('method') |
|
if method == 'personalize_chunking' and not v: |
|
raise ValueError('split_token is required when method is personalize_chunking') |
|
return v |
|
@app.post("/add_services", status_code=status.HTTP_201_CREATED) |
|
async def add_services(request: TokenServicesRequest): |
|
""" |
|
Adds a list of services to a given token. |
|
|
|
This endpoint accepts a request with a token and a list of services, attempting to add each service to the specified token. |
|
The service information must include all necessary details like name and description. |
|
|
|
Parameters: |
|
- request (TokenServicesRequest): A model containing the authorization token and a list of services to be added. |
|
|
|
Returns: |
|
- A list of dictionaries, each representing a successfully added service with its details. |
|
|
|
Raises: |
|
- HTTPException: 400 Bad Request if any value error occurs during processing, typically due to invalid input. |
|
- HTTPException: 500 Internal Server Error if any unexpected errors occur during the process. |
|
""" |
|
try: |
|
|
|
services_dicts = [service.dict() for service in request.services] |
|
result = functions_doc.add_services_to_token(request.token, services_dicts) |
|
return result |
|
except ValueError as ve: |
|
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(ve)) |
|
except Exception as e: |
|
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) |
|
|
|
|
|
|
|
@app.delete("/remove_service/") |
|
async def remove_service(request: ServiceRemovalRequest): |
|
""" |
|
Removes a specified service associated with a token. |
|
|
|
This endpoint allows the removal of a service by its name from a list associated with a given token. |
|
Before attempting to remove the service, it verifies that the token exists. If the token does not exist, |
|
a 404 error is returned. If the service name does not exist under the token or cannot be removed, |
|
a 400 error is raised with a specific message. |
|
|
|
Parameters: |
|
- request (ServiceRemovalRequest): A model containing the authorization token and the name of the service to be removed. |
|
|
|
Returns: |
|
- A dictionary with a success status and a message indicating the outcome of the operation. |
|
|
|
Raises: |
|
- HTTPException: 404 Not Found if the token does not exist. |
|
- HTTPException: 400 Bad Request if the service cannot be removed or does not exist under the token. |
|
- HTTPException: 500 Internal Server Error for any other unexpected errors. |
|
""" |
|
try: |
|
|
|
user_key = f"token:{request.token}:docservices" |
|
if not r.exists(user_key): |
|
raise HTTPException(status_code=404, detail="Token not found.") |
|
|
|
manager_doc = functions_doc() |
|
result = manager_doc.remove_service_by_name(token=request.token, servicename=request.servicename) |
|
if result["success"]: |
|
return {"success": True, "message": result["message"]} |
|
else: |
|
raise HTTPException(status_code=400, detail=result["message"]) |
|
except Exception as e: |
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
|
@app.post("/add_and_store_document/", summary="Store a Document in Redis", |
|
description="Stores a document as a base64 encoded string in Redis. The document is tagged with additional metadata and associated with a unique key.") |
|
async def add_and_store_document(request:StoreDocumentServicesRequest): |
|
""" |
|
Stores a file document in Redis as a base64 encoded string with its corresponding tags and document names. |
|
|
|
Args: |
|
token (str): The unique identifier for the user. |
|
service_name (str): The service under which the document will be stored. |
|
document_name (str): The name of the document to be stored. |
|
file (UploadFile): The file document to be stored as a base64 encoded string. |
|
|
|
Returns: |
|
JSONResponse: A JSON response indicating the status of the operation ("success" or "error") and a message describing the result or the error encountered. |
|
|
|
Raises: |
|
HTTPException: An HTTP exception is raised with a status code of 400 or 500 depending on the type of error during the storing process. |
|
""" |
|
try: |
|
|
|
|
|
|
|
|
|
|
|
response = functions_doc.add_and_store_document(token=request.token, service_name=request.service_name,document_name= request.document_name, encoded_file=request.file) |
|
return JSONResponse(status_code=200, content={"status":"success", "message":response}) |
|
|
|
except redis.RedisError as e: |
|
logging.error(f"Failed to store document: {e}") |
|
return JSONResponse(status_code=500, content={"status": "error", "message": str(e)}) |
|
|
|
except Exception as e: |
|
logging.error(f"An error occurred: {e}") |
|
return JSONResponse(status_code=500, content={"status": "error", "message": "An unexpected error occurred"}) |
|
|
|
@app.delete("/remove_documents/", summary="Remove Multiple Documents", |
|
description="Removes multiple documents from both the Redis store and the specified service list under a given token.") |
|
async def remove_documents(request: RemoveDocumentsRequest): |
|
""" |
|
Removes multiple documents from Redis storage and their references from a specified service list under a user's token. |
|
|
|
Args: |
|
request (RemoveDocumentsRequest): A Pydantic model that includes the token, the service name, and a list of document names to be removed. |
|
|
|
Returns: |
|
dict: A dictionary indicating the status of the operation ("success" or "error") and a message describing the result or error encountered. |
|
|
|
Raises: |
|
HTTPException: An HTTP exception is raised with status code 400 or 500, depending on the type of error during the document removal process. |
|
""" |
|
try: |
|
manager_doc = functions_doc() |
|
response = manager_doc.remove_documents_from_service(token = request.token, service_name = request.service_name, document_names = request.document_names) |
|
return response |
|
except Exception as e: |
|
raise HTTPException(status_code=400, detail=str(e)) |
|
|
|
@app.get("/services/", response_model=list) |
|
def retrieve_service_names(request: services) -> list: |
|
""" |
|
Endpoint to retrieve service names for a given token. |
|
|
|
:param token: The unique token of the user. |
|
:return: A list of service names associated with the token or an empty list if none are found. |
|
""" |
|
services_key = f"token:{request.token}:docservices" |
|
try: |
|
existing_services = r.lrange(services_key, 0, -1) |
|
service_names = [json.loads(service)['servicename'] for service in existing_services] |
|
return service_names if service_names else [] |
|
except Exception as e: |
|
raise HTTPException(status_code=500, detail=f"Failed to fetch or parse services: {str(e)}") |
|
|
|
@app.get("/documents/", response_model=dict) |
|
def retrieve_documents(request:ServiceRemovalRequest) -> dict: |
|
""" |
|
Endpoint to retrieve document names from a specific service for a given token. |
|
|
|
:param token: The unique token of the user. |
|
:param servicename: The name of the service from which to retrieve documents. |
|
:return: A dictionary containing a list of documents or a message if the service is not found. |
|
""" |
|
try: |
|
manager_doc = functions_doc() |
|
response = manager_doc.get_documents_from_service(token = request.token, servicename = request.servicename) |
|
return response |
|
except Exception as e: |
|
raise HTTPException(status_code=400, detail=str(e)) |
|
|
|
@app.get("/get_document/", response_model= Optional[bytes]) |
|
def get_document(request: Document) -> dict: |
|
""" |
|
Retrieves a document stored as base64-encoded bytes from a specified service for a given token. |
|
|
|
This endpoint is responsible for fetching a document by name from a specific service associated with a token. |
|
The document is expected to be stored in base64-encoded byte format. The response will include the document |
|
if available or return an appropriate error message if not found or in case of an error. |
|
|
|
Parameters: |
|
- request (Document): A model containing the authorization token, service name, and document name. |
|
|
|
Returns: |
|
- A JSON response containing the document in base64-encoded format or an error message. |
|
|
|
Raises: |
|
- HTTPException: 400 Bad Request if there's an error during the retrieval process. |
|
""" |
|
try: |
|
manager_doc = functions_doc() |
|
response = manager_doc.get_document(token=request.token, service_name=request.service_name, document_name=request.document_name) |
|
return JSONResponse(status_code=200, content=response) |
|
except Exception as e: |
|
raise HTTPException(status_code=400, detail=str(e)) |
|
@app.get("/get_num_pages/", response_model= dict) |
|
def get_num_pages(request: Document) -> dict: |
|
""" |
|
Retrieves the number of pages in a PDF document from a specified service for a given token. |
|
|
|
This endpoint fetches a document stored as a base64-encoded string, decodes it, and counts the number of pages using PyPDF2. |
|
|
|
Parameters: |
|
- request (Document): A model containing the authorization token, service name, and document name. |
|
|
|
Returns: |
|
- A JSON response with the status, message including the number of pages, and the number of pages if successful. |
|
|
|
Raises: |
|
- HTTPException: 400 Bad Request if there's an error during the retrieval or processing of the document. |
|
""" |
|
try: |
|
manager_doc = functions_doc() |
|
response = manager_doc.get_document(token=request.token, service_name=request.service_name, document_name=request.document_name) |
|
if response["status"]=="success": |
|
decoded_file = base64.b64decode(response["document"]) |
|
|
|
|
|
pdf_file_like = io.BytesIO(decoded_file) |
|
|
|
|
|
pdf_reader = PyPDF2.PdfReader(pdf_file_like) |
|
number_of_pages = len(pdf_reader.pages) |
|
return JSONResponse(status_code=200, content={"status": "success", "message": f"Document has {number_of_pages} pages.", "num_pages": number_of_pages}) |
|
return JSONResponse(status_code=200, content=response) |
|
except Exception as e: |
|
raise HTTPException(status_code=400, detail=str(e)) |
|
|
|
@app.get("/get_chunks/", response_model=dict) |
|
def get_chunks(request: DocumentChunks) -> dict: |
|
""" |
|
Retrieves text chunks from a specified range of pages in a document according to the chunking method. |
|
|
|
This endpoint decodes a stored document and processes it to extract text chunks from specified pages. |
|
Users must specify a valid start and end page, and a chunking method. The method can be 'chunk_per_page' |
|
for straightforward chunking by page, or 'personalize_chunking' which may use additional text parameters. |
|
|
|
Parameters: |
|
- request (DocumentChunks): A model containing the necessary details to fetch and chunk the document. |
|
|
|
Returns: |
|
- A dictionary response with the status, a message including the count of chunks, and the chunks themselves. |
|
|
|
Raises: |
|
- HTTPException: 400 Bad Request if there are parameter errors or processing fails. |
|
""" |
|
try: |
|
manager_doc = functions_doc() |
|
response = manager_doc.get_document(token=request.token, service_name=request.service_name, document_name=request.document_name) |
|
if response["status"] == "success": |
|
decoded_file = base64.b64decode(response["document"]) |
|
pdf_file_like = io.BytesIO(decoded_file) |
|
pdf_reader = PyPDF2.PdfReader(pdf_file_like) |
|
number_of_pages = len(pdf_reader.pages) |
|
|
|
if request.start_page < 1 or request.end_page > number_of_pages or request.start_page > request.end_page: |
|
raise HTTPException(status_code=400, detail="Invalid start_page or end_page.") |
|
|
|
if request.method == "chunk_per_page": |
|
chunks = manager_doc.extract_text_from_pdf(pdf_file_like, request.start_page, request.end_page) |
|
return {"status": "success", "message": f"Document has {len(chunks)} chunk(s).", "chunks": chunks} |
|
elif request.method == "personalize_chunking": |
|
|
|
personalized_chunks = manager_doc.personalize_chunking(request.split_token, pdf_file_like, request.start_page, request.end_page) |
|
return {"status": "success", "message": f"Document has {len(personalized_chunks)} personalized chunk(s).", "chunks": personalized_chunks} |
|
else: |
|
raise HTTPException(status_code=400, detail="Invalid method provided.") |
|
return response |
|
except Exception as e: |
|
raise HTTPException(status_code=400, detail=str(e)) |
|
|
|
@app.get("/structure_response/", response_model=dict) |
|
def structure_response(request: DocumentRespons)-> dict: |
|
""" |
|
Retrieves and processes chunks of a document into structured JSON based on specific criteria. |
|
|
|
This endpoint decodes a stored document and processes it to extract and transform text chunks |
|
from specified pages into structured JSON format. Users must specify a valid start and end page, |
|
and a chunking method. The method can be 'chunk_per_page' for straightforward chunking by page, |
|
or 'personalize_chunking' which may use additional text parameters. The model parameter can take |
|
values like "gpt-3.5-turbo" or "gemini" for processing the chunks. The processing method and output |
|
schema are specified by the user. |
|
|
|
Parameters: |
|
- request (DocumentRespons): A model containing the details needed to fetch, chunk, and structure the document. |
|
|
|
Returns: |
|
- A dictionary response with the status and the structured JSON if successful. |
|
|
|
Raises: |
|
- HTTPException: 400 Bad Request for parameter errors or processing issues. |
|
- HTTPException: 500 Internal Server Error for any other unexpected errors. |
|
""" |
|
|
|
try: |
|
|
|
manager_doc = functions_doc() |
|
response = manager_doc.get_document(token=request.token, service_name=request.service_name, document_name=request.document_name) |
|
if response["status"] == "success": |
|
decoded_file = base64.b64decode(response["document"]) |
|
pdf_file_like = io.BytesIO(decoded_file) |
|
pdf_reader = PyPDF2.PdfReader(pdf_file_like) |
|
number_of_pages = len(pdf_reader.pages) |
|
|
|
if request.start_page < 1 or request.end_page > number_of_pages or request.start_page > request.end_page: |
|
raise HTTPException(status_code=400, detail="Invalid start_page or end_page.") |
|
|
|
if request.method == "chunk_per_page": |
|
chunks = manager_doc.extract_text_from_pdf(pdf_file_like, request.start_page, request.end_page) |
|
json_list = process_chunks(chunks, manager_doc, request.schema, request.model,request.comment) |
|
return {"status": "success", "json": json_list} |
|
elif request.method == "personalize_chunking": |
|
personalized_chunks = manager_doc.personalize_chunking(request.split_token, pdf_file_like, request.start_page, request.end_page) |
|
json_list = process_chunks(personalized_chunks, manager_doc, request.schema, request.model,request.comment) |
|
return {"status": "success", "json": json_list} |
|
else: |
|
raise HTTPException(status_code=400, detail="Invalid method provided.") |
|
return response |
|
except Exception as e: |
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
def process_chunks(chunks, manager_doc, schema, model,comment): |
|
json_list = [] |
|
for chunk in chunks: |
|
try: |
|
response = manager_doc.get_json(schema, chunk, model,comment) |
|
except Exception as e: |
|
response = {} |
|
json_list.append(response) |
|
return json_list |
|
|
|
|
|
|
|
|