import streamlit as st import paho.mqtt.client as mqtt import queue, json, base64, io, os from PIL import Image @st.cache_resource def create_paho_client(tls=True): client = mqtt.Client(client_id="", userdata=None, protocol=mqtt.MQTTv5) if tls: client.tls_set(tls_version=mqtt.ssl.PROTOCOL_TLS_CLIENT) return client response_queue = queue.Queue() def setup_paho_client(client, username, password, cloud, port, device_id): def on_message(client, userdata, msg): payload_dict = json.loads(msg.payload) response_queue.put(payload_dict) client.username_pw_set(username, password) client.connect(cloud, port) client.subscribe(device_id + "/response") client.on_message = on_message client.loop_start() while not client.is_connected(): pass def publish_and_wait(client, device_id, payload): client.publish(device_id, payload=json.dumps(payload), qos=2) try: response = response_queue.get(block=True, timeout=10) except queue.Empty: response = None return response def handle_image_display(image): b64_bytes = base64.b64decode(image) img_bytes = io.BytesIO(b64_bytes) img = Image.open(img_bytes) st.image(img, caption="Cobot camera view") st.title("MyCobot280 Pi control demo") st.markdown(""" This app is a public demo of ... """) use_own_creds = st.checkbox("Use your own credentials", value=False) if use_own_creds: HIVEMQ_USERNAME = st.text_input("HIVEMQ Username", type="password") HIVEMQ_PASSWORD = st.text_input("HIVEMQ Password", type="password") HIVEMQ_HOST = st.text_input("HIVEMQ Host", type="password") DEVICE_ID = st.text_input("Device ID", type="password") PORT = st.number_input("Port", min_value=1, step=1) else: HIVEMQ_USERNAME = os.environ.get("HIVEMQ_USERNAME") HIVEMQ_PASSWORD = os.environ.get("HIVEMQ_PASSWORD") HIVEMQ_HOST = os.environ.get("HIVEMQ_HOST") DEVICE_ID = os.environ.get("DEVICE_ID") PORT = int(os.environ.get("PORT")) st.markdown("## Commands") commands = [ "query/angles", "query/coords", "query/gripper", "query/camera", "control/angles", "control/coords", "control/gripper" ] selected = st.selectbox("Select command: ", commands, key="selected_command", disabled=False) args = {} if st.session_state.selected_command == "query/angles": pass elif st.session_state.selected_command == "query/coords": pass elif st.session_state.selected_command == "query/gripper": pass elif st.session_state.selected_command == "query/camera": quality = st.slider("Image quality", 1, 100, 50) args = {"quality": quality} elif st.session_state.selected_command == "control/angles": col1, col2, col3 = st.columns(3) with col1: angle_1 = st.number_input("Angle 1", format="%.2f") angle_4 = st.number_input("Angle 4", format="%.2f") with col2: angle_2 = st.number_input("Angle 2", format="%.2f") angle_5 = st.number_input("Angle 5", format="%.2f") with col3: angle_3 = st.number_input("Angle 3", format="%.2f") angle_6 = st.number_input("Angle 6", format="%.2f") speed = st.slider("Speed", 1, 100, 50) args = {"angles": [angle_1, angle_2, angle_3, angle_4, angle_5, angle_6], "speed": speed} elif st.session_state.selected_command == "control/coords": col1, col2, col3 = st.columns(3) with col1: x = st.number_input("x", format="%.2f") rx = st.number_input("rx (Rotation x)", format="%.2f") with col2: y = st.number_input("y", format="%.2f") ry = st.number_input("ry (Rotation y)", format="%.2f") with col3: z = st.number_input("z", format="%.2f") rz = st.number_input("rz (Rotation z)", format="%.2f") speed = st.slider("Speed", 1, 100, 50) args = {"coords": [x, y, z, rx, ry, rz], "speed": speed} elif st.session_state.selected_command == "control/gripper": gripper_value = st.slider("Gripper value", 1, 100, 50) speed = st.slider("Speed", 1, 100, 50) args = {"gripper_value": gripper_value, "speed": speed} with st.form("mqtt_form"): if st.form_submit_button("Send Command"): client = create_paho_client() setup_paho_client( client, HIVEMQ_USERNAME, HIVEMQ_PASSWORD, HIVEMQ_HOST, PORT, DEVICE_ID ) payload = {"command": st.session_state.selected_command, "args": args} st.markdown("### Payload") st.write(payload) response = publish_and_wait(client, DEVICE_ID, payload) st.markdown("### Response") if response is not None: if "image" in response: image = response.pop("image") st.write(response) handle_image_display(image) else: st.write(response) else: st.write("Timed out waiting for response. Is the on-device server running?")