|
import gradio as gr |
|
import paho.mqtt.client as mqtt |
|
import json |
|
import time |
|
import threading |
|
import os |
|
import base64 |
|
from PIL import Image |
|
import io |
|
import numpy as np |
|
import logging |
|
import sys |
|
import random |
|
import cv2 |
|
from PIL import ImageDraw |
|
import requests |
|
|
|
|
|
logging.basicConfig( |
|
level=logging.INFO, |
|
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", |
|
handlers=[logging.StreamHandler(sys.stdout), logging.FileHandler("app.log")], |
|
) |
|
logger = logging.getLogger("bambu-analysis") |
|
|
|
BAMBU_HOST = os.environ.get("BAMBU_HOST", "default_host") |
|
BAMBU_PORT = int(os.environ.get("BAMBU_PORT", 1883)) |
|
BAMBU_USERNAME = os.environ.get("BAMBU_USERNAME", "default_user") |
|
BAMBU_PASSWORD = os.environ.get("BAMBU_PASSWORD", "default_pass") |
|
DEFAULT_SERIAL = os.environ.get("DEFAULT_SERIAL", "default_serial") |
|
|
|
RPI_HOST = os.environ.get("RPI_HOST", "default_host") |
|
RPI_PORT = int(os.environ.get("RPI_PORT", 1883)) |
|
RPI_USERNAME = os.environ.get("RPI_USERNAME", "default_user") |
|
RPI_PASSWORD = os.environ.get("RPI_PASSWORD", "default_pass") |
|
|
|
logger.info( |
|
f"MQTT Configuration: HOST={BAMBU_HOST}, PORT={BAMBU_PORT}, USERNAME={BAMBU_USERNAME}" |
|
) |
|
|
|
latest_data = { |
|
"bed_temperature": "N/A", |
|
"nozzle_temperature": "N/A", |
|
"status": "N/A", |
|
"update_time": "Waiting for data...", |
|
"image_url": "N/A", |
|
} |
|
|
|
bambu_client = None |
|
rpi_client = None |
|
response_topic = None |
|
|
|
|
|
def create_client(client_type, host, port, username, password): |
|
global bambu_client, rpi_client |
|
if client_type == "bambu": |
|
bambu_client = mqtt.Client() |
|
bambu_client.username_pw_set(username, password) |
|
bambu_client.tls_set(tls_version=mqtt.ssl.PROTOCOL_TLS) |
|
bambu_client.on_connect = on_connect |
|
bambu_client.on_message = bambu_on_message |
|
bambu_client.connect(host, port) |
|
bambu_client.loop_start() |
|
elif client_type == "rpi": |
|
rpi_client = mqtt.Client() |
|
rpi_client.username_pw_set(username, password) |
|
rpi_client.tls_set(tls_version=mqtt.ssl.PROTOCOL_TLS) |
|
rpi_client.on_connect = on_connect |
|
rpi_client.on_message = rpi_on_message |
|
rpi_client.connect(host, port) |
|
rpi_client.loop_start() |
|
|
|
|
|
def on_connect(client, userdata, flags, rc): |
|
logger.info(f"Connected with result code {rc}") |
|
|
|
|
|
def bambu_on_message(client, userdata, message): |
|
global latest_data |
|
logger.info("Received message") |
|
try: |
|
data = json.loads(message.payload) |
|
latest_data["bed_temperature"] = data.get("bed_temperature", "N/A") |
|
latest_data["nozzle_temperature"] = data.get("nozzle_temperature", "N/A") |
|
latest_data["status"] = data.get("status", "N/A") |
|
latest_data["update_time"] = time.strftime( |
|
"%Y-%m-%d %H:%M:%S", time.localtime() |
|
) |
|
latest_data["image_url"] = data.get("image_url", "N/A") |
|
except Exception as e: |
|
logger.error(f"Error parsing MQTT message: {e}") |
|
|
|
|
|
def rpi_on_message(client, userdata, message): |
|
global latest_data |
|
logger.info("Received message") |
|
try: |
|
data = json.loads(message.payload) |
|
latest_data["image_url"] = data.get("image_url", "N/A") |
|
except Exception as e: |
|
logger.error(f"Error parsing MQTT message: {e}") |
|
|
|
|
|
def get_data(serial=DEFAULT_SERIAL): |
|
global bambu_client, response_topic |
|
|
|
if bambu_client is None: |
|
create_client("bambu", BAMBU_HOST, BAMBU_PORT, BAMBU_USERNAME, BAMBU_PASSWORD) |
|
|
|
request_topic = f"bambu_a1_mini/request/{serial}" |
|
response_topic = f"bambu_a1_mini/response/{serial}" |
|
|
|
logger.info(f"Subscribing to {response_topic}") |
|
bambu_client.subscribe(response_topic) |
|
|
|
logger.info(f"Publishing request to {request_topic}") |
|
bambu_client.publish(request_topic, json.dumps("HI")) |
|
|
|
global latest_data |
|
latest_data["bed_temperature"] = "N/A" |
|
timeout = 10 |
|
while latest_data["bed_temperature"] == "N/A" and timeout > 0: |
|
time.sleep(1) |
|
timeout -= 1 |
|
|
|
return ( |
|
latest_data["status"], |
|
latest_data["bed_temperature"], |
|
latest_data["nozzle_temperature"], |
|
latest_data["update_time"], |
|
) |
|
|
|
|
|
def send_print_parameters(nozzle_temp, bed_temp, print_speed, fan_speed): |
|
global bambu_client |
|
|
|
serial = DEFAULT_SERIAL |
|
logger.info( |
|
f"Sending parameters to {serial}: nozzle={nozzle_temp}, bed={bed_temp}, speed={print_speed}, fan={fan_speed}" |
|
) |
|
try: |
|
params = { |
|
"nozzle_temp": nozzle_temp, |
|
"bed_temp": bed_temp, |
|
"print_speed": print_speed, |
|
"fan_speed": fan_speed, |
|
} |
|
|
|
request_topic = f"bambu_a1_mini/request/{serial}" |
|
|
|
if bambu_client: |
|
bambu_client.publish( |
|
request_topic, |
|
json.dumps({"command": "set_parameters", "parameters": params}), |
|
) |
|
logger.info("Parameters sent successfully") |
|
return "Parameters sent successfully" |
|
else: |
|
logger.warning("MQTT not connected, parameters not sent") |
|
return "MQTT not connected, parameters not sent" |
|
except Exception as e: |
|
logger.error(f"Error sending parameters: {e}") |
|
return f"Error sending parameters: {e}" |
|
|
|
|
|
def get_image_base64(image): |
|
if image is None: |
|
logger.warning("No image to encode") |
|
return None |
|
|
|
try: |
|
if isinstance(image, np.ndarray): |
|
image = Image.fromarray(image) |
|
|
|
buffer = io.BytesIO() |
|
image.save(buffer, format="PNG") |
|
img_str = base64.b64encode(buffer.getvalue()).decode("utf-8") |
|
logger.info(f"Image encoded to base64 (length: {len(img_str)})") |
|
return img_str |
|
except Exception as e: |
|
logger.error(f"Error encoding image: {e}") |
|
return None |
|
|
|
|
|
def get_test_image(image_name=None): |
|
test_dir = os.path.join(os.path.dirname(__file__), "test_images") |
|
|
|
if not os.path.exists(test_dir): |
|
logger.error(f"Test images directory not found: {test_dir}") |
|
return None |
|
|
|
image_files = [ |
|
f |
|
for f in os.listdir(test_dir) |
|
if f.lower().endswith((".png", ".jpg", ".jpeg", ".bmp")) |
|
] |
|
|
|
if not image_files: |
|
logger.error("No test images found") |
|
return None |
|
|
|
if image_name and image_name in image_files: |
|
image_path = os.path.join(test_dir, image_name) |
|
else: |
|
image_path = os.path.join(test_dir, random.choice(image_files)) |
|
|
|
logger.info(f"Using test image: {image_path}") |
|
|
|
try: |
|
return Image.open(image_path) |
|
except Exception as e: |
|
logger.error(f"Failed to open test image: {e}") |
|
return None |
|
|
|
|
|
def capture_image(url=None, use_test_image=False, test_image_name=None): |
|
global rpi_client, latest_data |
|
|
|
if rpi_client is None: |
|
create_client("rpi", BAMBU_HOST, BAMBU_PORT, BAMBU_USERNAME, BAMBU_PASSWORD) |
|
|
|
serial = DEFAULT_SERIAL |
|
request_topic = f"bambu_a1_mini/request/{serial}" |
|
response_topic = f"bambu_a1_mini/response/{serial}" |
|
|
|
logger.info(f"Subscribing to {response_topic}") |
|
rpi_client.subscribe(response_topic) |
|
|
|
rpi_client.publish( |
|
request_topic, |
|
json.dumps( |
|
{ |
|
"command": "capture_image", |
|
} |
|
), |
|
) |
|
|
|
latest_data["image_url"] = "N/A" |
|
timeout = 20 |
|
while latest_data["image_url"] == "N/A" and timeout > 0: |
|
print("timeout", timeout) |
|
time.sleep(1) |
|
timeout -= 1 |
|
|
|
url = latest_data["image_url"] |
|
|
|
if use_test_image: |
|
logger.info("Using test image instead of URL") |
|
test_img = get_test_image(test_image_name) |
|
if test_img: |
|
return test_img |
|
else: |
|
logger.warning("Failed to get specified test image, trying URL") |
|
|
|
if url != "N/A": |
|
try: |
|
logger.info(f"Capturing image from URL: {url}") |
|
response = requests.get(url, timeout=10) |
|
if response.status_code == 200: |
|
return Image.open(io.BytesIO(response.content)) |
|
else: |
|
logger.error(f"Failed to get image from URL: {response.status_code}") |
|
except Exception as e: |
|
logger.error(f"Error capturing image from URL: {e}") |
|
|
|
logger.info("URL capture failed or not provided, using random test image") |
|
return get_test_image() |
|
|
|
|
|
def health_check(): |
|
status = { |
|
"app": "running", |
|
"time": time.strftime("%Y-%m-%d %H:%M:%S"), |
|
"bambu_mqtt_connected": bambu_client is not None, |
|
"rpi_mqtt_connected": rpi_client is not None, |
|
"latest_update": latest_data["update_time"], |
|
} |
|
logger.info(f"Health check: {status}") |
|
return status |
|
|
|
|
|
demo = gr.Blocks(title="Bambu A1 Mini Print Control") |
|
|
|
with demo: |
|
gr.Markdown("# Bambu A1 Mini Print Control") |
|
|
|
with gr.Row(): |
|
refresh_btn = gr.Button("Refresh Status") |
|
|
|
with gr.Row(): |
|
current_status = gr.Textbox( |
|
label="Printer Status", value="N/A", interactive=False |
|
) |
|
current_bed_temp = gr.Textbox( |
|
label="Current Bed Temperature", value="N/A", interactive=False |
|
) |
|
current_nozzle_temp = gr.Textbox( |
|
label="Current Nozzle Temperature", value="N/A", interactive=False |
|
) |
|
last_update = gr.Textbox(label="Last Update", value="N/A", interactive=False) |
|
|
|
with gr.Row(): |
|
|
|
with gr.Column(scale=2): |
|
captured_image = gr.Image(label="Current Print Image", type="pil") |
|
capture_btn = gr.Button("Capture Image") |
|
|
|
|
|
with gr.Column(scale=1): |
|
gr.Markdown("### YouTube Livestream") |
|
iframe_html = """ |
|
<div style="position: relative; width: 100%; padding-top: 56.25%;"> |
|
<iframe |
|
style="position: absolute; top: 0; left: 0; width: 100%; height: 100%;" |
|
src="https://www.youtube.com/embed/mGlbI54SxQg" |
|
title="Bambu A1mini Livestream" |
|
frameborder="0" |
|
allow="accelerometer; autoplay; clipboard-write; encrypted-media; gyroscope; picture-in-picture; web-share" |
|
referrerpolicy="strict-origin-when-cross-origin" |
|
allowfullscreen> |
|
</iframe> |
|
</div> |
|
""" |
|
gr.HTML(iframe_html) |
|
|
|
with gr.Row(): |
|
with gr.Column(): |
|
nozzle_temp = gr.Slider( |
|
minimum=180, |
|
maximum=250, |
|
step=1, |
|
value=200, |
|
label="Nozzle Temperature (°C)", |
|
) |
|
bed_temp = gr.Slider( |
|
minimum=40, maximum=100, step=1, value=60, label="Bed Temperature (°C)" |
|
) |
|
print_speed = gr.Slider( |
|
minimum=20, maximum=150, step=1, value=60, label="Print Speed (mm/s)" |
|
) |
|
fan_speed = gr.Slider( |
|
minimum=0, maximum=100, step=1, value=100, label="Fan Speed (%)" |
|
) |
|
|
|
send_params_btn = gr.Button("Send Print Parameters") |
|
|
|
refresh_btn.click( |
|
fn=get_data, |
|
outputs=[current_status, current_bed_temp, current_nozzle_temp, last_update], |
|
) |
|
|
|
capture_btn.click( |
|
fn=lambda: gr.update(interactive=False), outputs=capture_btn |
|
).then(fn=capture_image, outputs=[captured_image]).then( |
|
fn=lambda: gr.update(interactive=True), outputs=capture_btn |
|
) |
|
|
|
send_params_btn.click( |
|
fn=send_print_parameters, |
|
inputs=[nozzle_temp, bed_temp, print_speed, fan_speed], |
|
outputs=[current_status], |
|
) |
|
|
|
def api_get_data(): |
|
logger.info("API call: get_data") |
|
return get_data() |
|
|
|
def api_capture_frame(url=None, use_test_image=False, test_image_name=None): |
|
logger.info( |
|
f"API call: capture_frame with URL: {url}, use_test_image: {use_test_image}" |
|
) |
|
|
|
try: |
|
img = capture_image(url, use_test_image, test_image_name) |
|
if img: |
|
if img.mode == "RGBA": |
|
img = img.convert("RGB") |
|
|
|
buffered = io.BytesIO() |
|
img.save(buffered, format="JPEG") |
|
img_str = base64.b64encode(buffered.getvalue()).decode() |
|
|
|
return {"success": True, "image": img_str} |
|
else: |
|
return {"success": False, "error": "Failed to capture image"} |
|
except Exception as e: |
|
logger.error(f"Error in capture_frame: {e}") |
|
return {"success": False, "error": str(e)} |
|
|
|
def api_lambda( |
|
img_data=None, |
|
param_1=200, |
|
param_2=60, |
|
param_3=60, |
|
param_4=100, |
|
use_test_image=False, |
|
test_image_name=None, |
|
): |
|
logger.info( |
|
f"API call: lambda with params: {param_1}, {param_2}, {param_3}, {param_4}, use_test_image: {use_test_image}, test_image_name: {test_image_name}" |
|
) |
|
try: |
|
img = None |
|
|
|
if use_test_image: |
|
logger.info(f"Lambda using test image: {test_image_name}") |
|
img = get_test_image(test_image_name) |
|
|
|
elif ( |
|
img_data |
|
and isinstance(img_data, str) |
|
and (img_data.startswith("http://") or img_data.startswith("https://")) |
|
): |
|
logger.info(f"Lambda received image URL: {img_data}") |
|
img = capture_image(img_data) |
|
|
|
elif img_data and isinstance(img_data, str): |
|
try: |
|
logger.info("Lambda received base64 image data") |
|
img_bytes = base64.b64decode(img_data) |
|
img = Image.open(io.BytesIO(img_bytes)) |
|
except Exception as e: |
|
logger.error(f"Failed to decode base64 image: {e}") |
|
|
|
if img is None: |
|
logger.info("No valid image data received, using default test image") |
|
img = get_test_image() |
|
|
|
if img: |
|
img_array = np.array(img) |
|
|
|
quality_level = "low" |
|
if 190 <= param_1 <= 210 and param_3 <= 50 and param_4 >= 80: |
|
quality_level = "high" |
|
elif 185 <= param_1 <= 215 and param_3 <= 70 and param_4 >= 60: |
|
quality_level = "medium" |
|
|
|
if quality_level == "high": |
|
missing_rate = 0.02 |
|
excess_rate = 0.01 |
|
stringing_rate = 0.01 |
|
elif quality_level == "medium": |
|
missing_rate = 0.05 |
|
excess_rate = 0.03 |
|
stringing_rate = 0.02 |
|
else: |
|
missing_rate = 0.10 |
|
excess_rate = 0.07 |
|
stringing_rate = 0.05 |
|
|
|
uniformity_score = 1.0 - (missing_rate + excess_rate + stringing_rate) |
|
|
|
print_quality_score = 1.0 - ( |
|
missing_rate * 2.0 + excess_rate * 1.5 + stringing_rate * 1.0 |
|
) |
|
print_quality_score = max(0, min(1, print_quality_score)) |
|
|
|
print_speed_score = param_3 / 150.0 |
|
print_speed_score = max(0, min(1, print_speed_score)) |
|
|
|
material_efficiency_score = 1.0 - excess_rate * 3.0 |
|
material_efficiency_score = max(0, min(1, material_efficiency_score)) |
|
|
|
total_performance_score = ( |
|
0.5 * print_quality_score |
|
+ 0.3 * print_speed_score |
|
+ 0.2 * material_efficiency_score |
|
) |
|
|
|
img_draw = img.copy() |
|
draw = ImageDraw.Draw(img_draw) |
|
draw.text( |
|
(10, 10), f"Quality: {quality_level.upper()}", fill=(255, 0, 0) |
|
) |
|
draw.text((10, 30), f"Missing: {missing_rate:.2f}", fill=(255, 0, 0)) |
|
draw.text((10, 50), f"Excess: {excess_rate:.2f}", fill=(255, 0, 0)) |
|
draw.text( |
|
(10, 70), f"Stringing: {stringing_rate:.2f}", fill=(255, 0, 0) |
|
) |
|
|
|
result = { |
|
"success": True, |
|
"missing_rate": missing_rate, |
|
"excess_rate": excess_rate, |
|
"stringing_rate": stringing_rate, |
|
"uniformity_score": uniformity_score, |
|
"print_quality_score": print_quality_score, |
|
"print_speed_score": print_speed_score, |
|
"material_efficiency_score": material_efficiency_score, |
|
"total_performance_score": total_performance_score, |
|
} |
|
|
|
if img_draw.mode == "RGBA": |
|
img_draw = img_draw.convert("RGB") |
|
|
|
buffered = io.BytesIO() |
|
img_draw.save(buffered, format="JPEG") |
|
img_str = base64.b64encode(buffered.getvalue()).decode() |
|
result["image"] = img_str |
|
|
|
return result |
|
else: |
|
return {"success": False, "error": "Failed to get image"} |
|
except Exception as e: |
|
logger.error(f"Error in lambda: {e}") |
|
return {"error": str(e)} |
|
|
|
def api_send_print_parameters( |
|
nozzle_temp=200, bed_temp=60, print_speed=60, fan_speed=100 |
|
): |
|
logger.info( |
|
f"API call: send_print_parameters with nozzle={nozzle_temp}, bed={bed_temp}, speed={print_speed}, fan={fan_speed}" |
|
) |
|
return send_print_parameters(nozzle_temp, bed_temp, print_speed, fan_speed) |
|
|
|
api_json_output = gr.JSON() |
|
api_text_output = gr.Textbox() |
|
|
|
capture_frame_api = demo.load( |
|
fn=api_capture_frame, |
|
inputs=[ |
|
gr.Textbox(label="Image URL"), |
|
gr.Checkbox(label="Use Test Image", value=False), |
|
gr.Textbox(label="Test Image Name", value=""), |
|
], |
|
outputs=api_json_output, |
|
api_name="capture_frame", |
|
) |
|
|
|
lambda_api = demo.load( |
|
fn=api_lambda, |
|
inputs=[ |
|
gr.Textbox(label="Image Base64 or URL"), |
|
gr.Number(label="Nozzle Temperature", value=200), |
|
gr.Number(label="Bed Temperature", value=60), |
|
gr.Number(label="Print Speed", value=60), |
|
gr.Number(label="Fan Speed", value=100), |
|
gr.Checkbox(label="Use Test Image", value=False), |
|
gr.Textbox(label="Test Image Name", value=""), |
|
], |
|
outputs=api_json_output, |
|
api_name="lambda", |
|
) |
|
|
|
get_data_api = demo.load( |
|
fn=api_get_data, inputs=None, outputs=api_json_output, api_name="get_data" |
|
) |
|
|
|
send_params_api = demo.load( |
|
fn=api_send_print_parameters, |
|
inputs=[ |
|
gr.Number(label="Nozzle Temperature", value=200), |
|
gr.Number(label="Bed Temperature", value=60), |
|
gr.Number(label="Print Speed", value=60), |
|
gr.Number(label="Fan Speed", value=100), |
|
], |
|
outputs=api_text_output, |
|
api_name="send_print_parameters", |
|
) |
|
|
|
if __name__ == "__main__": |
|
logger.info("Starting Bambu A1 Mini Print Control application") |
|
|
|
try: |
|
logger.info("Initializing MQTT client") |
|
create_client("bambu", BAMBU_HOST, BAMBU_PORT, BAMBU_USERNAME, BAMBU_PASSWORD) |
|
create_client("rpi", RPI_HOST, RPI_PORT, RPI_USERNAME, RPI_PASSWORD) |
|
except Exception as e: |
|
logger.error(f"Failed to initialize MQTT: {e}") |
|
|
|
demo.queue().launch( |
|
show_error=True, share=False, server_name="0.0.0.0", server_port=7860 |
|
) |
|
|