gursi26 commited on
Commit
f48c8a8
·
1 Parent(s): 7df42d3

quick changes to check multi user

Browse files
Files changed (2) hide show
  1. app.py +243 -129
  2. client.py +91 -175
app.py CHANGED
@@ -1,130 +1,244 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  import gradio as gr
2
- from PIL import Image
3
- from client import CobotController
4
- from utils import get_credentials
5
- import json
6
- import uuid
7
-
8
-
9
-
10
- options = [
11
- "query/angles", "query/coords", "query/gripper", "query/camera",
12
- "control/angles", "control/coords", "control/gripper"
13
- ]
14
-
15
- with gr.Blocks() as app:
16
- user, pwd, host, endpoint, port = get_credentials(False)
17
- client = CobotController(user, pwd, host, port, endpoint)
18
-
19
- def handle_request(
20
- selected_command, angle1=0.0, angle2=0.0, angle3=0.0, angle4=0.0,
21
- angle5=0.0, angle6=0.0, coord1=0.0, coord2=0.0, coord3=0.0, coord4=0.0,
22
- coord5=0.0, coord6=0.0, gripper_value=0.0, speed_angles=0.0, speed_coords=0.0,
23
- speed_gripper=0.0
24
- ):
25
- img = None
26
- if selected_command == "query/angles":
27
- response = client.get_angles()
28
- elif selected_command == "query/coords":
29
- response = client.get_coords()
30
- elif selected_command == "query/gripper":
31
- response = client.get_gripper_value()
32
- elif selected_command == "query/camera":
33
- response = client.get_camera()
34
- img = response.pop("image")
35
- elif selected_command == "control/angles":
36
- response = client.send_angles(
37
- angle_list = [angle1, angle2, angle3, angle4, angle5, angle6],
38
- speed = speed_angles
39
- )
40
- elif selected_command == "control/coords":
41
- response = client.send_coords(
42
- angle_list = [coord1, coord2, coord3, coord4, coord5, coord6],
43
- speed = speed_coords
44
- )
45
- elif selected_command == "control/gripper":
46
- response = client.send_gripper_value(
47
- value = gripper_value,
48
- speed = speed_gripper
49
- )
50
-
51
- text_output = json.dumps(response, indent=4)
52
- if img is not None:
53
- return text_output, gr.update(value=img, visible=True)
54
- return text_output, None
55
-
56
- gr.Markdown("# MyCobot 280pi MQTT Control demo")
57
- gr.Markdown("This is a public demo of the MyCobot 280pi...")
58
-
59
- selected_command = gr.Dropdown(choices=options, label="Select command")
60
-
61
- with gr.Group(visible=False) as query_angles_group:
62
- pass
63
-
64
- with gr.Group(visible=False) as query_coords_group:
65
- pass
66
-
67
- with gr.Group(visible=False) as query_camera_group:
68
- pass
69
-
70
- with gr.Group(visible=False) as query_gripper_group:
71
- pass
72
-
73
- with gr.Group(visible=False) as control_angles_group:
74
- angle1 = gr.Number(value=0.0, label="Angle 1", step=5.0)
75
- angle2 = gr.Number(value=0.0, label="Angle 2", step=5.0)
76
- angle3 = gr.Number(value=0.0, label="Angle 3", step=5.0)
77
- angle4 = gr.Number(value=0.0, label="Angle 4", step=5.0)
78
- angle5 = gr.Number(value=0.0, label="Angle 5", step=5.0)
79
- angle6 = gr.Number(value=0.0, label="Angle 6", step=5.0)
80
- speed_angles = gr.Slider(value=50.0, minimum=0.0, maximum=100.0, step=1.0, label="Speed")
81
-
82
- with gr.Group(visible=False) as control_coords_group:
83
- coord1 = gr.Number(value=0.0, label="X coordinate", step=5.0)
84
- coord2 = gr.Number(value=0.0, label="Y coordinate", step=5.0)
85
- coord3 = gr.Number(value=0.0, label="Z coordinate", step=5.0)
86
- coord4 = gr.Number(value=0.0, label="Rotation X", step=5.0)
87
- coord5 = gr.Number(value=0.0, label="Rotation Y", step=5.0)
88
- coord6 = gr.Number(value=0.0, label="Rotation Z", step=5.0)
89
- speed_coords = gr.Slider(value=50.0, minimum=0.0, maximum=100.0, step=1.0, label="Speed")
90
-
91
- with gr.Group(visible=False) as control_gripper_group:
92
- gripper_value = gr.Slider(minimum=0.0, maximum=100.0, step=1.0, label="Gripper value")
93
- speed_gripper = gr.Slider(value=50.0, minimum=0.0, maximum=100.0, step=1.0, label="Speed")
94
-
95
- def toggle_visibility(command):
96
- return (
97
- gr.update(visible=command == "query/angles"),
98
- gr.update(visible=command == "query/coords"),
99
- gr.update(visible=command == "query/camera"),
100
- gr.update(visible=command == "query/gripper"),
101
- gr.update(visible=command == "control/angles"),
102
- gr.update(visible=command == "control/coords"),
103
- gr.update(visible=command == "control/gripper"),
104
- gr.update(visible=False),
105
- gr.update(value="")
106
- )
107
-
108
- submit_button = gr.Button("Send Command")
109
- output_text = gr.Textbox(label="")
110
- output_image = gr.Image(visible=False)
111
-
112
- selected_command.change(
113
- toggle_visibility,
114
- selected_command,
115
- [
116
- query_angles_group, query_coords_group, query_camera_group, query_gripper_group,
117
- control_angles_group, control_coords_group, control_gripper_group, output_image,
118
- output_text
119
- ]
120
- )
121
-
122
- submit_button.click(
123
- handle_request,
124
- inputs=[selected_command, angle1, angle2, angle3, angle4, angle5, angle6,
125
- coord1, coord2, coord3, coord4, coord5, coord6, gripper_value, speed_angles,
126
- speed_coords, speed_gripper],
127
- outputs=[output_text, output_image]
128
- )
129
-
130
- app.launch()
 
