linx5o's picture
try again
920b951
raw
history blame
41.3 kB
import gradio as gr
import paho.mqtt.client as mqtt
import os
import json
import time
import matplotlib.pyplot as plt
import numpy as np
import datetime
import pytz
import pandas as pd
import plotly.tools as tls
import plotly.io as pio
import plotly.graph_objects as go
HOST = os.environ.get("host")
PORT = int(os.environ.get("port"))
USERNAME = os.environ.get("username")
PASSWORD = os.environ.get("password")
PIOREACTOR = os.environ.get("pioreactor")
client = None
client_custom = None
experiment = None
running = []
temp_mqtt_auto = None
rpm_mqtt = None
led_mqtt = None
experiments = []
temp_graph = None
od_graph = None
norm_od_graph = None
growth_rate_graph = None
def on_connect(client, userdata, flags, rc):
print("Connected with result code "+str(rc))
def create_client(host, port, username, password):
print(host, port, username, password)
client = mqtt.Client()
client.username_pw_set(username, password)
client.tls_set(tls_version=mqtt.ssl.PROTOCOL_TLS)
client.on_connect = on_connect
client.connect(host, port)
client.loop_start()
return client
def on_message_worker(client, userdata, message):
payload = message.payload.decode("utf-8")
data = json.loads(payload)
global experiment
global running
global temp_mqtt_auto
global rpm_mqtt
global led_mqtt
global experiments
experiment = data.get("experiment", None)
running = data.get("running", [])
temp_mqtt_auto = data.get("temperature_automation", None)
rpm_mqtt = data.get("stirring", None)
led_mqtt = data.get("leds", None)
experiments = data.get("experiments", [])
def store_client():
global client
global HOST
global PORT
global USERNAME
global PASSWORD
client = create_client(HOST, PORT, USERNAME, PASSWORD)
def custom_client(host, port, username, password):
global client_custom
client_custom = create_client(host, port, username, password)
def stirring(rpm, host, port, username, password, pioreactor, experiment, state):
global client_custom
if client_custom is None:
custom_client(host, port, username, password)
if state == "start":
payload = {
"command": "start_stirring",
"experiment": experiment,
"reactor": pioreactor,
"rpm": rpm
}
client_custom.publish(f"pioreactor/control", json.dumps(payload))
elif state == "stop":
payload = {
"command": "stop_stirring",
"experiment": experiment,
"reactor": pioreactor
}
client_custom.publish(f"pioreactor/control", json.dumps(payload))
elif state == "update":
payload = {
"command": "update_stirring_rpm",
"experiment": experiment,
"reactor": pioreactor,
"rpm": rpm
}
client_custom.publish(f"pioreactor/control", json.dumps(payload))
else:
print("Invalid state")
def toggle_temperature_input(selected_option):
# Show the temperature slider only if "Heat To Temp" is selected
return gr.update(visible=selected_option == "Heat To Temp")
def temp_automation(temperature, host, port, username, password, pioreactor, experiment, state, option):
global client_custom
if client_custom is None:
custom_client(host, port, username, password)
payload_options = "thermostat" if option == "Heat To Temp" else "only_record_temperature"
if state == "start":
payload = {
"command": "set_temperature_automation",
"experiment": experiment,
"reactor": pioreactor,
"temp": temperature,
"automation": payload_options
}
client_custom.publish(f"pioreactor/control", json.dumps(payload))
elif state == "stop":
payload = {
"command": "temp_update",
"experiment": experiment,
"reactor": pioreactor,
"settings": {
"$state": "disconnected"
}
}
client_custom.publish(f"pioreactor/control", json.dumps(payload))
elif state == "update":
payload = {
"command": "temp_update",
"experiment": experiment,
"reactor": pioreactor,
"settings": {
"target_temperature": temperature
}
}
client_custom.publish(f"pioreactor/control", json.dumps(payload))
elif state == "restart":
payload = {
"command": "temp_restart",
"experiment": experiment,
"reactor": pioreactor,
"automation": payload_options,
"temp": temperature
}
client_custom.publish(f"pioreactor/control", json.dumps(payload))
else:
print("Invalid state")
def od(host, port, username, password, pioreactor, experiment, state):
global client_custom
if client_custom is None:
custom_client(host, port, username, password)
if state == "start":
payload = {
"command": "start_od_reading",
"experiment": experiment,
"reactor": pioreactor
}
client_custom.publish(f"pioreactor/control", json.dumps(payload))
elif state == "stop":
payload = {
"command": "stop_od_reading",
"experiment": experiment,
"reactor": pioreactor
}
client_custom.publish(f"pioreactor/control", json.dumps(payload))
else:
print("Invalid state")
def grf(host, port, username, password, pioreactor, experiment, state):
global client_custom
if client_custom is None:
custom_client(host, port, username, password)
if state == "start":
payload = {
"command": "start_growth_rate",
"experiment": experiment,
"reactor": pioreactor
}
client_custom.publish(f"pioreactor/control", json.dumps(payload))
elif state == "stop":
payload = {
"command": "stop_growth_rate",
"experiment": experiment,
"reactor": pioreactor
}
client_custom.publish(f"pioreactor/control", json.dumps(payload))
else:
print("Invalid state")
def led1fn(led1, host, port, username, password, pioreactor, experiment):
global client_custom
if client_custom is None:
custom_client(host, port, username, password)
payload = {
"command": "set_led_intensity",
"experiment": experiment,
"reactor": pioreactor,
"led": "A",
"brightness": led1
}
client_custom.publish(f"pioreactor/control", json.dumps(payload))
def led2fn(led2, host, port, username, password, pioreactor, experiment):
global client_custom
if client_custom is None:
custom_client(host, port, username, password)
payload = {
"command": "set_led_intensity",
"experiment": experiment,
"reactor": pioreactor,
"led": "B",
"brightness": led2
}
client_custom.publish(f"pioreactor/control", json.dumps(payload))
def led3fn(led3, host, port, username, password, pioreactor, experiment):
global client_custom
if client_custom is None:
custom_client(host, port, username, password)
payload = {
"command": "set_led_intensity",
"experiment": experiment,
"reactor": pioreactor,
"led": "C",
"brightness": led3
}
client_custom.publish(f"pioreactor/control", json.dumps(payload))
def led4fn(led4, host, port, username, password, pioreactor, experiment):
global client_custom
if client_custom is None:
custom_client(host, port, username, password)
payload = {
"command": "set_led_intensity",
"experiment": experiment,
"reactor": pioreactor,
"led": "D",
"brightness": led4
}
client_custom.publish(f"pioreactor/control", json.dumps(payload))
def add_media(media, host, port, username, password, pioreactor, experiment):
global client_custom
if client_custom is None:
custom_client(host, port, username, password)
payload = {
"command": "pump_add_media",
"experiment": experiment,
"reactor": pioreactor,
"volume": media
}
client_custom.publish(f"pioreactor/control", json.dumps(payload))
def remove_waste(waste, host, port, username, password, pioreactor, experiment):
global client_custom
if client_custom is None:
custom_client(host, port, username, password)
payload = {
"command": "pump_remove_media",
"experiment": experiment,
"reactor": pioreactor,
"volume": waste
}
client_custom.publish(f"pioreactor/control", json.dumps(payload))
def cycle_media(cycle, host, port, username, password, pioreactor, experiment):
global client_custom
if client_custom is None:
custom_client(host, port, username, password)
payload = {
"command": "circulate_media",
"experiment": experiment,
"reactor": pioreactor,
"duration": cycle
}
client_custom.publish(f"pioreactor/control", json.dumps(payload))
def void_client():
global client_custom
if client_custom is not None:
client_custom.disconnect()
client_custom = None
print("Client disconnected")
def stirring_default(rpm, experiment, state):
global client
if state == "start":
payload = {
"command": "start_stirring",
"experiment": experiment,
"reactor": PIOREACTOR,
"rpm": rpm
}
client.publish(f"pioreactor/control", json.dumps(payload))
elif state == "stop":
payload = {
"command": "stop_stirring",
"experiment": experiment,
"reactor": PIOREACTOR
}
client.publish(f"pioreactor/control", json.dumps(payload))
elif state == "update":
payload = {
"command": "update_stirring_rpm",
"experiment": experiment,
"reactor": PIOREACTOR,
"rpm": rpm
}
client.publish(f"pioreactor/control", json.dumps(payload))
else:
print("Invalid state")
def temp_automation_default(temperature, experiment, state, option):
payload_options = "thermostat" if option == "Heat To Temp" else "only_record_temperature"
global client
if state == "start":
payload = {
"command": "set_temperature_automation",
"experiment": experiment,
"reactor": PIOREACTOR,
"temp": temperature,
"automation": payload_options
}
client.publish(f"pioreactor/control", json.dumps(payload))
elif state == "stop":
payload = {
"command": "temp_update",
"experiment": experiment,
"reactor": PIOREACTOR,
"settings": {
"$state": "disconnected"
}
}
client.publish(f"pioreactor/control", json.dumps(payload))
elif state == "update":
payload = {
"command": "temp_update",
"experiment": experiment,
"reactor": PIOREACTOR,
"settings": {
"target_temperature": temperature
}
}
client.publish(f"pioreactor/control", json.dumps(payload))
elif state == "restart":
payload = {
"command": "temp_restart",
"experiment": experiment,
"reactor": PIOREACTOR,
"automation": payload_options,
"temp": temperature
}
client.publish(f"pioreactor/control", json.dumps(payload))
else:
print("Invalid state")
def od_default(experiment, state):
global client
if state == "start":
payload = {
"command": "start_od_reading",
"experiment": experiment,
"reactor": PIOREACTOR
}
client.publish(f"pioreactor/control", json.dumps(payload))
elif state == "stop":
payload = {
"command": "stop_od_reading",
"experiment": experiment,
"reactor": PIOREACTOR
}
client.publish(f"pioreactor/control", json.dumps(payload))
else:
print("Invalid state")
def grf_default(experiment, state):
global client
if state == "start":
payload = {
"command": "start_growth_rate",
"experiment": experiment,
"reactor": PIOREACTOR
}
client.publish(f"pioreactor/control", json.dumps(payload))
elif state == "stop":
payload = {
"command": "stop_growth_rate",
"experiment": experiment,
"reactor": PIOREACTOR
}
client.publish(f"pioreactor/control", json.dumps(payload))
else:
print("Invalid state")
def led1fn_default(led1, experiment):
global client
payload = {
"command": "set_led_intensity",
"experiment": experiment,
"reactor": PIOREACTOR,
"led": "A",
"brightness": led1
}
client.publish(f"pioreactor/control", json.dumps(payload))
def led2fn_default(led2, experiment):
global client
payload = {
"command": "set_led_intensity",
"experiment": experiment,
"reactor": PIOREACTOR,
"led": "B",
"brightness": led2
}
client.publish(f"pioreactor/control", json.dumps(payload))
def led3fn_default(led3, experiment):
global client
payload = {
"command": "set_led_intensity",
"experiment": experiment,
"reactor": PIOREACTOR,
"led": "C",
"brightness": led3
}
client.publish(f"pioreactor/control", json.dumps(payload))
def led4fn_default(led4, experiment):
global client
payload = {
"command": "set_led_intensity",
"experiment": experiment,
"reactor": PIOREACTOR,
"led": "D",
"brightness": led4
}
client.publish(f"pioreactor/control", json.dumps(payload))
def add_media_default(media, experiment):
global client
payload = {
"command": "pump_add_media",
"experiment": experiment,
"reactor": PIOREACTOR,
"volume": media
}
client.publish(f"pioreactor/control", json.dumps(payload))
def remove_waste_default(waste, experiment):
global client
payload = {
"command": "pump_remove_media",
"experiment": experiment,
"reactor": PIOREACTOR,
"volume": waste
}
client.publish(f"pioreactor/control", json.dumps(payload))
def cycle_media_default(cycle, experiment):
global client
payload = {
"command": "circulate_media",
"experiment": experiment,
"reactor": PIOREACTOR,
"duration": cycle
}
client.publish(f"pioreactor/control", json.dumps(payload))
def get_status(host, port, username, password, pioreactor, exp):
global client_custom
if client_custom is None:
custom_client(host, port, username, password)
client_custom.subscribe(f"pioreactor/{pioreactor}/worker")
client_custom.on_message = on_message_worker
global experiment
experiment = None
payload = {
"command": "get_worker",
"experiment": exp,
"reactor": pioreactor
}
client_custom.publish(f"pioreactor/control", json.dumps(payload))
timeout = 10
start = time.time()
while experiment is None and (time.time() - start) < timeout:
time.sleep(0.1)
client_custom.unsubscribe(f"pioreactor/{pioreactor}/worker")
return experiment, running, temp_mqtt_auto, rpm_mqtt, led_mqtt, experiments
def get_status_default(exp):
global client
client.subscribe(f"pioreactor/{PIOREACTOR}/worker")
client.on_message = on_message_worker
global experiment
experiment = None
payload = {
"command": "get_worker",
"experiment": exp,
"reactor": PIOREACTOR
}
client.publish(f"pioreactor/control", json.dumps(payload))
timeout = 10
start = time.time()
while experiment is None and (time.time() - start) < timeout:
time.sleep(0.1)
client.unsubscribe(f"pioreactor/{PIOREACTOR}/worker")
return experiment, running, temp_mqtt_auto, rpm_mqtt, led_mqtt, experiments
def new_experiment_default(new_exp, exp):
global client
payload = {
"command": "new_experiment",
"experiment": new_exp,
"reactor": PIOREACTOR,
}
client.publish(f"pioreactor/control", json.dumps(payload))
def remove_experiment_default(rem_exp, exp):
global client
payload = {
"command": "delete_experiment",
"experiment": rem_exp,
"reactor": PIOREACTOR,
}
client.publish(f"pioreactor/control", json.dumps(payload))
def change_experiment_default(ch_exp, exp):
global client
payload = {
"command": "change_experiment",
"experiment": exp,
"experiment_new": ch_exp,
"reactor": PIOREACTOR,
}
client.publish(f"pioreactor/control", json.dumps(payload))
def new_experiment(new_exp, host, port, username, password, pioreactor, exp):
global client_custom
if client_custom is None:
custom_client(host, port, username, password)
payload = {
"command": "new_experiment",
"experiment": new_exp,
"reactor": pioreactor,
}
client_custom.publish(f"pioreactor/control", json.dumps(payload))
def remove_experiment(rem_exp, host, port, username, password, pioreactor, exp):
global client_custom
if client_custom is None:
custom_client(host, port, username, password)
payload = {
"command": "delete_experiment",
"experiment": rem_exp,
"reactor": pioreactor,
}
client_custom.publish(f"pioreactor/control", json.dumps(payload))
def change_experiment(ch_exp, host, port, username, password, pioreactor, exp):
global client_custom
if client_custom is None:
custom_client(host, port, username, password)
payload = {
"command": "change_experiment",
"experiment": exp,
"experiment_new": ch_exp,
"reactor": pioreactor,
}
client_custom.publish(f"pioreactor/control", json.dumps(payload))
def on_message(client, userdata, message):
payload = message.payload.decode("utf-8")
data = json.loads(payload)
global temp_graph
global od_graph
global norm_od_graph
global growth_rate_graph
temp_graph = data.get("temperature", None)
od_graph = data.get("od", None)
norm_od_graph = data.get("normalized_od", None)
growth_rate_graph = data.get("growth_rate", None)
def get_data_default(time_scale, exp):
global client
payload = {
"command": "get_readings",
"experiment": exp,
"filter_mod": 1,
"lookback": 1000000,
"filter_mod2": 1,
"lookback2": 1000000,
"filter_mod3": 1,
"lookback3": 1000000,
"filter_mod4": 1,
"lookback4": 1000000,
"amount": time_scale,
"amount2": time_scale,
"amount3": time_scale,
"amount4": time_scale,
}
client.publish(f"pioreactor/control", json.dumps(payload))
client.on_message = on_message
client.subscribe(f"pioreactor/{PIOREACTOR}/readings")
timeout = 10
start = time.time()
global temp_graph
global od_graph
global norm_od_graph
global growth_rate_graph
temp_graph = None
od_graph = None
norm_od_graph = None
growth_rate_graph = None
while temp_graph is None and (time.time() - start) < timeout:
time.sleep(0.1)
client.unsubscribe(f"pioreactor/{PIOREACTOR}/readings")
plots = [None, None, None, None]
if temp_graph is not None:
for temp in temp_graph:
utc_time = datetime.strptime(temp["x"], "%Y-%m-%dT%H:%M:%S.%fZ")
local_tz = pytz.timezone("America/New_York")
utc_time = utc_time.replace(tzinfo=pytz.utc)
local_time = utc_time.astimezone(local_tz)
temp["x"] = local_time.strftime("%Y-%m-%d %H:%M:%S")
df = pd.DataFrame(temp_graph)
df = df.set_index("x")
plt.figure()
plt.plot(df.index, df["temperature"])
plt.xlabel("Time")
plt.ylabel("Temperature")
plt.title("Temperature vs Time")
plot1 = plt.gcf()
plt.close()
else:
fig = None
if od_graph is not None:
for od in od_graph:
utc_time = datetime.strptime(od["x"], "%Y-%m-%dT%H:%M:%S.%fZ")
local_tz = pytz.timezone("America/New_York")
utc_time = utc_time.replace(tzinfo=pytz.utc)
local_time = utc_time.astimezone(local_tz)
od["x"] = local_time.strftime("%Y-%m-%d %H:%M:%S")
df = pd.DataFrame(od_graph)
df = df.set_index("x")
fig1 = go.Figure(data=go.Scatter(x=df.index, y=df["od"], mode="lines"))
else:
fig1 = None
if norm_od_graph is not None:
for od in norm_od_graph:
utc_time = datetime.strptime(od["x"], "%Y-%m-%dT%H:%M:%S.%fZ")
local_tz = pytz.timezone("America/New_York")
utc_time = utc_time.replace(tzinfo=pytz.utc)
local_time = utc_time.astimezone(local_tz)
od["x"] = local_time.strftime("%Y-%m-%d %H:%M:%S")
df = pd.DataFrame(norm_od_graph)
df = df.set_index("x")
fig2 = go.Figure(data=go.Scatter(x=df.index, y=df["normalized_od"], mode="lines"))
else:
fig2 = None
if growth_rate_graph is not None:
for gr in growth_rate_graph:
utc_time = datetime.strptime(gr["x"], "%Y-%m-%dT%H:%M:%S.%fZ")
local_tz = pytz.timezone("America/New_York")
utc_time = utc_time.replace(tzinfo=pytz.utc)
local_time = utc_time.astimezone(local_tz)
gr["x"] = local_time.strftime("%Y-%m-%d %H:%M:%S")
df = pd.DataFrame(growth_rate_graph)
df = df.set_index("x")
fig3 = go.Figure(data=go.Scatter(x=df.index, y=df["growth_rate"], mode="lines"))
else:
fig3 = None
return plot1, fig1, fig2, fig3
# Define the interface components
with gr.Blocks() as demo:
with gr.Tab("Default"):
experiment_input = gr.Textbox(label="Experiment", value="Ed")
experiment_input.input(fn=void_client)
client = create_client(HOST, PORT, USERNAME, PASSWORD)
with gr.Blocks():
gr.Markdown("# Status")
get_status_a = gr.Button("Get Status")
with gr.Row():
experiment_output = gr.Textbox(label="Experiment", interactive=False)
running_output = gr.Textbox(label="Running", interactive=False)
with gr.Row():
temp_output = gr.Textbox(label="Temperature Automation", interactive=False)
rpm_output = gr.Textbox(label="RPM", interactive=False)
led_output = gr.Textbox(label="LEDs", interactive=False)
experiments_output = gr.Textbox(label="Experiments", interactive=False)
get_status_a.click(
fn=get_status_default,
inputs=[experiment_input],
outputs=[experiment_output, running_output, temp_output, rpm_output, led_output, experiments_output]
)
with gr.Blocks():
gr.Markdown("# Experiments")
with gr.Row():
new_experiment_a = gr.Textbox(label="New Experiment")
new_experiment_button = gr.Button("Add Experiment")
with gr.Row():
remove_experiment_a = gr.Textbox(label="Remove Experiment")
remove_experiment_button = gr.Button("Remove Experiment")
with gr.Row():
change_experiment_a = gr.Textbox(label="Change Experiment")
change_experiment_button = gr.Button("Change Experiment")
new_experiment_button.click(
fn=new_experiment_default,
inputs=[new_experiment_a, experiment_input]
)
remove_experiment_button.click(
fn=remove_experiment_default,
inputs=[remove_experiment_a, experiment_input]
)
change_experiment_button.click(
fn=change_experiment_default,
inputs=[change_experiment_a, experiment_input]
)
with gr.Blocks():
gr.Markdown("# Stirring")
rpm_input = gr.Slider(minimum=0, maximum=2000, step=50, label="RPM")
with gr.Row():
stir_state = gr.Radio(choices=["start", "stop", "update"], label="State")
st = gr.Button("Send Command")
st.click(
fn=stirring_default,
inputs=[rpm_input, experiment_input, stir_state]
)
with gr.Blocks():
gr.Markdown("# Temperature Automation")
# Dropdown for selecting automation type
temp_option = gr.Dropdown(
choices=["Record Temp Only", "Heat To Temp"],
label="Temperature Automation",
value="Record Temp Only"
)
# Slider for temperature (initially hidden)
temperature_slider = gr.Slider(minimum=0, maximum=60, step=1, label="Temperature", visible=False)
# Button to start automation
with gr.Row():
temp_state = gr.Radio(choices=["start", "stop", "update", "restart"], label="State")
temp = gr.Button("Send Command")
# Update visibility of the slider based on dropdown selection
temp_option.change(
fn=toggle_temperature_input,
inputs=temp_option,
outputs=temperature_slider
)
temp.click(
fn=temp_automation_default,
inputs=[temperature_slider, experiment_input, temp_state, temp_option]
)
with gr.Blocks():
with gr.Row():
with gr.Column():
gr.Markdown("# OD Reading")
od_state = gr.Radio(choices=["start", "stop"], label="State")
od_button = gr.Button("Send Command")
od_button.click(
fn=od_default,
inputs=[experiment_input, od_state]
)
with gr.Column():
gr.Markdown("# Growth Rate")
gr_state = gr.Radio(choices=["start", "stop"], label="State")
gr_button = gr.Button("Send Command")
gr_button.click(
fn=grf_default,
inputs=[experiment_input, gr_state]
)
with gr.Blocks():
gr.Markdown("# LEDS")
with gr.Row():
with gr.Column():
gr.Markdown("### Channel A")
led1 = gr.Slider(minimum=0, maximum=100, step=1, label="Intensity")
led1_button = gr.Button("Send Command")
led1_button.click(
fn=led1fn_default,
inputs=[led1, experiment_input]
)
with gr.Column():
gr.Markdown("### Channel B")
led2 = gr.Slider(minimum=0, maximum=100, step=1, label="Intensity")
led2_button = gr.Button("Send Command")
led2_button.click(
fn=led2fn_default,
inputs=[led2, experiment_input]
)
with gr.Column():
gr.Markdown("### Channel C")
led3 = gr.Slider(minimum=0, maximum=100, step=1, label="Intensity")
led3_button = gr.Button("Send Command")
led3_button.click(
fn=led3fn_default,
inputs=[led3, experiment_input]
)
with gr.Column():
gr.Markdown("### Channel D")
led4 = gr.Slider(minimum=0, maximum=100, step=1, label="Intensity")
led4_button = gr.Button("Send Command")
led4_button.click(
fn=led4fn_default,
inputs=[led4, experiment_input]
)
with gr.Blocks():
gr.Markdown("# Dosing")
with gr.Row():
with gr.Column():
gr.Markdown("### Add Media")
add_media_ml = gr.Number(label="Media (mL)", step=1, minimum=0, maximum=14)
add_media_button = gr.Button("Send Command")
add_media_button.click(
fn=add_media_default,
inputs=[add_media_ml, experiment_input]
)
with gr.Column():
gr.Markdown("### Remove Waste")
remove_waste_ml = gr.Number(label="Waste (mL)", step=1, minimum=0, maximum=20)
remove_waste_button = gr.Button("Send Command")
remove_waste_button.click(
fn=remove_waste_default,
inputs=[remove_waste_ml, experiment_input]
)
with gr.Column():
gr.Markdown("### Cycle Media")
cycle_media_sec = gr.Number(label="Cycle (s)", step=1, minimum=0, maximum=60)
cycle_media_button = gr.Button("Send Command")
cycle_media_button.click(
fn=cycle_media_default,
inputs=[cycle_media_sec, experiment_input]
)
with gr.Tab("Default Graphs"):
with gr.Row():
time_scale = gr.Radio(label="Time Scale", choices=["1 hour", "24 hour", "All Data"])
get_graphs = gr.Button("Get Graphs")
with gr.Row():
plot1 = gr.Plot()
plot2 = gr.Plot()
with gr.Row():
plot3 = gr.Plot()
plot4 = gr.Plot()
get_graphs.click(
fn=get_data_default,
inputs=[time_scale, experiment_input],
outputs=[plot1, plot2, plot3, plot4]
)
with gr.Tab("Custom"):
# Input components
with gr.Row():
host_input = gr.Textbox(label="Host")
port_input = gr.Number(label="Port")
host_input.input(fn=void_client)
port_input.input(fn=void_client)
with gr.Row():
username_input = gr.Textbox(label="Username")
password_input = gr.Textbox(label="Password")
pioreactor_input = gr.Textbox(label="Pioreactor")
experiment_input = gr.Textbox(label="Experiment")
username_input.input(fn=void_client)
password_input.input(fn=void_client)
pioreactor_input.input(fn=void_client)
experiment_input.input(fn=void_client)
with gr.Blocks():
gr.Markdown("# Status")
get_status_b = gr.Button("Get Status")
with gr.Row():
experiment_output = gr.Textbox(label="Experiment", interactive=False)
running_output = gr.Textbox(label="Running", interactive=False)
with gr.Row():
temp_output = gr.Textbox(label="Temperature Automation", interactive=False)
rpm_output = gr.Textbox(label="RPM", interactive=False)
led_output = gr.Textbox(label="LEDs", interactive=False)
experiments_output = gr.Textbox(label="Experiments", interactive=False)
get_status_b.click(
fn=get_status,
inputs=[host_input, port_input, username_input, password_input, pioreactor_input, experiment_input],
outputs=[experiment_output, running_output, temp_output, rpm_output, led_output, experiments_output]
)
with gr.Blocks():
gr.Markdown("# Experiments")
with gr.Row():
new_experiment_b = gr.Textbox(label="New Experiment")
new_experiment_button_b = gr.Button("Add Experiment")
with gr.Row():
remove_experiment_b = gr.Textbox(label="Remove Experiment")
remove_experiment_button_b = gr.Button("Remove Experiment")
with gr.Row():
change_experiment_b = gr.Textbox(label="Change Experiment")
change_experiment_button_b = gr.Button("Change Experiment")
new_experiment_button_b.click(
fn=new_experiment,
inputs=[new_experiment_b, host_input, port_input, username_input, password_input, pioreactor_input, experiment_input]
)
remove_experiment_button_b.click(
fn=remove_experiment,
inputs=[remove_experiment_b, host_input, port_input, username_input, password_input, pioreactor_input, experiment_input]
)
change_experiment_button_b.click(
fn=change_experiment,
inputs=[change_experiment_b, host_input, port_input, username_input, password_input, pioreactor_input, experiment_input]
)
with gr.Blocks():
gr.Markdown("# Stirring")
rpm_input = gr.Slider(minimum=0, maximum=2000, step=50, label="RPM")
with gr.Row():
stir_state = gr.Radio(choices=["start", "stop", "update"], label="State")
st = gr.Button("Send Command")
st.click(
fn=stirring,
inputs=[rpm_input, host_input, port_input, username_input, password_input, pioreactor_input, experiment_input, stir_state]
)
with gr.Blocks():
gr.Markdown("# Temperature Automation")
# Dropdown for selecting automation type
temp_option = gr.Dropdown(
choices=["Record Temp Only", "Heat To Temp"],
label="Temperature Automation",
value="Record Temp Only"
)
# Slider for temperature (initially hidden)
temperature_slider = gr.Slider(minimum=0, maximum=60, step=1, label="Temperature", visible=False)
# Button to start automation
with gr.Row():
temp_state = gr.Radio(choices=["start", "stop", "update", "restart"], label="State")
temp = gr.Button("Send Command")
# Update visibility of the slider based on dropdown selection
temp_option.change(
fn=toggle_temperature_input,
inputs=temp_option,
outputs=temperature_slider
)
temp.click(
fn=temp_automation,
inputs=[temperature_slider, host_input, port_input, username_input, password_input, pioreactor_input, experiment_input, temp_state, temp_option]
)
with gr.Blocks():
with gr.Row():
with gr.Column():
gr.Markdown("# OD Reading")
od_state = gr.Radio(choices=["start", "stop"], label="State")
od_button = gr.Button("Send Command")
od_button.click(
fn=od,
inputs=[host_input, port_input, username_input, password_input, pioreactor_input, experiment_input, od_state]
)
with gr.Column():
gr.Markdown("# Growth Rate")
gr_state = gr.Radio(choices=["start", "stop"], label="State")
gr_button = gr.Button("Send Command")
gr_button.click(
fn=grf,
inputs=[host_input, port_input, username_input, password_input, pioreactor_input, experiment_input, gr_state]
)
with gr.Blocks():
gr.Markdown("# LEDS")
with gr.Row(): # Row for all channels
with gr.Column():
gr.Markdown("### Channel A")
led1 = gr.Slider(minimum=0, maximum=100, step=1, label="Intensity")
led1_button = gr.Button("Send Command")
led1_button.click(
fn=led1fn,
inputs=[led1, host_input, port_input, username_input, password_input, pioreactor_input, experiment_input]
)
with gr.Column():
gr.Markdown("### Channel B")
led2 = gr.Slider(minimum=0, maximum=100, step=1, label="Intensity")
led2_button = gr.Button("Send Command")
led2_button.click(
fn=led2fn,
inputs=[led2, host_input, port_input, username_input, password_input, pioreactor_input, experiment_input]
)
with gr.Column():
gr.Markdown("### Channel C")
led3 = gr.Slider(minimum=0, maximum=100, step=1, label="Intensity")
led3_button = gr.Button("Send Command")
led3_button.click(
fn=led3fn,
inputs=[led3, host_input, port_input, username_input, password_input, pioreactor_input, experiment_input]
)
with gr.Column():
gr.Markdown("### Channel D")
led4 = gr.Slider(minimum=0, maximum=100, step=1, label="Intensity")
led4_button = gr.Button("Send Command")
led4_button.click(
fn=led4fn,
inputs=[led4, host_input, port_input, username_input, password_input, pioreactor_input, experiment_input]
)
with gr.Blocks():
gr.Markdown("# Dosing")
with gr.Row():
with gr.Column():
gr.Markdown("### Add Media")
add_media_ml = gr.Number(label="Media (mL)", step=1, minimum=0, maximum=14)
add_media_button = gr.Button("Send Command")
add_media_button.click(
fn=add_media,
inputs=[add_media_ml, host_input, port_input, username_input, password_input, pioreactor_input, experiment_input]
)
with gr.Column():
gr.Markdown("### Remove Waste")
remove_waste_ml = gr.Number(label="Waste (mL)", step=1, minimum=0, maximum=20)
remove_waste_button = gr.Button("Send Command")
remove_waste_button.click(
fn=remove_waste,
inputs=[remove_waste_ml, host_input, port_input, username_input, password_input, pioreactor_input, experiment_input]
)
with gr.Column():
gr.Markdown("### Cycle Media")
cycle_media_sec = gr.Number(label="Cycle (s)", step=1, minimum=0, maximum=60)
cycle_media_button = gr.Button("Send Command")
cycle_media_button.click(
fn=cycle_media,
inputs=[cycle_media_sec, host_input, port_input, username_input, password_input, pioreactor_input, experiment_input]
)
# Launch the interface
demo.launch()