import gradio as gr from client import CobotController from utils import get_credentials import json import threading import time from collections import deque import namesgenerator # Queue system setup SESSION_TIME = 120 class QueueSystem: def __init__(self): self.queue = deque() self.current_user = None self.session_start_time = None self.lock = threading.Lock() def enqueue_user(self, user_id): with self.lock: if user_id not in self.queue and user_id != self.current_user: self.queue.append(user_id) def dequeue_user(self): with self.lock: if self.queue: return self.queue.popleft() return None def get_queue_info(self, user_id): with self.lock: if user_id == self.current_user: remaining_time = max(0, SESSION_TIME - (time.time() - self.session_start_time)) return 0, remaining_time elif user_id in self.queue: position = list(self.queue).index(user_id) + 1 if self.session_start_time: wait_time = (position - 1) * SESSION_TIME + max(0, SESSION_TIME - (time.time() - self.session_start_time)) else: wait_time = position * SESSION_TIME return position, wait_time else: return None, None def start_session(self, user_id): with self.lock: if self.current_user is None: self.current_user = user_id self.session_start_time = time.time() return True return False def end_session(self): with self.lock: if self.current_user and time.time() - self.session_start_time >= SESSION_TIME: self.current_user = None self.session_start_time = None return True return False queue_system = QueueSystem() def queue_size(): return f"There are {len(queue_system.queue)} people in the queue." # Background timer thread to end session after SESSION_TIME def background_timer(): while True: time.sleep(1) if queue_system.end_session(): next_user = queue_system.dequeue_user() if next_user: queue_system.start_session(next_user) timer_thread = threading.Thread(target=background_timer, daemon=True) timer_thread.start() user, pwd, host, endpoint, port = get_credentials(False) client = CobotController(user, pwd, host, port, endpoint) CSS = """ #col { background-color: #161624; padding: 16px; border-radius: 8px; } #nogaprow { gap: 0px !important; } #nogapcol { padding: 0px !important; border: none !important; box-shadow: none !important; } """ """ checks the user position on the queue and returns a message along with whether the command should be executed. """ def authenticate_user(user_id): if queue_system.current_user is None: queue_system.start_session(user_id) queue_system.enqueue_user(user_id) position, wait_time = queue_system.get_queue_info(user_id) if position == 0: remaining_time_msg = f"Your turn!\nTime remaining: {wait_time:.2f} seconds." return True, remaining_time_msg elif position is not None: if position == 1: wait_msg = f"You are next!\nWait time: {wait_time:.2f} seconds." else: wait_msg = f"There are {position - 1} people ahead of you in the queue.\nWait time: {wait_time:.2f} seconds." return False, wait_msg else: return False, "Error: You are not in the queue." def enter_queue(user_id): _, msg = authenticate_user(user_id) return msg def query_angles(user_id): to_execute, queue_status_msg = authenticate_user(user_id) if to_execute: resp = client.get_angles() resp["command"] = "query/angles" return json.dumps(resp, indent=4), queue_status_msg else: return None, queue_status_msg def query_coords(user_id): to_execute, queue_status_msg = authenticate_user(user_id) if to_execute: resp = client.get_coords() resp["command"] = "query/coords" return json.dumps(resp, indent=4), queue_status_msg else: return None, queue_status_msg def query_gripper(user_id): to_execute, queue_status_msg = authenticate_user(user_id) if to_execute: resp = client.get_gripper_value() resp["command"] = "query/gripper" return json.dumps(resp, indent=4), queue_status_msg else: return None, queue_status_msg def query_camera(user_id): to_execute, queue_status_msg = authenticate_user(user_id) if to_execute: resp = client.get_camera() resp["command"] = "query/camera" if not resp["success"]: return json.dumps(resp, indent=4), None, queue_status_msg img = resp.pop("image") return json.dumps(resp, indent=4), gr.Image(visible=True, value=img), queue_status_msg else: return None, None, queue_status_msg def control_angles(user_id, angle0, angle1, angle2, angle3, angle4, angle5, movement_speed): to_execute, queue_status_msg = authenticate_user(user_id) if to_execute: resp = client.send_angles([angle0, angle1, angle2, angle3, angle4, angle5], movement_speed) resp["command"] = "control/angles" return json.dumps(resp, indent=4), queue_status_msg else: return None, queue_status_msg def control_coords(user_id, x, y, z, roll, pitch, yaw, movement_speed): to_execute, queue_status_msg = authenticate_user(user_id) if to_execute: resp = client.send_coords([x, y, z, roll, pitch, yaw], movement_speed) resp["command"] = "control/coords" return json.dumps(resp, indent=4), queue_status_msg else: return None, queue_status_msg def control_gripper(user_id, gripper_value, movement_speed): to_execute, queue_status_msg = authenticate_user(user_id) if to_execute: resp = client.send_gripper_value(gripper_value, movement_speed) resp["command"] = "control/gripper" return json.dumps(resp, indent=4), queue_status_msg else: return None, queue_status_msg def handle_request( command, gripper_value, gripper_speed, angle1, angle2, angle3, angle4, angle5, angle6, angle_speed, xcoord, ycoord, zcoord, roll, pitch, yaw, coord_speed,image_box ): if command == "enterqueue": return "", gr.Image(visible=False) elif command == "query/angles": resp = client.get_angles() resp["command"] = command return json.dumps(resp, indent=4), image_box elif command == "query/coords": resp = client.get_coords() resp["command"] = command return json.dumps(resp, indent=4), image_box elif command == "query/camera": resp = client.get_camera() resp["command"] = command if not resp["success"]: return json.dumps(resp, indent=4) img = resp.pop("image") return json.dumps(resp, indent=4), gr.Image(visible=True, value=img) elif command == "query/gripper": resp = client.get_gripper_value() resp["command"] = command return json.dumps(resp, indent=4), image_box elif command == "control/angles": resp = client.send_angles( angle_list = [angle1, angle2, angle3, angle4, angle5, angle6], speed = angle_speed ) resp["command"] = command image_resp = client.get_camera() image_ret = gr.Image(value=image_resp["image"], visible=True) if image_resp["success"] else gr.Image(visible=False) return json.dumps(resp, indent=4), image_ret elif command == "control/gripper": resp = client.send_gripper_value( value = gripper_value, speed = gripper_speed ) resp["command"] = command image_resp = client.get_camera() image_ret = gr.Image(value=image_resp["image"], visible=True) if image_resp["success"] else gr.Image(visible=False) return json.dumps(resp, indent=4), image_ret elif command == "control/coords": resp = client.send_coords( coord_list = [xcoord, ycoord, zcoord, roll, pitch, yaw], speed = coord_speed ) resp["command"] = command image_resp = client.get_camera() image_ret = gr.Image(value=image_resp["image"], visible=True) if image_resp["success"] else gr.Image(visible=False) return json.dumps(resp, indent=4), image_ret def process_user_auth(command, user_id, gripper_value, gripper_speed, angle1, angle2, angle3, angle4, angle5, angle6, angle_speed, xcoord, ycoord, zcoord, roll, pitch, yaw, coord_speed, output_image): if queue_system.current_user is None: queue_system.start_session(user_id) queue_system.enqueue_user(user_id) position, wait_time = queue_system.get_queue_info(user_id) if position == 0: result, response_image = handle_request( command, gripper_value, gripper_speed, angle1, angle2, angle3, angle4, angle5, angle6, angle_speed, xcoord, ycoord, zcoord, roll, pitch, yaw, coord_speed, output_image ) remaining_time_msg = f"Your turn!\nTime remaining: {wait_time:.2f} seconds." return result, response_image, remaining_time_msg elif position is not None: if position == 1: wait_msg = f"You are next!\nWait time: {wait_time:.2f} seconds." else: wait_msg = f"There are {position - 1} people ahead of you in the queue.\nWait time: {wait_time:.2f} seconds." return None, None, wait_msg else: return None, None, "Error: You are not in the queue." with gr.Blocks(css=CSS) as app: gr.Markdown("# MyCobot 280pi MQTT Control Demo") gr.Markdown("[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/AccelerationConsortium/ac-training-lab/blob/cobot-usage-docs/src/ac_training_lab/cobot280pi/gradio-client-demo.ipynb)") gr.Markdown("This is a demo that uses the MQTT protocol to communicate with the MyCobot 280pi over the internet. You can remotely send commands to the cobot and query it's current status.") with gr.Row(): with gr.Column(): user_id = gr.Textbox(label="User ID", info="Enter a unique user id of your choice or take note of the automatically generated user id. This user id will be placed into a queue to give you access to the cobot, so make sure you remember it!") enter_queue_button = gr.Button("Join queue") with gr.Column(): status_text = gr.Textbox(label="Queue status", value="", interactive=False, lines=5) with gr.Row(): # QUERY PANEL with gr.Column(elem_id="col"): gr.Markdown("## Query") gr.Markdown("Use buttons on this panel to query the current status of the cobot, including information like joint angles, coordinates, gripper state and what the onboard camera sees.") angle_query_button = gr.Button("Query Angles") coord_query_button = gr.Button("Query Coordinates") gripper_query_button = gr.Button("Query Gripper state") camera_query_button = gr.Button("Query Camera") # GRIPPER PANEL with gr.Column(elem_id="col"): gr.Markdown("## Gripper Control") gr.Markdown("Use this panel to control the gripper of the cobot.") gripper_value = gr.Slider(minimum=0.0, maximum=100.0, step=1.0, label="Gripper value") speed_gripper = gr.Slider(value=50.0, minimum=0.0, maximum=100.0, step=1.0, label="Movement speed") gripper_control_button = gr.Button("Send gripper command") with gr.Row(): # ANGLE PANEL with gr.Column(elem_id="col"): gr.Markdown("## Angle Control") gr.Markdown("Use this panel to control the joint angles of the cobot. Each angle corresponds to one of the 6 joints on the cobot.") with gr.Row(elem_id="nogaprow"): with gr.Column(elem_id="nogapcol"): angle1 = gr.Slider(value=0.0, label="Angle 1", step=1.0, minimum=-168, maximum=168) angle3 = gr.Slider(value=0.0, label="Angle 3", step=1.0, minimum=-135, maximum=135) angle5 = gr.Slider(value=0.0, label="Angle 5", step=1.0, minimum=-150, maximum=150) with gr.Column(elem_id="nogapcol"): angle2 = gr.Slider(value=0.0, label="Angle 2", step=1.0, minimum=-145, maximum=145) angle4 = gr.Slider(value=0.0, label="Angle 4", step=1.0, minimum=-165, maximum=165) angle6 = gr.Slider(value=0.0, label="Angle 6", step=1.0, minimum=-180, maximum=180) speed_angles = gr.Slider(value=50.0, minimum=0.0, maximum=100.0, step=1.0, label="Movement speed") angle_control_button = gr.Button("Send angle command") # COORD PANEL with gr.Column(elem_id="col"): gr.Markdown("## Coordinate Control") gr.Markdown("Use this panel to control the joint coordinates of the cobot head. The angles are in [6-DoF format](https://en.wikipedia.org/wiki/Six_degrees_of_freedom).") with gr.Row(elem_id="nogaprow"): with gr.Column(elem_id="nogapcol"): xcoord = gr.Slider(value=0.0, label="X coordinate", step=1.0, minimum=-350, maximum=350) ycoord = gr.Slider(value=0.0, label="Y coordinate", step=1.0, minimum=-350, maximum=350) zcoord = gr.Slider(value=0.0, label="Z coordinate", step=1.0, minimum=-70, maximum=523) with gr.Column(elem_id="nogapcol"): roll = gr.Slider(value=0.0, label="Roll", step=1.0, minimum=-180, maximum=180) pitch = gr.Slider(value=0.0, label="Pitch", step=1.0, minimum=-180, maximum=180) yaw = gr.Slider(value=0.0, label="Yaw", step=1.0, minimum=-180, maximum=180) speed_coords = gr.Slider(value=50.0, minimum=0.0, maximum=100.0, step=1.0, label="Movement speed") coord_control_button = gr.Button("Send coordinate command") response = gr.Textbox(label="Response") response_image = gr.Image(visible=False) # Queue-aware event handling angle_query_button.click( query_angles, inputs = [user_id], outputs = [response, status_text] ) coord_query_button.click( query_coords, inputs = [user_id], outputs = [response, status_text] ) gripper_query_button.click( query_gripper, inputs = [user_id], outputs = [response, status_text] ) camera_query_button.click( query_camera, inputs = [user_id], outputs = [response, response_image, status_text] ) gripper_control_button.click( control_gripper, inputs = [user_id, gripper_value, speed_gripper], outputs = [response, status_text] ) angle_control_button.click( control_angles, inputs = [user_id, angle1, angle2, angle3, angle4, angle5, angle6, speed_angles], outputs = [response, status_text] ) coord_control_button.click( control_coords, inputs = [user_id, xcoord, ycoord, zcoord, roll, pitch, yaw, speed_coords], outputs = [response, status_text] ) enter_queue_button.click( enter_queue, inputs = [user_id], outputs = [status_text] ) app.load( namesgenerator.get_random_name, outputs=[user_id] ) app.load( queue_size, outputs=[status_text] ) app.queue(default_concurrency_limit=1, max_size=100) app.launch()