TheKnight115's picture
Update app.py
3889969 verified
raw
history blame
5.03 kB
import streamlit as st
from processor import process_frame, process_image, process_video
import os
from PIL import Image
import tempfile
import cv2
from streamlit_webrtc import VideoTransformerBase, webrtc_streamer, WebRtcMode, ClientSettings
import av
import asyncio
# Configure Streamlit page
st.set_page_config(page_title="🚦 Traffic Violation Detection", layout="wide")
st.title("🚦 Traffic Violation Detection App")
# Sidebar for selection
st.sidebar.title("Choose an Option")
option = st.sidebar.radio("Select the processing type:", ("Image", "Video", "Live Camera"))
if option == "Image":
st.header("πŸ–ΌοΈ Image Processing")
uploaded_file = st.file_uploader("Upload an image", type=["jpg", "jpeg", "png"])
if uploaded_file is not None:
# Save the uploaded image to a temporary file
with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as temp_image:
temp_image.write(uploaded_file.read())
temp_image_path = temp_image.name
# Display the uploaded image
st.image(uploaded_file, caption='Uploaded Image.', use_column_width=True)
# Process the image
if st.button("Process Image"):
with st.spinner("Processing..."):
font_path = "fonts/alfont_com_arial-1.ttf" # Update the path as needed
processed_image = process_image(temp_image_path, font_path)
if processed_image is not None:
# Convert the processed image to RGB
processed_image_rgb = cv2.cvtColor(processed_image, cv2.COLOR_BGR2RGB)
st.image(processed_image_rgb, caption='Processed Image.', use_column_width=True)
# Save processed image to a temporary file
result_image = Image.fromarray(processed_image_rgb)
with tempfile.NamedTemporaryFile(delete=False, suffix=".jpg") as tmp:
result_image.save(tmp.name)
tmp_path = tmp.name
# Download button
with open(tmp_path, "rb") as file:
btn = st.download_button(
label="πŸ“₯ Download Processed Image",
data=file,
file_name="processed_image.jpg",
mime="image/jpeg"
)
else:
st.error("Failed to process the image.")
elif option == "Video":
st.header("πŸŽ₯ Video Processing")
video_files = [f for f in os.listdir("videos") if f.endswith(('.mp4', '.avi', '.mov'))]
if not video_files:
st.warning("No predefined videos found in the 'videos/' directory.")
else:
selected_video = st.selectbox("Select a video to process:", video_files)
video_path = os.path.join("videos", selected_video)
st.video(video_path)
if st.button("Process Video"):
with st.spinner("Processing..."):
font_path = "fonts/alfont_com_arial-1.ttf" # Update the path as needed
processed_video_path = process_video(video_path, font_path)
if processed_video_path and os.path.exists(processed_video_path):
st.success("Video processed successfully!")
st.video(processed_video_path)
# Provide download button
with open(processed_video_path, "rb") as file:
btn = st.download_button(
label="πŸ“₯ Download Processed Video",
data=file,
file_name="processed_video.mp4",
mime="video/mp4"
)
else:
st.error("Failed to process the video.")
elif option == "Live Camera":
st.header("πŸ“· Live Camera Processing")
st.warning("Live camera processing is in progress. Please allow camera access.")
# Define settings for WebRTC
WEBRTC_CLIENT_SETTINGS = ClientSettings(
rtc_configuration={"iceServers": [{"urls": ["stun:stun.l.google.com:19302"]}]},
media_stream_constraints={"video": True, "audio": False},
)
class VideoTransformer(VideoTransformerBase):
def __init__(self):
self.font_path = "fonts/alfont_com_arial-1.ttf" # Update the path as needed
def transform(self, frame):
img = frame.to_ndarray(format="bgr24")
processed_img = process_frame(img, self.font_path)
if processed_img is not None:
return processed_img
return img
webrtc_ctx = webrtc_streamer(
key="live-camera",
mode=WebRtcMode.SENDRECV,
client_settings=WEBRTC_CLIENT_SETTINGS,
video_transformer_factory=VideoTransformer,
async_transform=True,
)
st.info("Live camera feed is being processed. Detected violations will be annotated on the video stream.")