cobot280pi-gradio / client.py
gursi26's picture
quick changes to check multi user
f48c8a8
raw
history blame
3.24 kB
import paho.mqtt.client as paho
import json, io, base64
from queue import Queue
from PIL import Image
class CobotController:
def __init__(
self,
hive_mq_username: str,
hive_mq_password: str,
hive_mq_cloud: str,
port: int,
cobot_id: str,
):
self.publish_endpoint = cobot_id
self.response_endpoint = cobot_id + "/response"
self.client = paho.Client(client_id="", userdata=None, protocol=paho.MQTTv5)
self.client.tls_set()
self.client.username_pw_set(hive_mq_username, hive_mq_password)
self.client.connect(hive_mq_cloud, port)
response_queue = Queue()
def on_message(client, userdata, msg):
payload_dict = json.loads(msg.payload)
response_queue.put(payload_dict)
self.response_queue = response_queue
def on_connect(client, userdata, flags, rc, properties=None):
print("Connection recieved")
self.client.on_connect = on_connect
self.client.on_message = on_message
self.client.subscribe(self.response_endpoint, qos=2)
self.client.loop_start()
def handle_publish_and_response(self, payload):
self.client.publish(self.publish_endpoint, payload=payload, qos=2)
return self.response_queue.get(block=True)
def send_angles(
self,
angle_list: list[float] = [0.0] * 6,
speed: int = 50
):
payload = json.dumps({"command": "control/angles",
"args": {"angles": angle_list, "speed": speed}})
return self.handle_publish_and_response(payload)
def send_coords(
self,
coord_list: list[float] = [0.0] * 6,
speed: int = 50
):
payload = json.dumps({"command": "control/coords",
"args": {"coords": coord_list, "speed": speed}})
return self.handle_publish_and_response(payload)
def send_gripper_value(
self,
value: int = 100,
speed: int = 50
):
payload = json.dumps({"command": "control/gripper",
"args": {"gripper_value": value, "speed": speed}})
return self.handle_publish_and_response(payload)
def get_angles(self):
payload = json.dumps({"command": "query/angles", "args": {}})
return self.handle_publish_and_response(payload)
def get_coords(self):
payload = json.dumps({"command": "query/coords", "args": {}})
return self.handle_publish_and_response(payload)
def get_gripper_value(self):
payload = json.dumps({"command": "query/gripper", "args": {}})
return self.handle_publish_and_response(payload)
def get_camera(self, quality=100, save_path=None):
payload = json.dumps({"command": "query/camera", "args": {"quality": quality}})
response = self.handle_publish_and_response(payload)
if not response["success"]:
return response
b64_bytes = base64.b64decode(response["image"])
img_bytes = io.BytesIO(b64_bytes)
img = Image.open(img_bytes)
response["image"] = img
if save_path is not None:
img.save(save_path)
return response