linx5o's picture
remove unused show_image function and directly display Pioreactor image in the interface
69fee00
raw
history blame
43.8 kB
import gradio as gr
import paho.mqtt.client as mqtt
import os
import json
import time
import matplotlib.pyplot as plt
import numpy as np
from datetime import datetime
import pytz
import pandas as pd
import plotly.graph_objs as go
HOST = os.environ.get("host")
PORT = int(os.environ.get("port"))
USERNAME = os.environ.get("username")
PASSWORD = os.environ.get("password")
PIOREACTOR = os.environ.get("pioreactor")
client = None
client_custom = None
experiment = None
running = []
temp_mqtt_auto = None
rpm_mqtt = None
led_mqtt = None
experiments = []
temp_graph = None
od_graph = None
norm_od_graph = None
growth_rate_graph = None
def on_connect(client, userdata, flags, rc):
print("Connected with result code "+str(rc))
def create_client(host, port, username, password):
print(host, port, username, password)
client = mqtt.Client()
client.username_pw_set(username, password)
client.tls_set(tls_version=mqtt.ssl.PROTOCOL_TLS)
client.on_connect = on_connect
client.connect(host, port)
client.loop_start()
return client
def on_message_worker(client, userdata, message):
payload = message.payload.decode("utf-8")
data = json.loads(payload)
global experiment
global running
global temp_mqtt_auto
global rpm_mqtt
global led_mqtt
global experiments
experiment = data.get("experiment", None)
running = data.get("running", [])
temp_mqtt_auto = data.get("temperature_automation", None)
rpm_mqtt = data.get("stirring", None)
led_mqtt = data.get("leds", None)
experiments = data.get("experiments", [])
def store_client():
global client
global HOST
global PORT
global USERNAME
global PASSWORD
client = create_client(HOST, PORT, USERNAME, PASSWORD)
def custom_client(host, port, username, password):
global client_custom
client_custom = create_client(host, port, username, password)
def stirring(rpm, host, port, username, password, pioreactor, experiment, state):
global client_custom
if client_custom is None:
custom_client(host, port, username, password)
if state == "start":
payload = {
"command": "start_stirring",
"experiment": experiment,
"reactor": pioreactor,
"rpm": rpm
}
client_custom.publish(f"pioreactor/control", json.dumps(payload))
elif state == "stop":
payload = {
"command": "stop_stirring",
"experiment": experiment,
"reactor": pioreactor
}
client_custom.publish(f"pioreactor/control", json.dumps(payload))
elif state == "update":
payload = {
"command": "update_stirring_rpm",
"experiment": experiment,
"reactor": pioreactor,
"rpm": rpm
}
client_custom.publish(f"pioreactor/control", json.dumps(payload))
else:
print("Invalid state")
def toggle_temperature_input(selected_option):
# Show the temperature slider only if "Heat To Temp" is selected
return gr.update(visible=selected_option == "Heat To Temp")
def temp_automation(temperature, host, port, username, password, pioreactor, experiment, state, option):
global client_custom
if client_custom is None:
custom_client(host, port, username, password)
payload_options = "thermostat" if option == "Heat To Temp" else "only_record_temperature"
if state == "start":
payload = {
"command": "set_temperature_automation",
"experiment": experiment,
"reactor": pioreactor,
"temp": temperature,
"automation": payload_options
}
client_custom.publish(f"pioreactor/control", json.dumps(payload))
elif state == "stop":
payload = {
"command": "temp_update",
"experiment": experiment,
"reactor": pioreactor,
"settings": {
"$state": "disconnected"
}
}
client_custom.publish(f"pioreactor/control", json.dumps(payload))
elif state == "update":
payload = {
"command": "temp_update",
"experiment": experiment,
"reactor": pioreactor,
"settings": {
"target_temperature": temperature
}
}
client_custom.publish(f"pioreactor/control", json.dumps(payload))
elif state == "restart":
payload = {
"command": "temp_restart",
"experiment": experiment,
"reactor": pioreactor,
"automation": payload_options,
"temp": temperature
}
client_custom.publish(f"pioreactor/control", json.dumps(payload))
else:
print("Invalid state")
def od(host, port, username, password, pioreactor, experiment, state):
global client_custom
if client_custom is None:
custom_client(host, port, username, password)
if state == "start":
payload = {
"command": "start_od_reading",
"experiment": experiment,
"reactor": pioreactor
}
client_custom.publish(f"pioreactor/control", json.dumps(payload))
elif state == "stop":
payload = {
"command": "stop_od_reading",
"experiment": experiment,
"reactor": pioreactor
}
client_custom.publish(f"pioreactor/control", json.dumps(payload))
else:
print("Invalid state")
def grf(host, port, username, password, pioreactor, experiment, state):
global client_custom
if client_custom is None:
custom_client(host, port, username, password)
if state == "start":
payload = {
"command": "start_growth_rate",
"experiment": experiment,
"reactor": pioreactor
}
client_custom.publish(f"pioreactor/control", json.dumps(payload))
elif state == "stop":
payload = {
"command": "stop_growth_rate",
"experiment": experiment,
"reactor": pioreactor
}
client_custom.publish(f"pioreactor/control", json.dumps(payload))
else:
print("Invalid state")
def led1fn(led1, host, port, username, password, pioreactor, experiment):
global client_custom
if client_custom is None:
custom_client(host, port, username, password)
payload = {
"command": "set_led_intensity",
"experiment": experiment,
"reactor": pioreactor,
"led": "A",
"brightness": led1
}
client_custom.publish(f"pioreactor/control", json.dumps(payload))
def led2fn(led2, host, port, username, password, pioreactor, experiment):
global client_custom
if client_custom is None:
custom_client(host, port, username, password)
payload = {
"command": "set_led_intensity",
"experiment": experiment,
"reactor": pioreactor,
"led": "B",
"brightness": led2
}
client_custom.publish(f"pioreactor/control", json.dumps(payload))
def led3fn(led3, host, port, username, password, pioreactor, experiment):
global client_custom
if client_custom is None:
custom_client(host, port, username, password)
payload = {
"command": "set_led_intensity",
"experiment": experiment,
"reactor": pioreactor,
"led": "C",
"brightness": led3
}
client_custom.publish(f"pioreactor/control", json.dumps(payload))
def led4fn(led4, host, port, username, password, pioreactor, experiment):
global client_custom
if client_custom is None:
custom_client(host, port, username, password)
payload = {
"command": "set_led_intensity",
"experiment": experiment,
"reactor": pioreactor,
"led": "D",
"brightness": led4
}
client_custom.publish(f"pioreactor/control", json.dumps(payload))
def add_media(media, host, port, username, password, pioreactor, experiment):
global client_custom
if client_custom is None:
custom_client(host, port, username, password)
payload = {
"command": "pump_add_media",
"experiment": experiment,
"reactor": pioreactor,
"volume": media
}
client_custom.publish(f"pioreactor/control", json.dumps(payload))
def remove_waste(waste, host, port, username, password, pioreactor, experiment):
global client_custom
if client_custom is None:
custom_client(host, port, username, password)
payload = {
"command": "pump_remove_media",
"experiment": experiment,
"reactor": pioreactor,
"volume": waste
}
client_custom.publish(f"pioreactor/control", json.dumps(payload))
def cycle_media(cycle, host, port, username, password, pioreactor, experiment):
global client_custom
if client_custom is None:
custom_client(host, port, username, password)
payload = {
"command": "circulate_media",
"experiment": experiment,
"reactor": pioreactor,
"duration": cycle
}
client_custom.publish(f"pioreactor/control", json.dumps(payload))
def void_client():
global client_custom
if client_custom is not None:
client_custom.disconnect()
client_custom = None
print("Client disconnected")
def stirring_default(rpm, experiment, state):
global client
if state == "start":
payload = {
"command": "start_stirring",
"experiment": experiment,
"reactor": PIOREACTOR,
"rpm": rpm
}
client.publish(f"pioreactor/control", json.dumps(payload))
elif state == "stop":
payload = {
"command": "stop_stirring",
"experiment": experiment,
"reactor": PIOREACTOR
}
client.publish(f"pioreactor/control", json.dumps(payload))
elif state == "update":
payload = {
"command": "update_stirring_rpm",
"experiment": experiment,
"reactor": PIOREACTOR,
"rpm": rpm
}
client.publish(f"pioreactor/control", json.dumps(payload))
else:
print("Invalid state")
def temp_automation_default(temperature, experiment, state, option):
payload_options = "thermostat" if option == "Heat To Temp" else "only_record_temperature"
global client
if state == "start":
payload = {
"command": "set_temperature_automation",
"experiment": experiment,
"reactor": PIOREACTOR,
"temp": temperature,
"automation": payload_options
}
client.publish(f"pioreactor/control", json.dumps(payload))
elif state == "stop":
payload = {
"command": "temp_update",
"experiment": experiment,
"reactor": PIOREACTOR,
"settings": {
"$state": "disconnected"
}
}
client.publish(f"pioreactor/control", json.dumps(payload))
elif state == "update":
payload = {
"command": "temp_update",
"experiment": experiment,
"reactor": PIOREACTOR,
"settings": {
"target_temperature": temperature
}
}
client.publish(f"pioreactor/control", json.dumps(payload))
elif state == "restart":
payload = {
"command": "temp_restart",
"experiment": experiment,
"reactor": PIOREACTOR,
"automation": payload_options,
"temp": temperature
}
client.publish(f"pioreactor/control", json.dumps(payload))
else:
print("Invalid state")
def od_default(experiment, state):
global client
if state == "start":
payload = {
"command": "start_od_reading",
"experiment": experiment,
"reactor": PIOREACTOR
}
client.publish(f"pioreactor/control", json.dumps(payload))
elif state == "stop":
payload = {
"command": "stop_od_reading",
"experiment": experiment,
"reactor": PIOREACTOR
}
client.publish(f"pioreactor/control", json.dumps(payload))
else:
print("Invalid state")
def grf_default(experiment, state):
global client
if state == "start":
payload = {
"command": "start_growth_rate",
"experiment": experiment,
"reactor": PIOREACTOR
}
client.publish(f"pioreactor/control", json.dumps(payload))
elif state == "stop":
payload = {
"command": "stop_growth_rate",
"experiment": experiment,
"reactor": PIOREACTOR
}
client.publish(f"pioreactor/control", json.dumps(payload))
else:
print("Invalid state")
def led1fn_default(led1, experiment):
global client
payload = {
"command": "set_led_intensity",
"experiment": experiment,
"reactor": PIOREACTOR,
"led": "A",
"brightness": led1
}
client.publish(f"pioreactor/control", json.dumps(payload))
def led2fn_default(led2, experiment):
global client
payload = {
"command": "set_led_intensity",
"experiment": experiment,
"reactor": PIOREACTOR,
"led": "B",
"brightness": led2
}
client.publish(f"pioreactor/control", json.dumps(payload))
def led3fn_default(led3, experiment):
global client
payload = {
"command": "set_led_intensity",
"experiment": experiment,
"reactor": PIOREACTOR,
"led": "C",
"brightness": led3
}
client.publish(f"pioreactor/control", json.dumps(payload))
def led4fn_default(led4, experiment):
global client
payload = {
"command": "set_led_intensity",
"experiment": experiment,
"reactor": PIOREACTOR,
"led": "D",
"brightness": led4
}
client.publish(f"pioreactor/control", json.dumps(payload))
def add_media_default(media, experiment):
global client
payload = {
"command": "pump_add_media",
"experiment": experiment,
"reactor": PIOREACTOR,
"volume": media
}
client.publish(f"pioreactor/control", json.dumps(payload))
def remove_waste_default(waste, experiment):
global client
payload = {
"command": "pump_remove_media",
"experiment": experiment,
"reactor": PIOREACTOR,
"volume": waste
}
client.publish(f"pioreactor/control", json.dumps(payload))
def cycle_media_default(cycle, experiment):
global client
payload = {
"command": "circulate_media",
"experiment": experiment,
"reactor": PIOREACTOR,
"duration": cycle
}
client.publish(f"pioreactor/control", json.dumps(payload))
def get_status(host, port, username, password, pioreactor, exp):
global client_custom
if client_custom is None:
custom_client(host, port, username, password)
client_custom.subscribe(f"pioreactor/{pioreactor}/worker")
client_custom.on_message = on_message_worker
global experiment
experiment = None
payload = {
"command": "get_worker",
"experiment": exp,
"reactor": pioreactor
}
client_custom.publish(f"pioreactor/control", json.dumps(payload))
timeout = 10
start = time.time()
while experiment is None and (time.time() - start) < timeout:
time.sleep(0.1)
client_custom.unsubscribe(f"pioreactor/{pioreactor}/worker")
return experiment, running, temp_mqtt_auto, rpm_mqtt, led_mqtt, experiments
def get_status_default(exp):
global client
client.subscribe(f"pioreactor/{PIOREACTOR}/worker")
client.on_message = on_message_worker
global experiment
experiment = None
payload = {
"command": "get_worker",
"experiment": exp,
"reactor": PIOREACTOR
}
client.publish(f"pioreactor/control", json.dumps(payload))
timeout = 10
start = time.time()
while experiment is None and (time.time() - start) < timeout:
time.sleep(0.1)
client.unsubscribe(f"pioreactor/{PIOREACTOR}/worker")
return experiment, running, temp_mqtt_auto, rpm_mqtt, led_mqtt, experiments
def new_experiment_default(new_exp, exp):
global client
payload = {
"command": "new_experiment",
"experiment": new_exp,
"reactor": PIOREACTOR,
}
client.publish(f"pioreactor/control", json.dumps(payload))
def remove_experiment_default(rem_exp, exp):
global client
payload = {
"command": "delete_experiment",
"experiment": rem_exp,
"reactor": PIOREACTOR,
}
client.publish(f"pioreactor/control", json.dumps(payload))
def change_experiment_default(ch_exp, exp):
global client
payload = {
"command": "change_experiment",
"experiment": exp,
"experiment_new": ch_exp,
"reactor": PIOREACTOR,
}
client.publish(f"pioreactor/control", json.dumps(payload))
def new_experiment(new_exp, host, port, username, password, pioreactor, exp):
global client_custom
if client_custom is None:
custom_client(host, port, username, password)
payload = {
"command": "new_experiment",
"experiment": new_exp,
"reactor": pioreactor,
}
client_custom.publish(f"pioreactor/control", json.dumps(payload))
def remove_experiment(rem_exp, host, port, username, password, pioreactor, exp):
global client_custom
if client_custom is None:
custom_client(host, port, username, password)
payload = {
"command": "delete_experiment",
"experiment": rem_exp,
"reactor": pioreactor,
}
client_custom.publish(f"pioreactor/control", json.dumps(payload))
def change_experiment(ch_exp, host, port, username, password, pioreactor, exp):
global client_custom
if client_custom is None:
custom_client(host, port, username, password)
payload = {
"command": "change_experiment",
"experiment": exp,
"experiment_new": ch_exp,
"reactor": pioreactor,
}
client_custom.publish(f"pioreactor/control", json.dumps(payload))
def on_reading(client, userdata, message):
payload = message.payload.decode("utf-8")
data = json.loads(payload)
global temp_graph
global od_graph
global norm_od_graph
global growth_rate_graph
temp_graph = data.get("temperature", None)
od_graph = data.get("od", None)
norm_od_graph = data.get("normalized_od", None)
growth_rate_graph = data.get("growth_rate", None)
def get_data_default(time_scale, exp):
global client
payload = {
"command": "get_readings",
"experiment": exp,
"filter_mod": 1,
"lookback": 1000000,
"filter_mod2": 1,
"lookback2": 1000000,
"filter_mod3": 1,
"lookback3": 1000000,
"filter_mod4": 1,
"lookback4": 1000000,
"amount": time_scale,
"amount2": time_scale,
"amount3": time_scale,
"amount4": time_scale,
"reactor": PIOREACTOR
}
client.on_message = on_reading
client.subscribe(f"pioreactor/{PIOREACTOR}/readings")
timeout = 10
start = time.time()
global temp_graph
global od_graph
global norm_od_graph
global growth_rate_graph
temp_graph = None
od_graph = None
norm_od_graph = None
growth_rate_graph = None
client.publish(f"pioreactor/control", json.dumps(payload))
while temp_graph is None and (time.time() - start) < timeout:
time.sleep(0.1)
client.unsubscribe(f"pioreactor/{PIOREACTOR}/readings")
if temp_graph is not None:
for temp in temp_graph:
utc_time = datetime.strptime(temp["x"], "%Y-%m-%dT%H:%M:%S.%fZ")
local_tz = pytz.timezone("America/New_York")
utc_time = utc_time.replace(tzinfo=pytz.utc)
local_time = utc_time.astimezone(local_tz)
temp["x"] = local_time.strftime("%Y-%m-%d %H:%M:%S")
df = pd.DataFrame(temp_graph)
df = df.set_index("x")
# plt.figure()
# plt.plot(df.index, df["y"])
# plt.xlabel("Time")
# plt.ylabel("Temperature")
# plt.title("Temperature vs Time")
# plot1 = plt.gcf()
# plt.close()
plot1 = go.Figure()
plot1.add_trace(go.Scatter(x=df.index, y=df["y"], mode="lines", name="Temperature"))
plot1.update_layout(title="Temperature vs Time", xaxis_title="Time", yaxis_title="Temperature")
else:
plot1 = None
if od_graph is not None:
for od in od_graph:
utc_time = datetime.strptime(od["x"], "%Y-%m-%dT%H:%M:%S.%fZ")
local_tz = pytz.timezone("America/New_York")
utc_time = utc_time.replace(tzinfo=pytz.utc)
local_time = utc_time.astimezone(local_tz)
od["x"] = local_time.strftime("%Y-%m-%d %H:%M:%S")
df = pd.DataFrame(od_graph)
df = df.set_index("x")
# plt.figure()
# plt.plot(df.index, df["y"])
# plt.xlabel("Time")
# plt.ylabel("OD")
# plt.title("OD vs Time")
# plot2 = plt.gcf()
# plt.close()
plot2 = go.Figure()
plot2.add_trace(go.Scatter(x=df.index, y=df["y"], mode="lines", name="OD"))
plot2.update_layout(title="OD vs Time", xaxis_title="Time", yaxis_title="OD")
else:
plot2 = None
if norm_od_graph is not None:
for od in norm_od_graph:
utc_time = datetime.strptime(od["x"], "%Y-%m-%dT%H:%M:%S.%fZ")
local_tz = pytz.timezone("America/New_York")
utc_time = utc_time.replace(tzinfo=pytz.utc)
local_time = utc_time.astimezone(local_tz)
od["x"] = local_time.strftime("%Y-%m-%d %H:%M:%S")
df = pd.DataFrame(norm_od_graph)
df = df.set_index("x")
# plt.figure()
# plt.plot(df.index, df["y"])
# plt.xlabel("Time")
# plt.ylabel("Normalized OD")
# plt.title("Normalized OD vs Time")
# plot3 = plt.gcf()
# plt.close()
plot3 = go.Figure()
plot3.add_trace(go.Scatter(x=df.index, y=df["y"], mode="lines", name="Normalized OD"))
plot3.update_layout(title="Normalized OD vs Time", xaxis_title="Time", yaxis_title="Normalized OD")
else:
plot3 = None
if growth_rate_graph is not None:
for gr in growth_rate_graph:
utc_time = datetime.strptime(gr["x"], "%Y-%m-%dT%H:%M:%S.%fZ")
local_tz = pytz.timezone("America/New_York")
utc_time = utc_time.replace(tzinfo=pytz.utc)
local_time = utc_time.astimezone(local_tz)
gr["x"] = local_time.strftime("%Y-%m-%d %H:%M:%S")
df = pd.DataFrame(growth_rate_graph)
df = df.set_index("x")
# plt.figure()
# plt.plot(df.index, df["y"])
# plt.xlabel("Time")
# plt.ylabel("Growth Rate")
# plt.title("Growth Rate vs Time")
# plot4 = plt.gcf()
# plt.close()
plot4 = go.Figure()
plot4.add_trace(go.Scatter(x=df.index, y=df["y"], mode="lines", name="Growth Rate"))
plot4.update_layout(title="Growth Rate vs Time", xaxis_title="Time", yaxis_title="Growth Rate")
else:
plot4 = None
# Return both plots
return plot1, plot2, plot3, plot4
# Define the interface components
with gr.Blocks() as demo:
gr.Markdown("# Pioreactor Control Panel")
with gr.Tab("Default"):
with gr.Blocks():
gr.HTML("""
<a target="_blank" href="https://colab.research.google.com/github/AccelerationConsortium/ac-training-lab/blob/51-pioreactor-mqtt-and-streamlit/notebooks/Pioreactor_Example_Workflow.ipynb">
<img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/>
</a>
""")
gr.Markdown("This is a control panel for the [Pioreactor](https://pioreactor.com/en-ca). You can use this to control the Pioreactor. You can also view the status of the Pioreactor and view graphs of the data. To use a custom Pioreactor, click on the Custom tab.")
gr.Image("pioreactor.webp")
gr.Markdown("The experiment name is required for all commands. You can get the experiment name from the status tab. When changing experiments, remember to update the experiment name.")
experiment_input = gr.Textbox(label="Experiment", placeholder="Get Experiment Name From Status")
experiment_input.input(fn=void_client)
client = create_client(HOST, PORT, USERNAME, PASSWORD)
with gr.Blocks():
gr.Markdown("# Status")
get_status_a = gr.Button("Get Status")
with gr.Row():
experiment_output = gr.Textbox(label="Experiment", interactive=False)
running_output = gr.Textbox(label="Running", interactive=False)
with gr.Row():
temp_output = gr.Textbox(label="Temperature Automation", interactive=False)
rpm_output = gr.Textbox(label="RPM", interactive=False)
led_output = gr.Textbox(label="LEDs", interactive=False)
experiments_output = gr.Textbox(label="Experiments", interactive=False)
get_status_a.click(
fn=get_status_default,
inputs=[experiment_input],
outputs=[experiment_output, running_output, temp_output, rpm_output, led_output, experiments_output]
)
with gr.Blocks():
gr.Markdown("# Experiments")
with gr.Row():
new_experiment_a = gr.Textbox(label="New Experiment")
new_experiment_button = gr.Button("Add Experiment")
with gr.Row():
remove_experiment_a = gr.Textbox(label="Remove Experiment")
remove_experiment_button = gr.Button("Remove Experiment")
with gr.Row():
change_experiment_a = gr.Textbox(label="Change Experiment")
change_experiment_button = gr.Button("Change Experiment")
new_experiment_button.click(
fn=new_experiment_default,
inputs=[new_experiment_a, experiment_input]
)
remove_experiment_button.click(
fn=remove_experiment_default,
inputs=[remove_experiment_a, experiment_input]
)
change_experiment_button.click(
fn=change_experiment_default,
inputs=[change_experiment_a, experiment_input]
)
with gr.Blocks():
gr.Markdown("# Stirring")
rpm_input = gr.Slider(minimum=0, maximum=2000, step=50, label="RPM")
with gr.Row():
stir_state = gr.Radio(choices=["start", "stop", "update"], label="State", value="start")
st = gr.Button("Send Command")
st.click(
fn=stirring_default,
inputs=[rpm_input, experiment_input, stir_state]
)
with gr.Blocks():
gr.Markdown("# Temperature Automation")
# Dropdown for selecting automation type
temp_option = gr.Dropdown(
choices=["Record Temp Only", "Heat To Temp"],
label="Temperature Automation",
value="Record Temp Only"
)
# Slider for temperature (initially hidden)
temperature_slider = gr.Slider(minimum=0, maximum=60, step=1, label="Temperature", visible=False)
# Button to start automation
with gr.Row():
temp_state = gr.Radio(choices=["start", "stop", "update", "restart"], label="State", value="start")
temp = gr.Button("Send Command")
# Update visibility of the slider based on dropdown selection
temp_option.change(
fn=toggle_temperature_input,
inputs=temp_option,
outputs=temperature_slider
)
temp.click(
fn=temp_automation_default,
inputs=[temperature_slider, experiment_input, temp_state, temp_option]
)
with gr.Blocks():
with gr.Row():
with gr.Column():
gr.Markdown("# OD Reading")
od_state = gr.Radio(choices=["start", "stop"], label="State", value="start")
od_button = gr.Button("Send Command")
od_button.click(
fn=od_default,
inputs=[experiment_input, od_state]
)
with gr.Column():
gr.Markdown("# Growth Rate")
gr_state = gr.Radio(choices=["start", "stop"], label="State", value="start")
gr_button = gr.Button("Send Command")
gr_button.click(
fn=grf_default,
inputs=[experiment_input, gr_state]
)
with gr.Blocks():
gr.Markdown("# LEDS")
with gr.Row():
with gr.Column():
gr.Markdown("### Channel A")
led1 = gr.Slider(minimum=0, maximum=100, step=1, label="Intensity")
led1_button = gr.Button("Send Command")
led1_button.click(
fn=led1fn_default,
inputs=[led1, experiment_input]
)
with gr.Column():
gr.Markdown("### Channel B")
led2 = gr.Slider(minimum=0, maximum=100, step=1, label="Intensity")
led2_button = gr.Button("Send Command")
led2_button.click(
fn=led2fn_default,
inputs=[led2, experiment_input]
)
with gr.Column():
gr.Markdown("### Channel C")
led3 = gr.Slider(minimum=0, maximum=100, step=1, label="Intensity")
led3_button = gr.Button("Send Command")
led3_button.click(
fn=led3fn_default,
inputs=[led3, experiment_input]
)
with gr.Column():
gr.Markdown("### Channel D")
led4 = gr.Slider(minimum=0, maximum=100, step=1, label="Intensity")
led4_button = gr.Button("Send Command")
led4_button.click(
fn=led4fn_default,
inputs=[led4, experiment_input]
)
with gr.Blocks():
gr.Markdown("# Dosing")
with gr.Row():
with gr.Column():
gr.Markdown("### Add Media")
add_media_ml = gr.Number(label="Media (mL)", step=1, minimum=0, maximum=14)
add_media_button = gr.Button("Send Command")
add_media_button.click(
fn=add_media_default,
inputs=[add_media_ml, experiment_input]
)
with gr.Column():
gr.Markdown("### Remove Waste")
remove_waste_ml = gr.Number(label="Waste (mL)", step=1, minimum=0, maximum=20)
remove_waste_button = gr.Button("Send Command")
remove_waste_button.click(
fn=remove_waste_default,
inputs=[remove_waste_ml, experiment_input]
)
with gr.Column():
gr.Markdown("### Cycle Media")
cycle_media_sec = gr.Number(label="Cycle (s)", step=1, minimum=0, maximum=60)
cycle_media_button = gr.Button("Send Command")
cycle_media_button.click(
fn=cycle_media_default,
inputs=[cycle_media_sec, experiment_input]
)
with gr.Tab("Default Graphs"):
with gr.Row():
time_scale = gr.Radio(label="Time Scale", choices=["1 hour", "24 hour", "All Data"], value="All Data")
get_graphs = gr.Button("Get Graphs")
plot1 = gr.Plot()
plot2 = gr.Plot()
plot3 = gr.Plot()
plot4 = gr.Plot()
get_graphs.click(
fn=get_data_default,
inputs=[time_scale, experiment_input],
outputs=[plot1, plot2, plot3, plot4]
)
with gr.Tab("Custom"):
gr.Markdown("Remember to change the experiment name when changing experiments.")
# Input components
with gr.Row():
host_input = gr.Textbox(label="Host")
port_input = gr.Number(label="Port")
host_input.input(fn=void_client)
port_input.input(fn=void_client)
with gr.Row():
username_input = gr.Textbox(label="Username")
password_input = gr.Textbox(label="Password")
pioreactor_input = gr.Textbox(label="Pioreactor")
experiment_input = gr.Textbox(label="Experiment")
username_input.input(fn=void_client)
password_input.input(fn=void_client)
pioreactor_input.input(fn=void_client)
experiment_input.input(fn=void_client)
with gr.Blocks():
gr.Markdown("# Status")
get_status_b = gr.Button("Get Status")
with gr.Row():
experiment_output = gr.Textbox(label="Experiment", interactive=False)
running_output = gr.Textbox(label="Running", interactive=False)
with gr.Row():
temp_output = gr.Textbox(label="Temperature Automation", interactive=False)
rpm_output = gr.Textbox(label="RPM", interactive=False)
led_output = gr.Textbox(label="LEDs", interactive=False)
experiments_output = gr.Textbox(label="Experiments", interactive=False)
get_status_b.click(
fn=get_status,
inputs=[host_input, port_input, username_input, password_input, pioreactor_input, experiment_input],
outputs=[experiment_output, running_output, temp_output, rpm_output, led_output, experiments_output]
)
with gr.Blocks():
gr.Markdown("# Experiments")
with gr.Row():
new_experiment_b = gr.Textbox(label="New Experiment")
new_experiment_button_b = gr.Button("Add Experiment")
with gr.Row():
remove_experiment_b = gr.Textbox(label="Remove Experiment")
remove_experiment_button_b = gr.Button("Remove Experiment")
with gr.Row():
change_experiment_b = gr.Textbox(label="Change Experiment")
change_experiment_button_b = gr.Button("Change Experiment")
new_experiment_button_b.click(
fn=new_experiment,
inputs=[new_experiment_b, host_input, port_input, username_input, password_input, pioreactor_input, experiment_input]
)
remove_experiment_button_b.click(
fn=remove_experiment,
inputs=[remove_experiment_b, host_input, port_input, username_input, password_input, pioreactor_input, experiment_input]
)
change_experiment_button_b.click(
fn=change_experiment,
inputs=[change_experiment_b, host_input, port_input, username_input, password_input, pioreactor_input, experiment_input]
)
with gr.Blocks():
gr.Markdown("# Stirring")
rpm_input = gr.Slider(minimum=0, maximum=2000, step=50, label="RPM")
with gr.Row():
stir_state = gr.Radio(choices=["start", "stop", "update"], label="State", value="start")
st = gr.Button("Send Command")
st.click(
fn=stirring,
inputs=[rpm_input, host_input, port_input, username_input, password_input, pioreactor_input, experiment_input, stir_state]
)
with gr.Blocks():
gr.Markdown("# Temperature Automation")
# Dropdown for selecting automation type
temp_option = gr.Dropdown(
choices=["Record Temp Only", "Heat To Temp"],
label="Temperature Automation",
value="Record Temp Only"
)
# Slider for temperature (initially hidden)
temperature_slider = gr.Slider(minimum=0, maximum=60, step=1, label="Temperature", visible=False)
# Button to start automation
with gr.Row():
temp_state = gr.Radio(choices=["start", "stop", "update", "restart"], label="State", value="start")
temp = gr.Button("Send Command")
# Update visibility of the slider based on dropdown selection
temp_option.change(
fn=toggle_temperature_input,
inputs=temp_option,
outputs=temperature_slider
)
temp.click(
fn=temp_automation,
inputs=[temperature_slider, host_input, port_input, username_input, password_input, pioreactor_input, experiment_input, temp_state, temp_option]
)
with gr.Blocks():
with gr.Row():
with gr.Column():
gr.Markdown("# OD Reading")
od_state = gr.Radio(choices=["start", "stop"], label="State", value="start")
od_button = gr.Button("Send Command")
od_button.click(
fn=od,
inputs=[host_input, port_input, username_input, password_input, pioreactor_input, experiment_input, od_state]
)
with gr.Column():
gr.Markdown("# Growth Rate")
gr_state = gr.Radio(choices=["start", "stop"], label="State", value="start")
gr_button = gr.Button("Send Command")
gr_button.click(
fn=grf,
inputs=[host_input, port_input, username_input, password_input, pioreactor_input, experiment_input, gr_state]
)
with gr.Blocks():
gr.Markdown("# LEDS")
with gr.Row(): # Row for all channels
with gr.Column():
gr.Markdown("### Channel A")
led1 = gr.Slider(minimum=0, maximum=100, step=1, label="Intensity")
led1_button = gr.Button("Send Command")
led1_button.click(
fn=led1fn,
inputs=[led1, host_input, port_input, username_input, password_input, pioreactor_input, experiment_input]
)
with gr.Column():
gr.Markdown("### Channel B")
led2 = gr.Slider(minimum=0, maximum=100, step=1, label="Intensity")
led2_button = gr.Button("Send Command")
led2_button.click(
fn=led2fn,
inputs=[led2, host_input, port_input, username_input, password_input, pioreactor_input, experiment_input]
)
with gr.Column():
gr.Markdown("### Channel C")
led3 = gr.Slider(minimum=0, maximum=100, step=1, label="Intensity")
led3_button = gr.Button("Send Command")
led3_button.click(
fn=led3fn,
inputs=[led3, host_input, port_input, username_input, password_input, pioreactor_input, experiment_input]
)
with gr.Column():
gr.Markdown("### Channel D")
led4 = gr.Slider(minimum=0, maximum=100, step=1, label="Intensity")
led4_button = gr.Button("Send Command")
led4_button.click(
fn=led4fn,
inputs=[led4, host_input, port_input, username_input, password_input, pioreactor_input, experiment_input]
)
with gr.Blocks():
gr.Markdown("# Dosing")
with gr.Row():
with gr.Column():
gr.Markdown("### Add Media")
add_media_ml = gr.Number(label="Media (mL)", step=1, minimum=0, maximum=14)
add_media_button = gr.Button("Send Command")
add_media_button.click(
fn=add_media,
inputs=[add_media_ml, host_input, port_input, username_input, password_input, pioreactor_input, experiment_input]
)
with gr.Column():
gr.Markdown("### Remove Waste")
remove_waste_ml = gr.Number(label="Waste (mL)", step=1, minimum=0, maximum=20)
remove_waste_button = gr.Button("Send Command")
remove_waste_button.click(
fn=remove_waste,
inputs=[remove_waste_ml, host_input, port_input, username_input, password_input, pioreactor_input, experiment_input]
)
with gr.Column():
gr.Markdown("### Cycle Media")
cycle_media_sec = gr.Number(label="Cycle (s)", step=1, minimum=0, maximum=60)
cycle_media_button = gr.Button("Send Command")
cycle_media_button.click(
fn=cycle_media,
inputs=[cycle_media_sec, host_input, port_input, username_input, password_input, pioreactor_input, experiment_input]
)
# Launch the interface
demo.launch()