File size: 6,972 Bytes
d737cb7
 
ef16e35
d737cb7
 
cd325a1
ef16e35
d737cb7
ef16e35
 
d737cb7
b3d077e
d737cb7
ef16e35
cd325a1
 
ef16e35
 
d737cb7
 
 
 
 
 
 
 
 
ef16e35
 
 
 
 
 
 
d737cb7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ef16e35
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
cd325a1
ef16e35
 
 
 
 
 
 
 
d737cb7
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
# Importing the required libraries
import cv2
from fastapi import FastAPI, File, UploadFile, HTTPException, Form, Depends
from fastapi.middleware.cors import CORSMiddleware
import numpy as np
import os
import datetime
import base64
from motor.motor_asyncio import AsyncIOMotorClient
from pymongo.server_api import ServerApi

app = FastAPI(docs_url=None, openapi_url=None, redoc_url=None)

# MongoDB connection settings
MONGODB_URL = os.environ.get("mongoUrl")
DATABASE_NAME = os.environ.get("databaseName")
COLLECTION_NAME = "inkognitoUsers"

# Allowing CORS (Cross-Origin Resource Sharing)
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],  
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# MongoDB client initialization
async def get_database_client():
    client = AsyncIOMotorClient(MONGODB_URL)
    database = client[DATABASE_NAME]
    yield database
    client.close()

# @app.on_event("startup")
# def save_openapi_json():
#     openapi_data = app.openapi()
#     with open("openapi.json", "w") as file:
#         json.dump(openapi_data, file)

# Converting types to binary  
def msg_to_bin(msg):  
    if type(msg) == str:  
        return ''.join([format(ord(i), "08b") for i in msg])  
    elif type(msg) == bytes or type(msg) == np.ndarray:  
        return [format(i, "08b") for i in msg]  
    elif type(msg) == int or type(msg) == np.uint8:  
        return format(msg, "08b")  
    else:  
        raise TypeError("Input type not supported")  

# defining function to hide the secret message into the image  
def hide_data(img, secret_msg):  
    # calculating the maximum bytes for encoding  
    nBytes = img.shape[0] * img.shape[1] * 3 // 8  
    print("Maximum Bytes for encoding:", nBytes)  
    # checking whether the number of bytes for encoding is less  
    # than the maximum bytes in the image  
    if len(secret_msg) > nBytes:  
        raise ValueError("Error encountered insufficient bytes, need bigger image or less data!!")  
    secret_msg += '#####'       # we can utilize any string as the delimiter  
    dataIndex = 0  
    # converting the input data to binary format using the msg_to_bin() function  
    bin_secret_msg = msg_to_bin(secret_msg)  
  
    # finding the length of data that requires to be hidden  
    dataLen = len(bin_secret_msg)  
    for values in img:  
        for pixels in values:  
            # converting RGB values to binary format  
            r, g, b = msg_to_bin(pixels)  
            # modifying the LSB only if there is data remaining to store  
            if dataIndex < dataLen:  
                # hiding the data into LSB of Red pixel  
                pixels[0] = int(r[:-1] + bin_secret_msg[dataIndex], 2)  
                dataIndex += 1  
            if dataIndex < dataLen:  
                # hiding the data into LSB of Green pixel  
                pixels[1] = int(g[:-1] + bin_secret_msg[dataIndex], 2)  
                dataIndex += 1  
            if dataIndex < dataLen:  
                # hiding the data into LSB of Blue pixel  
                pixels[2] = int(b[:-1] + bin_secret_msg[dataIndex], 2)  
                dataIndex += 1  
            # if data is encoded, break out the loop  
            if dataIndex >= dataLen:  
                break  
      
    return img  
  
def show_data(img):  
    bin_data = ""  
    for values in img:  
        for pixels in values:  
            # converting the Red, Green, Blue values into binary format  
            r, g, b = msg_to_bin(pixels)  
            # data extraction from the LSB of Red pixel  
            bin_data += r[-1]  
            # data extraction from the LSB of Green pixel  
            bin_data += g[-1]  
            # data extraction from the LSB of Blue pixel  
            bin_data += b[-1]  
    # split by 8-Bits  
    allBytes = [bin_data[i: i + 8] for i in range(0, len(bin_data), 8)]  
    # converting from bits to characters  
    decodedData = ""  
    for bytes in allBytes:  
        decodedData += chr(int(bytes, 2))  
        # checking if we have reached the delimiter which is "#####"  
        if decodedData[-5:] == "#####":  
            break  
    # print(decodedData)  
    # removing the delimiter to display the actual hidden message  
    return decodedData[:-5]  

@app.get("/")
async def root():
    return {"message": "Server is alive"}

@app.post("/hide_message")
async def hide_message(image: UploadFile = File(...), secret_msg: str = Form(...)):
    try:
        # Read image using OpenCV
        content = await image.read()
        nparr = np.frombuffer(content, np.uint8)
        img = cv2.imdecode(nparr, cv2.IMREAD_UNCHANGED)

        # Hide the secret message in the image
        encoded_image = hide_data(img, secret_msg)

        # Convert the image to bytes
        _, encoded_image_data = cv2.imencode(".png", encoded_image)
        encoded_image_bytes = encoded_image_data.tobytes()

        # Encode the image bytes to base64
        encoded_image_base64 = base64.b64encode(encoded_image_bytes).decode()

        # Return the base64 encoded image
        return {"base64_image": encoded_image_base64}
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))


# API endpoint to retrieve the secret message from an image
@app.post("/retrieve_message")
async def retrieve_message(image: UploadFile = File(...)):
    try:
        # Read image using OpenCV
        content = await image.read()
        nparr = np.frombuffer(content, np.uint8)
        img = cv2.imdecode(nparr, cv2.IMREAD_UNCHANGED)

        # Retrieve the hidden message from the image
        decoded_message = show_data(img)

        return {"decoded_message": decoded_message}
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))
    
@app.post("/save_user_data")
async def save_user_data(name: str = Form(...), email: str = Form(...), database: AsyncIOMotorClient = Depends(get_database_client)):
    try:
        # Save user data to MongoDB
        users_collection = database[COLLECTION_NAME]
        user_data = {
            "name": name,
            "email": email,
            "first_login": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
        }
        result = await users_collection.insert_one(user_data)

        # Return success response
        return {"message": "User data saved successfully"}
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.get(os.environ.get("getUsersEp"))
async def get_all_users(database: AsyncIOMotorClient = Depends(get_database_client)):
    try:
        # Retrieve all users from MongoDB
        users_collection = database[COLLECTION_NAME]
        all_users = await users_collection.find({}, {"_id": 0}).to_list(length=None)

        # Return the list of all users
        return {"users": all_users}
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))