import gradio as gr import paho.mqtt.client as mqtt import queue, json, base64, io, os from PIL import Image def create_paho_client(tls=True): client = mqtt.Client(client_id="", userdata=None, protocol=mqtt.MQTTv5) if tls: client.tls_set(tls_version=mqtt.ssl.PROTOCOL_TLS_CLIENT) return client def setup_paho_client(client, username, password, cloud, port, device_id, response_queue): def on_message(client, userdata, msg): payload_dict = json.loads(msg.payload) response_queue.put(payload_dict) client.username_pw_set(username, password) client.connect(cloud, port) client.subscribe(device_id + "/response") client.on_message = on_message client.loop_start() def publish_and_wait(client, device_id, payload, response_queue): client.publish(device_id, payload=json.dumps(payload), qos=2) try: response = response_queue.get(block=True, timeout=10) except queue.Empty: response = None return response def handle_image_display(image): b64_bytes = base64.b64decode(image) img_bytes = io.BytesIO(b64_bytes) img = Image.open(img_bytes) return img with gr.Blocks() as demo: gr.Markdown("# MyCobot280 Pi Control Demo") gr.Markdown(""" This app is a public demo of ... """) use_own_creds = gr.Checkbox(label="Use custom broker", value=False) # Placeholders for credentials inputs HIVEMQ_USERNAME = gr.Textbox(label="HIVEMQ Username", type="password", visible=False) HIVEMQ_PASSWORD = gr.Textbox(label="HIVEMQ Password", type="password", visible=False) HIVEMQ_HOST = gr.Textbox(label="HIVEMQ Host", visible=False) DEVICE_ID = gr.Textbox(label="Device ID", visible=False) PORT = gr.Number(label="Port", value=8883, visible=False) # Define function to update visibility of credential inputs def toggle_credentials(use_own_creds): visible = use_own_creds return { HIVEMQ_USERNAME: gr.update(visible=visible), HIVEMQ_PASSWORD: gr.update(visible=visible), HIVEMQ_HOST: gr.update(visible=visible), DEVICE_ID: gr.update(visible=visible), PORT: gr.update(visible=visible) } use_own_creds.change(fn=toggle_credentials, inputs=use_own_creds, outputs=[HIVEMQ_USERNAME, HIVEMQ_PASSWORD, HIVEMQ_HOST, DEVICE_ID, PORT]) # Commands gr.Markdown("## Commands") commands = [ "query/angles", "query/coords", "query/gripper", "query/camera", "control/angles", "control/coords", "control/gripper" ] selected_command = gr.Dropdown(choices=commands, label="Select command") # Placeholder for command arguments # We'll need to conditionally display inputs based on the selected command # Create a dictionary to store argument inputs for each command command_args = {} # For each command, define the inputs required with gr.Column(visible=False) as query_camera_args: quality = gr.Slider(label="Image quality", minimum=1, maximum=100, value=50) command_args["query/camera"] = [quality] # Similarly for other commands with gr.Column(visible=False) as control_angles_args: angle_1 = gr.Number(label="Angle 1", value=0.0) angle_2 = gr.Number(label="Angle 2", value=0.0) angle_3 = gr.Number(label="Angle 3", value=0.0) angle_4 = gr.Number(label="Angle 4", value=0.0) angle_5 = gr.Number(label="Angle 5", value=0.0) angle_6 = gr.Number(label="Angle 6", value=0.0) speed_angles = gr.Slider(label="Speed", minimum=1, maximum=100, value=50) command_args["control/angles"] = [angle_1, angle_2, angle_3, angle_4, angle_5, angle_6, speed_angles] with gr.Column(visible=False) as control_coords_args: x = gr.Number(label="x", value=0.0) y = gr.Number(label="y", value=0.0) z = gr.Number(label="z", value=0.0) rx = gr.Number(label="rx (Rotation x)", value=0.0) ry = gr.Number(label="ry (Rotation y)", value=0.0) rz = gr.Number(label="rz (Rotation z)", value=0.0) speed_coords = gr.Slider(label="Speed", minimum=1, maximum=100, value=50) command_args["control/coords"] = [x, y, z, rx, ry, rz, speed_coords] with gr.Column(visible=False) as control_gripper_args: gripper_value = gr.Slider(label="Gripper value", minimum=1, maximum=100, value=50) speed_gripper = gr.Slider(label="Speed", minimum=1, maximum=100, value=50) command_args["control/gripper"] = [gripper_value, speed_gripper] # Define function to update visibility of argument inputs based on selected command def update_args(selected_command): # First, set all arg groups to be hidden updates = { query_camera_args: gr.update(visible=False), control_angles_args: gr.update(visible=False), control_coords_args: gr.update(visible=False), control_gripper_args: gr.update(visible=False) } if selected_command == "query/camera": updates[query_camera_args] = gr.update(visible=True) elif selected_command == "control/angles": updates[control_angles_args] = gr.update(visible=True) elif selected_command == "control/coords": updates[control_coords_args] = gr.update(visible=True) elif selected_command == "control/gripper": updates[control_gripper_args] = gr.update(visible=True) return [updates[query_camera_args], updates[control_angles_args], updates[control_coords_args], updates[control_gripper_args]] selected_command.change(fn=update_args, inputs=selected_command, outputs=[query_camera_args, control_angles_args, control_coords_args, control_gripper_args]) # Now, define a "Send Command" button send_command_button = gr.Button("Send Command") # Placeholder for displaying payload and response payload_display = gr.Markdown() response_display = gr.Markdown() image_display = gr.Image() # Define state variables client_state = gr.State(None) response_queue_state = gr.State(None) previous_credentials_state = gr.State({"username": None, "password": None, "host": None, "device_id": None, "port": None}) # Define function to handle send command def send_command(use_own_creds, username, password, host, device_id, port, selected_command, quality, angle_1, angle_2, angle_3, angle_4, angle_5, angle_6, speed_angles, x, y, z, rx, ry, rz, speed_coords, gripper_value, speed_gripper, client_state, response_queue_state, previous_credentials_state): # Prepare args based on selected_command args = {} if selected_command == "query/angles": pass elif selected_command == "query/coords": pass elif selected_command == "query/gripper": pass elif selected_command == "query/camera": args = {"quality": quality} elif selected_command == "control/angles": args = {"angles": [angle_1, angle_2, angle_3, angle_4, angle_5, angle_6], "speed": speed_angles} elif selected_command == "control/coords": args = {"coords": [x, y, z, rx, ry, rz], "speed": speed_coords} elif selected_command == "control/gripper": args = {"gripper_value": gripper_value, "speed": speed_gripper} else: pass # Handle invalid command # Get credentials if use_own_creds: HIVEMQ_USERNAME = username HIVEMQ_PASSWORD = password HIVEMQ_HOST = host DEVICE_ID = device_id PORT = port else: HIVEMQ_USERNAME = os.environ.get("HIVEMQ_USERNAME") HIVEMQ_PASSWORD = os.environ.get("HIVEMQ_PASSWORD") HIVEMQ_HOST = os.environ.get("HIVEMQ_HOST") DEVICE_ID = os.environ.get("DEVICE_ID") PORT = int(os.environ.get("PORT", 8883)) # Check if client is already connected and credentials haven't changed if client_state is not None: credentials_changed = ( previous_credentials_state['username'] != HIVEMQ_USERNAME or previous_credentials_state['password'] != HIVEMQ_PASSWORD or previous_credentials_state['host'] != HIVEMQ_HOST or previous_credentials_state['device_id'] != DEVICE_ID or previous_credentials_state['port'] != PORT ) if credentials_changed: client_state.disconnect() client_state = None if client_state is None: response_queue = queue.Queue() client = create_paho_client() try: setup_paho_client( client, HIVEMQ_USERNAME, HIVEMQ_PASSWORD, HIVEMQ_HOST, PORT, DEVICE_ID, response_queue ) except Exception as e: payload_md = "" response_md = f"### Error\nFailed to connect to broker: {e}" return payload_md, response_md, None, client_state, response_queue_state, previous_credentials_state client_state = client response_queue_state = response_queue previous_credentials_state = { 'username': HIVEMQ_USERNAME, 'password': HIVEMQ_PASSWORD, 'host': HIVEMQ_HOST, 'device_id': DEVICE_ID, 'port': PORT } else: client = client_state response_queue = response_queue_state # Prepare payload payload = {"command": selected_command, "args": args} payload_md = f"### Payload\n```json\n{json.dumps(payload, indent=2)}\n```" # Publish and wait for response response = publish_and_wait( client, DEVICE_ID, payload, response_queue ) # Prepare response display if response is not None: if "image" in response: image_b64 = response.pop("image") img = handle_image_display(image_b64) response_md = f"### Response\n```json\n{json.dumps(response, indent=2)}\n```" return payload_md, response_md, img, client_state, response_queue_state, previous_credentials_state else: response_md = f"### Response\n```json\n{json.dumps(response, indent=2)}\n```" return payload_md, response_md, None, client_state, response_queue_state, previous_credentials_state else: response_md = "### Response\nTimed out waiting for response. Is the on-device server running?" return payload_md, response_md, None, client_state, response_queue_state, previous_credentials_state # Now, set up the function to be called when the "Send Command" button is clicked send_command_button.click( fn=send_command, inputs=[ use_own_creds, HIVEMQ_USERNAME, HIVEMQ_PASSWORD, HIVEMQ_HOST, DEVICE_ID, PORT, selected_command, quality, angle_1, angle_2, angle_3, angle_4, angle_5, angle_6, speed_angles, x, y, z, rx, ry, rz, speed_coords, gripper_value, speed_gripper, client_state, response_queue_state, previous_credentials_state ], outputs=[ payload_display, response_display, image_display, client_state, response_queue_state, previous_credentials_state ] ) demo.launch()