gursi26's picture
added coord/angle reset and update buttons, refactor
39dd3bf
import gradio as gr
from client import CobotController
from utils import get_credentials
import json
import threading
import time
from collections import deque
import namesgenerator
# Queue system setup
SESSION_TIME = 120
class QueueSystem:
def __init__(self):
self.queue = deque()
self.current_user = None
self.session_start_time = None
self.lock = threading.Lock()
def enqueue_user(self, user_id):
with self.lock:
if user_id not in self.queue and user_id != self.current_user:
self.queue.append(user_id)
def dequeue_user(self):
with self.lock:
if self.queue:
return self.queue.popleft()
return None
def get_queue_info(self, user_id):
with self.lock:
if user_id == self.current_user:
remaining_time = max(0, SESSION_TIME - (time.time() - self.session_start_time))
return 0, remaining_time
elif user_id in self.queue:
position = list(self.queue).index(user_id) + 1
if self.session_start_time:
wait_time = (position - 1) * SESSION_TIME + max(0, SESSION_TIME - (time.time() - self.session_start_time))
else:
wait_time = position * SESSION_TIME
return position, wait_time
else:
return None, None
def start_session(self, user_id):
with self.lock:
if self.current_user is None:
self.current_user = user_id
self.session_start_time = time.time()
return True
return False
def end_session(self):
with self.lock:
if self.current_user and time.time() - self.session_start_time >= SESSION_TIME:
self.current_user = None
self.session_start_time = None
return True
return False
queue_system = QueueSystem()
def queue_size():
return f"There are {len(queue_system.queue)} people in the queue."
# Background timer thread to end session after SESSION_TIME
def background_timer():
while True:
time.sleep(1)
if queue_system.end_session():
next_user = queue_system.dequeue_user()
if next_user:
queue_system.start_session(next_user)
timer_thread = threading.Thread(target=background_timer, daemon=True)
timer_thread.start()
user, pwd, host, endpoint, port = get_credentials(False)
client = CobotController(user, pwd, host, port, endpoint)
CSS = """
#col {
background-color: #161624;
padding: 16px;
border-radius: 8px;
}
#nogaprow {
gap: 0px !important;
}
#nogapcol {
padding: 0px !important;
border: none !important;
box-shadow: none !important;
}
"""
"""
checks the user position on the queue and returns a message along with
whether the command should be executed.
"""
def authenticate_user(user_id):
if queue_system.current_user is None:
queue_system.start_session(user_id)
queue_system.enqueue_user(user_id)
position, wait_time = queue_system.get_queue_info(user_id)
if position == 0:
remaining_time_msg = f"Your turn!\nTime remaining: {wait_time:.2f} seconds."
return True, remaining_time_msg
elif position is not None:
if position == 1:
wait_msg = f"You are next!\nWait time: {wait_time:.2f} seconds."
else:
wait_msg = f"There are {position - 1} people ahead of you in the queue.\nWait time: {wait_time:.2f} seconds."
return False, wait_msg
else:
return False, "Error: You are not in the queue."
def enter_queue(user_id):
_, msg = authenticate_user(user_id)
return msg
def query_angles(user_id):
to_execute, queue_status_msg = authenticate_user(user_id)
if to_execute:
resp = client.get_angles()
resp["command"] = "query/angles"
return json.dumps(resp, indent=4), queue_status_msg
else:
return None, queue_status_msg
def query_coords(user_id):
to_execute, queue_status_msg = authenticate_user(user_id)
if to_execute:
resp = client.get_coords()
resp["command"] = "query/coords"
return json.dumps(resp, indent=4), queue_status_msg
else:
return None, queue_status_msg
def query_gripper(user_id):
to_execute, queue_status_msg = authenticate_user(user_id)
if to_execute:
resp = client.get_gripper_value()
resp["command"] = "query/gripper"
return json.dumps(resp, indent=4), queue_status_msg
else:
return None, queue_status_msg
def query_camera(user_id):
to_execute, queue_status_msg = authenticate_user(user_id)
if to_execute:
resp = client.get_camera()
resp["command"] = "query/camera"
if not resp["success"]:
return json.dumps(resp, indent=4), None, queue_status_msg
img = resp.pop("image")
return json.dumps(resp, indent=4), gr.Image(visible=True, value=img), queue_status_msg
else:
return None, None, queue_status_msg
def control_angles(user_id, angle0, angle1, angle2, angle3, angle4, angle5, movement_speed):
to_execute, queue_status_msg = authenticate_user(user_id)
if to_execute:
resp = client.send_angles([angle0, angle1, angle2, angle3, angle4, angle5], movement_speed)
resp["command"] = "control/angles"
return json.dumps(resp, indent=4), queue_status_msg
else:
return None, queue_status_msg
def control_coords(user_id, x, y, z, roll, pitch, yaw, movement_speed):
to_execute, queue_status_msg = authenticate_user(user_id)
if to_execute:
resp = client.send_coords([x, y, z, roll, pitch, yaw], movement_speed)
resp["command"] = "control/coords"
return json.dumps(resp, indent=4), queue_status_msg
else:
return None, queue_status_msg
def control_gripper(user_id, gripper_value, movement_speed):
to_execute, queue_status_msg = authenticate_user(user_id)
if to_execute:
resp = client.send_gripper_value(gripper_value, movement_speed)
resp["command"] = "control/gripper"
return json.dumps(resp, indent=4), queue_status_msg
else:
return None, queue_status_msg
def set_coords_to_current(user_id):
to_execute, queue_status_msg = authenticate_user(user_id)
if to_execute:
resp, _ = query_coords(user_id)
resp = json.loads(resp)
if not resp["success"]:
return None, None, None, None, None, None, queue_status_msg
return resp["coords"] + [queue_status_msg]
else:
return None, None, None, None, None, None, queue_status_msg
def set_angles_to_current(user_id):
to_execute, queue_status_msg = authenticate_user(user_id)
if to_execute:
resp, _ = query_angles(user_id)
resp = json.loads(resp)
if not resp["success"]:
return None, None, None, None, None, None, queue_status_msg
return resp["angles"] + [queue_status_msg]
else:
return None, None, None, None, None, None, queue_status_msg
def reset():
return 0, 0, 0, 0, 0, 0, 50
with gr.Blocks(css=CSS) as app:
gr.Markdown("# MyCobot 280pi MQTT Control Demo")
gr.HTML('''
<a href="https://colab.research.google.com/github/AccelerationConsortium/ac-training-lab/blob/cobot-usage-docs/src/ac_training_lab/cobot280pi/gradio-client-demo.ipynb" target="_blank">
<svg xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink" width="117" height="20"><linearGradient id="b" x2="0" y2="100%"><stop offset="0" stop-color="#bbb" stop-opacity=".1"/><stop offset="1" stop-opacity=".1"/></linearGradient><clipPath id="a"><rect width="117" height="20" rx="3" fill="#fff"/></clipPath><g clip-path="url(#a)"><path fill="#555" d="M0 0h30v20H0z"/><path fill="#007ec6" d="M30 0h87v20H30z"/><path fill="url(#b)" d="M0 0h117v20H0z"/></g><g fill="#fff" text-anchor="middle" font-family="DejaVu Sans,Verdana,Geneva,sans-serif" font-size="110"><svg x="4px" y="0px" width="22px" height="20px" viewBox="-2 0 28 24" style="background-color: #fff;border-radius: 1px;"><path style="fill:#e8710a;" d="M1.977,16.77c-2.667-2.277-2.605-7.079,0-9.357C2.919,8.057,3.522,9.075,4.49,9.691c-1.152,1.6-1.146,3.201-0.004,4.803C3.522,15.111,2.918,16.126,1.977,16.77z"/><path style="fill:#f9ab00;" d="M12.257,17.114c-1.767-1.633-2.485-3.658-2.118-6.02c0.451-2.91,2.139-4.893,4.946-5.678c2.565-0.718,4.964-0.217,6.878,1.819c-0.884,0.743-1.707,1.547-2.434,2.446C18.488,8.827,17.319,8.435,16,8.856c-2.404,0.767-3.046,3.241-1.494,5.644c-0.241,0.275-0.493,0.541-0.721,0.826C13.295,15.939,12.511,16.3,12.257,17.114z"/><path style="fill:#e8710a;" d="M19.529,9.682c0.727-0.899,1.55-1.703,2.434-2.446c2.703,2.783,2.701,7.031-0.005,9.764c-2.648,2.674-6.936,2.725-9.701,0.115c0.254-0.814,1.038-1.175,1.528-1.788c0.228-0.285,0.48-0.552,0.721-0.826c1.053,0.916,2.254,1.268,3.6,0.83C20.502,14.551,21.151,11.927,19.529,9.682z"/><path style="fill:#f9ab00;" d="M4.49,9.691C3.522,9.075,2.919,8.057,1.977,7.413c2.209-2.398,5.721-2.942,8.476-1.355c0.555,0.32,0.719,0.606,0.285,1.128c-0.157,0.188-0.258,0.422-0.391,0.631c-0.299,0.47-0.509,1.067-0.929,1.371C8.933,9.539,8.523,8.847,8.021,8.746C6.673,8.475,5.509,8.787,4.49,9.691z"/><path style="fill:#f9ab00;" d="M1.977,16.77c0.941-0.644,1.545-1.659,2.509-2.277c1.373,1.152,2.85,1.433,4.45,0.499c0.332-0.194,0.503-0.088,0.673,0.19c0.386,0.635,0.753,1.285,1.181,1.89c0.34,0.48,0.222,0.715-0.253,1.006C7.84,19.73,4.205,19.188,1.977,16.77z"/></svg><text x="245" y="140" transform="scale(.1)" textLength="30"> </text><text x="725" y="150" fill="#010101" fill-opacity=".3" transform="scale(.1)" textLength="770">Open in Colab</text><text x="725" y="140" transform="scale(.1)" textLength="770">Open in Colab</text></g> </svg>
</a>
''')
gr.Markdown("This is a demo that uses the MQTT protocol to communicate with the MyCobot 280pi over the internet. You can remotely send commands to the cobot and query it's current status.")
with gr.Row():
with gr.Column():
user_id = gr.Textbox(label="User ID", info="Enter a unique user id of your choice or take note of the automatically generated user id. This user id will be placed into a queue to give you access to the cobot, so make sure you remember it!")
enter_queue_button = gr.Button("Join queue")
with gr.Column():
status_text = gr.Textbox(label="Queue status", value="", interactive=False, lines=5)
with gr.Row():
# QUERY PANEL
with gr.Column(elem_id="col"):
gr.Markdown("## Query")
gr.Markdown("Use buttons on this panel to query the current status of the cobot, including information like joint angles, coordinates, gripper state and what the onboard camera sees.")
angle_query_button = gr.Button("Query Angles")
coord_query_button = gr.Button("Query Coordinates")
gripper_query_button = gr.Button("Query Gripper state")
camera_query_button = gr.Button("Query Camera")
# GRIPPER PANEL
with gr.Column(elem_id="col"):
gr.Markdown("## Gripper Control")
gr.Markdown("Use this panel to control the gripper of the cobot.")
gripper_value = gr.Slider(minimum=0.0, maximum=100.0, step=1.0, label="Gripper value")
speed_gripper = gr.Slider(value=50.0, minimum=0.0, maximum=100.0, step=1.0, label="Movement speed")
gripper_control_button = gr.Button("Send gripper command")
with gr.Row():
# ANGLE PANEL
with gr.Column(elem_id="col"):
gr.Markdown("## Angle Control")
gr.Markdown("Use this panel to control the joint angles of the cobot. Each angle corresponds to one of the 6 joints on the cobot.")
angle_set_button = gr.Button("Set to current angles")
reset_angle_button = gr.Button("Reset angles")
with gr.Row(elem_id="nogaprow"):
with gr.Column(elem_id="nogapcol"):
angle1 = gr.Slider(value=0.0, label="Angle 1", step=1.0, minimum=-168, maximum=168)
angle3 = gr.Slider(value=0.0, label="Angle 3", step=1.0, minimum=-135, maximum=135)
angle5 = gr.Slider(value=0.0, label="Angle 5", step=1.0, minimum=-150, maximum=150)
with gr.Column(elem_id="nogapcol"):
angle2 = gr.Slider(value=0.0, label="Angle 2", step=1.0, minimum=-145, maximum=145)
angle4 = gr.Slider(value=0.0, label="Angle 4", step=1.0, minimum=-165, maximum=165)
angle6 = gr.Slider(value=0.0, label="Angle 6", step=1.0, minimum=-180, maximum=180)
speed_angles = gr.Slider(value=50.0, minimum=0.0, maximum=100.0, step=1.0, label="Movement speed")
angle_control_button = gr.Button("Send angle command")
# COORD PANEL
with gr.Column(elem_id="col"):
gr.Markdown("## Coordinate Control")
gr.Markdown("Use this panel to control the joint coordinates of the cobot head. The angles are in [6-DoF format](https://en.wikipedia.org/wiki/Six_degrees_of_freedom).")
coord_set_button = gr.Button("Set to current coords")
reset_coords_button = gr.Button("Reset coordinates")
with gr.Row(elem_id="nogaprow"):
with gr.Column(elem_id="nogapcol"):
xcoord = gr.Slider(value=0.0, label="X coordinate", step=1.0, minimum=-350, maximum=350)
ycoord = gr.Slider(value=0.0, label="Y coordinate", step=1.0, minimum=-350, maximum=350)
zcoord = gr.Slider(value=0.0, label="Z coordinate", step=1.0, minimum=-70, maximum=523)
with gr.Column(elem_id="nogapcol"):
roll = gr.Slider(value=0.0, label="Roll", step=1.0, minimum=-180, maximum=180)
pitch = gr.Slider(value=0.0, label="Pitch", step=1.0, minimum=-180, maximum=180)
yaw = gr.Slider(value=0.0, label="Yaw", step=1.0, minimum=-180, maximum=180)
speed_coords = gr.Slider(value=50.0, minimum=0.0, maximum=100.0, step=1.0, label="Movement speed")
coord_control_button = gr.Button("Send coordinate command")
response = gr.Textbox(label="Response")
response_image = gr.Image(visible=False)
# Queue-aware event handling
angle_query_button.click(
query_angles,
inputs = [user_id],
outputs = [response, status_text]
)
coord_query_button.click(
query_coords,
inputs = [user_id],
outputs = [response, status_text]
)
gripper_query_button.click(
query_gripper,
inputs = [user_id],
outputs = [response, status_text]
)
camera_query_button.click(
query_camera,
inputs = [user_id],
outputs = [response, response_image, status_text]
)
gripper_control_button.click(
control_gripper,
inputs = [user_id, gripper_value, speed_gripper],
outputs = [response, status_text]
)
angle_control_button.click(
control_angles,
inputs = [user_id, angle1, angle2, angle3, angle4, angle5, angle6, speed_angles],
outputs = [response, status_text]
)
coord_control_button.click(
control_coords,
inputs = [user_id, xcoord, ycoord, zcoord, roll, pitch, yaw, speed_coords],
outputs = [response, status_text]
)
coord_set_button.click(
set_coords_to_current,
inputs = [user_id],
outputs = [xcoord, ycoord, zcoord, roll, pitch, yaw, status_text]
)
angle_set_button.click(
set_angles_to_current,
inputs = [user_id],
outputs = [angle1, angle2, angle3, angle4, angle5, angle6, status_text]
)
reset_angle_button.click(
reset,
outputs = [angle1, angle2, angle3, angle4, angle5, angle6, speed_angles]
)
reset_coords_button.click(
reset,
outputs = [xcoord, ycoord, zcoord, roll, pitch, yaw, speed_coords]
)
enter_queue_button.click(
enter_queue,
inputs = [user_id],
outputs = [status_text]
)
app.load(
namesgenerator.get_random_name,
outputs=[user_id]
)
app.load(
queue_size,
outputs=[status_text]
)
app.queue(default_concurrency_limit=1, max_size=100)
app.launch()