|
import gradio as gr |
|
from client import CobotController |
|
from utils import get_credentials |
|
import json |
|
import threading |
|
import time |
|
from collections import deque |
|
import namesgenerator |
|
|
|
|
|
SESSION_TIME = 120 |
|
|
|
class QueueSystem: |
|
def __init__(self): |
|
self.queue = deque() |
|
self.current_user = None |
|
self.session_start_time = None |
|
self.lock = threading.Lock() |
|
|
|
def enqueue_user(self, user_id): |
|
with self.lock: |
|
if user_id not in self.queue and user_id != self.current_user: |
|
self.queue.append(user_id) |
|
|
|
def dequeue_user(self): |
|
with self.lock: |
|
if self.queue: |
|
return self.queue.popleft() |
|
return None |
|
|
|
def get_queue_info(self, user_id): |
|
with self.lock: |
|
if user_id == self.current_user: |
|
remaining_time = max(0, SESSION_TIME - (time.time() - self.session_start_time)) |
|
return 0, remaining_time |
|
elif user_id in self.queue: |
|
position = list(self.queue).index(user_id) + 1 |
|
if self.session_start_time: |
|
wait_time = (position - 1) * SESSION_TIME + max(0, SESSION_TIME - (time.time() - self.session_start_time)) |
|
else: |
|
wait_time = position * SESSION_TIME |
|
return position, wait_time |
|
else: |
|
return None, None |
|
|
|
def start_session(self, user_id): |
|
with self.lock: |
|
if self.current_user is None: |
|
self.current_user = user_id |
|
self.session_start_time = time.time() |
|
return True |
|
return False |
|
|
|
def end_session(self): |
|
with self.lock: |
|
if self.current_user and time.time() - self.session_start_time >= SESSION_TIME: |
|
self.current_user = None |
|
self.session_start_time = None |
|
return True |
|
return False |
|
|
|
queue_system = QueueSystem() |
|
|
|
def queue_size(): |
|
return f"There are {len(queue_system.queue)} people in the queue." |
|
|
|
|
|
def background_timer(): |
|
while True: |
|
time.sleep(1) |
|
if queue_system.end_session(): |
|
next_user = queue_system.dequeue_user() |
|
if next_user: |
|
queue_system.start_session(next_user) |
|
|
|
timer_thread = threading.Thread(target=background_timer, daemon=True) |
|
timer_thread.start() |
|
|
|
user, pwd, host, endpoint, port = get_credentials(False) |
|
client = CobotController(user, pwd, host, port, endpoint) |
|
|
|
CSS = """ |
|
#col { |
|
background-color: #161624; |
|
padding: 16px; |
|
border-radius: 8px; |
|
} |
|
#nogaprow { |
|
gap: 0px !important; |
|
} |
|
#nogapcol { |
|
padding: 0px !important; |
|
border: none !important; |
|
box-shadow: none !important; |
|
} |
|
""" |
|
|
|
def handle_request( |
|
command, gripper_value, gripper_speed, angle1, angle2, angle3, angle4, |
|
angle5, angle6, angle_speed, xcoord, ycoord, zcoord, roll, pitch, yaw, coord_speed,image_box |
|
): |
|
if command == "enterqueue": |
|
return "", gr.Image(visible=False) |
|
elif command == "query/angles": |
|
resp = client.get_angles() |
|
resp["command"] = command |
|
return json.dumps(resp, indent=4), image_box |
|
elif command == "query/coords": |
|
resp = client.get_coords() |
|
resp["command"] = command |
|
return json.dumps(resp, indent=4), image_box |
|
elif command == "query/camera": |
|
resp = client.get_camera() |
|
resp["command"] = command |
|
if not resp["success"]: |
|
return json.dumps(resp, indent=4) |
|
img = resp.pop("image") |
|
return json.dumps(resp, indent=4), gr.Image(visible=True, value=img) |
|
elif command == "query/gripper": |
|
resp = client.get_gripper_value() |
|
resp["command"] = command |
|
return json.dumps(resp, indent=4), image_box |
|
elif command == "control/angles": |
|
resp = client.send_angles( |
|
angle_list = [angle1, angle2, angle3, angle4, angle5, angle6], |
|
speed = angle_speed |
|
) |
|
resp["command"] = command |
|
|
|
image_resp = client.get_camera() |
|
image_ret = gr.Image(value=image_resp["image"], visible=True) if image_resp["success"] else gr.Image(visible=False) |
|
return json.dumps(resp, indent=4), image_ret |
|
elif command == "control/gripper": |
|
resp = client.send_gripper_value( |
|
value = gripper_value, |
|
speed = gripper_speed |
|
) |
|
resp["command"] = command |
|
|
|
image_resp = client.get_camera() |
|
image_ret = gr.Image(value=image_resp["image"], visible=True) if image_resp["success"] else gr.Image(visible=False) |
|
return json.dumps(resp, indent=4), image_ret |
|
elif command == "control/coords": |
|
resp = client.send_coords( |
|
coord_list = [xcoord, ycoord, zcoord, roll, pitch, yaw], |
|
speed = coord_speed |
|
) |
|
resp["command"] = command |
|
|
|
image_resp = client.get_camera() |
|
image_ret = gr.Image(value=image_resp["image"], visible=True) if image_resp["success"] else gr.Image(visible=False) |
|
return json.dumps(resp, indent=4), image_ret |
|
|
|
def process_user_auth(command, user_id, gripper_value, gripper_speed, angle1, angle2, angle3, angle4, |
|
angle5, angle6, angle_speed, xcoord, ycoord, zcoord, roll, pitch, yaw, coord_speed, output_image): |
|
if queue_system.current_user is None: |
|
queue_system.start_session(user_id) |
|
queue_system.enqueue_user(user_id) |
|
position, wait_time = queue_system.get_queue_info(user_id) |
|
|
|
if position == 0: |
|
result, response_image = handle_request( |
|
command, gripper_value, gripper_speed, angle1, angle2, angle3, angle4, |
|
angle5, angle6, angle_speed, xcoord, ycoord, zcoord, roll, pitch, yaw, coord_speed, output_image |
|
) |
|
remaining_time_msg = f"Your turn!\nTime remaining: {wait_time:.2f} seconds." |
|
return result, response_image, remaining_time_msg |
|
elif position is not None: |
|
if position == 1: |
|
wait_msg = f"You are next!\nWait time: {wait_time:.2f} seconds." |
|
else: |
|
wait_msg = f"There are {position - 1} people ahead of you in the queue.\nWait time: {wait_time:.2f} seconds." |
|
return None, None, wait_msg |
|
else: |
|
return None, None, "Error: You are not in the queue." |
|
|
|
|
|
with gr.Blocks(css=CSS) as app: |
|
gr.Markdown("# MyCobot 280pi MQTT Control Demo") |
|
gr.Markdown("This is a demo that uses the MQTT protocol to communicate with the MyCobot 280pi over the internet. You can remotely send commands to the cobot and query it's current status.") |
|
|
|
with gr.Row(): |
|
with gr.Column(): |
|
user_id = gr.Textbox(label="User ID", info="Enter a unique user id of your choice or take note of the automatically generated user id. This user id will be placed into a queue to give you access to the cobot, so make sure you remember it!") |
|
enter_queue_button = gr.Button("Join queue") |
|
with gr.Column(): |
|
status_text = gr.Textbox(label="Queue status", value="", interactive=False, lines=5) |
|
|
|
with gr.Row(): |
|
|
|
with gr.Column(elem_id="col"): |
|
gr.Markdown("## Query") |
|
gr.Markdown("Use buttons on this panel to query the current status of the cobot, including information like joint angles, coordinates, gripper state and what the onboard camera sees.") |
|
angle_query_button = gr.Button("Query Angles") |
|
coord_query_button = gr.Button("Query Coordinates") |
|
gripper_query_button = gr.Button("Query Gripper state") |
|
camera_query_button = gr.Button("Query Camera") |
|
|
|
|
|
with gr.Column(elem_id="col"): |
|
gr.Markdown("## Gripper Control") |
|
gr.Markdown("Use this panel to control the gripper of the cobot.") |
|
gripper_value = gr.Slider(minimum=0.0, maximum=100.0, step=1.0, label="Gripper value") |
|
speed_gripper = gr.Slider(value=50.0, minimum=0.0, maximum=100.0, step=1.0, label="Movement speed") |
|
gripper_control_button = gr.Button("Send gripper command") |
|
|
|
with gr.Row(): |
|
|
|
with gr.Column(elem_id="col"): |
|
gr.Markdown("## Angle Control") |
|
gr.Markdown("Use this panel to control the joint angles of the cobot. Each angle corresponds to one of the 6 joints on the cobot.") |
|
with gr.Row(elem_id="nogaprow"): |
|
with gr.Column(elem_id="nogapcol"): |
|
angle1 = gr.Slider(value=0.0, label="Angle 1", step=1.0, minimum=-168, maximum=168) |
|
angle3 = gr.Slider(value=0.0, label="Angle 3", step=1.0, minimum=-135, maximum=135) |
|
angle5 = gr.Slider(value=0.0, label="Angle 5", step=1.0, minimum=-150, maximum=150) |
|
with gr.Column(elem_id="nogapcol"): |
|
angle2 = gr.Slider(value=0.0, label="Angle 2", step=1.0, minimum=-145, maximum=145) |
|
angle4 = gr.Slider(value=0.0, label="Angle 4", step=1.0, minimum=-165, maximum=165) |
|
angle6 = gr.Slider(value=0.0, label="Angle 6", step=1.0, minimum=-180, maximum=180) |
|
speed_angles = gr.Slider(value=50.0, minimum=0.0, maximum=100.0, step=1.0, label="Movement speed") |
|
angle_control_button = gr.Button("Send angle command") |
|
|
|
|
|
with gr.Column(elem_id="col"): |
|
gr.Markdown("## Coordinate Control") |
|
gr.Markdown("Use this panel to control the joint coordinates of the cobot head. The angles are in [6-DoF format](https://en.wikipedia.org/wiki/Six_degrees_of_freedom).") |
|
with gr.Row(elem_id="nogaprow"): |
|
with gr.Column(elem_id="nogapcol"): |
|
xcoord = gr.Slider(value=0.0, label="X coordinate", step=1.0, minimum=-350, maximum=350) |
|
ycoord = gr.Slider(value=0.0, label="Y coordinate", step=1.0, minimum=-350, maximum=350) |
|
zcoord = gr.Slider(value=0.0, label="Z coordinate", step=1.0, minimum=-70, maximum=523) |
|
with gr.Column(elem_id="nogapcol"): |
|
roll = gr.Slider(value=0.0, label="Roll", step=1.0, minimum=-180, maximum=180) |
|
pitch = gr.Slider(value=0.0, label="Pitch", step=1.0, minimum=-180, maximum=180) |
|
yaw = gr.Slider(value=0.0, label="Yaw", step=1.0, minimum=-180, maximum=180) |
|
speed_coords = gr.Slider(value=50.0, minimum=0.0, maximum=100.0, step=1.0, label="Movement speed") |
|
coord_control_button = gr.Button("Send coordinate command") |
|
|
|
response = gr.Textbox(label="Response") |
|
response_image = gr.Image(visible=False) |
|
|
|
|
|
angle_query_button.click( |
|
process_user_auth, |
|
inputs = [gr.Textbox(value="query/angles", visible=False), user_id, gripper_value, speed_gripper, angle1, angle2, angle3, angle4, |
|
angle5, angle6, speed_angles, xcoord, ycoord, zcoord, roll, pitch, yaw, speed_coords, response_image], |
|
outputs = [response, response_image, status_text] |
|
) |
|
coord_query_button.click( |
|
process_user_auth, |
|
inputs = [gr.Textbox(value="query/coords", visible=False), user_id, gripper_value, speed_gripper, angle1, angle2, angle3, angle4, |
|
angle5, angle6, speed_angles, xcoord, ycoord, zcoord, roll, pitch, yaw, speed_coords, response_image], |
|
outputs = [response, response_image, status_text] |
|
) |
|
gripper_query_button.click( |
|
process_user_auth, |
|
inputs = [gr.Textbox(value="query/gripper", visible=False), user_id, gripper_value, speed_gripper, angle1, angle2, angle3, angle4, |
|
angle5, angle6, speed_angles, xcoord, ycoord, zcoord, roll, pitch, yaw, speed_coords, response_image], |
|
outputs = [response, response_image, status_text] |
|
) |
|
camera_query_button.click( |
|
process_user_auth, |
|
inputs = [gr.Textbox(value="query/camera", visible=False), user_id, gripper_value, speed_gripper, angle1, angle2, angle3, angle4, |
|
angle5, angle6, speed_angles, xcoord, ycoord, zcoord, roll, pitch, yaw, speed_coords, response_image], |
|
outputs = [response, response_image, status_text] |
|
) |
|
gripper_control_button.click( |
|
process_user_auth, |
|
inputs = [gr.Textbox(value="control/gripper", visible=False), user_id, gripper_value, speed_gripper, angle1, angle2, angle3, angle4, |
|
angle5, angle6, speed_angles, xcoord, ycoord, zcoord, roll, pitch, yaw, speed_coords, response_image], |
|
outputs = [response, response_image, status_text] |
|
) |
|
angle_control_button.click( |
|
process_user_auth, |
|
inputs = [gr.Textbox(value="control/angles", visible=False), user_id, gripper_value, speed_gripper, angle1, angle2, angle3, angle4, |
|
angle5, angle6, speed_angles, xcoord, ycoord, zcoord, roll, pitch, yaw, speed_coords, response_image], |
|
outputs = [response, response_image, status_text] |
|
) |
|
coord_control_button.click( |
|
process_user_auth, |
|
inputs = [gr.Textbox(value="control/coords", visible=False), user_id, gripper_value, speed_gripper, angle1, angle2, angle3, angle4, |
|
angle5, angle6, speed_angles, xcoord, ycoord, zcoord, roll, pitch, yaw, speed_coords, response_image], |
|
outputs = [response, response_image, status_text] |
|
) |
|
enter_queue_button.click( |
|
process_user_auth, |
|
inputs = [gr.Textbox(value="enterqueue", visible=False), user_id, gripper_value, speed_gripper, angle1, angle2, angle3, angle4, |
|
angle5, angle6, speed_angles, xcoord, ycoord, zcoord, roll, pitch, yaw, speed_coords, response_image], |
|
outputs = [response, response_image, status_text] |
|
) |
|
|
|
app.load( |
|
namesgenerator.get_random_name, |
|
outputs=[user_id] |
|
) |
|
|
|
app.load( |
|
queue_size, |
|
outputs=[status_text] |
|
) |
|
|
|
app.queue(default_concurrency_limit=1, max_size=100) |
|
app.launch() |
|
|