Spaces:
Sleeping
Sleeping
File size: 5,058 Bytes
2554ae8 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 |
# installed packages
from PIL import Image
import paho.mqtt.client as paho
# base python packages
import json, io, base64
from queue import Queue
from datetime import datetime
import uuid
def print_with_timestamp(message):
print(f"[{datetime.now().strftime('%b %d, %H:%M:%S')}] {message}")
class CobotController:
user_id = str(uuid.uuid4())
def __init__(
self,
hive_mq_username: str,
hive_mq_password: str,
hive_mq_cloud: str,
port: int,
device_endpoint: str,
user_id: str = None
):
# setup client and response queues
self.client = paho.Client(client_id="", userdata=None, protocol=paho.MQTTv5)
self.client.tls_set()
self.client.username_pw_set(hive_mq_username, hive_mq_password)
self.client.connect(hive_mq_cloud, port)
self.response_queue = Queue()
def on_message(client, userdata, msg):
payload_dict = json.loads(msg.payload)
self.response_queue.put(payload_dict)
def on_connect(client, userdata, flags, rc, properties=None):
print_with_timestamp("Connected to HiveMQ broker...")
self.client.on_connect = on_connect
self.client.on_message = on_message
self.client.loop_start()
# initialize user id and endpoints
if user_id is not None:
CobotController.user_id = user_id
self.user_id = CobotController.user_id
self.device_endpoint = device_endpoint
self.init_endpoint = self.device_endpoint + "/init"
self.publish_endpoint = self.device_endpoint + "/" + self.user_id
self.incoming_endpoint = self.publish_endpoint + "/response"
self.client.subscribe(self.incoming_endpoint, qos=2)
connected = self.check_connection_status()
if connected:
return
# send an init request
print_with_timestamp("Sending a connection request...")
pub_handle = self.client.publish(
self.init_endpoint,
payload=json.dumps({"id": self.user_id}),
qos=2
)
pub_handle.wait_for_publish()
# get a response for the init message, if no response, have to wait for current users time to end
print_with_timestamp("Waiting for cobot access...")
prev_pos = None
while True:
try:
payload = self.response_queue.get(timeout=10)
if payload["status"] == "ready":
self.client.publish(
self.publish_endpoint,
payload=json.dumps({"yeehaw": []}),
qos=2
)
print_with_timestamp("Connected to server successfully.")
break
except Exception as e:
resp = self.handle_publish_and_response(
payload=json.dumps({"id": self.user_id}),
custom_endpoint=self.device_endpoint + "/queuequery"
)
if "queue_pos" not in resp:
break
pos = resp["queue_pos"]
if prev_pos == None:
prev_pos = pos
elif prev_pos == pos:
continue
prev_pos = pos
print_with_timestamp(f"Waiting for cobot access. There are {pos - 1} users ahead of you...")
def check_connection_status(self):
self.client.publish(
self.publish_endpoint,
payload=json.dumps({"command":"query/angles", "args": {}}),
qos=2
)
try: # if we recieve any response, it means the server is currently servicing our requests
_ = self.response_queue.get(timeout=5)
return True
except Exception as _:
return False
def handle_publish_and_response(self, payload, custom_endpoint=None):
if custom_endpoint is None:
self.client.publish(self.publish_endpoint, payload=payload, qos=2)
else:
self.client.publish(custom_endpoint, payload=payload, qos=2)
return self.response_queue.get(block=True)
def send_angles(
self,
angle_list: list[float] = [0.0] * 6,
speed: int = 50
):
payload = json.dumps({"command": "control/angles",
"args": {"angles": angle_list, "speed": speed}})
return self.handle_publish_and_response(payload)
def send_coords(
self,
coord_list: list[float] = [0.0] * 6,
speed: int = 50
):
payload = json.dumps({"command": "control/coords",
"args": {"coords": coord_list, "speed": speed}})
return self.handle_publish_and_response(payload)
def send_gripper_value(
self,
value: int = 100,
speed: int = 50
):
payload = json.dumps({"command": "control/gripper",
"args": {"gripper_value": value, "speed": speed}})
return self.handle_publish_and_response(payload)
def get_angles(self):
payload = json.dumps({"command": "query/angles", "args": {}})
return self.handle_publish_and_response(payload)
def get_coords(self):
payload = json.dumps({"command": "query/coords", "args": {}})
return self.handle_publish_and_response(payload)
def get_gripper_value(self):
payload = json.dumps({"command": "query/gripper", "args": {}})
return self.handle_publish_and_response(payload)
def get_camera(self, quality=100, save_path=None):
payload = json.dumps({"command": "query/camera", "args": {"quality": quality}})
response = self.handle_publish_and_response(payload)
if not response["success"]:
return response
b64_bytes = base64.b64decode(response["image"])
img_bytes = io.BytesIO(b64_bytes)
img = Image.open(img_bytes)
response["image"] = img
if save_path is not None:
img.save(save_path)
return response |