File size: 18,861 Bytes
0b644a6 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 |
import numpy as np
import redis
import requests
from fastapi import FastAPI, HTTPException, status
from typing import List
from datetime import datetime
from fastapi.concurrency import run_in_threadpool
from fastapi import Query
from srs.utils import functions_doc
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
import asyncio
from uuid import uuid4
from pydantic import BaseModel, validator
from fastapi import FastAPI, HTTPException, status, Depends
from pydantic import BaseModel,conlist
from typing import List, Optional,Dict, Any
import redis
import json
import logging
from fastapi.responses import JSONResponse
import base64
import PyPDF2
import io
r = redis.Redis(host='redis', port=6379, db=0, password=None,decode_responses=True)
# Instantiate your document functions class
app=FastAPI()
class ServiceRemovalRequest(BaseModel):
token: str
servicename: str
class Document(BaseModel):
token: str
service_name: str
document_name: str
class ServicesResponse(BaseModel):
message: str
added_services: List[str]
class RemoveDocumentsRequest(BaseModel):
token: str
service_name: str
document_names: List[str]
# Define request body model
class Service(BaseModel):
servicename: str
class TokenServicesRequest(BaseModel):
token: str
services: List[Service]
class services(BaseModel):
token: str
class AddDocumentRequest(BaseModel):
token: str
servicename: str
documentname: str
class StoreDocumentServicesRequest(BaseModel):
token: str
service_name: str
document_name: str
file: bytes
class DocumentChunks(BaseModel):
token: str
service_name: str
document_name: str
method: str = "chunk_per_page"
split_token: Optional[str] = ""
start_page: int = 1
end_page: int = 1
@validator('split_token', always=True)
def check_real_text(cls, v, values):
method = values.get('method')
if method == 'personalize_chunking' and not v:
raise ValueError('split_token is required when method is personalize_chunking')
return v
class DocumentRespons(BaseModel):
token: str
service_name: str
document_name: str
method: str = "chunk_per_page"
model : str = "gpt-3.5-turbo"
schema: dict
comment: Optional[dict] = {}
split_token: Optional[str] = ""
start_page: int = 1
end_page: int = 1
@validator('split_token', always=True)
def check_real_text(cls, v, values):
method = values.get('method')
if method == 'personalize_chunking' and not v:
raise ValueError('split_token is required when method is personalize_chunking')
return v
@app.post("/add_services", status_code=status.HTTP_201_CREATED)
async def add_services(request: TokenServicesRequest):
"""
Adds a list of services to a given token.
This endpoint accepts a request with a token and a list of services, attempting to add each service to the specified token.
The service information must include all necessary details like name and description.
Parameters:
- request (TokenServicesRequest): A model containing the authorization token and a list of services to be added.
Returns:
- A list of dictionaries, each representing a successfully added service with its details.
Raises:
- HTTPException: 400 Bad Request if any value error occurs during processing, typically due to invalid input.
- HTTPException: 500 Internal Server Error if any unexpected errors occur during the process.
"""
try:
# Convert services to dicts
services_dicts = [service.dict() for service in request.services]
result = functions_doc.add_services_to_token(request.token, services_dicts)
return result
except ValueError as ve:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(ve))
except Exception as e:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
@app.delete("/remove_service/")
async def remove_service(request: ServiceRemovalRequest):
"""
Removes a specified service associated with a token.
This endpoint allows the removal of a service by its name from a list associated with a given token.
Before attempting to remove the service, it verifies that the token exists. If the token does not exist,
a 404 error is returned. If the service name does not exist under the token or cannot be removed,
a 400 error is raised with a specific message.
Parameters:
- request (ServiceRemovalRequest): A model containing the authorization token and the name of the service to be removed.
Returns:
- A dictionary with a success status and a message indicating the outcome of the operation.
Raises:
- HTTPException: 404 Not Found if the token does not exist.
- HTTPException: 400 Bad Request if the service cannot be removed or does not exist under the token.
- HTTPException: 500 Internal Server Error for any other unexpected errors.
"""
try:
# Check if the token exists in Redis
user_key = f"token:{request.token}:docservices"
if not r.exists(user_key):
raise HTTPException(status_code=404, detail="Token not found.")
# If checks pass, proceed to remove the service
manager_doc = functions_doc()
result = manager_doc.remove_service_by_name(token=request.token, servicename=request.servicename)
if result["success"]:
return {"success": True, "message": result["message"]}
else:
raise HTTPException(status_code=400, detail=result["message"])
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/add_and_store_document/", summary="Store a Document in Redis",
description="Stores a document as a base64 encoded string in Redis. The document is tagged with additional metadata and associated with a unique key.")
async def add_and_store_document(request:StoreDocumentServicesRequest):
"""
Stores a file document in Redis as a base64 encoded string with its corresponding tags and document names.
Args:
token (str): The unique identifier for the user.
service_name (str): The service under which the document will be stored.
document_name (str): The name of the document to be stored.
file (UploadFile): The file document to be stored as a base64 encoded string.
Returns:
JSONResponse: A JSON response indicating the status of the operation ("success" or "error") and a message describing the result or the error encountered.
Raises:
HTTPException: An HTTP exception is raised with a status code of 400 or 500 depending on the type of error during the storing process.
"""
try:
# Read file content as bytes
#encoded_file = await request.file.read()
# Store the document
# doc_manager = functions_doc()
response = functions_doc.add_and_store_document(token=request.token, service_name=request.service_name,document_name= request.document_name, encoded_file=request.file)
return JSONResponse(status_code=200, content={"status":"success", "message":response})
except redis.RedisError as e:
logging.error(f"Failed to store document: {e}")
return JSONResponse(status_code=500, content={"status": "error", "message": str(e)})
except Exception as e:
logging.error(f"An error occurred: {e}")
return JSONResponse(status_code=500, content={"status": "error", "message": "An unexpected error occurred"})
@app.delete("/remove_documents/", summary="Remove Multiple Documents",
description="Removes multiple documents from both the Redis store and the specified service list under a given token.")
async def remove_documents(request: RemoveDocumentsRequest):
"""
Removes multiple documents from Redis storage and their references from a specified service list under a user's token.
Args:
request (RemoveDocumentsRequest): A Pydantic model that includes the token, the service name, and a list of document names to be removed.
Returns:
dict: A dictionary indicating the status of the operation ("success" or "error") and a message describing the result or error encountered.
Raises:
HTTPException: An HTTP exception is raised with status code 400 or 500, depending on the type of error during the document removal process.
"""
try:
manager_doc = functions_doc()
response = manager_doc.remove_documents_from_service(token = request.token, service_name = request.service_name, document_names = request.document_names)
return response
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@app.get("/services/", response_model=list)
def retrieve_service_names(request: services) -> list:
"""
Endpoint to retrieve service names for a given token.
:param token: The unique token of the user.
:return: A list of service names associated with the token or an empty list if none are found.
"""
services_key = f"token:{request.token}:docservices"
try:
existing_services = r.lrange(services_key, 0, -1)
service_names = [json.loads(service)['servicename'] for service in existing_services]
return service_names if service_names else []
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to fetch or parse services: {str(e)}")
@app.get("/documents/", response_model=dict)
def retrieve_documents(request:ServiceRemovalRequest) -> dict:
"""
Endpoint to retrieve document names from a specific service for a given token.
:param token: The unique token of the user.
:param servicename: The name of the service from which to retrieve documents.
:return: A dictionary containing a list of documents or a message if the service is not found.
"""
try:
manager_doc = functions_doc()
response = manager_doc.get_documents_from_service(token = request.token, servicename = request.servicename)
return response
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@app.get("/get_document/", response_model= Optional[bytes])
def get_document(request: Document) -> dict:
"""
Retrieves a document stored as base64-encoded bytes from a specified service for a given token.
This endpoint is responsible for fetching a document by name from a specific service associated with a token.
The document is expected to be stored in base64-encoded byte format. The response will include the document
if available or return an appropriate error message if not found or in case of an error.
Parameters:
- request (Document): A model containing the authorization token, service name, and document name.
Returns:
- A JSON response containing the document in base64-encoded format or an error message.
Raises:
- HTTPException: 400 Bad Request if there's an error during the retrieval process.
"""
try:
manager_doc = functions_doc()
response = manager_doc.get_document(token=request.token, service_name=request.service_name, document_name=request.document_name)
return JSONResponse(status_code=200, content=response)
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@app.get("/get_num_pages/", response_model= dict)
def get_num_pages(request: Document) -> dict:
"""
Retrieves the number of pages in a PDF document from a specified service for a given token.
This endpoint fetches a document stored as a base64-encoded string, decodes it, and counts the number of pages using PyPDF2.
Parameters:
- request (Document): A model containing the authorization token, service name, and document name.
Returns:
- A JSON response with the status, message including the number of pages, and the number of pages if successful.
Raises:
- HTTPException: 400 Bad Request if there's an error during the retrieval or processing of the document.
"""
try:
manager_doc = functions_doc()
response = manager_doc.get_document(token=request.token, service_name=request.service_name, document_name=request.document_name)
if response["status"]=="success":
decoded_file = base64.b64decode(response["document"])
# Use BytesIO to create a file-like object in memory from the decoded data
pdf_file_like = io.BytesIO(decoded_file)
# Use PyPDF2 to read the file-like object and count the pages
pdf_reader = PyPDF2.PdfReader(pdf_file_like)
number_of_pages = len(pdf_reader.pages)
return JSONResponse(status_code=200, content={"status": "success", "message": f"Document has {number_of_pages} pages.", "num_pages": number_of_pages})
return JSONResponse(status_code=200, content=response)
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@app.get("/get_chunks/", response_model=dict)
def get_chunks(request: DocumentChunks) -> dict:
"""
Retrieves text chunks from a specified range of pages in a document according to the chunking method.
This endpoint decodes a stored document and processes it to extract text chunks from specified pages.
Users must specify a valid start and end page, and a chunking method. The method can be 'chunk_per_page'
for straightforward chunking by page, or 'personalize_chunking' which may use additional text parameters.
Parameters:
- request (DocumentChunks): A model containing the necessary details to fetch and chunk the document.
Returns:
- A dictionary response with the status, a message including the count of chunks, and the chunks themselves.
Raises:
- HTTPException: 400 Bad Request if there are parameter errors or processing fails.
"""
try:
manager_doc = functions_doc()
response = manager_doc.get_document(token=request.token, service_name=request.service_name, document_name=request.document_name)
if response["status"] == "success":
decoded_file = base64.b64decode(response["document"])
pdf_file_like = io.BytesIO(decoded_file)
pdf_reader = PyPDF2.PdfReader(pdf_file_like)
number_of_pages = len(pdf_reader.pages)
if request.start_page < 1 or request.end_page > number_of_pages or request.start_page > request.end_page:
raise HTTPException(status_code=400, detail="Invalid start_page or end_page.")
if request.method == "chunk_per_page":
chunks = manager_doc.extract_text_from_pdf(pdf_file_like, request.start_page, request.end_page)
return {"status": "success", "message": f"Document has {len(chunks)} chunk(s).", "chunks": chunks}
elif request.method == "personalize_chunking":
# Assuming you process personalized chunking here:
personalized_chunks = manager_doc.personalize_chunking(request.split_token, pdf_file_like, request.start_page, request.end_page)
return {"status": "success", "message": f"Document has {len(personalized_chunks)} personalized chunk(s).", "chunks": personalized_chunks}
else:
raise HTTPException(status_code=400, detail="Invalid method provided.")
return response
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@app.get("/structure_response/", response_model=dict)
def structure_response(request: DocumentRespons)-> dict:
"""
Retrieves and processes chunks of a document into structured JSON based on specific criteria.
This endpoint decodes a stored document and processes it to extract and transform text chunks
from specified pages into structured JSON format. Users must specify a valid start and end page,
and a chunking method. The method can be 'chunk_per_page' for straightforward chunking by page,
or 'personalize_chunking' which may use additional text parameters. The model parameter can take
values like "gpt-3.5-turbo" or "gemini" for processing the chunks. The processing method and output
schema are specified by the user.
Parameters:
- request (DocumentRespons): A model containing the details needed to fetch, chunk, and structure the document.
Returns:
- A dictionary response with the status and the structured JSON if successful.
Raises:
- HTTPException: 400 Bad Request for parameter errors or processing issues.
- HTTPException: 500 Internal Server Error for any other unexpected errors.
"""
try:
# Assuming functions_doc() returns an instance with necessary methods
manager_doc = functions_doc()
response = manager_doc.get_document(token=request.token, service_name=request.service_name, document_name=request.document_name)
if response["status"] == "success":
decoded_file = base64.b64decode(response["document"])
pdf_file_like = io.BytesIO(decoded_file)
pdf_reader = PyPDF2.PdfReader(pdf_file_like)
number_of_pages = len(pdf_reader.pages)
if request.start_page < 1 or request.end_page > number_of_pages or request.start_page > request.end_page:
raise HTTPException(status_code=400, detail="Invalid start_page or end_page.")
if request.method == "chunk_per_page":
chunks = manager_doc.extract_text_from_pdf(pdf_file_like, request.start_page, request.end_page)
json_list = process_chunks(chunks, manager_doc, request.schema, request.model,request.comment)
return {"status": "success", "json": json_list}
elif request.method == "personalize_chunking":
personalized_chunks = manager_doc.personalize_chunking(request.split_token, pdf_file_like, request.start_page, request.end_page)
json_list = process_chunks(personalized_chunks, manager_doc, request.schema, request.model,request.comment)
return {"status": "success", "json": json_list}
else:
raise HTTPException(status_code=400, detail="Invalid method provided.")
return response
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
def process_chunks(chunks, manager_doc, schema, model,comment):
json_list = []
for chunk in chunks:
try:
response = manager_doc.get_json(schema, chunk, model,comment)
except Exception as e:
response = {}
json_list.append(response)
return json_list
|