import sqlite3
from io import BytesIO
import base64
def create_connection(db_file):
""" Create a database connection to the SQLite database """
conn = None
try:
conn = sqlite3.connect(db_file)
return conn
except sqlite3.Error as e:
print(e)
return conn
def create_table(conn):
""" Create the results tables if they do not exist """
create_glaucoma_table_sql = """ CREATE TABLE IF NOT EXISTS results (
id integer PRIMARY KEY,
image blob,
cup_area real,
disk_area real,
rim_area real,
rim_to_disc_line_ratio real,
ddls_stage integer
); """
create_cataract_table_sql = """ CREATE TABLE IF NOT EXISTS cataract_results (
id integer PRIMARY KEY,
image blob,
red_quantity real,
green_quantity real,
blue_quantity real,
stage text
); """
try:
cursor = conn.cursor()
cursor.execute(create_glaucoma_table_sql)
cursor.execute(create_cataract_table_sql)
except sqlite3.Error as e:
print(e)
def save_prediction_to_db(image, cup_area, disk_area, rim_area, rim_to_disc_line_ratio, ddls_stage):
database = "glaucoma_results.db"
conn = create_connection(database)
if conn:
create_table(conn)
sql = ''' INSERT INTO results(image, cup_area, disk_area, rim_area, rim_to_disc_line_ratio, ddls_stage)
VALUES(?,?,?,?,?,?) '''
cur = conn.cursor()
# Convert the image to bytes
buffered = BytesIO()
image.save(buffered, format="PNG")
img_bytes = buffered.getvalue()
cur.execute(sql, (img_bytes, cup_area, disk_area, rim_area, rim_to_disc_line_ratio, ddls_stage))
conn.commit()
conn.close()
def fetch_all_data(conn, table_name):
""" Fetch all data from the given table """
cursor = conn.cursor()
cursor.execute(f"SELECT * FROM {table_name}")
rows = cursor.fetchall()
return rows
def get_db_data(db_path_glaucoma, db_path_cataract):
conn_glaucoma = create_connection(db_path_glaucoma)
conn_cataract = create_connection(db_path_cataract)
if conn_glaucoma and conn_cataract:
create_table(conn_glaucoma)
create_table(conn_cataract)
glaucoma_data = fetch_all_data(conn_glaucoma, "results")
cataract_data = fetch_all_data(conn_cataract, "cataract_results")
conn_glaucoma.close()
conn_cataract.close()
return glaucoma_data, cataract_data
else:
return [], []
def format_db_data(glaucoma_data, cataract_data):
""" Format the database data for display """
formatted_data = ""
if not glaucoma_data and not cataract_data:
return "No data available in the database."
if glaucoma_data:
headers = ["ID", "Image", "Cup Area", "Disk Area", "Rim Area", "Rim to Disc Line Ratio", "DDLS Stage"]
formatted_data += "
Glaucoma Data
" + "".join([f"{header} | " for header in headers]) + "
"
for row in glaucoma_data:
image_html = "No image"
if row[1] is not None:
image = base64.b64encode(row[1]).decode('utf-8')
image_html = f""
formatted_data += "" + "".join([f"{image_html if i == 1 else row[i]} | " for i in range(len(row))]) + "
"
formatted_data += "
"
if cataract_data:
headers = ["ID", "Image", "Red Quantity", "Green Quantity", "Blue Quantity", "Stage"]
formatted_data += "Cataract Data
" + "".join([f"{header} | " for header in headers]) + "
"
for row in cataract_data:
image_html = "No image"
if row[1] is not None:
image = base64.b64encode(row[1]).decode('utf-8')
image_html = f""
formatted_data += "" + "".join([f"{image_html if i == 1 else row[i]} | " for i in range(len(row))]) + "
"
formatted_data += "
"
return formatted_data