ping-pong / app.py
SissiFeng's picture
Create app.py
8714c43 verified
import json
import gradio as gr
import paho.mqtt.client as mqtt
import time
import random
from queue import Queue
import numpy as np
import pandas as pd
from datetime import datetime, timedelta
import plotly.graph_objects as go
# Global configuration
MQTT_HOST = "broker.hivemq.com"
MQTT_PORT = 1883
# Global state
response_queue = Queue()
command_queue = Queue()
mqtt_ping_client = None
mqtt_pong_client = None
session_id = None
device_state = {
"rgb": {"r": 0, "g": 0, "b": 0},
"temperature": 25.0,
"rpm": 0
}
# Add historical data storage
history_data = {
"rgb": [],
"temperature": [],
"timestamps": []
}
# MQTT callback functions
def on_ping_connect(client, userdata, flags, rc):
print(f"Ping connected with result code {rc}")
if session_id:
client.subscribe(f"pong/{session_id}/response")
def on_pong_connect(client, userdata, flags, rc):
print(f"Pong connected with result code {rc}")
client.subscribe("ping/command")
def on_ping_message(client, userdata, msg):
try:
response = json.loads(msg.payload.decode())
response_queue.put(response)
print(f"Ping received: {response}")
except Exception as e:
print(f"Ping error: {e}")
def on_pong_message(client, userdata, msg):
try:
command = json.loads(msg.payload.decode())
command_queue.put(command)
print(f"Pong received: {command}")
except Exception as e:
print(f"Pong error: {e}")
# Ping functionality
def initialize_ping():
global mqtt_ping_client, session_id
session_id = f"ping_{int(time.time())}"
mqtt_ping_client = mqtt.Client()
mqtt_ping_client.on_connect = on_ping_connect
mqtt_ping_client.on_message = on_ping_message
mqtt_ping_client.connect(MQTT_HOST, MQTT_PORT, 60)
mqtt_ping_client.loop_start()
return f"Ping initialized: {session_id}"
def send_command(command_type, data=None):
if not mqtt_ping_client:
return "Please initialize ping first"
payload = {
"type": command_type,
"data": data or {},
"session_id": session_id,
"timestamp": time.time()
}
mqtt_ping_client.publish("ping/command", json.dumps(payload))
return f"Sent {command_type}"
def send_rgb(r, g, b):
if not mqtt_ping_client:
return "Please initialize ping first"
payload = {
"type": "RGB Command",
"data": {"r": r, "g": g, "b": b},
"session_id": session_id,
"timestamp": time.time()
}
# Send command
mqtt_ping_client.publish("ping/command", json.dumps(payload))
# Put command into command queue for pong display
command_queue.put(payload)
return f"Sent RGB Command: R={r}, G={g}, B={b}"
def send_weight_request(rpm):
return send_command("Weight Data", {"set_rpm": rpm, "request_weight": True})
# Pong functionality
def initialize_pong():
global mqtt_pong_client
mqtt_pong_client = mqtt.Client()
mqtt_pong_client.on_connect = on_pong_connect
mqtt_pong_client.on_message = on_pong_message
mqtt_pong_client.connect(MQTT_HOST, MQTT_PORT, 60)
mqtt_pong_client.loop_start()
return "Pong started"
def process_command(command):
global device_state
command_type = command.get("type")
data = command.get("data", {})
session_id = command.get("session_id")
timestamp = datetime.fromtimestamp(command.get("timestamp", time.time()))
if command_type == "RGB Command":
# Update device state
device_state = {
**device_state,
"rgb": {
"r": int(data.get("r", 0)),
"g": int(data.get("g", 0)),
"b": int(data.get("b", 0))
}
}
print(f"Processing RGB command: {data}")
response_data = {
"current_state": "applied",
"power_consumption": random.uniform(0.1, 1.0),
"applied_values": device_state["rgb"]
}
# Record RGB value (using average for simplicity)
rgb_avg = sum([int(data.get(k, 0)) for k in ['r', 'g', 'b']]) / 3
record_data("rgb", rgb_avg, timestamp)
elif command_type == "Temperature Reading":
device_state["temperature"] += random.uniform(-0.5, 0.5)
response_data = {
"current_temperature": device_state["temperature"],
"humidity": random.uniform(40, 60),
"pressure": random.uniform(980, 1020)
}
# Record temperature
record_data("temperature", device_state["temperature"], timestamp)
elif command_type == "Weight Data":
if "set_rpm" in data:
device_state["rpm"] = data["set_rpm"]
response_data = {
"calibrated_weight": random.uniform(95, 105),
"current_rpm": device_state["rpm"],
"stability": random.uniform(0.98, 1.02)
}
else:
response_data = {"error": "Unknown command type"}
response = {
"type": command_type,
"data": response_data,
"timestamp": time.time(),
"session_id": session_id
}
if mqtt_pong_client:
mqtt_pong_client.publish(f"pong/{session_id}/response", json.dumps(response))
return json.dumps(response, indent=2)
def check_ping_responses():
"""Check the ping response queue"""
responses = []
while not response_queue.empty():
response = response_queue.get_nowait()
responses.append(json.dumps(response, indent=2))
return "\n".join(responses) if responses else "No new responses"
def check_pong_commands():
"""Check and process command queue"""
responses = []
while not command_queue.empty():
command = command_queue.get_nowait()
response = process_command(command)
responses.append(response)
return "\n".join(responses) if responses else "No new commands"
# Add stop_mqtt function
def stop_mqtt():
global mqtt_ping_client, mqtt_pong_client
if mqtt_ping_client:
mqtt_ping_client.loop_stop()
mqtt_ping_client.disconnect()
if mqtt_pong_client:
mqtt_pong_client.loop_stop()
mqtt_pong_client.disconnect()
return "Both Ping and Pong clients stopped"
# Add real-time update functions
def update_rgb_preview(r, g, b):
"""Real-time update RGB preview values"""
global device_state
device_state = {
**device_state,
"rgb": {
"r": int(r),
"g": int(g),
"b": int(b)
}
}
return gr.update(value=device_state)
def update_rpm_preview(rpm):
"""Real-time update RPM preview values"""
global device_state
device_state = {
**device_state,
"rpm": int(rpm)
}
return gr.update(value=device_state)
def update_temperature_preview(temperature):
"""Real-time update temperature preview values"""
global device_state
device_state = {
**device_state,
"temperature": float(temperature)
}
return gr.update(value=device_state)
# Add data recording and prediction functions
def record_data(data_type, value, timestamp):
"""Record historical data"""
history_data[data_type].append(value)
history_data["timestamps"].append(timestamp)
# Keep only the last 100 data points
if len(history_data[data_type]) > 100:
history_data[data_type] = history_data[data_type][-100:]
history_data["timestamps"] = history_data["timestamps"][-100:]
def generate_prediction(data_type):
"""Generate simple prediction"""
if len(history_data[data_type]) < 2:
return None
# Use simple linear regression for prediction
x = np.arange(len(history_data[data_type]))
y = np.array(history_data[data_type])
z = np.polyfit(x, y, 1)
p = np.poly1d(z)
# Predict the next 5 points
future_x = np.arange(len(x), len(x) + 5)
future_y = p(future_x)
return {
"historical": history_data[data_type],
"predicted": future_y.tolist(),
"timestamps": history_data["timestamps"]
}
def plot_data_with_prediction(data_type):
"""Create chart with prediction"""
if len(history_data[data_type]) < 2:
return None
# Use simple linear regression for prediction
x = np.arange(len(history_data[data_type]))
y = np.array(history_data[data_type])
z = np.polyfit(x, y, 1)
p = np.poly1d(z)
# Predict the next 5 points
future_x = np.arange(len(x), len(x) + 5)
future_y = p(future_x)
# Create Plotly chart
fig = go.Figure()
# Add historical data
fig.add_trace(go.Scatter(
x=[t.strftime('%H:%M:%S') for t in history_data["timestamps"]],
y=history_data[data_type],
mode='lines+markers',
name='Historical'
))
# Add predicted data
future_times = [
(history_data["timestamps"][-1] + timedelta(minutes=i)).strftime('%H:%M:%S')
for i in range(1, 6)
]
fig.add_trace(go.Scatter(
x=future_times,
y=future_y,
mode='lines',
line=dict(dash='dash'),
name='Predicted'
))
# Update layout
fig.update_layout(
title=f"{data_type.upper()} Trend Analysis",
xaxis_title="Time",
yaxis_title="Value",
showlegend=True
)
return fig
# Add manual refresh function
def refresh_all():
"""Manually refresh all states"""
commands = check_pong_commands()
print(f"Current device state: {device_state}")
return [commands, gr.update(value=device_state)]
# Gradio interface
with gr.Blocks(title="MQTT Ping-Pong System", theme=gr.themes.Base(
primary_hue=gr.themes.colors.Color(
c50="#edf2f0",
c100="#dbE5E1",
c200="#B8CCC6",
c300="#96B3AB",
c400="#7FA595", # Right main color
c500="#5C8072", # Left main color
c600="#4A665B",
c700="#374D45",
c800="#25332E",
c900="#121917",
c950="#080C0B", # Add this
)
)) as demo:
gr.Markdown("# πŸ“ MQTT Ping-Pong Communication System <span class='pong-emoji'>πŸ“</span>")
# Add CSS, including font settings
gr.HTML("""
<style>
/* Basic styles */
.ping-panel {
background-color: #5C8072 !important;
border-radius: 8px;
padding: 20px;
}
.pong-panel {
background-color: #7FA595 !important;
border-radius: 8px;
padding: 20px;
}
/* Font settings */
* {
font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto,
"Helvetica Neue", Arial, sans-serif;
}
/* Title styles */
h1, h2, h3, .header {
font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto,
"Helvetica Neue", Arial, sans-serif;
font-weight: 600;
color: #2c3e50;
}
/* Button styles */
.gr-button {
margin: 10px 0;
font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto,
"Helvetica Neue", Arial, sans-serif;
font-weight: 500;
}
/* Group styles */
.gr-group {
background-color: rgba(255, 255, 255, 0.1);
border-radius: 8px;
padding: 15px;
margin: 10px 0;
}
/* Label styles */
label {
font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto,
"Helvetica Neue", Arial, sans-serif;
font-weight: 500;
}
/* Pong emoji flip - applied to all elements with pong-emoji class */
.pong-emoji {
display: inline-block;
transform: scaleX(-1);
margin-left: 5px; /* Add a little space */
}
/* Step title styles */
.step-header {
font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto,
"Helvetica Neue", Arial, sans-serif;
font-weight: 600;
color: #2c3e50;
margin: 15px 0 10px 0;
}
</style>
""")
with gr.Row():
# Ping part (left)
with gr.Column(scale=1, variant="panel", elem_classes=["ping-panel"]):
gr.Markdown("### πŸ“ Ping Control (Sender)")
with gr.Group():
gr.Markdown("**Step 1: Initialize Connection**")
ping_init_btn = gr.Button("Initialize Ping", variant="primary", size="lg")
ping_status = gr.Textbox(label="Connection Status", lines=2)
with gr.Tabs():
with gr.TabItem("RGB Control"):
with gr.Group():
gr.Markdown("**Step 2: Configure RGB Values**")
r = gr.Slider(0, 255, 128, label="Red Value", interactive=True)
g = gr.Slider(0, 255, 128, label="Green Value", interactive=True)
b = gr.Slider(0, 255, 128, label="Blue Value", interactive=True)
gr.Markdown("**Step 3: Send Command**")
send_rgb_btn = gr.Button("Send RGB Command", variant="secondary", size="lg")
rgb_status = gr.Textbox(label="RGB Status", lines=2)
with gr.TabItem("Weight Control"):
with gr.Group():
gr.Markdown("**Step 2: Set RPM Value**")
rpm = gr.Slider(0, 5000, 1000, label="RPM Setting", interactive=True)
gr.Markdown("**Step 3: Send Request**")
send_weight_btn = gr.Button("Send Weight Request", variant="secondary", size="lg")
weight_status = gr.Textbox(label="Weight Status", lines=2)
# Add temperature and humidity control tab
with gr.TabItem("Temperature Control"):
with gr.Group():
gr.Markdown("**Step 2: Set Temperature Value**")
temperature = gr.Slider(0, 50, 25, label="Temperature (Β°C)", interactive=True)
gr.Markdown("**Step 3: Send Request**")
send_temp_btn = gr.Button("Send Temperature Request", variant="secondary", size="lg")
temp_status = gr.Textbox(label="Temperature Status", lines=2)
with gr.Group():
gr.Markdown("**Step 4: Check Responses**")
check_ping_btn = gr.Button("Check Responses", variant="secondary", size="lg")
ping_responses = gr.Textbox(
label="Response Log",
lines=10,
show_copy_button=True
)
# Pong part (right)
with gr.Column(scale=1, variant="panel", elem_classes=["pong-panel"]):
gr.Markdown("### Pong Monitor (Receiver) <span class='pong-emoji'>πŸ“</span>")
with gr.Group():
gr.Markdown("**Step 1: Start System**")
with gr.Row():
pong_init_btn = gr.Button("Start Pong", variant="primary", size="lg")
pong_stop_btn = gr.Button("Stop Pong", variant="secondary", size="lg")
pong_status = gr.Textbox(label="System Status", lines=2)
with gr.Group():
gr.Markdown("**Step 2: Monitor Device Status**")
refresh_btn = gr.Button("πŸ”„ Refresh All", variant="primary", size="lg")
device_info = gr.JSON(
label="Current Device State",
value=device_state,
show_label=True
)
with gr.Group():
gr.Markdown("**Step 3: Check Incoming Commands**")
check_pong_btn = gr.Button("Check Commands", variant="secondary", size="lg")
pong_commands = gr.Textbox(
label="Command Log",
lines=10,
show_copy_button=True
)
# Add Step 4: Data Analysis (Optional)
with gr.Group():
gr.Markdown("**Step 4: Data Analysis (Optional)**")
with gr.Row():
analyze_rgb_btn = gr.Button("Analyze RGB Trend", variant="secondary", size="lg")
analyze_temp_btn = gr.Button("Analyze Temperature Trend", variant="secondary", size="lg")
plot_output = gr.Plot(
label="Trend Analysis",
show_label=True
)
# Add event handling for analysis buttons
analyze_rgb_btn.click(
lambda: plot_data_with_prediction("rgb"),
outputs=plot_output
)
analyze_temp_btn.click(
lambda: plot_data_with_prediction("temperature"),
outputs=plot_output
)
# Modify event handling
ping_init_btn.click(initialize_ping, outputs=ping_status)
pong_init_btn.click(initialize_pong, outputs=pong_status)
pong_stop_btn.click(stop_mqtt, outputs=pong_status)
# Automatic refresh of pong state when sending commands
send_rgb_btn.click(
send_rgb,
[r, g, b],
rgb_status
).then(
check_pong_commands, # Update pong command display
outputs=[pong_commands]
).then(
check_ping_responses, # Update ping response display
outputs=[ping_responses]
)
send_weight_btn.click(
send_weight_request,
rpm,
weight_status
).then(
refresh_all, # Use refresh_all
outputs=[pong_commands, device_info]
)
# Modify manual refresh button event handling
refresh_btn.click(
refresh_all,
outputs=[pong_commands, device_info]
)
check_ping_btn.click(
check_ping_responses,
outputs=ping_responses
)
check_pong_btn.click(check_pong_commands, outputs=pong_commands)
# Add real-time update when slider values change
r.change(update_rgb_preview, inputs=[r, g, b], outputs=device_info)
g.change(update_rgb_preview, inputs=[r, g, b], outputs=device_info)
b.change(update_rgb_preview, inputs=[r, g, b], outputs=device_info)
# Add RPM slider real-time update
rpm.change(update_rpm_preview, inputs=rpm, outputs=device_info)
# Add temperature slider real-time update
temperature.change(update_temperature_preview, inputs=temperature, outputs=device_info)
# Add temperature command sending handling
send_temp_btn.click(
lambda temp: send_command("Temperature Reading", {"temperature": temp}),
inputs=[temperature],
outputs=temp_status
).then(
check_pong_commands,
outputs=[pong_commands]
).then(
check_ping_responses,
outputs=[ping_responses]
)
demo.load(lambda: None) # Initialize
if __name__ == "__main__":
demo.launch()