cobot280pi / app.py
gursi26's picture
maybe
5821d03
raw
history blame
6.18 kB
import streamlit as st
import paho.mqtt.client as mqtt
import queue, json, base64, io, os
from PIL import Image
@st.cache_resource
def create_paho_client(tls=True):
client = mqtt.Client(client_id="", userdata=None, protocol=mqtt.MQTTv5)
if tls:
client.tls_set(tls_version=mqtt.ssl.PROTOCOL_TLS_CLIENT)
return client
def setup_paho_client(client, username, password, cloud, port, device_id, response_queue):
def on_message(client, userdata, msg):
payload_dict = json.loads(msg.payload)
response_queue.put(payload_dict)
client.username_pw_set(username, password)
client.connect(cloud, port)
client.subscribe(device_id + "/response")
client.on_message = on_message
client.loop_start()
def publish_and_wait(client, device_id, payload, response_queue):
client.publish(device_id, payload=json.dumps(payload), qos=2)
try:
response = response_queue.get(block=True, timeout=10)
except queue.Empty:
response = None
return response
def handle_image_display(image):
b64_bytes = base64.b64decode(image)
img_bytes = io.BytesIO(b64_bytes)
img = Image.open(img_bytes)
st.image(img, caption="Cobot camera view")
st.title("MyCobot280 Pi control demo")
st.markdown("""
This app is a public demo of ...
""")
use_own_creds = st.checkbox("Use your own credentials", value=False)
if use_own_creds:
HIVEMQ_USERNAME = st.text_input("HIVEMQ Username", type="password")
HIVEMQ_PASSWORD = st.text_input("HIVEMQ Password", type="password")
HIVEMQ_HOST = st.text_input("HIVEMQ Host", type="password")
DEVICE_ID = st.text_input("Device ID", type="password")
PORT = st.number_input("Port", min_value=1, step=1, value=8883)
else:
HIVEMQ_USERNAME = os.environ.get("HIVEMQ_USERNAME")
HIVEMQ_PASSWORD = os.environ.get("HIVEMQ_PASSWORD")
HIVEMQ_HOST = os.environ.get("HIVEMQ_HOST")
DEVICE_ID = os.environ.get("DEVICE_ID")
PORT = int(os.environ.get("PORT", 8883))
# Initialize or update the MQTT client and response queue in session state
if 'client' not in st.session_state:
st.session_state['client'] = None
st.session_state['response_queue'] = None
st.session_state['HIVEMQ_USERNAME'] = None
st.session_state['HIVEMQ_PASSWORD'] = None
st.session_state['HIVEMQ_HOST'] = None
st.session_state['DEVICE_ID'] = None
st.session_state['PORT'] = None
credentials_changed = (
st.session_state['HIVEMQ_USERNAME'] != HIVEMQ_USERNAME or
st.session_state['HIVEMQ_PASSWORD'] != HIVEMQ_PASSWORD or
st.session_state['HIVEMQ_HOST'] != HIVEMQ_HOST or
st.session_state['DEVICE_ID'] != DEVICE_ID or
st.session_state['PORT'] != PORT
)
if st.session_state['client'] is None or credentials_changed:
st.session_state['HIVEMQ_USERNAME'] = HIVEMQ_USERNAME
st.session_state['HIVEMQ_PASSWORD'] = HIVEMQ_PASSWORD
st.session_state['HIVEMQ_HOST'] = HIVEMQ_HOST
st.session_state['DEVICE_ID'] = DEVICE_ID
st.session_state['PORT'] = PORT
st.session_state['response_queue'] = queue.Queue()
st.session_state['client'] = create_paho_client()
setup_paho_client(
st.session_state['client'],
HIVEMQ_USERNAME,
HIVEMQ_PASSWORD,
HIVEMQ_HOST,
PORT,
DEVICE_ID,
st.session_state['response_queue']
)
st.markdown("## Commands")
commands = [
"query/angles",
"query/coords",
"query/gripper",
"query/camera",
"control/angles",
"control/coords",
"control/gripper"
]
selected = st.selectbox("Select command: ", commands, key="selected_command", disabled=False)
args = {}
if st.session_state.selected_command == "query/angles":
pass
elif st.session_state.selected_command == "query/coords":
pass
elif st.session_state.selected_command == "query/gripper":
pass
elif st.session_state.selected_command == "query/camera":
quality = st.slider("Image quality", 1, 100, 50)
args = {"quality": quality}
elif st.session_state.selected_command == "control/angles":
col1, col2, col3 = st.columns(3)
with col1:
angle_1 = st.number_input("Angle 1", format="%.2f")
angle_4 = st.number_input("Angle 4", format="%.2f")
with col2:
angle_2 = st.number_input("Angle 2", format="%.2f")
angle_5 = st.number_input("Angle 5", format="%.2f")
with col3:
angle_3 = st.number_input("Angle 3", format="%.2f")
angle_6 = st.number_input("Angle 6", format="%.2f")
speed = st.slider("Speed", 1, 100, 50)
args = {"angles": [angle_1, angle_2, angle_3, angle_4, angle_5, angle_6], "speed": speed}
elif st.session_state.selected_command == "control/coords":
col1, col2, col3 = st.columns(3)
with col1:
x = st.number_input("x", format="%.2f")
rx = st.number_input("rx (Rotation x)", format="%.2f")
with col2:
y = st.number_input("y", format="%.2f")
ry = st.number_input("ry (Rotation y)", format="%.2f")
with col3:
z = st.number_input("z", format="%.2f")
rz = st.number_input("rz (Rotation z)", format="%.2f")
speed = st.slider("Speed", 1, 100, 50)
args = {"coords": [x, y, z, rx, ry, rz], "speed": speed}
elif st.session_state.selected_command == "control/gripper":
gripper_value = st.slider("Gripper value", 1, 100, 50)
speed = st.slider("Speed", 1, 100, 50)
args = {"gripper_value": gripper_value, "speed": speed}
with st.form("mqtt_form"):
if st.form_submit_button("Send Command"):
payload = {"command": st.session_state.selected_command, "args": args}
st.markdown("### Payload")
st.write(payload)
response = publish_and_wait(
st.session_state['client'],
DEVICE_ID,
payload,
st.session_state['response_queue']
)
st.markdown("### Response")
if response is not None:
if "image" in response:
image = response.pop("image")
st.write(response)
handle_image_display(image)
else:
st.write(response)
else:
st.write("Timed out waiting for response. Is the on-device server running?")