Spaces:
Sleeping
Sleeping
File size: 6,972 Bytes
d737cb7 ef16e35 d737cb7 cd325a1 ef16e35 d737cb7 ef16e35 d737cb7 b3d077e d737cb7 ef16e35 cd325a1 ef16e35 d737cb7 ef16e35 d737cb7 ef16e35 cd325a1 ef16e35 d737cb7 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 |
# Importing the required libraries
import cv2
from fastapi import FastAPI, File, UploadFile, HTTPException, Form, Depends
from fastapi.middleware.cors import CORSMiddleware
import numpy as np
import os
import datetime
import base64
from motor.motor_asyncio import AsyncIOMotorClient
from pymongo.server_api import ServerApi
app = FastAPI(docs_url=None, openapi_url=None, redoc_url=None)
# MongoDB connection settings
MONGODB_URL = os.environ.get("mongoUrl")
DATABASE_NAME = os.environ.get("databaseName")
COLLECTION_NAME = "inkognitoUsers"
# Allowing CORS (Cross-Origin Resource Sharing)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# MongoDB client initialization
async def get_database_client():
client = AsyncIOMotorClient(MONGODB_URL)
database = client[DATABASE_NAME]
yield database
client.close()
# @app.on_event("startup")
# def save_openapi_json():
# openapi_data = app.openapi()
# with open("openapi.json", "w") as file:
# json.dump(openapi_data, file)
# Converting types to binary
def msg_to_bin(msg):
if type(msg) == str:
return ''.join([format(ord(i), "08b") for i in msg])
elif type(msg) == bytes or type(msg) == np.ndarray:
return [format(i, "08b") for i in msg]
elif type(msg) == int or type(msg) == np.uint8:
return format(msg, "08b")
else:
raise TypeError("Input type not supported")
# defining function to hide the secret message into the image
def hide_data(img, secret_msg):
# calculating the maximum bytes for encoding
nBytes = img.shape[0] * img.shape[1] * 3 // 8
print("Maximum Bytes for encoding:", nBytes)
# checking whether the number of bytes for encoding is less
# than the maximum bytes in the image
if len(secret_msg) > nBytes:
raise ValueError("Error encountered insufficient bytes, need bigger image or less data!!")
secret_msg += '#####' # we can utilize any string as the delimiter
dataIndex = 0
# converting the input data to binary format using the msg_to_bin() function
bin_secret_msg = msg_to_bin(secret_msg)
# finding the length of data that requires to be hidden
dataLen = len(bin_secret_msg)
for values in img:
for pixels in values:
# converting RGB values to binary format
r, g, b = msg_to_bin(pixels)
# modifying the LSB only if there is data remaining to store
if dataIndex < dataLen:
# hiding the data into LSB of Red pixel
pixels[0] = int(r[:-1] + bin_secret_msg[dataIndex], 2)
dataIndex += 1
if dataIndex < dataLen:
# hiding the data into LSB of Green pixel
pixels[1] = int(g[:-1] + bin_secret_msg[dataIndex], 2)
dataIndex += 1
if dataIndex < dataLen:
# hiding the data into LSB of Blue pixel
pixels[2] = int(b[:-1] + bin_secret_msg[dataIndex], 2)
dataIndex += 1
# if data is encoded, break out the loop
if dataIndex >= dataLen:
break
return img
def show_data(img):
bin_data = ""
for values in img:
for pixels in values:
# converting the Red, Green, Blue values into binary format
r, g, b = msg_to_bin(pixels)
# data extraction from the LSB of Red pixel
bin_data += r[-1]
# data extraction from the LSB of Green pixel
bin_data += g[-1]
# data extraction from the LSB of Blue pixel
bin_data += b[-1]
# split by 8-Bits
allBytes = [bin_data[i: i + 8] for i in range(0, len(bin_data), 8)]
# converting from bits to characters
decodedData = ""
for bytes in allBytes:
decodedData += chr(int(bytes, 2))
# checking if we have reached the delimiter which is "#####"
if decodedData[-5:] == "#####":
break
# print(decodedData)
# removing the delimiter to display the actual hidden message
return decodedData[:-5]
@app.get("/")
async def root():
return {"message": "Server is alive"}
@app.post("/hide_message")
async def hide_message(image: UploadFile = File(...), secret_msg: str = Form(...)):
try:
# Read image using OpenCV
content = await image.read()
nparr = np.frombuffer(content, np.uint8)
img = cv2.imdecode(nparr, cv2.IMREAD_UNCHANGED)
# Hide the secret message in the image
encoded_image = hide_data(img, secret_msg)
# Convert the image to bytes
_, encoded_image_data = cv2.imencode(".png", encoded_image)
encoded_image_bytes = encoded_image_data.tobytes()
# Encode the image bytes to base64
encoded_image_base64 = base64.b64encode(encoded_image_bytes).decode()
# Return the base64 encoded image
return {"base64_image": encoded_image_base64}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# API endpoint to retrieve the secret message from an image
@app.post("/retrieve_message")
async def retrieve_message(image: UploadFile = File(...)):
try:
# Read image using OpenCV
content = await image.read()
nparr = np.frombuffer(content, np.uint8)
img = cv2.imdecode(nparr, cv2.IMREAD_UNCHANGED)
# Retrieve the hidden message from the image
decoded_message = show_data(img)
return {"decoded_message": decoded_message}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/save_user_data")
async def save_user_data(name: str = Form(...), email: str = Form(...), database: AsyncIOMotorClient = Depends(get_database_client)):
try:
# Save user data to MongoDB
users_collection = database[COLLECTION_NAME]
user_data = {
"name": name,
"email": email,
"first_login": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
}
result = await users_collection.insert_one(user_data)
# Return success response
return {"message": "User data saved successfully"}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get(os.environ.get("getUsersEp"))
async def get_all_users(database: AsyncIOMotorClient = Depends(get_database_client)):
try:
# Retrieve all users from MongoDB
users_collection = database[COLLECTION_NAME]
all_users = await users_collection.find({}, {"_id": 0}).to_list(length=None)
# Return the list of all users
return {"users": all_users}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e)) |