cobot280pi / client.py
gursi26's picture
added logs
2554ae8
raw
history blame
5.06 kB
# installed packages
from PIL import Image
import paho.mqtt.client as paho
# base python packages
import json, io, base64
from queue import Queue
from datetime import datetime
import uuid
def print_with_timestamp(message):
print(f"[{datetime.now().strftime('%b %d, %H:%M:%S')}] {message}")
class CobotController:
user_id = str(uuid.uuid4())
def __init__(
self,
hive_mq_username: str,
hive_mq_password: str,
hive_mq_cloud: str,
port: int,
device_endpoint: str,
user_id: str = None
):
# setup client and response queues
self.client = paho.Client(client_id="", userdata=None, protocol=paho.MQTTv5)
self.client.tls_set()
self.client.username_pw_set(hive_mq_username, hive_mq_password)
self.client.connect(hive_mq_cloud, port)
self.response_queue = Queue()
def on_message(client, userdata, msg):
payload_dict = json.loads(msg.payload)
self.response_queue.put(payload_dict)
def on_connect(client, userdata, flags, rc, properties=None):
print_with_timestamp("Connected to HiveMQ broker...")
self.client.on_connect = on_connect
self.client.on_message = on_message
self.client.loop_start()
# initialize user id and endpoints
if user_id is not None:
CobotController.user_id = user_id
self.user_id = CobotController.user_id
self.device_endpoint = device_endpoint
self.init_endpoint = self.device_endpoint + "/init"
self.publish_endpoint = self.device_endpoint + "/" + self.user_id
self.incoming_endpoint = self.publish_endpoint + "/response"
self.client.subscribe(self.incoming_endpoint, qos=2)
connected = self.check_connection_status()
if connected:
return
# send an init request
print_with_timestamp("Sending a connection request...")
pub_handle = self.client.publish(
self.init_endpoint,
payload=json.dumps({"id": self.user_id}),
qos=2
)
pub_handle.wait_for_publish()
# get a response for the init message, if no response, have to wait for current users time to end
print_with_timestamp("Waiting for cobot access...")
prev_pos = None
while True:
try:
payload = self.response_queue.get(timeout=10)
if payload["status"] == "ready":
self.client.publish(
self.publish_endpoint,
payload=json.dumps({"yeehaw": []}),
qos=2
)
print_with_timestamp("Connected to server successfully.")
break
except Exception as e:
resp = self.handle_publish_and_response(
payload=json.dumps({"id": self.user_id}),
custom_endpoint=self.device_endpoint + "/queuequery"
)
if "queue_pos" not in resp:
break
pos = resp["queue_pos"]
if prev_pos == None:
prev_pos = pos
elif prev_pos == pos:
continue
prev_pos = pos
print_with_timestamp(f"Waiting for cobot access. There are {pos - 1} users ahead of you...")
def check_connection_status(self):
self.client.publish(
self.publish_endpoint,
payload=json.dumps({"command":"query/angles", "args": {}}),
qos=2
)
try: # if we recieve any response, it means the server is currently servicing our requests
_ = self.response_queue.get(timeout=5)
return True
except Exception as _:
return False
def handle_publish_and_response(self, payload, custom_endpoint=None):
if custom_endpoint is None:
self.client.publish(self.publish_endpoint, payload=payload, qos=2)
else:
self.client.publish(custom_endpoint, payload=payload, qos=2)
return self.response_queue.get(block=True)
def send_angles(
self,
angle_list: list[float] = [0.0] * 6,
speed: int = 50
):
payload = json.dumps({"command": "control/angles",
"args": {"angles": angle_list, "speed": speed}})
return self.handle_publish_and_response(payload)
def send_coords(
self,
coord_list: list[float] = [0.0] * 6,
speed: int = 50
):
payload = json.dumps({"command": "control/coords",
"args": {"coords": coord_list, "speed": speed}})
return self.handle_publish_and_response(payload)
def send_gripper_value(
self,
value: int = 100,
speed: int = 50
):
payload = json.dumps({"command": "control/gripper",
"args": {"gripper_value": value, "speed": speed}})
return self.handle_publish_and_response(payload)
def get_angles(self):
payload = json.dumps({"command": "query/angles", "args": {}})
return self.handle_publish_and_response(payload)
def get_coords(self):
payload = json.dumps({"command": "query/coords", "args": {}})
return self.handle_publish_and_response(payload)
def get_gripper_value(self):
payload = json.dumps({"command": "query/gripper", "args": {}})
return self.handle_publish_and_response(payload)
def get_camera(self, quality=100, save_path=None):
payload = json.dumps({"command": "query/camera", "args": {"quality": quality}})
response = self.handle_publish_and_response(payload)
if not response["success"]:
return response
b64_bytes = base64.b64decode(response["image"])
img_bytes = io.BytesIO(b64_bytes)
img = Image.open(img_bytes)
response["image"] = img
if save_path is not None:
img.save(save_path)
return response