linx5o's picture
add for custom
2ffdf2a
raw
history blame
31.8 kB
import gradio as gr
import paho.mqtt.client as mqtt
import os
import json
import time
HOST = os.environ.get("host")
PORT = int(os.environ.get("port"))
USERNAME = os.environ.get("username")
PASSWORD = os.environ.get("password")
PIOREACTOR = os.environ.get("pioreactor")
client = None
client_custom = None
experiment = None
running = []
temp_mqtt_auto = None
rpm_mqtt = None
led_mqtt = None
experiments = []
def on_connect(client, userdata, flags, rc):
print("Connected with result code "+str(rc))
def create_client(host, port, username, password):
print(host, port, username, password)
client = mqtt.Client()
client.username_pw_set(username, password)
client.tls_set(tls_version=mqtt.ssl.PROTOCOL_TLS)
client.on_connect = on_connect
client.connect(host, port)
client.loop_start()
return client
def on_message_worker(client, userdata, message):
payload = message.payload.decode("utf-8")
data = json.loads(payload)
global experiment
global running
global temp_mqtt_auto
global rpm_mqtt
global led_mqtt
global experiments
experiment = data.get("experiment", None)
running = data.get("running", [])
temp_mqtt_auto = data.get("temperature_automation", None)
rpm_mqtt = data.get("stirring", None)
led_mqtt = data.get("leds", None)
experiments = data.get("experiments", [])
def store_client():
global client
global HOST
global PORT
global USERNAME
global PASSWORD
client = create_client(HOST, PORT, USERNAME, PASSWORD)
def custom_client(host, port, username, password):
global client_custom
client_custom = create_client(host, port, username, password)
def stirring(rpm, host, port, username, password, pioreactor, experiment, state):
global client_custom
if client_custom is None:
custom_client(host, port, username, password)
if state == "start":
payload = {
"command": "start_stirring",
"experiment": experiment,
"reactor": pioreactor,
"rpm": rpm
}
client_custom.publish(f"pioreactor/control", json.dumps(payload))
elif state == "stop":
payload = {
"command": "stop_stirring",
"experiment": experiment,
"reactor": pioreactor
}
client_custom.publish(f"pioreactor/control", json.dumps(payload))
elif state == "update":
payload = {
"command": "update_stirring_rpm",
"experiment": experiment,
"reactor": pioreactor,
"rpm": rpm
}
client_custom.publish(f"pioreactor/control", json.dumps(payload))
else:
print("Invalid state")
def toggle_temperature_input(selected_option):
# Show the temperature slider only if "Heat To Temp" is selected
return gr.update(visible=selected_option == "Heat To Temp")
def temp_automation(temperature, host, port, username, password, pioreactor, experiment, state, option):
global client_custom
if client_custom is None:
custom_client(host, port, username, password)
payload_options = "thermostat" if option == "Heat To Temp" else "only_record_temperature"
if state == "start":
payload = {
"command": "set_temperature_automation",
"experiment": experiment,
"reactor": pioreactor,
"temp": temperature,
"automation": payload_options
}
client_custom.publish(f"pioreactor/control", json.dumps(payload))
elif state == "stop":
payload = {
"command": "temp_update",
"experiment": experiment,
"reactor": pioreactor,
"settings": {
"$state": "disconnected"
}
}
client_custom.publish(f"pioreactor/control", json.dumps(payload))
elif state == "update":
payload = {
"command": "temp_update",
"experiment": experiment,
"reactor": pioreactor,
"settings": {
"target_temperature": temperature
}
}
client_custom.publish(f"pioreactor/control", json.dumps(payload))
elif state == "restart":
payload = {
"command": "temp_restart",
"experiment": experiment,
"reactor": pioreactor,
"automation": payload_options,
"temp": temperature
}
client_custom.publish(f"pioreactor/control", json.dumps(payload))
else:
print("Invalid state")
def od(host, port, username, password, pioreactor, experiment, state):
global client_custom
if client_custom is None:
custom_client(host, port, username, password)
if state == "start":
payload = {
"command": "start_od_reading",
"experiment": experiment,
"reactor": pioreactor
}
client_custom.publish(f"pioreactor/control", json.dumps(payload))
elif state == "stop":
payload = {
"command": "stop_od_reading",
"experiment": experiment,
"reactor": pioreactor
}
client_custom.publish(f"pioreactor/control", json.dumps(payload))
else:
print("Invalid state")
def grf(host, port, username, password, pioreactor, experiment, state):
global client_custom
if client_custom is None:
custom_client(host, port, username, password)
if state == "start":
payload = {
"command": "start_growth_rate",
"experiment": experiment,
"reactor": pioreactor
}
client_custom.publish(f"pioreactor/control", json.dumps(payload))
elif state == "stop":
payload = {
"command": "stop_growth_rate",
"experiment": experiment,
"reactor": pioreactor
}
client_custom.publish(f"pioreactor/control", json.dumps(payload))
else:
print("Invalid state")
def led1fn(led1, host, port, username, password, pioreactor, experiment):
global client_custom
if client_custom is None:
custom_client(host, port, username, password)
payload = {
"command": "set_led_intensity",
"experiment": experiment,
"reactor": pioreactor,
"led": "A",
"brightness": led1
}
client_custom.publish(f"pioreactor/control", json.dumps(payload))
def led2fn(led2, host, port, username, password, pioreactor, experiment):
global client_custom
if client_custom is None:
custom_client(host, port, username, password)
payload = {
"command": "set_led_intensity",
"experiment": experiment,
"reactor": pioreactor,
"led": "B",
"brightness": led2
}
client_custom.publish(f"pioreactor/control", json.dumps(payload))
def led3fn(led3, host, port, username, password, pioreactor, experiment):
global client_custom
if client_custom is None:
custom_client(host, port, username, password)
payload = {
"command": "set_led_intensity",
"experiment": experiment,
"reactor": pioreactor,
"led": "C",
"brightness": led3
}
client_custom.publish(f"pioreactor/control", json.dumps(payload))
def led4fn(led4, host, port, username, password, pioreactor, experiment):
global client_custom
if client_custom is None:
custom_client(host, port, username, password)
payload = {
"command": "set_led_intensity",
"experiment": experiment,
"reactor": pioreactor,
"led": "D",
"brightness": led4
}
client_custom.publish(f"pioreactor/control", json.dumps(payload))
def add_media(media, host, port, username, password, pioreactor, experiment):
global client_custom
if client_custom is None:
custom_client(host, port, username, password)
payload = {
"command": "pump_add_media",
"experiment": experiment,
"reactor": pioreactor,
"volume": media
}
client_custom.publish(f"pioreactor/control", json.dumps(payload))
def remove_waste(waste, host, port, username, password, pioreactor, experiment):
global client_custom
if client_custom is None:
custom_client(host, port, username, password)
payload = {
"command": "pump_remove_media",
"experiment": experiment,
"reactor": pioreactor,
"volume": waste
}
client_custom.publish(f"pioreactor/control", json.dumps(payload))
def cycle_media(cycle, host, port, username, password, pioreactor, experiment):
global client_custom
if client_custom is None:
custom_client(host, port, username, password)
payload = {
"command": "circulate_media",
"experiment": experiment,
"reactor": pioreactor,
"duration": cycle
}
client_custom.publish(f"pioreactor/control", json.dumps(payload))
def void_client():
global client_custom
if client_custom is not None:
client_custom.disconnect()
client_custom = None
print("Client disconnected")
def stirring_default(rpm, experiment, state):
global client
if state == "start":
payload = {
"command": "start_stirring",
"experiment": experiment,
"reactor": PIOREACTOR,
"rpm": rpm
}
client.publish(f"pioreactor/control", json.dumps(payload))
elif state == "stop":
payload = {
"command": "stop_stirring",
"experiment": experiment,
"reactor": PIOREACTOR
}
client.publish(f"pioreactor/control", json.dumps(payload))
elif state == "update":
payload = {
"command": "update_stirring_rpm",
"experiment": experiment,
"reactor": PIOREACTOR,
"rpm": rpm
}
client.publish(f"pioreactor/control", json.dumps(payload))
else:
print("Invalid state")
def temp_automation_default(temperature, experiment, state, option):
payload_options = "thermostat" if option == "Heat To Temp" else "only_record_temperature"
global client
if state == "start":
payload = {
"command": "set_temperature_automation",
"experiment": experiment,
"reactor": PIOREACTOR,
"temp": temperature,
"automation": payload_options
}
client.publish(f"pioreactor/control", json.dumps(payload))
elif state == "stop":
payload = {
"command": "temp_update",
"experiment": experiment,
"reactor": PIOREACTOR,
"settings": {
"$state": "disconnected"
}
}
client.publish(f"pioreactor/control", json.dumps(payload))
elif state == "update":
payload = {
"command": "temp_update",
"experiment": experiment,
"reactor": PIOREACTOR,
"settings": {
"target_temperature": temperature
}
}
client.publish(f"pioreactor/control", json.dumps(payload))
elif state == "restart":
payload = {
"command": "temp_restart",
"experiment": experiment,
"reactor": PIOREACTOR,
"automation": payload_options,
"temp": temperature
}
client.publish(f"pioreactor/control", json.dumps(payload))
else:
print("Invalid state")
def od_default(experiment, state):
global client
if state == "start":
payload = {
"command": "start_od_reading",
"experiment": experiment,
"reactor": PIOREACTOR
}
client.publish(f"pioreactor/control", json.dumps(payload))
elif state == "stop":
payload = {
"command": "stop_od_reading",
"experiment": experiment,
"reactor": PIOREACTOR
}
client.publish(f"pioreactor/control", json.dumps(payload))
else:
print("Invalid state")
def grf_default(experiment, state):
global client
if state == "start":
payload = {
"command": "start_growth_rate",
"experiment": experiment,
"reactor": PIOREACTOR
}
client.publish(f"pioreactor/control", json.dumps(payload))
elif state == "stop":
payload = {
"command": "stop_growth_rate",
"experiment": experiment,
"reactor": PIOREACTOR
}
client.publish(f"pioreactor/control", json.dumps(payload))
else:
print("Invalid state")
def led1fn_default(led1, experiment):
global client
payload = {
"command": "set_led_intensity",
"experiment": experiment,
"reactor": PIOREACTOR,
"led": "A",
"brightness": led1
}
client.publish(f"pioreactor/control", json.dumps(payload))
def led2fn_default(led2, experiment):
global client
payload = {
"command": "set_led_intensity",
"experiment": experiment,
"reactor": PIOREACTOR,
"led": "B",
"brightness": led2
}
client.publish(f"pioreactor/control", json.dumps(payload))
def led3fn_default(led3, experiment):
global client
payload = {
"command": "set_led_intensity",
"experiment": experiment,
"reactor": PIOREACTOR,
"led": "C",
"brightness": led3
}
client.publish(f"pioreactor/control", json.dumps(payload))
def led4fn_default(led4, experiment):
global client
payload = {
"command": "set_led_intensity",
"experiment": experiment,
"reactor": PIOREACTOR,
"led": "D",
"brightness": led4
}
client.publish(f"pioreactor/control", json.dumps(payload))
def add_media_default(media, experiment):
global client
payload = {
"command": "pump_add_media",
"experiment": experiment,
"reactor": PIOREACTOR,
"volume": media
}
client.publish(f"pioreactor/control", json.dumps(payload))
def remove_waste_default(waste, experiment):
global client
payload = {
"command": "pump_remove_media",
"experiment": experiment,
"reactor": PIOREACTOR,
"volume": waste
}
client.publish(f"pioreactor/control", json.dumps(payload))
def cycle_media_default(cycle, experiment):
global client
payload = {
"command": "circulate_media",
"experiment": experiment,
"reactor": PIOREACTOR,
"duration": cycle
}
client.publish(f"pioreactor/control", json.dumps(payload))
def get_status(host, port, username, password, pioreactor, exp):
global client_custom
if client_custom is None:
custom_client(host, port, username, password)
client_custom.subscribe(f"pioreactor/{pioreactor}/worker")
client_custom.on_message = on_message_worker
global experiment
experiment = None
payload = {
"command": "get_worker",
"experiment": exp,
"reactor": pioreactor
}
client_custom.publish(f"pioreactor/control", json.dumps(payload))
timeout = 10
start = time.time()
while experiment is None and (time.time() - start) < timeout:
time.sleep(0.1)
client_custom.unsubscribe(f"pioreactor/{pioreactor}/worker")
return experiment, running, temp_mqtt_auto, rpm_mqtt, led_mqtt, experiments
def get_status_default(exp):
global client
client.subscribe(f"pioreactor/{PIOREACTOR}/worker")
client.on_message = on_message_worker
global experiment
experiment = None
payload = {
"command": "get_worker",
"experiment": exp,
"reactor": PIOREACTOR
}
client.publish(f"pioreactor/control", json.dumps(payload))
timeout = 10
start = time.time()
while experiment is None and (time.time() - start) < timeout:
time.sleep(0.1)
client.unsubscribe(f"pioreactor/{PIOREACTOR}/worker")
return experiment, running, temp_mqtt_auto, rpm_mqtt, led_mqtt, experiments
# Define the interface components
with gr.Blocks() as demo:
with gr.Tab("Default"):
experiment_input = gr.Textbox(label="Experiment", value="Ed")
experiment_input.input(fn=void_client)
client = create_client(HOST, PORT, USERNAME, PASSWORD)
with gr.Blocks():
gr.Markdown("# Status")
get_status = gr.Button("Get Status")
with gr.Row():
experiment_output = gr.Textbox(label="Experiment", interactive=False)
running_output = gr.Textbox(label="Running", interactive=False)
with gr.Row():
temp_output = gr.Textbox(label="Temperature Automation", interactive=False)
rpm_output = gr.Textbox(label="RPM", interactive=False)
led_output = gr.Textbox(label="LEDs", interactive=False)
experiments_output = gr.Textbox(label="Experiments", interactive=False)
get_status.click(
fn=get_status_default,
inputs=[experiment_input],
outputs=[experiment_output, running_output, temp_output, rpm_output, led_output, experiments_output]
)
with gr.Blocks():
gr.Markdown("# Stirring")
rpm = gr.Slider(minimum=0, maximum=2000, step=50, label="RPM")
with gr.Row():
stir_state = gr.Radio(choices=["start", "stop", "update"], label="State")
st = gr.Button("Send Command")
st.click(
fn=stirring_default,
inputs=[rpm, experiment_input, stir_state]
)
with gr.Blocks():
gr.Markdown("# Temperature Automation")
# Dropdown for selecting automation type
temp_option = gr.Dropdown(
choices=["Record Temp Only", "Heat To Temp"],
label="Temperature Automation",
value="Record Temp Only"
)
# Slider for temperature (initially hidden)
temperature_slider = gr.Slider(minimum=0, maximum=60, step=1, label="Temperature", visible=False)
# Button to start automation
with gr.Row():
temp_state = gr.Radio(choices=["start", "stop", "update", "restart"], label="State")
temp = gr.Button("Send Command")
# Update visibility of the slider based on dropdown selection
temp_option.change(
fn=toggle_temperature_input,
inputs=temp_option,
outputs=temperature_slider
)
temp.click(
fn=temp_automation_default,
inputs=[temperature_slider, experiment_input, temp_state, temp_option]
)
with gr.Blocks():
with gr.Row():
with gr.Column():
gr.Markdown("# OD Reading")
od_state = gr.Radio(choices=["start", "stop"], label="State")
od_button = gr.Button("Send Command")
od_button.click(
fn=od_default,
inputs=[experiment_input, od_state]
)
with gr.Column():
gr.Markdown("# Growth Rate")
gr_state = gr.Radio(choices=["start", "stop"], label="State")
gr_button = gr.Button("Send Command")
gr_button.click(
fn=grf_default,
inputs=[experiment_input, gr_state]
)
with gr.Blocks():
gr.Markdown("# LEDS")
with gr.Row():
with gr.Column():
gr.Markdown("### Channel A")
led1 = gr.Slider(minimum=0, maximum=100, step=1, label="Intensity")
led1_button = gr.Button("Send Command")
led1_button.click(
fn=led1fn_default,
inputs=[led1, experiment_input]
)
with gr.Column():
gr.Markdown("### Channel B")
led2 = gr.Slider(minimum=0, maximum=100, step=1, label="Intensity")
led2_button = gr.Button("Send Command")
led2_button.click(
fn=led2fn_default,
inputs=[led2, experiment_input]
)
with gr.Column():
gr.Markdown("### Channel C")
led3 = gr.Slider(minimum=0, maximum=100, step=1, label="Intensity")
led3_button = gr.Button("Send Command")
led3_button.click(
fn=led3fn_default,
inputs=[led3, experiment_input]
)
with gr.Column():
gr.Markdown("### Channel D")
led4 = gr.Slider(minimum=0, maximum=100, step=1, label="Intensity")
led4_button = gr.Button("Send Command")
led4_button.click(
fn=led4fn_default,
inputs=[led4, experiment_input]
)
with gr.Blocks():
gr.Markdown("# Dosing")
with gr.Row():
with gr.Column():
gr.Markdown("### Add Media")
add_media_ml = gr.Number(label="Media (mL)", step=1, minimum=0, maximum=14)
add_media_button = gr.Button("Send Command")
add_media_button.click(
fn=add_media_default,
inputs=[add_media_ml, experiment_input]
)
with gr.Column():
gr.Markdown("### Remove Waste")
remove_waste_ml = gr.Number(label="Waste (mL)", step=1, minimum=0, maximum=20)
remove_waste_button = gr.Button("Send Command")
remove_waste_button.click(
fn=remove_waste_default,
inputs=[remove_waste_ml, experiment_input]
)
with gr.Column():
gr.Markdown("### Cycle Media")
cycle_media_sec = gr.Number(label="Cycle (s)", step=1, minimum=0, maximum=60)
cycle_media_button = gr.Button("Send Command")
cycle_media_button.click(
fn=cycle_media_default,
inputs=[cycle_media_sec, experiment_input]
)
with gr.Tab("Custom"):
# Input components
with gr.Row():
host_input = gr.Textbox(label="Host")
port_input = gr.Number(label="Port")
host_input.input(fn=void_client)
port_input.input(fn=void_client)
with gr.Row():
username_input = gr.Textbox(label="Username")
password_input = gr.Textbox(label="Password")
pioreactor_input = gr.Textbox(label="Pioreactor")
experiment_input = gr.Textbox(label="Experiment")
username_input.input(fn=void_client)
password_input.input(fn=void_client)
pioreactor_input.input(fn=void_client)
experiment_input.input(fn=void_client)
with gr.Blocks():
gr.Markdown("# Status")
get_status = gr.Button("Get Status")
with gr.Row():
experiment_output = gr.Textbox(label="Experiment", interactive=False)
running_output = gr.Textbox(label="Running", interactive=False)
with gr.Row():
temp_output = gr.Textbox(label="Temperature Automation", interactive=False)
rpm_output = gr.Textbox(label="RPM", interactive=False)
led_output = gr.Textbox(label="LEDs", interactive=False)
experiments_output = gr.Textbox(label="Experiments", interactive=False)
get_status.click(
fn=get_status_default,
inputs=[experiment_input],
outputs=[experiment_output, running_output, temp_output, rpm_output, led_output, experiments_output]
)
with gr.Blocks():
gr.Markdown("# Stirring")
rpm_input = gr.Slider(minimum=0, maximum=2000, step=50, label="RPM")
with gr.Row():
stir_state = gr.Radio(choices=["start", "stop", "update"], label="State")
st = gr.Button("Send Command")
st.click(
fn=stirring,
inputs=[rpm_input, host_input, port_input, username_input, password_input, pioreactor_input, experiment_input, stir_state]
)
with gr.Blocks():
gr.Markdown("# Temperature Automation")
# Dropdown for selecting automation type
temp_option = gr.Dropdown(
choices=["Record Temp Only", "Heat To Temp"],
label="Temperature Automation",
value="Record Temp Only"
)
# Slider for temperature (initially hidden)
temperature_slider = gr.Slider(minimum=0, maximum=60, step=1, label="Temperature", visible=False)
# Button to start automation
with gr.Row():
temp_state = gr.Radio(choices=["start", "stop", "update", "restart"], label="State")
temp = gr.Button("Send Command")
# Update visibility of the slider based on dropdown selection
temp_option.change(
fn=toggle_temperature_input,
inputs=temp_option,
outputs=temperature_slider
)
temp.click(
fn=temp_automation,
inputs=[temperature_slider, host_input, port_input, username_input, password_input, pioreactor_input, experiment_input, temp_state, temp_option]
)
with gr.Blocks():
with gr.Row():
with gr.Column():
gr.Markdown("# OD Reading")
od_state = gr.Radio(choices=["start", "stop"], label="State")
od_button = gr.Button("Send Command")
od_button.click(
fn=od,
inputs=[host_input, port_input, username_input, password_input, pioreactor_input, experiment_input, od_state]
)
with gr.Column():
gr.Markdown("# Growth Rate")
gr_state = gr.Radio(choices=["start", "stop"], label="State")
gr_button = gr.Button("Send Command")
gr_button.click(
fn=grf,
inputs=[host_input, port_input, username_input, password_input, pioreactor_input, experiment_input, gr_state]
)
with gr.Blocks():
gr.Markdown("# LEDS")
with gr.Row(): # Row for all channels
with gr.Column():
gr.Markdown("### Channel A")
led1 = gr.Slider(minimum=0, maximum=100, step=1, label="Intensity")
led1_button = gr.Button("Send Command")
led1_button.click(
fn=led1fn,
inputs=[led1, host_input, port_input, username_input, password_input, pioreactor_input, experiment_input]
)
with gr.Column():
gr.Markdown("### Channel B")
led2 = gr.Slider(minimum=0, maximum=100, step=1, label="Intensity")
led2_button = gr.Button("Send Command")
led2_button.click(
fn=led2fn,
inputs=[led2, host_input, port_input, username_input, password_input, pioreactor_input, experiment_input]
)
with gr.Column():
gr.Markdown("### Channel C")
led3 = gr.Slider(minimum=0, maximum=100, step=1, label="Intensity")
led3_button = gr.Button("Send Command")
led3_button.click(
fn=led3fn,
inputs=[led3, host_input, port_input, username_input, password_input, pioreactor_input, experiment_input]
)
with gr.Column():
gr.Markdown("### Channel D")
led4 = gr.Slider(minimum=0, maximum=100, step=1, label="Intensity")
led4_button = gr.Button("Send Command")
led4_button.click(
fn=led4fn,
inputs=[led4, host_input, port_input, username_input, password_input, pioreactor_input, experiment_input]
)
with gr.Blocks():
gr.Markdown("# Dosing")
with gr.Row():
with gr.Column():
gr.Markdown("### Add Media")
add_media_ml = gr.Number(label="Media (mL)", step=1, minimum=0, maximum=14)
add_media_button = gr.Button("Send Command")
add_media_button.click(
fn=add_media,
inputs=[add_media_ml, host_input, port_input, username_input, password_input, pioreactor_input, experiment_input]
)
with gr.Column():
gr.Markdown("### Remove Waste")
remove_waste_ml = gr.Number(label="Waste (mL)", step=1, minimum=0, maximum=20)
remove_waste_button = gr.Button("Send Command")
remove_waste_button.click(
fn=remove_waste,
inputs=[remove_waste_ml, host_input, port_input, username_input, password_input, pioreactor_input, experiment_input]
)
with gr.Column():
gr.Markdown("### Cycle Media")
cycle_media_sec = gr.Number(label="Cycle (s)", step=1, minimum=0, maximum=60)
cycle_media_button = gr.Button("Send Command")
cycle_media_button.click(
fn=cycle_media,
inputs=[cycle_media_sec, host_input, port_input, username_input, password_input, pioreactor_input, experiment_input]
)
# Launch the interface
demo.launch()