import json import gradio as gr import paho.mqtt.client as mqtt import time import random from queue import Queue import numpy as np import pandas as pd from datetime import datetime, timedelta import plotly.graph_objects as go # Global configuration MQTT_HOST = "broker.hivemq.com" MQTT_PORT = 1883 # Global state response_queue = Queue() command_queue = Queue() mqtt_ping_client = None mqtt_pong_client = None session_id = None device_state = { "rgb": {"r": 0, "g": 0, "b": 0}, "temperature": 25.0, "rpm": 0 } # Add historical data storage history_data = { "rgb": [], "temperature": [], "timestamps": [] } # MQTT callback functions def on_ping_connect(client, userdata, flags, rc): print(f"Ping connected with result code {rc}") if session_id: client.subscribe(f"pong/{session_id}/response") def on_pong_connect(client, userdata, flags, rc): print(f"Pong connected with result code {rc}") client.subscribe("ping/command") def on_ping_message(client, userdata, msg): try: response = json.loads(msg.payload.decode()) response_queue.put(response) print(f"Ping received: {response}") except Exception as e: print(f"Ping error: {e}") def on_pong_message(client, userdata, msg): try: command = json.loads(msg.payload.decode()) command_queue.put(command) print(f"Pong received: {command}") except Exception as e: print(f"Pong error: {e}") # Ping functionality def initialize_ping(): global mqtt_ping_client, session_id session_id = f"ping_{int(time.time())}" mqtt_ping_client = mqtt.Client() mqtt_ping_client.on_connect = on_ping_connect mqtt_ping_client.on_message = on_ping_message mqtt_ping_client.connect(MQTT_HOST, MQTT_PORT, 60) mqtt_ping_client.loop_start() return f"Ping initialized: {session_id}" def send_command(command_type, data=None): if not mqtt_ping_client: return "Please initialize ping first" payload = { "type": command_type, "data": data or {}, "session_id": session_id, "timestamp": time.time() } mqtt_ping_client.publish("ping/command", json.dumps(payload)) return f"Sent {command_type}" def send_rgb(r, g, b): if not mqtt_ping_client: return "Please initialize ping first" payload = { "type": "RGB Command", "data": {"r": r, "g": g, "b": b}, "session_id": session_id, "timestamp": time.time() } # Send command mqtt_ping_client.publish("ping/command", json.dumps(payload)) # Put command into command queue for pong display command_queue.put(payload) return f"Sent RGB Command: R={r}, G={g}, B={b}" def send_weight_request(rpm): return send_command("Weight Data", {"set_rpm": rpm, "request_weight": True}) # Pong functionality def initialize_pong(): global mqtt_pong_client mqtt_pong_client = mqtt.Client() mqtt_pong_client.on_connect = on_pong_connect mqtt_pong_client.on_message = on_pong_message mqtt_pong_client.connect(MQTT_HOST, MQTT_PORT, 60) mqtt_pong_client.loop_start() return "Pong started" def process_command(command): global device_state command_type = command.get("type") data = command.get("data", {}) session_id = command.get("session_id") timestamp = datetime.fromtimestamp(command.get("timestamp", time.time())) if command_type == "RGB Command": # Update device state device_state = { **device_state, "rgb": { "r": int(data.get("r", 0)), "g": int(data.get("g", 0)), "b": int(data.get("b", 0)) } } print(f"Processing RGB command: {data}") response_data = { "current_state": "applied", "power_consumption": random.uniform(0.1, 1.0), "applied_values": device_state["rgb"] } # Record RGB value (using average for simplicity) rgb_avg = sum([int(data.get(k, 0)) for k in ['r', 'g', 'b']]) / 3 record_data("rgb", rgb_avg, timestamp) elif command_type == "Temperature Reading": device_state["temperature"] += random.uniform(-0.5, 0.5) response_data = { "current_temperature": device_state["temperature"], "humidity": random.uniform(40, 60), "pressure": random.uniform(980, 1020) } # Record temperature record_data("temperature", device_state["temperature"], timestamp) elif command_type == "Weight Data": if "set_rpm" in data: device_state["rpm"] = data["set_rpm"] response_data = { "calibrated_weight": random.uniform(95, 105), "current_rpm": device_state["rpm"], "stability": random.uniform(0.98, 1.02) } else: response_data = {"error": "Unknown command type"} response = { "type": command_type, "data": response_data, "timestamp": time.time(), "session_id": session_id } if mqtt_pong_client: mqtt_pong_client.publish(f"pong/{session_id}/response", json.dumps(response)) return json.dumps(response, indent=2) def check_ping_responses(): """Check the ping response queue""" responses = [] while not response_queue.empty(): response = response_queue.get_nowait() responses.append(json.dumps(response, indent=2)) return "\n".join(responses) if responses else "No new responses" def check_pong_commands(): """Check and process command queue""" responses = [] while not command_queue.empty(): command = command_queue.get_nowait() response = process_command(command) responses.append(response) return "\n".join(responses) if responses else "No new commands" # Add stop_mqtt function def stop_mqtt(): global mqtt_ping_client, mqtt_pong_client if mqtt_ping_client: mqtt_ping_client.loop_stop() mqtt_ping_client.disconnect() if mqtt_pong_client: mqtt_pong_client.loop_stop() mqtt_pong_client.disconnect() return "Both Ping and Pong clients stopped" # Add real-time update functions def update_rgb_preview(r, g, b): """Real-time update RGB preview values""" global device_state device_state = { **device_state, "rgb": { "r": int(r), "g": int(g), "b": int(b) } } return gr.update(value=device_state) def update_rpm_preview(rpm): """Real-time update RPM preview values""" global device_state device_state = { **device_state, "rpm": int(rpm) } return gr.update(value=device_state) def update_temperature_preview(temperature): """Real-time update temperature preview values""" global device_state device_state = { **device_state, "temperature": float(temperature) } return gr.update(value=device_state) # Add data recording and prediction functions def record_data(data_type, value, timestamp): """Record historical data""" history_data[data_type].append(value) history_data["timestamps"].append(timestamp) # Keep only the last 100 data points if len(history_data[data_type]) > 100: history_data[data_type] = history_data[data_type][-100:] history_data["timestamps"] = history_data["timestamps"][-100:] def generate_prediction(data_type): """Generate simple prediction""" if len(history_data[data_type]) < 2: return None # Use simple linear regression for prediction x = np.arange(len(history_data[data_type])) y = np.array(history_data[data_type]) z = np.polyfit(x, y, 1) p = np.poly1d(z) # Predict the next 5 points future_x = np.arange(len(x), len(x) + 5) future_y = p(future_x) return { "historical": history_data[data_type], "predicted": future_y.tolist(), "timestamps": history_data["timestamps"] } def plot_data_with_prediction(data_type): """Create chart with prediction""" if len(history_data[data_type]) < 2: return None # Use simple linear regression for prediction x = np.arange(len(history_data[data_type])) y = np.array(history_data[data_type]) z = np.polyfit(x, y, 1) p = np.poly1d(z) # Predict the next 5 points future_x = np.arange(len(x), len(x) + 5) future_y = p(future_x) # Create Plotly chart fig = go.Figure() # Add historical data fig.add_trace(go.Scatter( x=[t.strftime('%H:%M:%S') for t in history_data["timestamps"]], y=history_data[data_type], mode='lines+markers', name='Historical' )) # Add predicted data future_times = [ (history_data["timestamps"][-1] + timedelta(minutes=i)).strftime('%H:%M:%S') for i in range(1, 6) ] fig.add_trace(go.Scatter( x=future_times, y=future_y, mode='lines', line=dict(dash='dash'), name='Predicted' )) # Update layout fig.update_layout( title=f"{data_type.upper()} Trend Analysis", xaxis_title="Time", yaxis_title="Value", showlegend=True ) return fig # Add manual refresh function def refresh_all(): """Manually refresh all states""" commands = check_pong_commands() print(f"Current device state: {device_state}") return [commands, gr.update(value=device_state)] # Gradio interface with gr.Blocks(title="MQTT Ping-Pong System", theme=gr.themes.Base( primary_hue=gr.themes.colors.Color( c50="#edf2f0", c100="#dbE5E1", c200="#B8CCC6", c300="#96B3AB", c400="#7FA595", # Right main color c500="#5C8072", # Left main color c600="#4A665B", c700="#374D45", c800="#25332E", c900="#121917", c950="#080C0B", # Add this ) )) as demo: gr.Markdown("# 🏓 MQTT Ping-Pong Communication System 🏓") # Add CSS, including font settings gr.HTML(""" """) with gr.Row(): # Ping part (left) with gr.Column(scale=1, variant="panel", elem_classes=["ping-panel"]): gr.Markdown("### 🏓 Ping Control (Sender)") with gr.Group(): gr.Markdown("**Step 1: Initialize Connection**") ping_init_btn = gr.Button("Initialize Ping", variant="primary", size="lg") ping_status = gr.Textbox(label="Connection Status", lines=2) with gr.Tabs(): with gr.TabItem("RGB Control"): with gr.Group(): gr.Markdown("**Step 2: Configure RGB Values**") r = gr.Slider(0, 255, 128, label="Red Value", interactive=True) g = gr.Slider(0, 255, 128, label="Green Value", interactive=True) b = gr.Slider(0, 255, 128, label="Blue Value", interactive=True) gr.Markdown("**Step 3: Send Command**") send_rgb_btn = gr.Button("Send RGB Command", variant="secondary", size="lg") rgb_status = gr.Textbox(label="RGB Status", lines=2) with gr.TabItem("Weight Control"): with gr.Group(): gr.Markdown("**Step 2: Set RPM Value**") rpm = gr.Slider(0, 5000, 1000, label="RPM Setting", interactive=True) gr.Markdown("**Step 3: Send Request**") send_weight_btn = gr.Button("Send Weight Request", variant="secondary", size="lg") weight_status = gr.Textbox(label="Weight Status", lines=2) # Add temperature and humidity control tab with gr.TabItem("Temperature Control"): with gr.Group(): gr.Markdown("**Step 2: Set Temperature Value**") temperature = gr.Slider(0, 50, 25, label="Temperature (°C)", interactive=True) gr.Markdown("**Step 3: Send Request**") send_temp_btn = gr.Button("Send Temperature Request", variant="secondary", size="lg") temp_status = gr.Textbox(label="Temperature Status", lines=2) with gr.Group(): gr.Markdown("**Step 4: Check Responses**") check_ping_btn = gr.Button("Check Responses", variant="secondary", size="lg") ping_responses = gr.Textbox( label="Response Log", lines=10, show_copy_button=True ) # Pong part (right) with gr.Column(scale=1, variant="panel", elem_classes=["pong-panel"]): gr.Markdown("### Pong Monitor (Receiver) 🏓") with gr.Group(): gr.Markdown("**Step 1: Start System**") with gr.Row(): pong_init_btn = gr.Button("Start Pong", variant="primary", size="lg") pong_stop_btn = gr.Button("Stop Pong", variant="secondary", size="lg") pong_status = gr.Textbox(label="System Status", lines=2) with gr.Group(): gr.Markdown("**Step 2: Monitor Device Status**") refresh_btn = gr.Button("🔄 Refresh All", variant="primary", size="lg") device_info = gr.JSON( label="Current Device State", value=device_state, show_label=True ) with gr.Group(): gr.Markdown("**Step 3: Check Incoming Commands**") check_pong_btn = gr.Button("Check Commands", variant="secondary", size="lg") pong_commands = gr.Textbox( label="Command Log", lines=10, show_copy_button=True ) # Add Step 4: Data Analysis (Optional) with gr.Group(): gr.Markdown("**Step 4: Data Analysis (Optional)**") with gr.Row(): analyze_rgb_btn = gr.Button("Analyze RGB Trend", variant="secondary", size="lg") analyze_temp_btn = gr.Button("Analyze Temperature Trend", variant="secondary", size="lg") plot_output = gr.Plot( label="Trend Analysis", show_label=True ) # Add event handling for analysis buttons analyze_rgb_btn.click( lambda: plot_data_with_prediction("rgb"), outputs=plot_output ) analyze_temp_btn.click( lambda: plot_data_with_prediction("temperature"), outputs=plot_output ) # Modify event handling ping_init_btn.click(initialize_ping, outputs=ping_status) pong_init_btn.click(initialize_pong, outputs=pong_status) pong_stop_btn.click(stop_mqtt, outputs=pong_status) # Automatic refresh of pong state when sending commands send_rgb_btn.click( send_rgb, [r, g, b], rgb_status ).then( check_pong_commands, # Update pong command display outputs=[pong_commands] ).then( check_ping_responses, # Update ping response display outputs=[ping_responses] ) send_weight_btn.click( send_weight_request, rpm, weight_status ).then( refresh_all, # Use refresh_all outputs=[pong_commands, device_info] ) # Modify manual refresh button event handling refresh_btn.click( refresh_all, outputs=[pong_commands, device_info] ) check_ping_btn.click( check_ping_responses, outputs=ping_responses ) check_pong_btn.click(check_pong_commands, outputs=pong_commands) # Add real-time update when slider values change r.change(update_rgb_preview, inputs=[r, g, b], outputs=device_info) g.change(update_rgb_preview, inputs=[r, g, b], outputs=device_info) b.change(update_rgb_preview, inputs=[r, g, b], outputs=device_info) # Add RPM slider real-time update rpm.change(update_rpm_preview, inputs=rpm, outputs=device_info) # Add temperature slider real-time update temperature.change(update_temperature_preview, inputs=temperature, outputs=device_info) # Add temperature command sending handling send_temp_btn.click( lambda temp: send_command("Temperature Reading", {"temperature": temp}), inputs=[temperature], outputs=temp_status ).then( check_pong_commands, outputs=[pong_commands] ).then( check_ping_responses, outputs=[ping_responses] ) demo.load(lambda: None) # Initialize if __name__ == "__main__": demo.launch()