|
import gradio as gr |
|
import paho.mqtt.client as mqtt |
|
import os |
|
import json |
|
import time |
|
|
|
HOST = os.environ.get("host") |
|
PORT = int(os.environ.get("port")) |
|
USERNAME = os.environ.get("username") |
|
PASSWORD = os.environ.get("password") |
|
PIOREACTOR = os.environ.get("pioreactor") |
|
|
|
|
|
HOST_gr = gr.Textbox(label="Host", value=HOST, visible=False) |
|
PORT_gr = gr.Number(label="Port", value=PORT, visible=False) |
|
USERNAME_gr = gr.Textbox(label="Username", value=USERNAME, visible=False) |
|
PASSWORD_gr = gr.Textbox(label="Password", value=PASSWORD, visible=False) |
|
PIOREACTOR_gr = gr.Textbox(label="Pioreactor", value=PIOREACTOR, visible=False) |
|
|
|
client = None |
|
client_custom = None |
|
|
|
|
|
def on_connect(client, userdata, flags, rc): |
|
print("Connected with result code "+str(rc)) |
|
|
|
def create_client(host, port, username, password): |
|
print(host, port, username, password) |
|
client = mqtt.Client() |
|
client.username_pw_set(username, password) |
|
client.tls_set(tls_version=mqtt.ssl.PROTOCOL_TLS) |
|
client.on_connect = on_connect |
|
client.connect(host, port) |
|
client.loop_start() |
|
return client |
|
|
|
def on_message_worker(client, userdata, message): |
|
payload = message.payload.decode("utf-8") |
|
data = json.loads(payload) |
|
global experiment |
|
global running |
|
global temp_mqtt_auto |
|
global rpm_mqtt |
|
global led_mqtt |
|
global experiments |
|
experiment = data.get("experiment", None) |
|
running = data.get("running", []) |
|
temp_mqtt_auto = data.get("temperature_automation", None) |
|
rpm_mqtt = data.get("stirring", None) |
|
led_mqtt = data.get("leds", None) |
|
experiments = data.get("experiments", []) |
|
|
|
|
|
def store_client(): |
|
global client |
|
global HOST |
|
global PORT |
|
global USERNAME |
|
global PASSWORD |
|
client = create_client(HOST, PORT, USERNAME, PASSWORD) |
|
|
|
def custom_client(host, port, username, password): |
|
global client_custom |
|
client_custom = create_client(host, port, username, password) |
|
|
|
def stirring(rpm, host, port, username, password, pioreactor, experiment, state): |
|
global client_custom |
|
if client_custom is None: |
|
custom_client(host, port, username, password) |
|
|
|
if state == "start": |
|
payload = { |
|
"command": "start_stirring", |
|
"experiment": experiment, |
|
"reactor": pioreactor, |
|
"rpm": rpm |
|
} |
|
client_custom.publish(f"pioreactor/control", json.dumps(payload)) |
|
elif state == "stop": |
|
payload = { |
|
"command": "stop_stirring", |
|
"experiment": experiment, |
|
"reactor": pioreactor |
|
} |
|
client_custom.publish(f"pioreactor/control", json.dumps(payload)) |
|
elif state == "update": |
|
payload = { |
|
"command": "update_stirring_rpm", |
|
"experiment": experiment, |
|
"reactor": pioreactor, |
|
"rpm": rpm |
|
} |
|
client_custom.publish(f"pioreactor/control", json.dumps(payload)) |
|
else: |
|
print("Invalid state") |
|
|
|
def toggle_temperature_input(selected_option): |
|
|
|
return gr.update(visible=selected_option == "Heat To Temp") |
|
|
|
def temp_automation(temperature, host, port, username, password, pioreactor, experiment, state, option): |
|
global client_custom |
|
if client_custom is None: |
|
custom_client(host, port, username, password) |
|
|
|
payload_options = "thermostat" if option == "Heat To Temp" else "only_record_temperature" |
|
|
|
if state == "start": |
|
payload = { |
|
"command": "set_temperature_automation", |
|
"experiment": experiment, |
|
"reactor": pioreactor, |
|
"temp": temperature, |
|
"automation": payload_options |
|
} |
|
client_custom.publish(f"pioreactor/control", json.dumps(payload)) |
|
elif state == "stop": |
|
payload = { |
|
"command": "temp_update", |
|
"experiment": experiment, |
|
"reactor": pioreactor, |
|
"settings": { |
|
"$state": "disconnected" |
|
} |
|
} |
|
client_custom.publish(f"pioreactor/control", json.dumps(payload)) |
|
elif state == "update": |
|
payload = { |
|
"command": "temp_update", |
|
"experiment": experiment, |
|
"reactor": pioreactor, |
|
"settings": { |
|
"target_temperature": temperature |
|
} |
|
} |
|
client_custom.publish(f"pioreactor/control", json.dumps(payload)) |
|
elif state == "restart": |
|
payload = { |
|
"command": "temp_restart", |
|
"experiment": experiment, |
|
"reactor": pioreactor, |
|
"automation": payload_options, |
|
"temp": temperature |
|
} |
|
client_custom.publish(f"pioreactor/control", json.dumps(payload)) |
|
else: |
|
print("Invalid state") |
|
|
|
def od(host, port, username, password, pioreactor, experiment, state): |
|
global client_custom |
|
if client_custom is None: |
|
custom_client(host, port, username, password) |
|
|
|
if state == "start": |
|
payload = { |
|
"command": "start_od_reading", |
|
"experiment": experiment, |
|
"reactor": pioreactor |
|
} |
|
client_custom.publish(f"pioreactor/control", json.dumps(payload)) |
|
elif state == "stop": |
|
payload = { |
|
"command": "stop_od_reading", |
|
"experiment": experiment, |
|
"reactor": pioreactor |
|
} |
|
client_custom.publish(f"pioreactor/control", json.dumps(payload)) |
|
else: |
|
print("Invalid state") |
|
|
|
def grf(host, port, username, password, pioreactor, experiment, state): |
|
global client_custom |
|
if client_custom is None: |
|
custom_client(host, port, username, password) |
|
|
|
if state == "start": |
|
payload = { |
|
"command": "start_growth_rate", |
|
"experiment": experiment, |
|
"reactor": pioreactor |
|
} |
|
client_custom.publish(f"pioreactor/control", json.dumps(payload)) |
|
elif state == "stop": |
|
payload = { |
|
"command": "stop_growth_rate", |
|
"experiment": experiment, |
|
"reactor": pioreactor |
|
} |
|
client_custom.publish(f"pioreactor/control", json.dumps(payload)) |
|
else: |
|
print("Invalid state") |
|
|
|
def led1fn(led1, host, port, username, password, pioreactor, experiment): |
|
global client_custom |
|
if client_custom is None: |
|
custom_client(host, port, username, password) |
|
|
|
payload = { |
|
"command": "set_led_intensity", |
|
"experiment": experiment, |
|
"reactor": pioreactor, |
|
"led": "A", |
|
"brightness": led1 |
|
} |
|
client_custom.publish(f"pioreactor/control", json.dumps(payload)) |
|
|
|
def led2fn(led2, host, port, username, password, pioreactor, experiment): |
|
global client_custom |
|
if client_custom is None: |
|
custom_client(host, port, username, password) |
|
|
|
payload = { |
|
"command": "set_led_intensity", |
|
"experiment": experiment, |
|
"reactor": pioreactor, |
|
"led": "B", |
|
"brightness": led2 |
|
} |
|
client_custom.publish(f"pioreactor/control", json.dumps(payload)) |
|
|
|
def led3fn(led3, host, port, username, password, pioreactor, experiment): |
|
global client_custom |
|
if client_custom is None: |
|
custom_client(host, port, username, password) |
|
|
|
payload = { |
|
"command": "set_led_intensity", |
|
"experiment": experiment, |
|
"reactor": pioreactor, |
|
"led": "C", |
|
"brightness": led3 |
|
} |
|
client_custom.publish(f"pioreactor/control", json.dumps(payload)) |
|
|
|
def led4fn(led4, host, port, username, password, pioreactor, experiment): |
|
global client_custom |
|
if client_custom is None: |
|
custom_client(host, port, username, password) |
|
|
|
payload = { |
|
"command": "set_led_intensity", |
|
"experiment": experiment, |
|
"reactor": pioreactor, |
|
"led": "D", |
|
"brightness": led4 |
|
} |
|
client_custom.publish(f"pioreactor/control", json.dumps(payload)) |
|
|
|
def add_media(media, host, port, username, password, pioreactor, experiment): |
|
global client_custom |
|
if client_custom is None: |
|
custom_client(host, port, username, password) |
|
|
|
payload = { |
|
"command": "pump_add_media", |
|
"experiment": experiment, |
|
"reactor": pioreactor, |
|
"volume": media |
|
} |
|
client_custom.publish(f"pioreactor/control", json.dumps(payload)) |
|
|
|
def remove_waste(waste, host, port, username, password, pioreactor, experiment): |
|
global client_custom |
|
if client_custom is None: |
|
custom_client(host, port, username, password) |
|
|
|
payload = { |
|
"command": "pump_remove_media", |
|
"experiment": experiment, |
|
"reactor": pioreactor, |
|
"volume": waste |
|
} |
|
client_custom.publish(f"pioreactor/control", json.dumps(payload)) |
|
|
|
def cycle_media(cycle, host, port, username, password, pioreactor, experiment): |
|
global client_custom |
|
if client_custom is None: |
|
custom_client(host, port, username, password) |
|
|
|
payload = { |
|
"command": "circulate_media", |
|
"experiment": experiment, |
|
"reactor": pioreactor, |
|
"duration": cycle |
|
} |
|
client_custom.publish(f"pioreactor/control", json.dumps(payload)) |
|
|
|
def void_client(): |
|
global client_custom |
|
if client_custom is not None: |
|
client_custom.disconnect() |
|
client_custom = None |
|
print("Client disconnected") |
|
|
|
|
|
with gr.Blocks() as demo: |
|
with gr.Tab("Default"): |
|
experiment_input = gr.Textbox(label="Experiment", value="Ed") |
|
experiment_input.input(fn=void_client) |
|
|
|
with gr.Blocks(): |
|
gr.Markdown("# Stirring") |
|
rpm = gr.Slider(minimum=0, maximum=2000, step=50, label="RPM") |
|
|
|
with gr.Row(): |
|
stir_state = gr.Radio(choices=["start", "stop", "update"], label="State") |
|
|
|
st = gr.Button("Send Command") |
|
|
|
st.click( |
|
fn=stirring, |
|
inputs=[rpm, HOST_gr, PORT_gr, USERNAME_gr, PASSWORD_gr, PIOREACTOR_gr, experiment_input, stir_state] |
|
) |
|
|
|
with gr.Blocks(): |
|
gr.Markdown("# Temperature Automation") |
|
|
|
|
|
temp_option = gr.Dropdown( |
|
choices=["Record Temp Only", "Heat To Temp"], |
|
label="Temperature Automation", |
|
value="Record Temp Only" |
|
) |
|
|
|
|
|
temperature_slider = gr.Slider(minimum=0, maximum=60, step=1, label="Temperature", visible=False) |
|
|
|
|
|
with gr.Row(): |
|
temp_state = gr.Radio(choices=["start", "stop", "update", "restart"], label="State") |
|
temp = gr.Button("Send Command") |
|
|
|
|
|
temp_option.change( |
|
fn=toggle_temperature_input, |
|
inputs=temp_option, |
|
outputs=temperature_slider |
|
) |
|
|
|
temp.click( |
|
fn=temp_automation, |
|
inputs=[temperature_slider, HOST_gr, PORT_gr, USERNAME_gr, PASSWORD_gr, PIOREACTOR_gr, experiment_input, temp_state, temp_option] |
|
) |
|
|
|
|
|
|
|
with gr.Tab("Custom"): |
|
|
|
with gr.Row(): |
|
host_input = gr.Textbox(label="Host") |
|
port_input = gr.Number(label="Port") |
|
|
|
host_input.input(fn=void_client) |
|
port_input.input(fn=void_client) |
|
|
|
with gr.Row(): |
|
username_input = gr.Textbox(label="Username") |
|
password_input = gr.Textbox(label="Password") |
|
pioreactor_input = gr.Textbox(label="Pioreactor") |
|
experiment_input = gr.Textbox(label="Experiment") |
|
|
|
username_input.input(fn=void_client) |
|
password_input.input(fn=void_client) |
|
pioreactor_input.input(fn=void_client) |
|
experiment_input.input(fn=void_client) |
|
|
|
with gr.Blocks(): |
|
gr.Markdown("# Stirring") |
|
rpm_input = gr.Slider(minimum=0, maximum=2000, step=50, label="RPM") |
|
|
|
with gr.Row(): |
|
stir_state = gr.Radio(choices=["start", "stop", "update"], label="State") |
|
|
|
st = gr.Button("Send Command") |
|
|
|
st.click( |
|
fn=stirring, |
|
inputs=[rpm_input, host_input, port_input, username_input, password_input, pioreactor_input, experiment_input, stir_state] |
|
) |
|
|
|
with gr.Blocks(): |
|
gr.Markdown("# Temperature Automation") |
|
|
|
|
|
temp_option = gr.Dropdown( |
|
choices=["Record Temp Only", "Heat To Temp"], |
|
label="Temperature Automation", |
|
value="Record Temp Only" |
|
) |
|
|
|
|
|
temperature_slider = gr.Slider(minimum=0, maximum=60, step=1, label="Temperature", visible=False) |
|
|
|
|
|
with gr.Row(): |
|
temp_state = gr.Radio(choices=["start", "stop", "update", "restart"], label="State") |
|
temp = gr.Button("Send Command") |
|
|
|
|
|
temp_option.change( |
|
fn=toggle_temperature_input, |
|
inputs=temp_option, |
|
outputs=temperature_slider |
|
) |
|
|
|
temp.click( |
|
fn=temp_automation, |
|
inputs=[temperature_slider, host_input, port_input, username_input, password_input, pioreactor_input, experiment_input, temp_state, temp_option] |
|
) |
|
|
|
with gr.Blocks(): |
|
with gr.Row(): |
|
with gr.Column(): |
|
gr.Markdown("# OD Reading") |
|
od_state = gr.Radio(choices=["start", "stop"], label="State") |
|
od_button = gr.Button("Send Command") |
|
|
|
od_button.click( |
|
fn=od, |
|
inputs=[host_input, port_input, username_input, password_input, pioreactor_input, experiment_input, od_state] |
|
) |
|
|
|
with gr.Column(): |
|
gr.Markdown("# Growth Rate") |
|
gr_state = gr.Radio(choices=["start", "stop"], label="State") |
|
gr_button = gr.Button("Send Command") |
|
|
|
gr_button.click( |
|
fn=grf, |
|
inputs=[host_input, port_input, username_input, password_input, pioreactor_input, experiment_input, gr_state] |
|
) |
|
|
|
with gr.Blocks(): |
|
gr.Markdown("# LEDS") |
|
|
|
with gr.Row(): |
|
with gr.Column(): |
|
gr.Markdown("### Channel A") |
|
led1 = gr.Slider(minimum=0, maximum=100, step=1, label="Intensity") |
|
led1_button = gr.Button("Send Command") |
|
|
|
led1_button.click( |
|
fn=led1fn, |
|
inputs=[led1, host_input, port_input, username_input, password_input, pioreactor_input, experiment_input] |
|
) |
|
|
|
with gr.Column(): |
|
gr.Markdown("### Channel B") |
|
led2 = gr.Slider(minimum=0, maximum=100, step=1, label="Intensity") |
|
led2_button = gr.Button("Send Command") |
|
|
|
led2_button.click( |
|
fn=led2fn, |
|
inputs=[led2, host_input, port_input, username_input, password_input, pioreactor_input, experiment_input] |
|
) |
|
|
|
with gr.Column(): |
|
gr.Markdown("### Channel C") |
|
led3 = gr.Slider(minimum=0, maximum=100, step=1, label="Intensity") |
|
led3_button = gr.Button("Send Command") |
|
|
|
led3_button.click( |
|
fn=led3fn, |
|
inputs=[led3, host_input, port_input, username_input, password_input, pioreactor_input, experiment_input] |
|
) |
|
|
|
with gr.Column(): |
|
gr.Markdown("### Channel D") |
|
led4 = gr.Slider(minimum=0, maximum=100, step=1, label="Intensity") |
|
led4_button = gr.Button("Send Command") |
|
|
|
led4_button.click( |
|
fn=led4fn, |
|
inputs=[led4, host_input, port_input, username_input, password_input, pioreactor_input, experiment_input] |
|
) |
|
|
|
with gr.Blocks(): |
|
gr.Markdown("# Dosing") |
|
|
|
with gr.Row(): |
|
with gr.Column(): |
|
gr.Markdown("### Add Media") |
|
|
|
add_media_ml = gr.Number(label="Media (mL)", step=1, minimum=0, maximum=14) |
|
add_media_button = gr.Button("Send Command") |
|
|
|
add_media_button.click( |
|
fn=add_media, |
|
inputs=[add_media_ml, host_input, port_input, username_input, password_input, pioreactor_input, experiment_input] |
|
) |
|
|
|
with gr.Column(): |
|
gr.Markdown("### Remove Waste") |
|
|
|
remove_waste_ml = gr.Number(label="Waste (mL)", step=1, minimum=0, maximum=20) |
|
remove_waste_button = gr.Button("Send Command") |
|
|
|
remove_waste_button.click( |
|
fn=remove_waste, |
|
inputs=[remove_waste_ml, host_input, port_input, username_input, password_input, pioreactor_input, experiment_input] |
|
) |
|
|
|
with gr.Column(): |
|
gr.Markdown("### Cycle Media") |
|
|
|
cycle_media_sec = gr.Number(label="Cycle (s)", step=1, minimum=0, maximum=60) |
|
cycle_media_button = gr.Button("Send Command") |
|
|
|
cycle_media_button.click( |
|
fn=cycle_media, |
|
inputs=[cycle_media_sec, host_input, port_input, username_input, password_input, pioreactor_input, experiment_input] |
|
) |
|
|
|
|
|
|
|
|
|
demo.launch() |