File size: 5,058 Bytes
2554ae8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
# installed packages
from PIL import Image
import paho.mqtt.client as paho

# base python packages
import json, io, base64
from queue import Queue
from datetime import datetime
import uuid

def print_with_timestamp(message):
    print(f"[{datetime.now().strftime('%b %d, %H:%M:%S')}] {message}")

class CobotController:
	user_id = str(uuid.uuid4())

	def __init__(
		self,
		hive_mq_username: str,
		hive_mq_password: str,
		hive_mq_cloud: str,
		port: int,
		device_endpoint: str,
		user_id: str = None
	):
		# setup client and response queues
		self.client = paho.Client(client_id="", userdata=None, protocol=paho.MQTTv5)
		self.client.tls_set()
		self.client.username_pw_set(hive_mq_username, hive_mq_password)
		self.client.connect(hive_mq_cloud, port)

		self.response_queue = Queue()
		def on_message(client, userdata, msg):
			payload_dict = json.loads(msg.payload)
			self.response_queue.put(payload_dict)

		def on_connect(client, userdata, flags, rc, properties=None):
			print_with_timestamp("Connected to HiveMQ broker...")

		self.client.on_connect = on_connect
		self.client.on_message = on_message
		self.client.loop_start()
		
		# initialize user id and endpoints
		if user_id is not None:
			CobotController.user_id = user_id
		self.user_id = CobotController.user_id

		self.device_endpoint = device_endpoint
		self.init_endpoint = self.device_endpoint + "/init"
		self.publish_endpoint = self.device_endpoint + "/" + self.user_id
		self.incoming_endpoint = self.publish_endpoint + "/response"
		self.client.subscribe(self.incoming_endpoint, qos=2)

		connected = self.check_connection_status()
		if connected:
			return

		# send an init request
		print_with_timestamp("Sending a connection request...")
		pub_handle = self.client.publish(
			self.init_endpoint,
			payload=json.dumps({"id": self.user_id}),
			qos=2
		)
		pub_handle.wait_for_publish()
		
		# get a response for the init message, if no response, have to wait for current users time to end
		print_with_timestamp("Waiting for cobot access...")
		prev_pos = None
		while True:
			try:
				payload = self.response_queue.get(timeout=10)
				if payload["status"] == "ready":
					self.client.publish(
						self.publish_endpoint, 
						payload=json.dumps({"yeehaw": []}),
						qos=2
					)
					print_with_timestamp("Connected to server successfully.")
					break

			except Exception as e:
				resp = self.handle_publish_and_response(
					payload=json.dumps({"id": self.user_id}),
					custom_endpoint=self.device_endpoint + "/queuequery"
				)
				if "queue_pos" not in resp:
					break
				pos = resp["queue_pos"]
				if prev_pos == None:
					prev_pos = pos
				elif prev_pos == pos:
					continue
				prev_pos = pos
				print_with_timestamp(f"Waiting for cobot access. There are {pos - 1} users ahead of you...")

	def check_connection_status(self):
		self.client.publish(
			self.publish_endpoint,
			payload=json.dumps({"command":"query/angles", "args": {}}),
			qos=2
		)
		try: # if we recieve any response, it means the server is currently servicing our requests
			_ = self.response_queue.get(timeout=5)
			return True
		except Exception as _:
			return False

	def handle_publish_and_response(self, payload, custom_endpoint=None):
		if custom_endpoint is None:
			self.client.publish(self.publish_endpoint, payload=payload, qos=2)
		else:
			self.client.publish(custom_endpoint, payload=payload, qos=2)
		return self.response_queue.get(block=True)

	def send_angles(
		self,
		angle_list: list[float] = [0.0] * 6,
		speed: int = 50
	):
		payload = json.dumps({"command": "control/angles",
							 "args": {"angles": angle_list, "speed": speed}})
		return self.handle_publish_and_response(payload)

	def send_coords(
		self,
		coord_list: list[float] = [0.0] * 6,
		speed: int = 50
	):
		payload = json.dumps({"command": "control/coords",
							 "args": {"coords": coord_list, "speed": speed}})
		return self.handle_publish_and_response(payload)

	def send_gripper_value(
		self,
		value: int = 100,
		speed: int = 50
	):
		payload = json.dumps({"command": "control/gripper",
							 "args": {"gripper_value": value, "speed": speed}})
		return self.handle_publish_and_response(payload)

	def get_angles(self):
		payload = json.dumps({"command": "query/angles", "args": {}})
		return self.handle_publish_and_response(payload)

	def get_coords(self):
		payload = json.dumps({"command": "query/coords", "args": {}})
		return self.handle_publish_and_response(payload)

	def get_gripper_value(self):
		payload = json.dumps({"command": "query/gripper", "args": {}})
		return self.handle_publish_and_response(payload)

	def get_camera(self, quality=100, save_path=None):
		payload = json.dumps({"command": "query/camera", "args": {"quality": quality}})
		response = self.handle_publish_and_response(payload)
		if not response["success"]:
			return response

		b64_bytes = base64.b64decode(response["image"])
		img_bytes = io.BytesIO(b64_bytes)
		img = Image.open(img_bytes)

		response["image"] = img
		if save_path is not None:
			img.save(save_path)

		return response