1
+ # import gradio as gr
2
+ # from client import CobotController
3
+ # from utils import get_credentials
4
+ # import json
5
+
6
+ # options = [
7
+ # "query/angles", "query/coords", "query/gripper", "query/camera",
8
+ # "control/angles", "control/coords", "control/gripper"
9
+ # ]
10
+
11
+ # with gr.Blocks() as app:
12
+ # user, pwd, host, endpoint, port = get_credentials(False)
13
+ # client = CobotController(user, pwd, host, port, endpoint)
14
+
15
+ # def handle_request(
16
+ # selected_command, angle1=0.0, angle2=0.0, angle3=0.0, angle4=0.0,
17
+ # angle5=0.0, angle6=0.0, coord1=0.0, coord2=0.0, coord3=0.0, coord4=0.0,
18
+ # coord5=0.0, coord6=0.0, gripper_value=0.0, speed_angles=0.0, speed_coords=0.0,
19
+ # speed_gripper=0.0
20
+ # ):
21
+ # img = None
22
+ # if selected_command == "query/angles":
23
+ # response = client.get_angles()
24
+ # elif selected_command == "query/coords":
25
+ # response = client.get_coords()
26
+ # elif selected_command == "query/gripper":
27
+ # response = client.get_gripper_value()
28
+ # elif selected_command == "query/camera":
29
+ # response = client.get_camera()
30
+ # img = response.pop("image")
31
+ # elif selected_command == "control/angles":
32
+ # response = client.send_angles(
33
+ # angle_list = [angle1, angle2, angle3, angle4, angle5, angle6],
34
+ # speed = speed_angles
35
+ # )
36
+ # elif selected_command == "control/coords":
37
+ # response = client.send_coords(
38
+ # angle_list = [coord1, coord2, coord3, coord4, coord5, coord6],
39
+ # speed = speed_coords
40
+ # )
41
+ # elif selected_command == "control/gripper":
42
+ # response = client.send_gripper_value(
43
+ # value = gripper_value,
44
+ # speed = speed_gripper
45
+ # )
46
+
47
+ # text_output = json.dumps(response, indent=4)
48
+ # if img is not None:
49
+ # return text_output, gr.update(value=img, visible=True)
50
+ # return text_output, None
51
+
52
+ # gr.Markdown("# MyCobot 280pi MQTT Control demo")
53
+ # gr.Markdown("This is a public demo of the MyCobot 280pi...")
54
+
55
+ # selected_command = gr.Dropdown(choices=options, label="Select command")
56
+
57
+ # with gr.Group(visible=False) as query_angles_group:
58
+ # pass
59
+
60
+ # with gr.Group(visible=False) as query_coords_group:
61
+ # pass
62
+
63
+ # with gr.Group(visible=False) as query_camera_group:
64
+ # pass
65
+
66
+ # with gr.Group(visible=False) as query_gripper_group:
67
+ # pass
68
+
69
+ # with gr.Group(visible=False) as control_angles_group:
70
+ # angle1 = gr.Number(value=0.0, label="Angle 1", step=5.0)
71
+ # angle2 = gr.Number(value=0.0, label="Angle 2", step=5.0)
72
+ # angle3 = gr.Number(value=0.0, label="Angle 3", step=5.0)
73
+ # angle4 = gr.Number(value=0.0, label="Angle 4", step=5.0)
74
+ # angle5 = gr.Number(value=0.0, label="Angle 5", step=5.0)
75
+ # angle6 = gr.Number(value=0.0, label="Angle 6", step=5.0)
76
+ # speed_angles = gr.Slider(value=50.0, minimum=0.0, maximum=100.0, step=1.0, label="Speed")
77
+
78
+ # with gr.Group(visible=False) as control_coords_group:
79
+ # coord1 = gr.Number(value=0.0, label="X coordinate", step=5.0)
80
+ # coord2 = gr.Number(value=0.0, label="Y coordinate", step=5.0)
81
+ # coord3 = gr.Number(value=0.0, label="Z coordinate", step=5.0)
82
+ # coord4 = gr.Number(value=0.0, label="Rotation X", step=5.0)
83
+ # coord5 = gr.Number(value=0.0, label="Rotation Y", step=5.0)
84
+ # coord6 = gr.Number(value=0.0, label="Rotation Z", step=5.0)
85
+ # speed_coords = gr.Slider(value=50.0, minimum=0.0, maximum=100.0, step=1.0, label="Speed")
86
+
87
+ # with gr.Group(visible=False) as control_gripper_group:
88
+ # gripper_value = gr.Slider(minimum=0.0, maximum=100.0, step=1.0, label="Gripper value")
89
+ # speed_gripper = gr.Slider(value=50.0, minimum=0.0, maximum=100.0, step=1.0, label="Speed")
90
+
91
+ # def toggle_visibility(command):
92
+ # return (
93
+ # gr.update(visible=command == "query/angles"),
94
+ # gr.update(visible=command == "query/coords"),
95
+ # gr.update(visible=command == "query/camera"),
96
+ # gr.update(visible=command == "query/gripper"),
97
+ # gr.update(visible=command == "control/angles"),
98
+ # gr.update(visible=command == "control/coords"),
99
+ # gr.update(visible=command == "control/gripper"),
100
+ # gr.update(visible=False),
101
+ # gr.update(value="")
102
+ # )
103
+
104
+ # submit_button = gr.Button("Send Command")
105
+ # output_text = gr.Textbox(label="")
106
+ # output_image = gr.Image(visible=False)
107
+
108
+ # selected_command.change(
109
+ # toggle_visibility,
110
+ # selected_command,
111
+ # [
112
+ # query_angles_group, query_coords_group, query_camera_group, query_gripper_group,
113
+ # control_angles_group, control_coords_group, control_gripper_group, output_image,
114
+ # output_text
115
+ # ]
116
+ # )
117
+
118
+ # submit_button.click(
119
+ # handle_request,
120
+ # inputs=[selected_command, angle1, angle2, angle3, angle4, angle5, angle6,
121
+ # coord1, coord2, coord3, coord4, coord5, coord6, gripper_value, speed_angles,
122
+ # speed_coords, speed_gripper],
123
+ # outputs=[output_text, output_image],
124
+ # concurrency_id="single_user",
125
+ # concurrency_limit=1
126
+ # )
127
+
128
+ # app.queue(default_concurrency_limit=1, max_size=100)
129
+ # app.launch()
130
  import gradio as gr
