import gradio as gr import paho.mqtt.client as mqtt import json import time import threading import os import base64 from PIL import Image import io import numpy as np import logging import sys import random import cv2 from PIL import ImageDraw import requests logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", handlers=[logging.StreamHandler(sys.stdout), logging.FileHandler("app.log")], ) logger = logging.getLogger("bambu-analysis") BAMBU_HOST = os.environ.get("BAMBU_HOST", "default_host") BAMBU_PORT = int(os.environ.get("BAMBU_PORT", 1883)) BAMBU_USERNAME = os.environ.get("BAMBU_USERNAME", "default_user") BAMBU_PASSWORD = os.environ.get("BAMBU_PASSWORD", "default_pass") DEFAULT_SERIAL = os.environ.get("DEFAULT_SERIAL", "default_serial") RPI_HOST = os.environ.get("RPI_HOST", "default_host") RPI_PORT = int(os.environ.get("RPI_PORT", 1883)) RPI_USERNAME = os.environ.get("RPI_USERNAME", "default_user") RPI_PASSWORD = os.environ.get("RPI_PASSWORD", "default_pass") logger.info( f"MQTT Configuration: HOST={BAMBU_HOST}, PORT={BAMBU_PORT}, USERNAME={BAMBU_USERNAME}" ) latest_data = { "bed_temperature": "N/A", "nozzle_temperature": "N/A", "status": "N/A", "update_time": "Waiting for data...", "image_url": "N/A", } bambu_client = None rpi_client = None response_topic = None # Will be set dynamically def create_client(client_type, host, port, username, password): global bambu_client, rpi_client if client_type == "bambu": bambu_client = mqtt.Client() bambu_client.username_pw_set(username, password) bambu_client.tls_set(tls_version=mqtt.ssl.PROTOCOL_TLS) bambu_client.on_connect = on_connect bambu_client.on_message = bambu_on_message bambu_client.connect(host, port) bambu_client.loop_start() elif client_type == "rpi": rpi_client = mqtt.Client() rpi_client.username_pw_set(username, password) rpi_client.tls_set(tls_version=mqtt.ssl.PROTOCOL_TLS) rpi_client.on_connect = on_connect rpi_client.on_message = rpi_on_message rpi_client.connect(host, port) rpi_client.loop_start() def on_connect(client, userdata, flags, rc): logger.info(f"Connected with result code {rc}") def bambu_on_message(client, userdata, message): global latest_data logger.info("Received message") try: data = json.loads(message.payload) latest_data["bed_temperature"] = data.get("bed_temperature", "N/A") latest_data["nozzle_temperature"] = data.get("nozzle_temperature", "N/A") latest_data["status"] = data.get("status", "N/A") latest_data["update_time"] = time.strftime( "%Y-%m-%d %H:%M:%S", time.localtime() ) latest_data["image_url"] = data.get("image_url", "N/A") except Exception as e: logger.error(f"Error parsing MQTT message: {e}") def rpi_on_message(client, userdata, message): global latest_data logger.info("Received message") try: data = json.loads(message.payload) latest_data["image_url"] = data.get("image_url", "N/A") except Exception as e: logger.error(f"Error parsing MQTT message: {e}") def get_data(serial=DEFAULT_SERIAL): global bambu_client, response_topic if bambu_client is None: create_client("bambu", BAMBU_HOST, BAMBU_PORT, BAMBU_USERNAME, BAMBU_PASSWORD) request_topic = f"bambu_a1_mini/request/{serial}" response_topic = f"bambu_a1_mini/response/{serial}" logger.info(f"Subscribing to {response_topic}") bambu_client.subscribe(response_topic) logger.info(f"Publishing request to {request_topic}") bambu_client.publish(request_topic, json.dumps("HI")) global latest_data latest_data["bed_temperature"] = "N/A" timeout = 10 while latest_data["bed_temperature"] == "N/A" and timeout > 0: time.sleep(1) timeout -= 1 return ( latest_data["status"], latest_data["bed_temperature"], latest_data["nozzle_temperature"], latest_data["update_time"], ) def send_print_parameters(nozzle_temp, bed_temp, print_speed, fan_speed): global bambu_client serial = DEFAULT_SERIAL logger.info( f"Sending parameters to {serial}: nozzle={nozzle_temp}, bed={bed_temp}, speed={print_speed}, fan={fan_speed}" ) try: params = { "nozzle_temp": nozzle_temp, "bed_temp": bed_temp, "print_speed": print_speed, "fan_speed": fan_speed, } request_topic = f"bambu_a1_mini/request/{serial}" if bambu_client: bambu_client.publish( request_topic, json.dumps({"command": "set_parameters", "parameters": params}), ) logger.info("Parameters sent successfully") return "Parameters sent successfully" else: logger.warning("MQTT not connected, parameters not sent") return "MQTT not connected, parameters not sent" except Exception as e: logger.error(f"Error sending parameters: {e}") return f"Error sending parameters: {e}" def get_image_base64(image): if image is None: logger.warning("No image to encode") return None try: if isinstance(image, np.ndarray): image = Image.fromarray(image) buffer = io.BytesIO() image.save(buffer, format="PNG") img_str = base64.b64encode(buffer.getvalue()).decode("utf-8") logger.info(f"Image encoded to base64 (length: {len(img_str)})") return img_str except Exception as e: logger.error(f"Error encoding image: {e}") return None def get_test_image(image_name=None): test_dir = os.path.join(os.path.dirname(__file__), "test_images") if not os.path.exists(test_dir): logger.error(f"Test images directory not found: {test_dir}") return None image_files = [ f for f in os.listdir(test_dir) if f.lower().endswith((".png", ".jpg", ".jpeg", ".bmp")) ] if not image_files: logger.error("No test images found") return None if image_name and image_name in image_files: image_path = os.path.join(test_dir, image_name) else: image_path = os.path.join(test_dir, random.choice(image_files)) logger.info(f"Using test image: {image_path}") try: return Image.open(image_path) except Exception as e: logger.error(f"Failed to open test image: {e}") return None def capture_image(url=None, use_test_image=False, test_image_name=None): global rpi_client, latest_data if rpi_client is None: create_client("rpi", BAMBU_HOST, BAMBU_PORT, BAMBU_USERNAME, BAMBU_PASSWORD) serial = DEFAULT_SERIAL request_topic = f"bambu_a1_mini/request/{serial}" response_topic = f"bambu_a1_mini/response/{serial}" logger.info(f"Subscribing to {response_topic}") rpi_client.subscribe(response_topic) rpi_client.publish( request_topic, json.dumps( { "command": "capture_image", } ), ) latest_data["image_url"] = "N/A" timeout = 20 while latest_data["image_url"] == "N/A" and timeout > 0: print("timeout", timeout) time.sleep(1) timeout -= 1 url = latest_data["image_url"] if use_test_image: logger.info("Using test image instead of URL") test_img = get_test_image(test_image_name) if test_img: return test_img else: logger.warning("Failed to get specified test image, trying URL") if url != "N/A": try: logger.info(f"Capturing image from URL: {url}") response = requests.get(url, timeout=10) if response.status_code == 200: return Image.open(io.BytesIO(response.content)) else: logger.error(f"Failed to get image from URL: {response.status_code}") except Exception as e: logger.error(f"Error capturing image from URL: {e}") logger.info("URL capture failed or not provided, using random test image") return get_test_image() def health_check(): status = { "app": "running", "time": time.strftime("%Y-%m-%d %H:%M:%S"), "bambu_mqtt_connected": bambu_client is not None, "rpi_mqtt_connected": rpi_client is not None, "latest_update": latest_data["update_time"], } logger.info(f"Health check: {status}") return status demo = gr.Blocks(title="Bambu A1 Mini Print Control") with demo: gr.Markdown("# Bambu A1 Mini Print Control") with gr.Row(): refresh_btn = gr.Button("Refresh Status") with gr.Row(): current_status = gr.Textbox( label="Printer Status", value="N/A", interactive=False ) current_bed_temp = gr.Textbox( label="Current Bed Temperature", value="N/A", interactive=False ) current_nozzle_temp = gr.Textbox( label="Current Nozzle Temperature", value="N/A", interactive=False ) last_update = gr.Textbox(label="Last Update", value="N/A", interactive=False) with gr.Row(): # Left column for image and capture button with gr.Column(scale=2): captured_image = gr.Image(label="Current Print Image", type="pil") capture_btn = gr.Button("Capture Image") # Right column for queue status and livestream with gr.Column(scale=1): gr.Markdown("### YouTube Livestream") iframe_html = """