Spaces:
Sleeping
Sleeping
# installed packages | |
from PIL import Image | |
import paho.mqtt.client as paho | |
# base python packages | |
import json, io, base64 | |
from queue import Queue | |
from datetime import datetime | |
import uuid | |
def print_with_timestamp(message): | |
print(f"[{datetime.now().strftime('%b %d, %H:%M:%S')}] {message}") | |
class CobotController: | |
user_id = str(uuid.uuid4()) | |
def __init__( | |
self, | |
hive_mq_username: str, | |
hive_mq_password: str, | |
hive_mq_cloud: str, | |
port: int, | |
device_endpoint: str, | |
user_id: str = None | |
): | |
# setup client and response queues | |
self.client = paho.Client(client_id="", userdata=None, protocol=paho.MQTTv5) | |
self.client.tls_set() | |
self.client.username_pw_set(hive_mq_username, hive_mq_password) | |
self.client.connect(hive_mq_cloud, port) | |
self.response_queue = Queue() | |
def on_message(client, userdata, msg): | |
payload_dict = json.loads(msg.payload) | |
self.response_queue.put(payload_dict) | |
def on_connect(client, userdata, flags, rc, properties=None): | |
print_with_timestamp("Connected to HiveMQ broker...") | |
self.client.on_connect = on_connect | |
self.client.on_message = on_message | |
self.client.loop_start() | |
# initialize user id and endpoints | |
if user_id is not None: | |
CobotController.user_id = user_id | |
self.user_id = CobotController.user_id | |
self.device_endpoint = device_endpoint | |
self.init_endpoint = self.device_endpoint + "/init" | |
self.publish_endpoint = self.device_endpoint + "/" + self.user_id | |
self.incoming_endpoint = self.publish_endpoint + "/response" | |
self.client.subscribe(self.incoming_endpoint, qos=2) | |
connected = self.check_connection_status() | |
if connected: | |
return | |
# send an init request | |
print_with_timestamp("Sending a connection request...") | |
pub_handle = self.client.publish( | |
self.init_endpoint, | |
payload=json.dumps({"id": self.user_id}), | |
qos=2 | |
) | |
pub_handle.wait_for_publish() | |
# get a response for the init message, if no response, have to wait for current users time to end | |
print_with_timestamp("Waiting for cobot access...") | |
prev_pos = None | |
while True: | |
try: | |
payload = self.response_queue.get(timeout=10) | |
if payload["status"] == "ready": | |
self.client.publish( | |
self.publish_endpoint, | |
payload=json.dumps({"yeehaw": []}), | |
qos=2 | |
) | |
print_with_timestamp("Connected to server successfully.") | |
break | |
except Exception as e: | |
resp = self.handle_publish_and_response( | |
payload=json.dumps({"id": self.user_id}), | |
custom_endpoint=self.device_endpoint + "/queuequery" | |
) | |
if "queue_pos" not in resp: | |
break | |
pos = resp["queue_pos"] | |
if prev_pos == None: | |
prev_pos = pos | |
elif prev_pos == pos: | |
continue | |
prev_pos = pos | |
print_with_timestamp(f"Waiting for cobot access. There are {pos - 1} users ahead of you...") | |
def check_connection_status(self): | |
self.client.publish( | |
self.publish_endpoint, | |
payload=json.dumps({"command":"query/angles", "args": {}}), | |
qos=2 | |
) | |
try: # if we recieve any response, it means the server is currently servicing our requests | |
_ = self.response_queue.get(timeout=5) | |
return True | |
except Exception as _: | |
return False | |
def handle_publish_and_response(self, payload, custom_endpoint=None): | |
if custom_endpoint is None: | |
self.client.publish(self.publish_endpoint, payload=payload, qos=2) | |
else: | |
self.client.publish(custom_endpoint, payload=payload, qos=2) | |
return self.response_queue.get(block=True) | |
def send_angles( | |
self, | |
angle_list: list[float] = [0.0] * 6, | |
speed: int = 50 | |
): | |
payload = json.dumps({"command": "control/angles", | |
"args": {"angles": angle_list, "speed": speed}}) | |
return self.handle_publish_and_response(payload) | |
def send_coords( | |
self, | |
coord_list: list[float] = [0.0] * 6, | |
speed: int = 50 | |
): | |
payload = json.dumps({"command": "control/coords", | |
"args": {"coords": coord_list, "speed": speed}}) | |
return self.handle_publish_and_response(payload) | |
def send_gripper_value( | |
self, | |
value: int = 100, | |
speed: int = 50 | |
): | |
payload = json.dumps({"command": "control/gripper", | |
"args": {"gripper_value": value, "speed": speed}}) | |
return self.handle_publish_and_response(payload) | |
def get_angles(self): | |
payload = json.dumps({"command": "query/angles", "args": {}}) | |
return self.handle_publish_and_response(payload) | |
def get_coords(self): | |
payload = json.dumps({"command": "query/coords", "args": {}}) | |
return self.handle_publish_and_response(payload) | |
def get_gripper_value(self): | |
payload = json.dumps({"command": "query/gripper", "args": {}}) | |
return self.handle_publish_and_response(payload) | |
def get_camera(self, quality=100, save_path=None): | |
payload = json.dumps({"command": "query/camera", "args": {"quality": quality}}) | |
response = self.handle_publish_and_response(payload) | |
if not response["success"]: | |
return response | |
b64_bytes = base64.b64decode(response["image"]) | |
img_bytes = io.BytesIO(b64_bytes) | |
img = Image.open(img_bytes) | |
response["image"] = img | |
if save_path is not None: | |
img.save(save_path) | |
return response |