131
+ from collections import deque
132
+ import threading
133
+ import time
134
+
135
+ SESSION_TIME = 30
136
+
137
+ class QueueSystem:
138
+ def __init__(self):
139
+ self.queue = deque()
140
+ self.current_user = None
141
+ self.session_start_time = None
142
+ self.lock = threading.Lock()
143
+
144
+ def enqueue_user(self, user_id):
145
+ with self.lock:
146
+ if user_id not in self.queue and user_id != self.current_user:
147
+ self.queue.append(user_id)
148
+
149
+ def dequeue_user(self):
150
+ with self.lock:
151
+ if self.queue:
152
+ return self.queue.popleft()
153
+ return None
154
+
155
+ def get_queue_info(self, user_id):
156
+ with self.lock:
157
+ if user_id == self.current_user:
158
+ remaining_time = max(0, SESSION_TIME - (time.time() - self.session_start_time))
159
+ return 0, remaining_time
160
+ elif user_id in self.queue:
161
+ position = list(self.queue).index(user_id) + 1
162
+ wait_time = (position - 1) * SESSION_TIME + max(0, SESSION_TIME - (time.time() - self.session_start_time))
163
+ return position, wait_time
164
+ else:
165
+ return None, None
166
+
167
+ def start_session(self, user_id):
168
+ with self.lock:
169
+ if self.current_user is None:
170
+ self.current_user = user_id
171
+ self.session_start_time = time.time()
172
+ return True
173
+ return False
174
+
175
+ def end_session(self):
176
+ with self.lock:
177
+ if self.current_user and time.time() - self.session_start_time >= SESSION_TIME:
178
+ self.current_user = None
179
+ self.session_start_time = None
180
+ return True
181
+ return False
182
+
183
+ queue_system = QueueSystem()
184
+
185
+ def process_input(input_text, user_id):
186
+ update_queue()
187
+ if queue_system.current_user is None:
188
+ queue_system.start_session(user_id)
189
+
190
+ queue_system.enqueue_user(user_id)
191
+ position, wait_time = queue_system.get_queue_info(user_id)
192
+
193
+ if position == 0: # User is currently being served
194
+ result = f"Processed: {input_text}"
195
+ remaining_time = wait_time
196
+ return result, f"Your turn! Time remaining: {remaining_time:.0f} seconds."
197
+ elif position is not None:
198
+ return None, f"You are in position {position}. Estimated wait time: {wait_time:.0f} seconds."
199
+ else:
200
+ return None, "Error: You are not in the queue."
201
+
202
+ def update_queue():
203
+ if queue_system.end_session():
204
+ next_user = queue_system.dequeue_user()
205
+ if next_user:
206
+ queue_system.start_session(next_user)
207
+
208
+ def gradio_function(input_text, user_id):
209
+ result, status = process_input(input_text, user_id)
210
+ return result, status
211
+
212
+ def get_queue_status(user_id):
213
+ update_queue()
214
+ if not user_id:
215
+ return "Enter a User ID and submit to join the queue."
216
+ position, wait_time = queue_system.get_queue_info(user_id)
217
+ if position == 0:
218
+ return f"Your turn! Time remaining: {wait_time:.0f} seconds."
219
+ elif position is not None:
220
+ return f"You are in position {position}. Estimated wait time: {wait_time:.0f} seconds."
221
+ else:
222
+ return "You are not in the queue. Submit to join."
223
+
224
+ with gr.Blocks() as demo:
225
+ input_text = gr.Textbox(label="Input")
226
+ user_id = gr.Textbox(label="User ID")
227
+ output_text = gr.Textbox(label="Output")
228
+ status_text = gr.Markdown("Enter a User ID and submit to join the queue.")
229
+ submit_btn = gr.Button("Submit")
230
+
231
+ submit_btn.click(
232
+ fn=gradio_function,
233
+ inputs=[input_text, user_id],
234
+ outputs=[output_text, status_text]
235
+ )
236
+
237
+ demo.load(
238
+ fn=get_queue_status,
239
+ inputs=user_id,
240
+ outputs=status_text,
241
+ )
242
+
243
+ demo.queue(default_concurrency_limit=1, max_size=100)
244
+ demo.launch()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
client.py CHANGED
@@ -1,181 +1,97 @@
1
- # installed packages
2
- from PIL import Image
3
  import paho.mqtt.client as paho
