|
|
|
import cv2 |
|
from fastapi import FastAPI, File, UploadFile, HTTPException, Form, Depends |
|
from fastapi.middleware.cors import CORSMiddleware |
|
import numpy as np |
|
import os |
|
import datetime |
|
import base64 |
|
from motor.motor_asyncio import AsyncIOMotorClient |
|
from pymongo.server_api import ServerApi |
|
|
|
app = FastAPI() |
|
|
|
|
|
MONGODB_URL = os.environ.get("mongoUrl") |
|
DATABASE_NAME = os.environ.get("databaseName") |
|
COLLECTION_NAME = "inkognitoUsers" |
|
|
|
|
|
app.add_middleware( |
|
CORSMiddleware, |
|
allow_origins=["*"], |
|
allow_credentials=True, |
|
allow_methods=["*"], |
|
allow_headers=["*"], |
|
) |
|
|
|
|
|
async def get_database_client(): |
|
client = AsyncIOMotorClient(MONGODB_URL) |
|
database = client[DATABASE_NAME] |
|
yield database |
|
client.close() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def msg_to_bin(msg): |
|
if type(msg) == str: |
|
return ''.join([format(ord(i), "08b") for i in msg]) |
|
elif type(msg) == bytes or type(msg) == np.ndarray: |
|
return [format(i, "08b") for i in msg] |
|
elif type(msg) == int or type(msg) == np.uint8: |
|
return format(msg, "08b") |
|
else: |
|
raise TypeError("Input type not supported") |
|
|
|
|
|
def hide_data(img, secret_msg): |
|
|
|
nBytes = img.shape[0] * img.shape[1] * 3 // 8 |
|
print("Maximum Bytes for encoding:", nBytes) |
|
|
|
|
|
if len(secret_msg) > nBytes: |
|
raise ValueError("Error encountered insufficient bytes, need bigger image or less data!!") |
|
secret_msg += '#####' |
|
dataIndex = 0 |
|
|
|
bin_secret_msg = msg_to_bin(secret_msg) |
|
|
|
|
|
dataLen = len(bin_secret_msg) |
|
for values in img: |
|
for pixels in values: |
|
|
|
r, g, b = msg_to_bin(pixels) |
|
|
|
if dataIndex < dataLen: |
|
|
|
pixels[0] = int(r[:-1] + bin_secret_msg[dataIndex], 2) |
|
dataIndex += 1 |
|
if dataIndex < dataLen: |
|
|
|
pixels[1] = int(g[:-1] + bin_secret_msg[dataIndex], 2) |
|
dataIndex += 1 |
|
if dataIndex < dataLen: |
|
|
|
pixels[2] = int(b[:-1] + bin_secret_msg[dataIndex], 2) |
|
dataIndex += 1 |
|
|
|
if dataIndex >= dataLen: |
|
break |
|
|
|
return img |
|
|
|
def show_data(img): |
|
bin_data = "" |
|
for values in img: |
|
for pixels in values: |
|
|
|
r, g, b = msg_to_bin(pixels) |
|
|
|
bin_data += r[-1] |
|
|
|
bin_data += g[-1] |
|
|
|
bin_data += b[-1] |
|
|
|
allBytes = [bin_data[i: i + 8] for i in range(0, len(bin_data), 8)] |
|
|
|
decodedData = "" |
|
for bytes in allBytes: |
|
decodedData += chr(int(bytes, 2)) |
|
|
|
if decodedData[-5:] == "#####": |
|
break |
|
|
|
|
|
return decodedData[:-5] |
|
|
|
@app.get("/") |
|
async def root(): |
|
return {"message": "Server is alive"} |
|
|
|
@app.post("/hide_message") |
|
async def hide_message(image: UploadFile = File(...), secret_msg: str = Form(...)): |
|
try: |
|
|
|
content = await image.read() |
|
nparr = np.frombuffer(content, np.uint8) |
|
img = cv2.imdecode(nparr, cv2.IMREAD_UNCHANGED) |
|
|
|
|
|
encoded_image = hide_data(img, secret_msg) |
|
|
|
|
|
_, encoded_image_data = cv2.imencode(".png", encoded_image) |
|
encoded_image_bytes = encoded_image_data.tobytes() |
|
|
|
|
|
encoded_image_base64 = base64.b64encode(encoded_image_bytes).decode() |
|
|
|
|
|
return {"base64_image": encoded_image_base64} |
|
except Exception as e: |
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
|
|
|
@app.post("/retrieve_message") |
|
async def retrieve_message(image: UploadFile = File(...)): |
|
try: |
|
|
|
content = await image.read() |
|
nparr = np.frombuffer(content, np.uint8) |
|
img = cv2.imdecode(nparr, cv2.IMREAD_UNCHANGED) |
|
|
|
|
|
decoded_message = show_data(img) |
|
|
|
return {"decoded_message": decoded_message} |
|
except Exception as e: |
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
@app.post("/save_user_data") |
|
async def save_user_data(name: str = Form(...), email: str = Form(...), database: AsyncIOMotorClient = Depends(get_database_client)): |
|
try: |
|
|
|
users_collection = database[COLLECTION_NAME] |
|
user_data = { |
|
"name": name, |
|
"email": email, |
|
"first_login": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), |
|
} |
|
result = await users_collection.insert_one(user_data) |
|
|
|
|
|
return {"message": "User data saved successfully"} |
|
except Exception as e: |
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
@app.get(os.environ.get("getUsersEp")) |
|
async def get_all_users(database: AsyncIOMotorClient = Depends(get_database_client)): |
|
try: |
|
|
|
users_collection = database[COLLECTION_NAME] |
|
all_users = await users_collection.find({}, {"_id": 0}).to_list(length=None) |
|
|
|
|
|
return {"users": all_users} |
|
except Exception as e: |
|
raise HTTPException(status_code=500, detail=str(e)) |