AGDC-APIS / main.py
Nechba's picture
first commit
d5ec16b
raw
history blame
18.9 kB
import numpy as np
import redis
import requests
from fastapi import FastAPI, HTTPException, status
from typing import List
from datetime import datetime
from fastapi.concurrency import run_in_threadpool
from fastapi import Query
from srs.utils import functions_doc
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
import asyncio
from uuid import uuid4
from pydantic import BaseModel, validator
from fastapi import FastAPI, HTTPException, status, Depends
from pydantic import BaseModel,conlist
from typing import List, Optional,Dict, Any
import redis
import json
import logging
from fastapi.responses import JSONResponse
import base64
import PyPDF2
import io
r = redis.Redis(host='redis', port=6379, db=0, password=None,decode_responses=True)
# Instantiate your document functions class
app=FastAPI()
class ServiceRemovalRequest(BaseModel):
token: str
servicename: str
class Document(BaseModel):
token: str
service_name: str
document_name: str
class ServicesResponse(BaseModel):
message: str
added_services: List[str]
class RemoveDocumentsRequest(BaseModel):
token: str
service_name: str
document_names: List[str]
# Define request body model
class Service(BaseModel):
servicename: str
class TokenServicesRequest(BaseModel):
token: str
services: List[Service]
class services(BaseModel):
token: str
class AddDocumentRequest(BaseModel):
token: str
servicename: str
documentname: str
class StoreDocumentServicesRequest(BaseModel):
token: str
service_name: str
document_name: str
file: bytes
class DocumentChunks(BaseModel):
token: str
service_name: str
document_name: str
method: str = "chunk_per_page"
split_token: Optional[str] = ""
start_page: int = 1
end_page: int = 1
@validator('split_token', always=True)
def check_real_text(cls, v, values):
method = values.get('method')
if method == 'personalize_chunking' and not v:
raise ValueError('split_token is required when method is personalize_chunking')
return v
class DocumentRespons(BaseModel):
token: str
service_name: str
document_name: str
method: str = "chunk_per_page"
model : str = "gpt-3.5-turbo"
schema: dict
comment: Optional[dict] = {}
split_token: Optional[str] = ""
start_page: int = 1
end_page: int = 1
@validator('split_token', always=True)
def check_real_text(cls, v, values):
method = values.get('method')
if method == 'personalize_chunking' and not v:
raise ValueError('split_token is required when method is personalize_chunking')
return v
@app.post("/add_services", status_code=status.HTTP_201_CREATED)
async def add_services(request: TokenServicesRequest):
"""
Adds a list of services to a given token.
This endpoint accepts a request with a token and a list of services, attempting to add each service to the specified token.
The service information must include all necessary details like name and description.
Parameters:
- request (TokenServicesRequest): A model containing the authorization token and a list of services to be added.
Returns:
- A list of dictionaries, each representing a successfully added service with its details.
Raises:
- HTTPException: 400 Bad Request if any value error occurs during processing, typically due to invalid input.
- HTTPException: 500 Internal Server Error if any unexpected errors occur during the process.
"""
try:
# Convert services to dicts
services_dicts = [service.dict() for service in request.services]
result = functions_doc.add_services_to_token(request.token, services_dicts)
return result
except ValueError as ve:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(ve))
except Exception as e:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
@app.delete("/remove_service/")
async def remove_service(request: ServiceRemovalRequest):
"""
Removes a specified service associated with a token.
This endpoint allows the removal of a service by its name from a list associated with a given token.
Before attempting to remove the service, it verifies that the token exists. If the token does not exist,
a 404 error is returned. If the service name does not exist under the token or cannot be removed,
a 400 error is raised with a specific message.
Parameters:
- request (ServiceRemovalRequest): A model containing the authorization token and the name of the service to be removed.
Returns:
- A dictionary with a success status and a message indicating the outcome of the operation.
Raises:
- HTTPException: 404 Not Found if the token does not exist.
- HTTPException: 400 Bad Request if the service cannot be removed or does not exist under the token.
- HTTPException: 500 Internal Server Error for any other unexpected errors.
"""
try:
# Check if the token exists in Redis
user_key = f"token:{request.token}:docservices"
if not r.exists(user_key):
raise HTTPException(status_code=404, detail="Token not found.")
# If checks pass, proceed to remove the service
manager_doc = functions_doc()
result = manager_doc.remove_service_by_name(token=request.token, servicename=request.servicename)
if result["success"]:
return {"success": True, "message": result["message"]}
else:
raise HTTPException(status_code=400, detail=result["message"])
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/add_and_store_document/", summary="Store a Document in Redis",
description="Stores a document as a base64 encoded string in Redis. The document is tagged with additional metadata and associated with a unique key.")
async def add_and_store_document(request:StoreDocumentServicesRequest):
"""
Stores a file document in Redis as a base64 encoded string with its corresponding tags and document names.
Args:
token (str): The unique identifier for the user.
service_name (str): The service under which the document will be stored.
document_name (str): The name of the document to be stored.
file (UploadFile): The file document to be stored as a base64 encoded string.
Returns:
JSONResponse: A JSON response indicating the status of the operation ("success" or "error") and a message describing the result or the error encountered.
Raises:
HTTPException: An HTTP exception is raised with a status code of 400 or 500 depending on the type of error during the storing process.
"""
try:
# Read file content as bytes
#encoded_file = await request.file.read()
# Store the document
# doc_manager = functions_doc()
response = functions_doc.add_and_store_document(token=request.token, service_name=request.service_name,document_name= request.document_name, encoded_file=request.file)
return JSONResponse(status_code=200, content={"status":"success", "message":response})
except redis.RedisError as e:
logging.error(f"Failed to store document: {e}")
return JSONResponse(status_code=500, content={"status": "error", "message": str(e)})
except Exception as e:
logging.error(f"An error occurred: {e}")
return JSONResponse(status_code=500, content={"status": "error", "message": "An unexpected error occurred"})
@app.delete("/remove_documents/", summary="Remove Multiple Documents",
description="Removes multiple documents from both the Redis store and the specified service list under a given token.")
async def remove_documents(request: RemoveDocumentsRequest):
"""
Removes multiple documents from Redis storage and their references from a specified service list under a user's token.
Args:
request (RemoveDocumentsRequest): A Pydantic model that includes the token, the service name, and a list of document names to be removed.
Returns:
dict: A dictionary indicating the status of the operation ("success" or "error") and a message describing the result or error encountered.
Raises:
HTTPException: An HTTP exception is raised with status code 400 or 500, depending on the type of error during the document removal process.
"""
try:
manager_doc = functions_doc()
response = manager_doc.remove_documents_from_service(token = request.token, service_name = request.service_name, document_names = request.document_names)
return response
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@app.get("/services/", response_model=list)
def retrieve_service_names(request: services) -> list:
"""
Endpoint to retrieve service names for a given token.
:param token: The unique token of the user.
:return: A list of service names associated with the token or an empty list if none are found.
"""
services_key = f"token:{request.token}:docservices"
try:
existing_services = r.lrange(services_key, 0, -1)
service_names = [json.loads(service)['servicename'] for service in existing_services]
return service_names if service_names else []
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to fetch or parse services: {str(e)}")
@app.get("/documents/", response_model=dict)
def retrieve_documents(request:ServiceRemovalRequest) -> dict:
"""
Endpoint to retrieve document names from a specific service for a given token.
:param token: The unique token of the user.
:param servicename: The name of the service from which to retrieve documents.
:return: A dictionary containing a list of documents or a message if the service is not found.
"""
try:
manager_doc = functions_doc()
response = manager_doc.get_documents_from_service(token = request.token, servicename = request.servicename)
return response
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@app.get("/get_document/", response_model= Optional[bytes])
def get_document(request: Document) -> dict:
"""
Retrieves a document stored as base64-encoded bytes from a specified service for a given token.
This endpoint is responsible for fetching a document by name from a specific service associated with a token.
The document is expected to be stored in base64-encoded byte format. The response will include the document
if available or return an appropriate error message if not found or in case of an error.
Parameters:
- request (Document): A model containing the authorization token, service name, and document name.
Returns:
- A JSON response containing the document in base64-encoded format or an error message.
Raises:
- HTTPException: 400 Bad Request if there's an error during the retrieval process.
"""
try:
manager_doc = functions_doc()
response = manager_doc.get_document(token=request.token, service_name=request.service_name, document_name=request.document_name)
return JSONResponse(status_code=200, content=response)
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@app.get("/get_num_pages/", response_model= dict)
def get_num_pages(request: Document) -> dict:
"""
Retrieves the number of pages in a PDF document from a specified service for a given token.
This endpoint fetches a document stored as a base64-encoded string, decodes it, and counts the number of pages using PyPDF2.
Parameters:
- request (Document): A model containing the authorization token, service name, and document name.
Returns:
- A JSON response with the status, message including the number of pages, and the number of pages if successful.
Raises:
- HTTPException: 400 Bad Request if there's an error during the retrieval or processing of the document.
"""
try:
manager_doc = functions_doc()
response = manager_doc.get_document(token=request.token, service_name=request.service_name, document_name=request.document_name)
if response["status"]=="success":
decoded_file = base64.b64decode(response["document"])
# Use BytesIO to create a file-like object in memory from the decoded data
pdf_file_like = io.BytesIO(decoded_file)
# Use PyPDF2 to read the file-like object and count the pages
pdf_reader = PyPDF2.PdfReader(pdf_file_like)
number_of_pages = len(pdf_reader.pages)
return JSONResponse(status_code=200, content={"status": "success", "message": f"Document has {number_of_pages} pages.", "num_pages": number_of_pages})
return JSONResponse(status_code=200, content=response)
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@app.get("/get_chunks/", response_model=dict)
def get_chunks(request: DocumentChunks) -> dict:
"""
Retrieves text chunks from a specified range of pages in a document according to the chunking method.
This endpoint decodes a stored document and processes it to extract text chunks from specified pages.
Users must specify a valid start and end page, and a chunking method. The method can be 'chunk_per_page'
for straightforward chunking by page, or 'personalize_chunking' which may use additional text parameters.
Parameters:
- request (DocumentChunks): A model containing the necessary details to fetch and chunk the document.
Returns:
- A dictionary response with the status, a message including the count of chunks, and the chunks themselves.
Raises:
- HTTPException: 400 Bad Request if there are parameter errors or processing fails.
"""
try:
manager_doc = functions_doc()
response = manager_doc.get_document(token=request.token, service_name=request.service_name, document_name=request.document_name)
if response["status"] == "success":
decoded_file = base64.b64decode(response["document"])
pdf_file_like = io.BytesIO(decoded_file)
pdf_reader = PyPDF2.PdfReader(pdf_file_like)
number_of_pages = len(pdf_reader.pages)
if request.start_page < 1 or request.end_page > number_of_pages or request.start_page > request.end_page:
raise HTTPException(status_code=400, detail="Invalid start_page or end_page.")
if request.method == "chunk_per_page":
chunks = manager_doc.extract_text_from_pdf(pdf_file_like, request.start_page, request.end_page)
return {"status": "success", "message": f"Document has {len(chunks)} chunk(s).", "chunks": chunks}
elif request.method == "personalize_chunking":
# Assuming you process personalized chunking here:
personalized_chunks = manager_doc.personalize_chunking(request.split_token, pdf_file_like, request.start_page, request.end_page)
return {"status": "success", "message": f"Document has {len(personalized_chunks)} personalized chunk(s).", "chunks": personalized_chunks}
else:
raise HTTPException(status_code=400, detail="Invalid method provided.")
return response
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@app.get("/structure_response/", response_model=dict)
def structure_response(request: DocumentRespons)-> dict:
"""
Retrieves and processes chunks of a document into structured JSON based on specific criteria.
This endpoint decodes a stored document and processes it to extract and transform text chunks
from specified pages into structured JSON format. Users must specify a valid start and end page,
and a chunking method. The method can be 'chunk_per_page' for straightforward chunking by page,
or 'personalize_chunking' which may use additional text parameters. The model parameter can take
values like "gpt-3.5-turbo" or "gemini" for processing the chunks. The processing method and output
schema are specified by the user.
Parameters:
- request (DocumentRespons): A model containing the details needed to fetch, chunk, and structure the document.
Returns:
- A dictionary response with the status and the structured JSON if successful.
Raises:
- HTTPException: 400 Bad Request for parameter errors or processing issues.
- HTTPException: 500 Internal Server Error for any other unexpected errors.
"""
try:
# Assuming functions_doc() returns an instance with necessary methods
manager_doc = functions_doc()
response = manager_doc.get_document(token=request.token, service_name=request.service_name, document_name=request.document_name)
if response["status"] == "success":
decoded_file = base64.b64decode(response["document"])
pdf_file_like = io.BytesIO(decoded_file)
pdf_reader = PyPDF2.PdfReader(pdf_file_like)
number_of_pages = len(pdf_reader.pages)
if request.start_page < 1 or request.end_page > number_of_pages or request.start_page > request.end_page:
raise HTTPException(status_code=400, detail="Invalid start_page or end_page.")
if request.method == "chunk_per_page":
chunks = manager_doc.extract_text_from_pdf(pdf_file_like, request.start_page, request.end_page)
json_list = process_chunks(chunks, manager_doc, request.schema, request.model,request.comment)
return {"status": "success", "json": json_list}
elif request.method == "personalize_chunking":
personalized_chunks = manager_doc.personalize_chunking(request.split_token, pdf_file_like, request.start_page, request.end_page)
json_list = process_chunks(personalized_chunks, manager_doc, request.schema, request.model,request.comment)
return {"status": "success", "json": json_list}
else:
raise HTTPException(status_code=400, detail="Invalid method provided.")
return response
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
def process_chunks(chunks, manager_doc, schema, model,comment):
json_list = []
for chunk in chunks:
try:
response = manager_doc.get_json(schema, chunk, model,comment)
except Exception as e:
response = {}
json_list.append(response)
return json_list