4
-
5
- # base python packages
6
  import json, io, base64
7
  from queue import Queue
8
- from datetime import datetime
9
- import uuid
10
 
11
- def print_with_timestamp(message):
12
- print(f"[{datetime.now().strftime('%b %d, %H:%M:%S')}] {message}")
13
 
14
  class CobotController:
15
- user_id = str(uuid.uuid4())
16
-
17
- def __init__(
18
- self,
19
- hive_mq_username: str,
20
- hive_mq_password: str,
21
- hive_mq_cloud: str,
22
- port: int,
23
- device_endpoint: str,
24
- user_id: str = None
25
- ):
26
- # setup client and response queues
27
- self.client = paho.Client(client_id="", userdata=None, protocol=paho.MQTTv5)
28
- self.client.tls_set()
29
- self.client.username_pw_set(hive_mq_username, hive_mq_password)
30
- self.client.connect(hive_mq_cloud, port)
31
-
32
- self.response_queue = Queue()
33
- def on_message(client, userdata, msg):
34
- payload_dict = json.loads(msg.payload)
35
- self.response_queue.put(payload_dict)
36
-
37
- def on_connect(client, userdata, flags, rc, properties=None):
38
- print_with_timestamp("Connected to HiveMQ broker...")
39
-
40
- self.client.on_connect = on_connect
41
- self.client.on_message = on_message
42
- self.client.loop_start()
43
-
44
- # initialize user id and endpoints
45
- if user_id is not None:
46
- CobotController.user_id = user_id
47
- self.user_id = CobotController.user_id
48
-
49
- self.device_endpoint = device_endpoint
50
- self.init_endpoint = self.device_endpoint + "/init"
51
- self.publish_endpoint = self.device_endpoint + "/" + self.user_id
52
- self.incoming_endpoint = self.publish_endpoint + "/response"
53
- self.client.subscribe(self.incoming_endpoint, qos=2)
54
-
55
- connected = self.check_connection_status()
56
- if connected:
57
- return
58
-
59
- # send an init request
60
- print_with_timestamp("Sending a connection request...")
61
- pub_handle = self.client.publish(
62
- self.init_endpoint,
63
- payload=json.dumps({"id": self.user_id}),
64
- qos=2
65
- )
66
- pub_handle.wait_for_publish()
67
-
68
- # get a response for the init message, if no response, have to wait for current users time to end
69
- print_with_timestamp("Waiting for cobot access...")
70
- prev_pos = None
71
- while True:
72
- try:
73
- payload = self.response_queue.get(timeout=10)
74
- if payload["status"] == "ready":
75
- self.client.publish(
76
- self.publish_endpoint,
77
- payload=json.dumps({"yeehaw": []}),
78
- qos=2
79
- )
80
- print_with_timestamp("Connected to server successfully.")
81
- break
82
-
83
- except Exception as e:
84
- resp = self.handle_publish_and_response(
85
- payload=json.dumps({"id": self.user_id}),
86
- custom_endpoint=self.device_endpoint + "/queuequery"
87
- )
88
- if "queue_pos" not in resp:
89
- break
90
- pos = resp["queue_pos"]
91
- if prev_pos == None:
92
- prev_pos = pos
93
- elif prev_pos == pos:
94
- continue
95
- prev_pos = pos
96
- print_with_timestamp(f"Waiting for cobot access. There are {pos - 1} users ahead of you...")
97
-
98
- def check_connection_status(self):
99
- self.client.publish(
100
- self.publish_endpoint,
101
- payload=json.dumps({"command":"query/angles", "args": {}}),
102
- qos=2
103
- )
104
- try: # if we recieve any response, it means the server is currently servicing our requests
105
- _ = self.response_queue.get(timeout=5)
106
- return True
107
- except Exception as _:
108
- return False
109
-
110
- def handle_publish_and_response(self, payload, custom_endpoint=None):
111
- if custom_endpoint is None:
112
- self.client.publish(self.publish_endpoint, payload=payload, qos=2)
113
- else:
114
- self.client.publish(custom_endpoint, payload=payload, qos=2)
115
- return self.response_queue.get(block=True)
116
-
117
- def send_angles(
118
- self,
119
- angle_list: list[float] = [0.0] * 6,
120
- speed: int = 50
121
- ):
122
- payload = json.dumps({"command": "control/angles",
123
- "args": {"angles": angle_list, "speed": speed}})
124
- return self.handle_publish_and_response(payload)
125
-
126
- def send_coords(
127
- self,
128
- coord_list: list[float] = [0.0] * 6,
129
- speed: int = 50
130
- ):
131
- payload = json.dumps({"command": "control/coords",
132
- "args": {"coords": coord_list, "speed": speed}})
133
- return self.handle_publish_and_response(payload)
134
-
135
- def send_gripper_value(
136
- self,
137
- value: int = 100,
138
- speed: int = 50
139
- ):
140
- payload = json.dumps({"command": "control/gripper",
141
- "args": {"gripper_value": value, "speed": speed}})
142
- return self.handle_publish_and_response(payload)
143
-
144
- def get_angles(self):
145
- payload = json.dumps({"command": "query/angles", "args": {}})
146
- return self.handle_publish_and_response(payload)
147
-
148
- def get_coords(self):
149
- payload = json.dumps({"command": "query/coords", "args": {}})
150
- return self.handle_publish_and_response(payload)
151
-
152
- def get_gripper_value(self):
153
- payload = json.dumps({"command": "query/gripper", "args": {}})
154
- return self.handle_publish_and_response(payload)
155
-
156
- def get_camera(self, quality=100, save_path=None):
157
- payload = json.dumps({"command": "query/camera", "args": {"quality": quality}})
158
- response = self.handle_publish_and_response(payload)
159
- if not response["success"]:
160
- return response
161
-
162
- b64_bytes = base64.b64decode(response["image"])
163
- img_bytes = io.BytesIO(b64_bytes)
164
- img = Image.open(img_bytes)
165
-
166
- response["image"] = img
167
- if save_path is not None:
168
- img.save(save_path)
169
-
170
- return response
171
-
172
- if __name__ == "__main__":
173
- from my_secrets import *
174
-
175
- cobot = CobotController(
176
- HIVEMQ_USERNAME,
177
- HIVEMQ_PASSWORD,
178
- HIVEMQ_HOST,
179
- DEVICE_PORT,
180
- DEVICE_ENDPOINT,
181
- )
 
 
 
