import streamlit as st | |
import paho.mqtt.client as mqtt | |
import queue, json, base64, io, os | |
from PIL import Image | |
def create_paho_client(tls=True): | |
client = mqtt.Client(client_id="", userdata=None, protocol=mqtt.MQTTv5) | |
if tls: | |
client.tls_set(tls_version=mqtt.ssl.PROTOCOL_TLS_CLIENT) | |
return client | |
response_queue = queue.Queue() | |
def setup_paho_client(client, username, password, cloud, port, device_id): | |
def on_message(client, userdata, msg): | |
payload_dict = json.loads(msg.payload) | |
response_queue.put(payload_dict) | |
client.username_pw_set(username, password) | |
client.connect(cloud, port) | |
client.subscribe(device_id + "/response") | |
client.on_message = on_message | |
client.loop_start() | |
while not client.is_connected(): | |
pass | |
def publish_and_wait(client, device_id, payload): | |
client.publish(device_id, payload=json.dumps(payload), qos=2) | |
try: | |
response = response_queue.get(block=True, timeout=10) | |
except queue.Empty: | |
response = None | |
return response | |
def handle_image_display(image): | |
b64_bytes = base64.b64decode(image) | |
img_bytes = io.BytesIO(b64_bytes) | |
img = | |
st.image(img, caption="Cobot camera view") | |
st.title("MyCobot280 Pi control demo") | |
st.markdown(""" | |
This app is a public demo of ... | |
""") | |
use_own_creds = st.checkbox("Use your own credentials", value=False) | |
if use_own_creds: | |
HIVEMQ_USERNAME = st.text_input("HIVEMQ Username", type="password") | |
HIVEMQ_PASSWORD = st.text_input("HIVEMQ Password", type="password") | |
HIVEMQ_HOST = st.text_input("HIVEMQ Host", type="password") | |
DEVICE_ID = st.text_input("Device ID", type="password") | |
PORT = st.number_input("Port", min_value=1, step=1) | |
else: | |
HIVEMQ_HOST = os.environ.get("HIVEMQ_HOST") | |
DEVICE_ID = os.environ.get("DEVICE_ID") | |
PORT = int(os.environ.get("PORT")) | |
st.markdown("## Commands") | |
commands = [ | |
"query/angles", | |
"query/coords", | |
"query/gripper", | |
"query/camera", | |
"control/angles", | |
"control/coords", | |
"control/gripper" | |
] | |
selected = st.selectbox("Select command: ", commands, key="selected_command", disabled=False) | |
args = {} | |
if st.session_state.selected_command == "query/angles": | |
pass | |
elif st.session_state.selected_command == "query/coords": | |
pass | |
elif st.session_state.selected_command == "query/gripper": | |
pass | |
elif st.session_state.selected_command == "query/camera": | |
quality = st.slider("Image quality", 1, 100, 50) | |
args = {"quality": quality} | |
elif st.session_state.selected_command == "control/angles": | |
col1, col2, col3 = st.columns(3) | |
with col1: | |
angle_1 = st.number_input("Angle 1", format="%.2f") | |
angle_4 = st.number_input("Angle 4", format="%.2f") | |
with col2: | |
angle_2 = st.number_input("Angle 2", format="%.2f") | |
angle_5 = st.number_input("Angle 5", format="%.2f") | |
with col3: | |
angle_3 = st.number_input("Angle 3", format="%.2f") | |
angle_6 = st.number_input("Angle 6", format="%.2f") | |
speed = st.slider("Speed", 1, 100, 50) | |
args = {"angles": [angle_1, angle_2, angle_3, angle_4, angle_5, angle_6], "speed": speed} | |
elif st.session_state.selected_command == "control/coords": | |
col1, col2, col3 = st.columns(3) | |
with col1: | |
x = st.number_input("x", format="%.2f") | |
rx = st.number_input("rx (Rotation x)", format="%.2f") | |
with col2: | |
y = st.number_input("y", format="%.2f") | |
ry = st.number_input("ry (Rotation y)", format="%.2f") | |
with col3: | |
z = st.number_input("z", format="%.2f") | |
rz = st.number_input("rz (Rotation z)", format="%.2f") | |
speed = st.slider("Speed", 1, 100, 50) | |
args = {"coords": [x, y, z, rx, ry, rz], "speed": speed} | |
elif st.session_state.selected_command == "control/gripper": | |
gripper_value = st.slider("Gripper value", 1, 100, 50) | |
speed = st.slider("Speed", 1, 100, 50) | |
args = {"gripper_value": gripper_value, "speed": speed} | |
with st.form("mqtt_form"): | |
if st.form_submit_button("Send Command"): | |
client = create_paho_client() | |
setup_paho_client( | |
client, | |
PORT, | |
) | |
payload = {"command": st.session_state.selected_command, "args": args} | |
st.markdown("### Payload") | |
st.write(payload) | |
response = publish_and_wait(client, DEVICE_ID, payload) | |
st.markdown("### Response") | |
if response is not None: | |
if "image" in response: | |
image = response.pop("image") | |
st.write(response) | |
handle_image_display(image) | |
else: | |
st.write(response) | |
else: | |
st.write("Timed out waiting for response. Is the on-device server running?") | |