|
|
|
from PIL import Image |
|
import paho.mqtt.client as paho |
|
|
|
|
|
import json, io, base64 |
|
from queue import Queue |
|
from datetime import datetime |
|
import uuid |
|
|
|
def print_with_timestamp(message): |
|
print(f"[{datetime.now().strftime('%b %d, %H:%M:%S')}] {message}") |
|
|
|
class CobotController: |
|
user_id = str(uuid.uuid4()) |
|
|
|
def __init__( |
|
self, |
|
hive_mq_username: str, |
|
hive_mq_password: str, |
|
hive_mq_cloud: str, |
|
port: int, |
|
device_endpoint: str, |
|
user_id: str = None |
|
): |
|
|
|
self.client = paho.Client(client_id="", userdata=None, protocol=paho.MQTTv5) |
|
self.client.tls_set() |
|
self.client.username_pw_set(hive_mq_username, hive_mq_password) |
|
self.client.connect(hive_mq_cloud, port) |
|
|
|
self.response_queue = Queue() |
|
def on_message(client, userdata, msg): |
|
payload_dict = json.loads(msg.payload) |
|
self.response_queue.put(payload_dict) |
|
|
|
def on_connect(client, userdata, flags, rc, properties=None): |
|
print_with_timestamp("Connected to HiveMQ broker...") |
|
|
|
self.client.on_connect = on_connect |
|
self.client.on_message = on_message |
|
self.client.loop_start() |
|
|
|
|
|
if user_id is not None: |
|
CobotController.user_id = user_id |
|
self.user_id = CobotController.user_id |
|
|
|
self.device_endpoint = device_endpoint |
|
self.init_endpoint = self.device_endpoint + "/init" |
|
self.publish_endpoint = self.device_endpoint + "/" + self.user_id |
|
self.incoming_endpoint = self.publish_endpoint + "/response" |
|
self.client.subscribe(self.incoming_endpoint, qos=2) |
|
|
|
connected = self.check_connection_status() |
|
if connected: |
|
return |
|
|
|
|
|
print_with_timestamp("Sending a connection request...") |
|
pub_handle = self.client.publish( |
|
self.init_endpoint, |
|
payload=json.dumps({"id": self.user_id}), |
|
qos=2 |
|
) |
|
pub_handle.wait_for_publish() |
|
|
|
|
|
print_with_timestamp("Waiting for cobot access...") |
|
prev_pos = None |
|
while True: |
|
try: |
|
payload = self.response_queue.get(timeout=10) |
|
if payload["status"] == "ready": |
|
self.client.publish( |
|
self.publish_endpoint, |
|
payload=json.dumps({"yeehaw": []}), |
|
qos=2 |
|
) |
|
print_with_timestamp("Connected to server successfully.") |
|
break |
|
|
|
except Exception as e: |
|
resp = self.handle_publish_and_response( |
|
payload=json.dumps({"id": self.user_id}), |
|
custom_endpoint=self.device_endpoint + "/queuequery" |
|
) |
|
if "queue_pos" not in resp: |
|
break |
|
pos = resp["queue_pos"] |
|
if prev_pos == None: |
|
prev_pos = pos |
|
elif prev_pos == pos: |
|
continue |
|
prev_pos = pos |
|
print_with_timestamp(f"Waiting for cobot access. There are {pos - 1} users ahead of you...") |
|
|
|
def check_connection_status(self): |
|
self.client.publish( |
|
self.publish_endpoint, |
|
payload=json.dumps({"command":"query/angles", "args": {}}), |
|
qos=2 |
|
) |
|
try: |
|
_ = self.response_queue.get(timeout=5) |
|
return True |
|
except Exception as _: |
|
return False |
|
|
|
def handle_publish_and_response(self, payload, custom_endpoint=None): |
|
if custom_endpoint is None: |
|
self.client.publish(self.publish_endpoint, payload=payload, qos=2) |
|
else: |
|
self.client.publish(custom_endpoint, payload=payload, qos=2) |
|
return self.response_queue.get(block=True) |
|
|
|
def send_angles( |
|
self, |
|
angle_list: list[float] = [0.0] * 6, |
|
speed: int = 50 |
|
): |
|
payload = json.dumps({"command": "control/angles", |
|
"args": {"angles": angle_list, "speed": speed}}) |
|
return self.handle_publish_and_response(payload) |
|
|
|
def send_coords( |
|
self, |
|
coord_list: list[float] = [0.0] * 6, |
|
speed: int = 50 |
|
): |
|
payload = json.dumps({"command": "control/coords", |
|
"args": {"coords": coord_list, "speed": speed}}) |
|
return self.handle_publish_and_response(payload) |
|
|
|
def send_gripper_value( |
|
self, |
|
value: int = 100, |
|
speed: int = 50 |
|
): |
|
payload = json.dumps({"command": "control/gripper", |
|
"args": {"gripper_value": value, "speed": speed}}) |
|
return self.handle_publish_and_response(payload) |
|
|
|
def get_angles(self): |
|
payload = json.dumps({"command": "query/angles", "args": {}}) |
|
return self.handle_publish_and_response(payload) |
|
|
|
def get_coords(self): |
|
payload = json.dumps({"command": "query/coords", "args": {}}) |
|
return self.handle_publish_and_response(payload) |
|
|
|
def get_gripper_value(self): |
|
payload = json.dumps({"command": "query/gripper", "args": {}}) |
|
return self.handle_publish_and_response(payload) |
|
|
|
def get_camera(self, quality=100, save_path=None): |
|
payload = json.dumps({"command": "query/camera", "args": {"quality": quality}}) |
|
response = self.handle_publish_and_response(payload) |
|
if not response["success"]: |
|
return response |
|
|
|
b64_bytes = base64.b64decode(response["image"]) |
|
img_bytes = io.BytesIO(b64_bytes) |
|
img = Image.open(img_bytes) |
|
|
|
response["image"] = img |
|
if save_path is not None: |
|
img.save(save_path) |
|
|
|
return response |
|
|
|
if __name__ == "__main__": |
|
from my_secrets import * |
|
|
|
cobot = CobotController( |
|
HIVEMQ_USERNAME, |
|
HIVEMQ_PASSWORD, |
|
HIVEMQ_HOST, |
|
DEVICE_PORT, |
|
DEVICE_ENDPOINT, |
|
) |