1
  import paho.mqtt.client as paho
 
 
2
  import json, io, base64
3
  from queue import Queue
4
+ from PIL import Image
 
5
 
 
 
6
 
7
  class CobotController:
8
+
9
+ def __init__(
10
+ self,
11
+ hive_mq_username: str,
12
+ hive_mq_password: str,
13
+ hive_mq_cloud: str,
14
+ port: int,
15
+ cobot_id: str,
16
+ ):
17
+ self.publish_endpoint = cobot_id
18
+ self.response_endpoint = cobot_id + "/response"
19
+ self.client = paho.Client(client_id="", userdata=None, protocol=paho.MQTTv5)
20
+ self.client.tls_set()
21
+ self.client.username_pw_set(hive_mq_username, hive_mq_password)
22
+ self.client.connect(hive_mq_cloud, port)
23
+
24
+ response_queue = Queue()
25
+
26
+ def on_message(client, userdata, msg):
27
+ payload_dict = json.loads(msg.payload)
28
+ response_queue.put(payload_dict)
29
+
30
+ self.response_queue = response_queue
31
+
32
+ def on_connect(client, userdata, flags, rc, properties=None):
33
+ print("Connection recieved")
34
+
35
+ self.client.on_connect = on_connect
36
+ self.client.on_message = on_message
37
+ self.client.subscribe(self.response_endpoint, qos=2)
38
+ self.client.loop_start()
39
+
40
+ def handle_publish_and_response(self, payload):
41
+ self.client.publish(self.publish_endpoint, payload=payload, qos=2)
42
+ return self.response_queue.get(block=True)
43
+
44
+ def send_angles(
45
+ self,
46
+ angle_list: list[float] = [0.0] * 6,
47
+ speed: int = 50
48
+ ):
49
+ payload = json.dumps({"command": "control/angles",
50
+ "args": {"angles": angle_list, "speed": speed}})
51
+ return self.handle_publish_and_response(payload)
52
+
53
+ def send_coords(
54
+ self,
55
+ coord_list: list[float] = [0.0] * 6,
56
+ speed: int = 50
57
+ ):
58
+ payload = json.dumps({"command": "control/coords",
59
+ "args": {"coords": coord_list, "speed": speed}})
60
+ return self.handle_publish_and_response(payload)
61
+
62
+ def send_gripper_value(
63
+ self,
64
+ value: int = 100,
65
+ speed: int = 50
66
+ ):
67
+ payload = json.dumps({"command": "control/gripper",
68
+ "args": {"gripper_value": value, "speed": speed}})
69
+ return self.handle_publish_and_response(payload)
70
+
71
+ def get_angles(self):
72
+ payload = json.dumps({"command": "query/angles", "args": {}})
73
+ return self.handle_publish_and_response(payload)
74
+
75
+ def get_coords(self):
76
+ payload = json.dumps({"command": "query/coords", "args": {}})
77
+ return self.handle_publish_and_response(payload)
78
+
79
+ def get_gripper_value(self):
80
+ payload = json.dumps({"command": "query/gripper", "args": {}})
81
+ return self.handle_publish_and_response(payload)
82
+
83
+ def get_camera(self, quality=100, save_path=None):
84
+ payload = json.dumps({"command": "query/camera", "args": {"quality": quality}})
85
+ response = self.handle_publish_and_response(payload)
86
+ if not response["success"]:
87
+ return response
88
+
89
+ b64_bytes = base64.b64decode(response["image"])
90
+ img_bytes = io.BytesIO(b64_bytes)
91
+ img = Image.open(img_bytes)
92
+
93
+ response["image"] = img
94
+ if save_path is not None:
95
+ img.save(save_path)
96
+
97
+ return response