|
import json |
|
import gradio as gr |
|
import paho.mqtt.client as mqtt |
|
import time |
|
import random |
|
from queue import Queue |
|
import numpy as np |
|
import pandas as pd |
|
from datetime import datetime, timedelta |
|
import plotly.graph_objects as go |
|
|
|
|
|
MQTT_HOST = "broker.hivemq.com" |
|
MQTT_PORT = 1883 |
|
|
|
|
|
response_queue = Queue() |
|
command_queue = Queue() |
|
mqtt_ping_client = None |
|
mqtt_pong_client = None |
|
session_id = None |
|
device_state = { |
|
"rgb": {"r": 0, "g": 0, "b": 0}, |
|
"temperature": 25.0, |
|
"rpm": 0 |
|
} |
|
|
|
|
|
history_data = { |
|
"rgb": [], |
|
"temperature": [], |
|
"timestamps": [] |
|
} |
|
|
|
|
|
def on_ping_connect(client, userdata, flags, rc): |
|
print(f"Ping connected with result code {rc}") |
|
if session_id: |
|
client.subscribe(f"pong/{session_id}/response") |
|
|
|
def on_pong_connect(client, userdata, flags, rc): |
|
print(f"Pong connected with result code {rc}") |
|
client.subscribe("ping/command") |
|
|
|
def on_ping_message(client, userdata, msg): |
|
try: |
|
response = json.loads(msg.payload.decode()) |
|
response_queue.put(response) |
|
print(f"Ping received: {response}") |
|
except Exception as e: |
|
print(f"Ping error: {e}") |
|
|
|
def on_pong_message(client, userdata, msg): |
|
try: |
|
command = json.loads(msg.payload.decode()) |
|
command_queue.put(command) |
|
print(f"Pong received: {command}") |
|
except Exception as e: |
|
print(f"Pong error: {e}") |
|
|
|
|
|
def initialize_ping(): |
|
global mqtt_ping_client, session_id |
|
session_id = f"ping_{int(time.time())}" |
|
mqtt_ping_client = mqtt.Client() |
|
mqtt_ping_client.on_connect = on_ping_connect |
|
mqtt_ping_client.on_message = on_ping_message |
|
mqtt_ping_client.connect(MQTT_HOST, MQTT_PORT, 60) |
|
mqtt_ping_client.loop_start() |
|
return f"Ping initialized: {session_id}" |
|
|
|
def send_command(command_type, data=None): |
|
if not mqtt_ping_client: |
|
return "Please initialize ping first" |
|
|
|
payload = { |
|
"type": command_type, |
|
"data": data or {}, |
|
"session_id": session_id, |
|
"timestamp": time.time() |
|
} |
|
mqtt_ping_client.publish("ping/command", json.dumps(payload)) |
|
return f"Sent {command_type}" |
|
|
|
def send_rgb(r, g, b): |
|
if not mqtt_ping_client: |
|
return "Please initialize ping first" |
|
|
|
payload = { |
|
"type": "RGB Command", |
|
"data": {"r": r, "g": g, "b": b}, |
|
"session_id": session_id, |
|
"timestamp": time.time() |
|
} |
|
|
|
mqtt_ping_client.publish("ping/command", json.dumps(payload)) |
|
|
|
command_queue.put(payload) |
|
return f"Sent RGB Command: R={r}, G={g}, B={b}" |
|
|
|
def send_weight_request(rpm): |
|
return send_command("Weight Data", {"set_rpm": rpm, "request_weight": True}) |
|
|
|
|
|
def initialize_pong(): |
|
global mqtt_pong_client |
|
mqtt_pong_client = mqtt.Client() |
|
mqtt_pong_client.on_connect = on_pong_connect |
|
mqtt_pong_client.on_message = on_pong_message |
|
mqtt_pong_client.connect(MQTT_HOST, MQTT_PORT, 60) |
|
mqtt_pong_client.loop_start() |
|
return "Pong started" |
|
|
|
def process_command(command): |
|
global device_state |
|
command_type = command.get("type") |
|
data = command.get("data", {}) |
|
session_id = command.get("session_id") |
|
timestamp = datetime.fromtimestamp(command.get("timestamp", time.time())) |
|
|
|
if command_type == "RGB Command": |
|
|
|
device_state = { |
|
**device_state, |
|
"rgb": { |
|
"r": int(data.get("r", 0)), |
|
"g": int(data.get("g", 0)), |
|
"b": int(data.get("b", 0)) |
|
} |
|
} |
|
print(f"Processing RGB command: {data}") |
|
response_data = { |
|
"current_state": "applied", |
|
"power_consumption": random.uniform(0.1, 1.0), |
|
"applied_values": device_state["rgb"] |
|
} |
|
|
|
rgb_avg = sum([int(data.get(k, 0)) for k in ['r', 'g', 'b']]) / 3 |
|
record_data("rgb", rgb_avg, timestamp) |
|
elif command_type == "Temperature Reading": |
|
device_state["temperature"] += random.uniform(-0.5, 0.5) |
|
response_data = { |
|
"current_temperature": device_state["temperature"], |
|
"humidity": random.uniform(40, 60), |
|
"pressure": random.uniform(980, 1020) |
|
} |
|
|
|
record_data("temperature", device_state["temperature"], timestamp) |
|
elif command_type == "Weight Data": |
|
if "set_rpm" in data: |
|
device_state["rpm"] = data["set_rpm"] |
|
response_data = { |
|
"calibrated_weight": random.uniform(95, 105), |
|
"current_rpm": device_state["rpm"], |
|
"stability": random.uniform(0.98, 1.02) |
|
} |
|
else: |
|
response_data = {"error": "Unknown command type"} |
|
|
|
response = { |
|
"type": command_type, |
|
"data": response_data, |
|
"timestamp": time.time(), |
|
"session_id": session_id |
|
} |
|
|
|
if mqtt_pong_client: |
|
mqtt_pong_client.publish(f"pong/{session_id}/response", json.dumps(response)) |
|
return json.dumps(response, indent=2) |
|
|
|
def check_ping_responses(): |
|
"""Check the ping response queue""" |
|
responses = [] |
|
while not response_queue.empty(): |
|
response = response_queue.get_nowait() |
|
responses.append(json.dumps(response, indent=2)) |
|
return "\n".join(responses) if responses else "No new responses" |
|
|
|
def check_pong_commands(): |
|
"""Check and process command queue""" |
|
responses = [] |
|
while not command_queue.empty(): |
|
command = command_queue.get_nowait() |
|
response = process_command(command) |
|
responses.append(response) |
|
return "\n".join(responses) if responses else "No new commands" |
|
|
|
|
|
def stop_mqtt(): |
|
global mqtt_ping_client, mqtt_pong_client |
|
if mqtt_ping_client: |
|
mqtt_ping_client.loop_stop() |
|
mqtt_ping_client.disconnect() |
|
if mqtt_pong_client: |
|
mqtt_pong_client.loop_stop() |
|
mqtt_pong_client.disconnect() |
|
return "Both Ping and Pong clients stopped" |
|
|
|
|
|
def update_rgb_preview(r, g, b): |
|
"""Real-time update RGB preview values""" |
|
global device_state |
|
device_state = { |
|
**device_state, |
|
"rgb": { |
|
"r": int(r), |
|
"g": int(g), |
|
"b": int(b) |
|
} |
|
} |
|
return gr.update(value=device_state) |
|
|
|
def update_rpm_preview(rpm): |
|
"""Real-time update RPM preview values""" |
|
global device_state |
|
device_state = { |
|
**device_state, |
|
"rpm": int(rpm) |
|
} |
|
return gr.update(value=device_state) |
|
|
|
def update_temperature_preview(temperature): |
|
"""Real-time update temperature preview values""" |
|
global device_state |
|
device_state = { |
|
**device_state, |
|
"temperature": float(temperature) |
|
} |
|
return gr.update(value=device_state) |
|
|
|
|
|
def record_data(data_type, value, timestamp): |
|
"""Record historical data""" |
|
history_data[data_type].append(value) |
|
history_data["timestamps"].append(timestamp) |
|
|
|
if len(history_data[data_type]) > 100: |
|
history_data[data_type] = history_data[data_type][-100:] |
|
history_data["timestamps"] = history_data["timestamps"][-100:] |
|
|
|
def generate_prediction(data_type): |
|
"""Generate simple prediction""" |
|
if len(history_data[data_type]) < 2: |
|
return None |
|
|
|
|
|
x = np.arange(len(history_data[data_type])) |
|
y = np.array(history_data[data_type]) |
|
z = np.polyfit(x, y, 1) |
|
p = np.poly1d(z) |
|
|
|
|
|
future_x = np.arange(len(x), len(x) + 5) |
|
future_y = p(future_x) |
|
|
|
return { |
|
"historical": history_data[data_type], |
|
"predicted": future_y.tolist(), |
|
"timestamps": history_data["timestamps"] |
|
} |
|
|
|
def plot_data_with_prediction(data_type): |
|
"""Create chart with prediction""" |
|
if len(history_data[data_type]) < 2: |
|
return None |
|
|
|
|
|
x = np.arange(len(history_data[data_type])) |
|
y = np.array(history_data[data_type]) |
|
z = np.polyfit(x, y, 1) |
|
p = np.poly1d(z) |
|
|
|
|
|
future_x = np.arange(len(x), len(x) + 5) |
|
future_y = p(future_x) |
|
|
|
|
|
fig = go.Figure() |
|
|
|
|
|
fig.add_trace(go.Scatter( |
|
x=[t.strftime('%H:%M:%S') for t in history_data["timestamps"]], |
|
y=history_data[data_type], |
|
mode='lines+markers', |
|
name='Historical' |
|
)) |
|
|
|
|
|
future_times = [ |
|
(history_data["timestamps"][-1] + timedelta(minutes=i)).strftime('%H:%M:%S') |
|
for i in range(1, 6) |
|
] |
|
fig.add_trace(go.Scatter( |
|
x=future_times, |
|
y=future_y, |
|
mode='lines', |
|
line=dict(dash='dash'), |
|
name='Predicted' |
|
)) |
|
|
|
|
|
fig.update_layout( |
|
title=f"{data_type.upper()} Trend Analysis", |
|
xaxis_title="Time", |
|
yaxis_title="Value", |
|
showlegend=True |
|
) |
|
|
|
return fig |
|
|
|
|
|
def refresh_all(): |
|
"""Manually refresh all states""" |
|
commands = check_pong_commands() |
|
print(f"Current device state: {device_state}") |
|
return [commands, gr.update(value=device_state)] |
|
|
|
|
|
with gr.Blocks(title="MQTT Ping-Pong System", theme=gr.themes.Base( |
|
primary_hue=gr.themes.colors.Color( |
|
c50="#edf2f0", |
|
c100="#dbE5E1", |
|
c200="#B8CCC6", |
|
c300="#96B3AB", |
|
c400="#7FA595", |
|
c500="#5C8072", |
|
c600="#4A665B", |
|
c700="#374D45", |
|
c800="#25332E", |
|
c900="#121917", |
|
c950="#080C0B", |
|
) |
|
)) as demo: |
|
gr.Markdown("# π MQTT Ping-Pong Communication System <span class='pong-emoji'>π</span>") |
|
|
|
|
|
gr.HTML(""" |
|
<style> |
|
/* Basic styles */ |
|
.ping-panel { |
|
background-color: #5C8072 !important; |
|
border-radius: 8px; |
|
padding: 20px; |
|
} |
|
.pong-panel { |
|
background-color: #7FA595 !important; |
|
border-radius: 8px; |
|
padding: 20px; |
|
} |
|
|
|
/* Font settings */ |
|
* { |
|
font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, |
|
"Helvetica Neue", Arial, sans-serif; |
|
} |
|
|
|
/* Title styles */ |
|
h1, h2, h3, .header { |
|
font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, |
|
"Helvetica Neue", Arial, sans-serif; |
|
font-weight: 600; |
|
color: #2c3e50; |
|
} |
|
|
|
/* Button styles */ |
|
.gr-button { |
|
margin: 10px 0; |
|
font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, |
|
"Helvetica Neue", Arial, sans-serif; |
|
font-weight: 500; |
|
} |
|
|
|
/* Group styles */ |
|
.gr-group { |
|
background-color: rgba(255, 255, 255, 0.1); |
|
border-radius: 8px; |
|
padding: 15px; |
|
margin: 10px 0; |
|
} |
|
|
|
/* Label styles */ |
|
label { |
|
font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, |
|
"Helvetica Neue", Arial, sans-serif; |
|
font-weight: 500; |
|
} |
|
|
|
/* Pong emoji flip - applied to all elements with pong-emoji class */ |
|
.pong-emoji { |
|
display: inline-block; |
|
transform: scaleX(-1); |
|
margin-left: 5px; /* Add a little space */ |
|
} |
|
|
|
/* Step title styles */ |
|
.step-header { |
|
font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, |
|
"Helvetica Neue", Arial, sans-serif; |
|
font-weight: 600; |
|
color: #2c3e50; |
|
margin: 15px 0 10px 0; |
|
} |
|
</style> |
|
""") |
|
|
|
with gr.Row(): |
|
|
|
with gr.Column(scale=1, variant="panel", elem_classes=["ping-panel"]): |
|
gr.Markdown("### π Ping Control (Sender)") |
|
with gr.Group(): |
|
gr.Markdown("**Step 1: Initialize Connection**") |
|
ping_init_btn = gr.Button("Initialize Ping", variant="primary", size="lg") |
|
ping_status = gr.Textbox(label="Connection Status", lines=2) |
|
|
|
with gr.Tabs(): |
|
with gr.TabItem("RGB Control"): |
|
with gr.Group(): |
|
gr.Markdown("**Step 2: Configure RGB Values**") |
|
r = gr.Slider(0, 255, 128, label="Red Value", interactive=True) |
|
g = gr.Slider(0, 255, 128, label="Green Value", interactive=True) |
|
b = gr.Slider(0, 255, 128, label="Blue Value", interactive=True) |
|
gr.Markdown("**Step 3: Send Command**") |
|
send_rgb_btn = gr.Button("Send RGB Command", variant="secondary", size="lg") |
|
rgb_status = gr.Textbox(label="RGB Status", lines=2) |
|
|
|
with gr.TabItem("Weight Control"): |
|
with gr.Group(): |
|
gr.Markdown("**Step 2: Set RPM Value**") |
|
rpm = gr.Slider(0, 5000, 1000, label="RPM Setting", interactive=True) |
|
gr.Markdown("**Step 3: Send Request**") |
|
send_weight_btn = gr.Button("Send Weight Request", variant="secondary", size="lg") |
|
weight_status = gr.Textbox(label="Weight Status", lines=2) |
|
|
|
|
|
with gr.TabItem("Temperature Control"): |
|
with gr.Group(): |
|
gr.Markdown("**Step 2: Set Temperature Value**") |
|
temperature = gr.Slider(0, 50, 25, label="Temperature (Β°C)", interactive=True) |
|
gr.Markdown("**Step 3: Send Request**") |
|
send_temp_btn = gr.Button("Send Temperature Request", variant="secondary", size="lg") |
|
temp_status = gr.Textbox(label="Temperature Status", lines=2) |
|
|
|
with gr.Group(): |
|
gr.Markdown("**Step 4: Check Responses**") |
|
check_ping_btn = gr.Button("Check Responses", variant="secondary", size="lg") |
|
ping_responses = gr.Textbox( |
|
label="Response Log", |
|
lines=10, |
|
show_copy_button=True |
|
) |
|
|
|
|
|
with gr.Column(scale=1, variant="panel", elem_classes=["pong-panel"]): |
|
gr.Markdown("### Pong Monitor (Receiver) <span class='pong-emoji'>π</span>") |
|
with gr.Group(): |
|
gr.Markdown("**Step 1: Start System**") |
|
with gr.Row(): |
|
pong_init_btn = gr.Button("Start Pong", variant="primary", size="lg") |
|
pong_stop_btn = gr.Button("Stop Pong", variant="secondary", size="lg") |
|
pong_status = gr.Textbox(label="System Status", lines=2) |
|
|
|
with gr.Group(): |
|
gr.Markdown("**Step 2: Monitor Device Status**") |
|
refresh_btn = gr.Button("π Refresh All", variant="primary", size="lg") |
|
device_info = gr.JSON( |
|
label="Current Device State", |
|
value=device_state, |
|
show_label=True |
|
) |
|
|
|
with gr.Group(): |
|
gr.Markdown("**Step 3: Check Incoming Commands**") |
|
check_pong_btn = gr.Button("Check Commands", variant="secondary", size="lg") |
|
pong_commands = gr.Textbox( |
|
label="Command Log", |
|
lines=10, |
|
show_copy_button=True |
|
) |
|
|
|
|
|
with gr.Group(): |
|
gr.Markdown("**Step 4: Data Analysis (Optional)**") |
|
with gr.Row(): |
|
analyze_rgb_btn = gr.Button("Analyze RGB Trend", variant="secondary", size="lg") |
|
analyze_temp_btn = gr.Button("Analyze Temperature Trend", variant="secondary", size="lg") |
|
|
|
plot_output = gr.Plot( |
|
label="Trend Analysis", |
|
show_label=True |
|
) |
|
|
|
|
|
analyze_rgb_btn.click( |
|
lambda: plot_data_with_prediction("rgb"), |
|
outputs=plot_output |
|
) |
|
analyze_temp_btn.click( |
|
lambda: plot_data_with_prediction("temperature"), |
|
outputs=plot_output |
|
) |
|
|
|
|
|
ping_init_btn.click(initialize_ping, outputs=ping_status) |
|
pong_init_btn.click(initialize_pong, outputs=pong_status) |
|
pong_stop_btn.click(stop_mqtt, outputs=pong_status) |
|
|
|
|
|
send_rgb_btn.click( |
|
send_rgb, |
|
[r, g, b], |
|
rgb_status |
|
).then( |
|
check_pong_commands, |
|
outputs=[pong_commands] |
|
).then( |
|
check_ping_responses, |
|
outputs=[ping_responses] |
|
) |
|
|
|
send_weight_btn.click( |
|
send_weight_request, |
|
rpm, |
|
weight_status |
|
).then( |
|
refresh_all, |
|
outputs=[pong_commands, device_info] |
|
) |
|
|
|
|
|
refresh_btn.click( |
|
refresh_all, |
|
outputs=[pong_commands, device_info] |
|
) |
|
|
|
check_ping_btn.click( |
|
check_ping_responses, |
|
outputs=ping_responses |
|
) |
|
check_pong_btn.click(check_pong_commands, outputs=pong_commands) |
|
|
|
|
|
r.change(update_rgb_preview, inputs=[r, g, b], outputs=device_info) |
|
g.change(update_rgb_preview, inputs=[r, g, b], outputs=device_info) |
|
b.change(update_rgb_preview, inputs=[r, g, b], outputs=device_info) |
|
|
|
|
|
rpm.change(update_rpm_preview, inputs=rpm, outputs=device_info) |
|
|
|
|
|
temperature.change(update_temperature_preview, inputs=temperature, outputs=device_info) |
|
|
|
|
|
send_temp_btn.click( |
|
lambda temp: send_command("Temperature Reading", {"temperature": temp}), |
|
inputs=[temperature], |
|
outputs=temp_status |
|
).then( |
|
check_pong_commands, |
|
outputs=[pong_commands] |
|
).then( |
|
check_ping_responses, |
|
outputs=[ping_responses] |
|
) |
|
|
|
demo.load(lambda: None) |
|
|
|
if __name__ == "__main__": |
|
demo.launch() |