gursi26's picture
update 2, app complete
bf0b4b8
raw
history blame
10.3 kB
import gradio as gr
from client import CobotController
from utils import get_credentials
import json
import threading
import time
from collections import deque
import namesgenerator
# Queue system setup
SESSION_TIME = 120
class QueueSystem:
def __init__(self):
self.queue = deque()
self.current_user = None
self.session_start_time = None
self.lock = threading.Lock()
def enqueue_user(self, user_id):
with self.lock:
if user_id not in self.queue and user_id != self.current_user:
self.queue.append(user_id)
def dequeue_user(self):
with self.lock:
if self.queue:
return self.queue.popleft()
return None
def get_queue_info(self, user_id):
with self.lock:
if user_id == self.current_user:
remaining_time = max(0, SESSION_TIME - (time.time() - self.session_start_time))
return 0, remaining_time
elif user_id in self.queue:
position = list(self.queue).index(user_id) + 1
if self.session_start_time:
wait_time = (position - 1) * SESSION_TIME + max(0, SESSION_TIME - (time.time() - self.session_start_time))
else:
wait_time = position * SESSION_TIME
return position, wait_time
else:
return None, None
def start_session(self, user_id):
with self.lock:
if self.current_user is None:
self.current_user = user_id
self.session_start_time = time.time()
return True
return False
def end_session(self):
with self.lock:
if self.current_user and time.time() - self.session_start_time >= SESSION_TIME:
self.current_user = None
self.session_start_time = None
return True
return False
queue_system = QueueSystem()
# Background timer thread to end session after SESSION_TIME
def background_timer():
while True:
time.sleep(1)
if queue_system.end_session():
next_user = queue_system.dequeue_user()
if next_user:
queue_system.start_session(next_user)
timer_thread = threading.Thread(target=background_timer, daemon=True)
timer_thread.start()
user, pwd, host, endpoint, port = get_credentials(False)
client = CobotController(user, pwd, host, port, endpoint)
CSS = """
#col {
background-color: #161624;
padding: 16px;
border-radius: 8px;
}
#nogaprow {
gap: 0px !important;
}
#nogapcol {
padding: 0px !important;
border: none !important;
box-shadow: none !important;
}
"""
def handle_request(
command, gripper_value, gripper_speed, angle1, angle2, angle3, angle4,
angle5, angle6, angle_speed, xcoord, ycoord, zcoord, roll, pitch, yaw, coord_speed,
):
if command == "query/angles":
resp = client.get_angles()
resp["command"] = command
return json.dumps(resp, indent=4), gr.Image(visible=False)
elif command == "query/coords":
resp = client.get_coords()
resp["command"] = command
return json.dumps(resp, indent=4), gr.Image(visible=False)
elif command == "query/camera":
resp = client.get_camera()
resp["command"] = command
if not resp["success"]:
return json.dumps(resp, indent=4)
img = resp.pop("image")
return json.dumps(resp, indent=4), gr.Image(visible=True, value=img)
elif command == "query/gripper":
resp = client.get_gripper_value()
resp["command"] = command
return json.dumps(resp, indent=4), gr.Image(visible=False)
elif command == "control/angles":
resp = client.send_angles(
angle_list = [angle1, angle2, angle3, angle4, angle5, angle6],
speed = angle_speed
)
resp["command"] = command
return json.dumps(resp, indent=4), gr.Image(visible=False)
elif command == "control/gripper":
resp = client.send_gripper_value(
value = gripper_value,
speed = gripper_speed
)
resp["command"] = command
return json.dumps(resp, indent=4), gr.Image(visible=False)
elif command == "control/coords":
resp = client.send_coords(
coord_list = [xcoord, ycoord, zcoord, roll, pitch, yaw],
speed = coord_speed
)
resp["command"] = command
return json.dumps(resp, indent=4), gr.Image(visible=False)
def process_user_auth(command, user_id, gripper_value, gripper_speed, angle1, angle2, angle3, angle4,
angle5, angle6, angle_speed, xcoord, ycoord, zcoord, roll, pitch, yaw, coord_speed):
if queue_system.current_user is None:
queue_system.start_session(user_id)
queue_system.enqueue_user(user_id)
position, wait_time = queue_system.get_queue_info(user_id)
if position == 0:
result, response_image = handle_request(
command, gripper_value, gripper_speed, angle1, angle2, angle3, angle4,
angle5, angle6, angle_speed, xcoord, ycoord, zcoord, roll, pitch, yaw, coord_speed
)
remaining_time_msg = f"Your turn!\nTime remaining: {wait_time:.2f} seconds."
return result, response_image, remaining_time_msg
elif position is not None:
if position == 1:
wait_msg = f"You are next!\nWait time: {wait_time:.2f} seconds."
else:
wait_msg = f"There are {position - 1} people ahead of you in the queue.\nWait time: {wait_time:.2f} seconds."
return None, None, wait_msg
else:
return None, None, "Error: You are not in the queue."
with gr.Blocks(css=CSS) as app:
gr.Markdown("# MyCobot 280pi MQTT Control Demo")
gr.Markdown("This is a demo that uses the MQTT protocol to communicate with the MyCobot 280pi over the internet. You can remotely send commands to the cobot and query it's current status.")
with gr.Row():
with gr.Column():
user_id = gr.Textbox(label="User ID", info="Enter a unique user id of your choice or take note of the current one. This user id will be placed into a queue to give you access to the cobot, so make sure you remember it!")
with gr.Column():
status_text = gr.Textbox(label="Queue status", value="", interactive=False, max_lines=2)
with gr.Row():
# QUERY PANEL
with gr.Column(elem_id="col"):
gr.Markdown("## Query")
angle_query_button = gr.Button("Query Angles")
coord_query_button = gr.Button("Query Coordinates")
gripper_query_button = gr.Button("Query Gripper state")
camera_query_button = gr.Button("Query Camera")
# GRIPPER PANEL
with gr.Column(elem_id="col"):
gr.Markdown("## Gripper Control")
gripper_value = gr.Slider(minimum=0.0, maximum=100.0, step=1.0, label="Gripper value")
speed_gripper = gr.Slider(value=50.0, minimum=0.0, maximum=100.0, step=1.0, label="Movement speed")
gripper_control_button = gr.Button("Send gripper command")
with gr.Row():
# ANGLE PANEL
with gr.Column(elem_id="col"):
gr.Markdown("## Angle Control")
with gr.Row(elem_id="nogaprow"):
with gr.Column(elem_id="nogapcol"):
angle1 = gr.Number(value=0.0, label="Angle 1", step=5.0)
angle3 = gr.Number(value=0.0, label="Angle 3", step=5.0)
angle5 = gr.Number(value=0.0, label="Angle 5", step=5.0)
with gr.Column(elem_id="nogapcol"):
angle2 = gr.Number(value=0.0, label="Angle 2", step=5.0)
angle4 = gr.Number(value=0.0, label="Angle 4", step=5.0)
angle6 = gr.Number(value=0.0, label="Angle 6", step=5.0)
speed_angles = gr.Slider(value=50.0, minimum=0.0, maximum=100.0, step=1.0, label="Movement speed")
angle_control_button = gr.Button("Send angle command")
# COORD PANEL
with gr.Column(elem_id="col"):
gr.Markdown("## Coordinate Control")
with gr.Row(elem_id="nogaprow"):
with gr.Column(elem_id="nogapcol"):
xcoord = gr.Number(value=0.0, label="X coordinate", step=5.0)
ycoord = gr.Number(value=0.0, label="Y coordinate", step=5.0)
zcoord = gr.Number(value=0.0, label="Z coordinate", step=5.0)
with gr.Column(elem_id="nogapcol"):
roll = gr.Number(value=0.0, label="Roll", step=5.0)
pitch = gr.Number(value=0.0, label="Pitch", step=5.0)
yaw = gr.Number(value=0.0, label="Yaw", step=5.0)
speed_coords = gr.Slider(value=50.0, minimum=0.0, maximum=100.0, step=1.0, label="Movement speed")
coord_control_button = gr.Button("Send coordinate command")
response = gr.Textbox(label="Response")
response_image = gr.Image(visible=False)
# Queue-aware event handling
angle_query_button.click(
process_user_auth,
inputs = [gr.Textbox(value="query/angles", visible=False), user_id, gripper_value, speed_gripper, angle1, angle2, angle3, angle4,
angle5, angle6, speed_angles, xcoord, ycoord, zcoord, roll, pitch, yaw, speed_coords],
outputs = [response, response_image, status_text]
)
coord_query_button.click(
process_user_auth,
inputs = [gr.Textbox(value="query/coords", visible=False), user_id, gripper_value, speed_gripper, angle1, angle2, angle3, angle4,
angle5, angle6, speed_angles, xcoord, ycoord, zcoord, roll, pitch, yaw, speed_coords],
outputs = [response, response_image, status_text]
)
gripper_query_button.click(
process_user_auth,
inputs = [gr.Textbox(value="query/gripper", visible=False), user_id, gripper_value, speed_gripper, angle1, angle2, angle3, angle4,
angle5, angle6, speed_angles, xcoord, ycoord, zcoord, roll, pitch, yaw, speed_coords],
outputs = [response, response_image, status_text]
)
camera_query_button.click(
process_user_auth,
inputs = [gr.Textbox(value="query/camera", visible=False), user_id, gripper_value, speed_gripper, angle1, angle2, angle3, angle4,
angle5, angle6, speed_angles, xcoord, ycoord, zcoord, roll, pitch, yaw, speed_coords],
outputs = [response, response_image, status_text]
)
gripper_control_button.click(
process_user_auth,
inputs = [gr.Textbox(value="control/gripper", visible=False), user_id, gripper_value, speed_gripper, angle1, angle2, angle3, angle4,
angle5, angle6, speed_angles, xcoord, ycoord, zcoord, roll, pitch, yaw, speed_coords],
outputs = [response, response_image, status_text]
)
angle_control_button.click(
process_user_auth,
inputs = [gr.Textbox(value="control/angles", visible=False), user_id, gripper_value, speed_gripper, angle1, angle2, angle3, angle4,
angle5, angle6, speed_angles, xcoord, ycoord, zcoord, roll, pitch, yaw, speed_coords],
outputs = [response, response_image, status_text]
)
coord_control_button.click(
process_user_auth,
inputs = [gr.Textbox(value="control/coords", visible=False), user_id, gripper_value, speed_gripper, angle1, angle2, angle3, angle4,
angle5, angle6, speed_angles, xcoord, ycoord, zcoord, roll, pitch, yaw, speed_coords],
outputs = [response, response_image, status_text]
)
app.load(
namesgenerator.get_random_name,
outputs=[user_id]
)
app.queue(default_concurrency_limit=1, max_size=100)
app.launch()