CV_Accelerator / utils.py
samkeet's picture
First Commit
3d90a2e verified
raw
history blame
17.7 kB
# Importing necessary libraries
import cv2
import utils
import inspect
import numpy as np
import pandas as pd
import streamlit as st
import albumentations as A
# Please retain all the imported libraries, even if they appear to be unused, as they are necessary for the input part of the code (dynamic_input function)
# Sample class labels
sample_class_labels = "person, bicycle, car, motorcycle, airplane, bus, train, truck, boat, traffic light, fire hydrant, stop sign, parking meter, bench, bird, cat, dog, horse, sheep, cow, elephant, bear, zebra, giraffe, backpack, umbrella, handbag, tie, suitcase, frisbee, skis, snowboard, sports ball, kite, baseball bat, baseball glove, skateboard, surfboard, tennis racket, bottle, wine glass, cup, fork, knife, spoon, bowl, banana, apple, sandwich, orange, broccoli, carrot, hot dog, pizza, donut, cake, chair, couch, potted plant, bed, dining table, toilet, tv, laptop, mouse, remote, keyboard, cell phone, microwave, oven, toaster, sink, refrigerator, book, clock, vase, scissors, teddy bear, hair drier, toothbrush"
# Model info
models_info = {
"YOLOv8n": {
"size": 640,
"mAPval": 37.3,
"speed_cpu": 80.4,
"speed_gpu": 0.99,
"params": 3.2,
"flops": 8.7,
},
"YOLOv8s": {
"size": 640,
"mAPval": 44.9,
"speed_cpu": 128.4,
"speed_gpu": 1.20,
"params": 11.2,
"flops": 28.6,
},
"YOLOv8m": {
"size": 640,
"mAPval": 50.2,
"speed_cpu": 234.7,
"speed_gpu": 1.83,
"params": 25.9,
"flops": 78.9,
},
"YOLOv8l": {
"size": 640,
"mAPval": 52.9,
"speed_cpu": 375.2,
"speed_gpu": 2.39,
"params": 43.7,
"flops": 165.2,
},
"YOLOv8x": {
"size": 640,
"mAPval": 53.9,
"speed_cpu": 479.1,
"speed_gpu": 3.53,
"params": 68.2,
"flops": 257.8,
},
}
# Export formats info
export_formats = {
"TorchScript": {"format_argument": "torchscript", "arguments": ["optimize"]},
"ONNX": {
"format_argument": "onnx",
"arguments": ["half", "dynamic", "simplify", "opset"],
},
"OpenVINO": {"format_argument": "openvino", "arguments": ["half", "int8"]},
"TensorRT": {
"format_argument": "engine",
"arguments": ["half", "dynamic", "simplify", "workspace"],
},
"CoreML": {"format_argument": "coreml", "arguments": ["half", "int8", "nms"]},
"TF SavedModel": {"format_argument": "saved_model", "arguments": ["keras", "int8"]},
"TF GraphDef": {"format_argument": "pb", "arguments": []},
"TF Lite": {"format_argument": "tflite", "arguments": ["half", "int8"]},
"TF Edge TPU": {"format_argument": "edgetpu", "arguments": []},
"TF.js": {"format_argument": "tfjs", "arguments": ["half", "int8"]},
"PaddlePaddle": {"format_argument": "paddle", "arguments": []},
"ncnn": {"format_argument": "ncnn", "arguments": ["half"]},
}
# Function to add Top Padding
def top_padding(padding_aount=0):
for i in range(padding_aount):
st.write("")
# Function to load data from an Excel file
@st.cache_resource(show_spinner=False)
def load_data_from_excel(filename, sheet_name):
"""Load data from a specified sheet in an Excel file."""
return pd.read_excel(filename, sheet_name=sheet_name)
# Function to resets the validation trigger in the session state
def reset_validation_trigger():
st.session_state["validation_triggered"] = False
# Function to clear the Streamlit session state on the first load of the page
def clear_session_state_on_first_load(key):
# Check if the function has already been executed
if key not in st.session_state:
# Clear the session state
for k in list(st.session_state.keys()):
if k != key: # Preserve the key itself
del st.session_state[k]
# Mark this function as executed
st.session_state[key] = True
# Function to display a file uploader based on the cached state
def display_file_uploader(uploaded_files_type, label_value, key_value, is_cached):
if not is_cached:
st.session_state[uploaded_files_type] = st.file_uploader(
label=label_value,
type=["jpg", "png", "txt"],
accept_multiple_files=True,
on_change=utils.reset_validation_trigger,
key=key_value,
)
else:
st.file_uploader(
label_value,
type=["jpg", "png", "txt"],
accept_multiple_files=True,
disabled=True,
)
# Function to map each technique to its corresponding image types
@st.cache_resource(show_spinner=False)
def technique_image_input_mapping(available_techniques, details_df):
input_mapping_dict = {}
for technique in available_techniques:
# Filter the DataFrame for rows where the Name matches the technique
filtered_df = details_df[details_df["Name"] == technique]
# Extract the Image Types for these rows, split by comma, and strip spaces
image_types = (
filtered_df["Image Types"]
.apply(lambda x: [type.strip() for type in str(x).split(",")])
.tolist()
)
# Add to the dictionary
input_mapping_dict[technique] = image_types[0]
return input_mapping_dict
# Function to check if the image type match the allowed types
def is_image_type_allowed(input_image_type, num_channels, allowed_image_types):
for allowed_type in allowed_image_types:
# Handle general types like 'Any' or 'nan'
if allowed_type in ["Any", "nan", input_image_type]:
return True
# Handle specific multi-channel types (e.g., '3-channel uint8 images only')
if "channel" in allowed_type:
parts = allowed_type.split()
try:
allowed_channels = int(parts[0][0])
allowed_dtype = parts[1]
if (
num_channels == allowed_channels
and input_image_type == allowed_dtype
):
return True
except ValueError:
# Continue to the next allowed type
continue
# Add additional specific checks if needed
return False # If none of the conditions are met
# Function to creates a dynamic input widget based on the specified data type
def dynamic_input(
image_processing, data_type, label, input_key, input_data, default_val
):
if data_type == "bool":
# Create a checkbox for bool input
return st.checkbox(label=label, value=eval(default_val), key=input_key)
elif data_type == "int" or data_type == "float":
# Create a slider for integer / float input
return st.select_slider(
label=label,
options=eval(input_data),
value=eval(default_val),
key=input_key,
)
elif data_type == "tuple":
# Create a slider for tuple input
min_val, max_val = st.select_slider(
label=label,
options=eval(input_data),
value=(eval(default_val)[0], eval(default_val)[1]),
key=input_key,
)
return (min_val, max_val)
elif data_type == "single_select":
# Create a selectbox for single select input
return st.selectbox(
label=label,
options=eval(input_data),
index=eval(input_data).index(eval(default_val)),
)
elif data_type == "none":
# Return None without creating any input widget
return None
elif data_type == "code":
# Create a selectbox for code input
try:
# Evaluate the code entered by the user in the text input
input_code = eval(st.text_input(label=label, value=default_val))
except:
# If the user input is not valid Python code, catch the exception
st.warning(f"Invalid input for {label}. Using default value.", icon="⚠️")
# Return the default parameter value for this image processing technique
input_code = get_default_params(image_processing)[label]
# Show an example of the expected input format to the user
st.write(f"Sample Input for {label}:")
st.code(input_data, language="python", line_numbers=False)
return input_code
# Handle unsupported data types
else:
# Display an error message and return None for unsupported data types
st.error(f"Unsupported data type: {data_type}")
# Returns a dictionary of default parameters for a given Albumentations augmentation class
def get_default_params(augmentation_name):
# Retrieve the augmentation class from the Albumentations module
augmentation_class = getattr(A, augmentation_name, None)
# Inspect the constructor (__init__) of the augmentation class to get its parameters
params = inspect.signature(augmentation_class.__init__).parameters
# Create a dictionary of parameter names and their default values
default_params = {
name: param.default
for name, param in params.items()
if param.default is not inspect.Parameter.empty
}
return default_params
# Function to resize the image to a maximum dimension of 512 pixels while maintaining aspect ratio
def resize_image(image, max_size=512):
height, width = image.shape[:2]
# Calculate the ratio to maintain aspect ratio
ratio = min(max_size / height, max_size / width)
new_dimensions = (int(width * ratio), int(height * ratio))
resized_image = cv2.resize(image, new_dimensions, interpolation=cv2.INTER_AREA)
return resized_image
# Function to apply the specified transformation with optional custom parameters
def apply_albumentation(custom_params, albumentation_name):
# Get the Albumentations transformation class from the albumentation_name string
albumentation_class = getattr(A, albumentation_name, None)
# Check if the transformation class exists in the Albumentations library
if albumentation_class is not None:
# Apply the transformation with or without custom parameters
if custom_params is None:
return albumentation_class()
else:
return albumentation_class(**custom_params)
# Function to process image parameters from a DataFrame and create interactive UI
def process_image_parameters(params_df, image_processing):
# Dictionary to map UI input types to database values
type_mapping = {
"Boolean": "bool",
"Integer": "int",
"Float": "float",
"Tuple": "tuple",
"Single Select": "single_select",
"Code": "code",
"None": "none",
}
# Create a reverse mapping from database values to UI input types
reverse_type_mapping = {v: k for k, v in type_mapping.items()}
# Initialize a dictionary to store user-selected parameter values
image_processing_params = {}
if not params_df.empty:
for _, row in params_df.iterrows():
# Spacer
st.markdown("---")
param_name = row["Parameter Name"]
input_types = [
input_type.strip() for input_type in str(row["Input Type"]).split(",")
]
if len(input_types) > 1:
col1, col2 = st.columns([3, 7])
selected_type_ui = col1.selectbox(
"Choose input type",
[reverse_type_mapping[ip] for ip in input_types],
key=f"{image_processing}_{param_name}_selectbox",
)
# Map the selected UI input type to database value
selected_type = type_mapping[selected_type_ui]
selected_index = input_types.index(selected_type)
with col2:
user_input = utils.dynamic_input(
image_processing=image_processing,
data_type=selected_type,
label=param_name,
input_data=str(row[f"Input Values {selected_index+1}"]),
default_val=str(row[f"Default Value {selected_index+1}"]),
input_key=f"{image_processing}_{param_name}_{selected_type}",
)
else:
user_input = utils.dynamic_input(
image_processing=image_processing,
data_type=input_types[0],
label=param_name,
input_data=str(row[f"Input Values 1"]),
default_val=str(row[f"Default Value 1"]),
input_key=f"{image_processing}_{param_name}_{input_types[0]}",
)
st.markdown(
f"<div style='text-align: justify;'><b>Description:</b> {row['Parameter Description']}</div>",
unsafe_allow_html=True,
)
image_processing_params[param_name] = user_input
# Top-padding
top_padding(2)
return image_processing_params
# Function to generates a DataFrame detailing image processing technique parameters and descriptions
@st.cache_resource(show_spinner=False)
def create_image_processings_dataframe(
image_processings_params, image_processing_params_db
):
data = []
for aug_name, params in image_processings_params.items():
for param_name, param_value in params.items():
# Retrieve relevant image_processing information from the database
image_processing_info = image_processing_params_db[
image_processing_params_db["Name"] == aug_name
]
param_info = image_processing_info[
image_processing_info["Parameter Name"] == param_name
]
# Check if the parameter information exists in the database
if not param_info.empty:
# Get the description of the current parameter
param_description = param_info["Parameter Description"].iloc[0]
else:
param_description = "Description not available"
# Append image_processing name, parameter name, its value, and description to the data list
data.append([aug_name, param_name, param_value, param_description])
# Create the DataFrame from the accumulated data
image_processings_df = pd.DataFrame(
data, columns=["image_processing", "Parameter", "Value", "Description"]
)
return image_processings_df
def generate_python_code_images(
augmentations_params,
num_variations=1,
include_original=False,
):
# Start with necessary library imports
code_str = "# Importing necessary libraries\n"
code_str += "import os\nimport cv2\nimport shutil\nimport albumentations as A\n\n"
# Paths for input and output directories
code_str += "# Define the paths for input and output directories\n"
code_str += "input_directory = 'path/to/input'\n"
code_str += "output_directory = 'path/to/output'\n\n"
# Function to create an augmentation pipeline
code_str += "# Function to create an augmentation pipeline using Albumentations\n"
code_str += "def process_image(image):\n"
code_str += " # Define the sequence of augmentation techniques\n"
code_str += " pipeline = A.Compose([\n"
for technique, params in augmentations_params.items():
code_str += f" A.{technique}({', '.join(f'{k}={v}' for k, v in params.items())}),\n"
code_str += " ])\n"
code_str += " # Apply the augmentation pipeline\n"
code_str += " return pipeline(image=image)['image']\n\n"
# Function to process a batch of images
code_str += "# Function to process a batch of images\n"
code_str += "def process_batch(input_directory, output_directory, include_original=False, num_variations=1):\n"
code_str += " for filename in os.listdir(input_directory):\n"
code_str += " if filename.lower().endswith(('.png', '.jpg', '.jpeg')):\n"
code_str += " image_path = os.path.join(input_directory, filename)\n\n"
code_str += " # Read the image\n"
code_str += " image = cv2.imread(image_path)\n\n"
# Include original image
code_str += " # Include original image\n"
if include_original:
code_str += " shutil.copy2(image_path, output_directory)\n\n"
code_str += " # Generate variations for each image and process them\n"
code_str += f" for variation in range({num_variations}):\n"
code_str += " processed_image = process_image(image)\n\n"
code_str += " # Save the processed image\n"
code_str += " output_filename = f'processed_{os.path.splitext(filename)[0]}_{variation}{os.path.splitext(filename)[1]}'\n"
code_str += " cv2.imwrite(os.path.join(output_directory, output_filename), processed_image)\n\n"
# Execute the batch processing function
code_str += (
"# Execute the batch processing function with the specified parameters\n"
)
code_str += "process_batch(input_directory, output_directory)\n"
return code_str