# installed packages from PIL import Image import paho.mqtt.client as paho # base python packages import json, io, base64 from queue import Queue from datetime import datetime import uuid def print_with_timestamp(message): print(f"[{datetime.now().strftime('%b %d, %H:%M:%S')}] {message}") class CobotController: user_id = str(uuid.uuid4()) def __init__( self, hive_mq_username: str, hive_mq_password: str, hive_mq_cloud: str, port: int, device_endpoint: str, user_id: str = None ): # setup client and response queues self.client = paho.Client(client_id="", userdata=None, protocol=paho.MQTTv5) self.client.tls_set() self.client.username_pw_set(hive_mq_username, hive_mq_password) self.client.connect(hive_mq_cloud, port) self.response_queue = Queue() def on_message(client, userdata, msg): payload_dict = json.loads(msg.payload) self.response_queue.put(payload_dict) def on_connect(client, userdata, flags, rc, properties=None): print_with_timestamp("Connected to HiveMQ broker...") self.client.on_connect = on_connect self.client.on_message = on_message self.client.loop_start() # initialize user id and endpoints if user_id is not None: CobotController.user_id = user_id self.user_id = CobotController.user_id self.device_endpoint = device_endpoint self.init_endpoint = self.device_endpoint + "/init" self.publish_endpoint = self.device_endpoint + "/" + self.user_id self.incoming_endpoint = self.publish_endpoint + "/response" self.client.subscribe(self.incoming_endpoint, qos=2) connected = self.check_connection_status() if connected: return # send an init request print_with_timestamp("Sending a connection request...") pub_handle = self.client.publish( self.init_endpoint, payload=json.dumps({"id": self.user_id}), qos=2 ) pub_handle.wait_for_publish() # get a response for the init message, if no response, have to wait for current users time to end print_with_timestamp("Waiting for cobot access...") prev_pos = None while True: try: payload = self.response_queue.get(timeout=10) if payload["status"] == "ready": self.client.publish( self.publish_endpoint, payload=json.dumps({"yeehaw": []}), qos=2 ) print_with_timestamp("Connected to server successfully.") break except Exception as e: resp = self.handle_publish_and_response( payload=json.dumps({"id": self.user_id}), custom_endpoint=self.device_endpoint + "/queuequery" ) if "queue_pos" not in resp: break pos = resp["queue_pos"] if prev_pos == None: prev_pos = pos elif prev_pos == pos: continue prev_pos = pos print_with_timestamp(f"Waiting for cobot access. There are {pos - 1} users ahead of you...") def check_connection_status(self): self.client.publish( self.publish_endpoint, payload=json.dumps({"command":"query/angles", "args": {}}), qos=2 ) try: # if we recieve any response, it means the server is currently servicing our requests _ = self.response_queue.get(timeout=5) return True except Exception as _: return False def handle_publish_and_response(self, payload, custom_endpoint=None): if custom_endpoint is None: self.client.publish(self.publish_endpoint, payload=payload, qos=2) else: self.client.publish(custom_endpoint, payload=payload, qos=2) return self.response_queue.get(block=True) def send_angles( self, angle_list: list[float] = [0.0] * 6, speed: int = 50 ): payload = json.dumps({"command": "control/angles", "args": {"angles": angle_list, "speed": speed}}) return self.handle_publish_and_response(payload) def send_coords( self, coord_list: list[float] = [0.0] * 6, speed: int = 50 ): payload = json.dumps({"command": "control/coords", "args": {"coords": coord_list, "speed": speed}}) return self.handle_publish_and_response(payload) def send_gripper_value( self, value: int = 100, speed: int = 50 ): payload = json.dumps({"command": "control/gripper", "args": {"gripper_value": value, "speed": speed}}) return self.handle_publish_and_response(payload) def get_angles(self): payload = json.dumps({"command": "query/angles", "args": {}}) return self.handle_publish_and_response(payload) def get_coords(self): payload = json.dumps({"command": "query/coords", "args": {}}) return self.handle_publish_and_response(payload) def get_gripper_value(self): payload = json.dumps({"command": "query/gripper", "args": {}}) return self.handle_publish_and_response(payload) def get_camera(self, quality=100, save_path=None): payload = json.dumps({"command": "query/camera", "args": {"quality": quality}}) response = self.handle_publish_and_response(payload) if not response["success"]: return response b64_bytes = base64.b64decode(response["image"]) img_bytes = io.BytesIO(b64_bytes) img = Image.open(img_bytes) response["image"] = img if save_path is not None: img.save(save_path) return response