File size: 3,236 Bytes
7e745ee f48c8a8 7e745ee f48c8a8 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 |
import paho.mqtt.client as paho
import json, io, base64
from queue import Queue
from PIL import Image
class CobotController:
def __init__(
self,
hive_mq_username: str,
hive_mq_password: str,
hive_mq_cloud: str,
port: int,
cobot_id: str,
):
self.publish_endpoint = cobot_id
self.response_endpoint = cobot_id + "/response"
self.client = paho.Client(client_id="", userdata=None, protocol=paho.MQTTv5)
self.client.tls_set()
self.client.username_pw_set(hive_mq_username, hive_mq_password)
self.client.connect(hive_mq_cloud, port)
response_queue = Queue()
def on_message(client, userdata, msg):
payload_dict = json.loads(msg.payload)
response_queue.put(payload_dict)
self.response_queue = response_queue
def on_connect(client, userdata, flags, rc, properties=None):
print("Connection recieved")
self.client.on_connect = on_connect
self.client.on_message = on_message
self.client.subscribe(self.response_endpoint, qos=2)
self.client.loop_start()
def handle_publish_and_response(self, payload):
self.client.publish(self.publish_endpoint, payload=payload, qos=2)
return self.response_queue.get(block=True)
def send_angles(
self,
angle_list: list[float] = [0.0] * 6,
speed: int = 50
):
payload = json.dumps({"command": "control/angles",
"args": {"angles": angle_list, "speed": speed}})
return self.handle_publish_and_response(payload)
def send_coords(
self,
coord_list: list[float] = [0.0] * 6,
speed: int = 50
):
payload = json.dumps({"command": "control/coords",
"args": {"coords": coord_list, "speed": speed}})
return self.handle_publish_and_response(payload)
def send_gripper_value(
self,
value: int = 100,
speed: int = 50
):
payload = json.dumps({"command": "control/gripper",
"args": {"gripper_value": value, "speed": speed}})
return self.handle_publish_and_response(payload)
def get_angles(self):
payload = json.dumps({"command": "query/angles", "args": {}})
return self.handle_publish_and_response(payload)
def get_coords(self):
payload = json.dumps({"command": "query/coords", "args": {}})
return self.handle_publish_and_response(payload)
def get_gripper_value(self):
payload = json.dumps({"command": "query/gripper", "args": {}})
return self.handle_publish_and_response(payload)
def get_camera(self, quality=100, save_path=None):
payload = json.dumps({"command": "query/camera", "args": {"quality": quality}})
response = self.handle_publish_and_response(payload)
if not response["success"]:
return response
b64_bytes = base64.b64decode(response["image"])
img_bytes = io.BytesIO(b64_bytes)
img = Image.open(img_bytes)
response["image"] = img
if save_path is not None:
img.save(save_path)
return response |