import gradio as gr import paho.mqtt.client as mqtt import os import json import time HOST = os.environ.get("host") PORT = int(os.environ.get("port")) USERNAME = os.environ.get("username") PASSWORD = os.environ.get("password") PIOREACTOR = os.environ.get("pioreactor") client = None client_custom = None def on_connect(client, userdata, flags, rc): print("Connected with result code "+str(rc)) def create_client(host, port, username, password): print(host, port, username, password) client = mqtt.Client() client.username_pw_set(username, password) client.tls_set(tls_version=mqtt.ssl.PROTOCOL_TLS) client.on_connect = on_connect client.connect(host, port) client.loop_start() return client def on_message_worker(client, userdata, message): payload = message.payload.decode("utf-8") data = json.loads(payload) global experiment global running global temp_mqtt_auto global rpm_mqtt global led_mqtt global experiments experiment = data.get("experiment", None) running = data.get("running", []) temp_mqtt_auto = data.get("temperature_automation", None) rpm_mqtt = data.get("stirring", None) led_mqtt = data.get("leds", None) experiments = data.get("experiments", []) def store_client(): global client global HOST global PORT global USERNAME global PASSWORD client = create_client(HOST, PORT, USERNAME, PASSWORD) def custom_client(host, port, username, password): global client_custom client_custom = create_client(host, port, username, password) def stirring(rpm, host, port, username, password, pioreactor, experiment, state): global client_custom if client_custom is None: custom_client(host, port, username, password) if state == "start": payload = { "command": "start_stirring", "experiment": experiment, "reactor": pioreactor, "rpm": rpm } client_custom.publish(f"pioreactor/control", json.dumps(payload)) elif state == "stop": payload = { "command": "stop_stirring", "experiment": experiment, "reactor": pioreactor } client_custom.publish(f"pioreactor/control", json.dumps(payload)) elif state == "update": payload = { "command": "update_stirring_rpm", "experiment": experiment, "reactor": pioreactor, "rpm": rpm } client_custom.publish(f"pioreactor/control", json.dumps(payload)) else: print("Invalid state") def toggle_temperature_input(selected_option): # Show the temperature slider only if "Heat To Temp" is selected return gr.update(visible=selected_option == "Heat To Temp") def temp_automation(temperature, host, port, username, password, pioreactor, experiment, state, option): global client_custom if client_custom is None: custom_client(host, port, username, password) payload_options = "thermostat" if option == "Heat To Temp" else "only_record_temperature" if state == "start": payload = { "command": "set_temperature_automation", "experiment": experiment, "reactor": pioreactor, "temp": temperature, "automation": payload_options } client_custom.publish(f"pioreactor/control", json.dumps(payload)) elif state == "stop": payload = { "command": "temp_update", "experiment": experiment, "reactor": pioreactor, "settings": { "$state": "disconnected" } } client_custom.publish(f"pioreactor/control", json.dumps(payload)) elif state == "update": payload = { "command": "temp_update", "experiment": experiment, "reactor": pioreactor, "settings": { "target_temperature": temperature } } client_custom.publish(f"pioreactor/control", json.dumps(payload)) elif state == "restart": payload = { "command": "temp_restart", "experiment": experiment, "reactor": pioreactor, "automation": payload_options, "temp": temperature } client_custom.publish(f"pioreactor/control", json.dumps(payload)) else: print("Invalid state") def od(host, port, username, password, pioreactor, experiment, state): global client_custom if client_custom is None: custom_client(host, port, username, password) if state == "start": payload = { "command": "start_od_reading", "experiment": experiment, "reactor": pioreactor } client_custom.publish(f"pioreactor/control", json.dumps(payload)) elif state == "stop": payload = { "command": "stop_od_reading", "experiment": experiment, "reactor": pioreactor } client_custom.publish(f"pioreactor/control", json.dumps(payload)) else: print("Invalid state") def grf(host, port, username, password, pioreactor, experiment, state): global client_custom if client_custom is None: custom_client(host, port, username, password) if state == "start": payload = { "command": "start_growth_rate", "experiment": experiment, "reactor": pioreactor } client_custom.publish(f"pioreactor/control", json.dumps(payload)) elif state == "stop": payload = { "command": "stop_growth_rate", "experiment": experiment, "reactor": pioreactor } client_custom.publish(f"pioreactor/control", json.dumps(payload)) else: print("Invalid state") def led1fn(led1, host, port, username, password, pioreactor, experiment): global client_custom if client_custom is None: custom_client(host, port, username, password) payload = { "command": "set_led_intensity", "experiment": experiment, "reactor": pioreactor, "led": "A", "brightness": led1 } client_custom.publish(f"pioreactor/control", json.dumps(payload)) def led2fn(led2, host, port, username, password, pioreactor, experiment): global client_custom if client_custom is None: custom_client(host, port, username, password) payload = { "command": "set_led_intensity", "experiment": experiment, "reactor": pioreactor, "led": "B", "brightness": led2 } client_custom.publish(f"pioreactor/control", json.dumps(payload)) def led3fn(led3, host, port, username, password, pioreactor, experiment): global client_custom if client_custom is None: custom_client(host, port, username, password) payload = { "command": "set_led_intensity", "experiment": experiment, "reactor": pioreactor, "led": "C", "brightness": led3 } client_custom.publish(f"pioreactor/control", json.dumps(payload)) def led4fn(led4, host, port, username, password, pioreactor, experiment): global client_custom if client_custom is None: custom_client(host, port, username, password) payload = { "command": "set_led_intensity", "experiment": experiment, "reactor": pioreactor, "led": "D", "brightness": led4 } client_custom.publish(f"pioreactor/control", json.dumps(payload)) def add_media(media, host, port, username, password, pioreactor, experiment): global client_custom if client_custom is None: custom_client(host, port, username, password) payload = { "command": "pump_add_media", "experiment": experiment, "reactor": pioreactor, "volume": media } client_custom.publish(f"pioreactor/control", json.dumps(payload)) def remove_waste(waste, host, port, username, password, pioreactor, experiment): global client_custom if client_custom is None: custom_client(host, port, username, password) payload = { "command": "pump_remove_media", "experiment": experiment, "reactor": pioreactor, "volume": waste } client_custom.publish(f"pioreactor/control", json.dumps(payload)) def cycle_media(cycle, host, port, username, password, pioreactor, experiment): global client_custom if client_custom is None: custom_client(host, port, username, password) payload = { "command": "circulate_media", "experiment": experiment, "reactor": pioreactor, "duration": cycle } client_custom.publish(f"pioreactor/control", json.dumps(payload)) def void_client(): global client_custom if client_custom is not None: client_custom.disconnect() client_custom = None print("Client disconnected") def stirring_default(rpm, experiment, state): global client if state == "start": payload = { "command": "start_stirring", "experiment": experiment, "reactor": PIOREACTOR, "rpm": rpm } client.publish(f"pioreactor/control", json.dumps(payload)) elif state == "stop": payload = { "command": "stop_stirring", "experiment": experiment, "reactor": PIOREACTOR } client.publish(f"pioreactor/control", json.dumps(payload)) elif state == "update": payload = { "command": "update_stirring_rpm", "experiment": experiment, "reactor": PIOREACTOR, "rpm": rpm } client.publish(f"pioreactor/control", json.dumps(payload)) else: print("Invalid state") def temp_automation_default(temperature, experiment, state, option): payload_options = "thermostat" if option == "Heat To Temp" else "only_record_temperature" global client if state == "start": payload = { "command": "set_temperature_automation", "experiment": experiment, "reactor": PIOREACTOR, "temp": temperature, "automation": payload_options } client.publish(f"pioreactor/control", json.dumps(payload)) elif state == "stop": payload = { "command": "temp_update", "experiment": experiment, "reactor": PIOREACTOR, "settings": { "$state": "disconnected" } } client.publish(f"pioreactor/control", json.dumps(payload)) elif state == "update": payload = { "command": "temp_update", "experiment": experiment, "reactor": PIOREACTOR, "settings": { "target_temperature": temperature } } client.publish(f"pioreactor/control", json.dumps(payload)) elif state == "restart": payload = { "command": "temp_restart", "experiment": experiment, "reactor": PIOREACTOR, "automation": payload_options, "temp": temperature } client.publish(f"pioreactor/control", json.dumps(payload)) else: print("Invalid state") # Define the interface components with gr.Blocks() as demo: with gr.Tab("Default"): experiment_input = gr.Textbox(label="Experiment", value="Ed") experiment_input.input(fn=void_client) client = create_client(HOST, PORT, USERNAME, PASSWORD) with gr.Blocks(): gr.Markdown("# Stirring") rpm = gr.Slider(minimum=0, maximum=2000, step=50, label="RPM") with gr.Row(): stir_state = gr.Radio(choices=["start", "stop", "update"], label="State") st = gr.Button("Send Command") st.click( fn=stirring_default, inputs=[rpm, experiment_input, stir_state] ) with gr.Blocks(): gr.Markdown("# Temperature Automation") # Dropdown for selecting automation type temp_option = gr.Dropdown( choices=["Record Temp Only", "Heat To Temp"], label="Temperature Automation", value="Record Temp Only" ) # Slider for temperature (initially hidden) temperature_slider = gr.Slider(minimum=0, maximum=60, step=1, label="Temperature", visible=False) # Button to start automation with gr.Row(): temp_state = gr.Radio(choices=["start", "stop", "update", "restart"], label="State") temp = gr.Button("Send Command") # Update visibility of the slider based on dropdown selection temp_option.change( fn=toggle_temperature_input, inputs=temp_option, outputs=temperature_slider ) temp.click( fn=temp_automation_default, inputs=[temperature_slider, experiment_input, temp_state, temp_option] ) with gr.Tab("Custom"): # Input components with gr.Row(): host_input = gr.Textbox(label="Host") port_input = gr.Number(label="Port") host_input.input(fn=void_client) port_input.input(fn=void_client) with gr.Row(): username_input = gr.Textbox(label="Username") password_input = gr.Textbox(label="Password") pioreactor_input = gr.Textbox(label="Pioreactor") experiment_input = gr.Textbox(label="Experiment") username_input.input(fn=void_client) password_input.input(fn=void_client) pioreactor_input.input(fn=void_client) experiment_input.input(fn=void_client) with gr.Blocks(): gr.Markdown("# Stirring") rpm_input = gr.Slider(minimum=0, maximum=2000, step=50, label="RPM") with gr.Row(): stir_state = gr.Radio(choices=["start", "stop", "update"], label="State") st = gr.Button("Send Command") st.click( fn=stirring, inputs=[rpm_input, host_input, port_input, username_input, password_input, pioreactor_input, experiment_input, stir_state] ) with gr.Blocks(): gr.Markdown("# Temperature Automation") # Dropdown for selecting automation type temp_option = gr.Dropdown( choices=["Record Temp Only", "Heat To Temp"], label="Temperature Automation", value="Record Temp Only" ) # Slider for temperature (initially hidden) temperature_slider = gr.Slider(minimum=0, maximum=60, step=1, label="Temperature", visible=False) # Button to start automation with gr.Row(): temp_state = gr.Radio(choices=["start", "stop", "update", "restart"], label="State") temp = gr.Button("Send Command") # Update visibility of the slider based on dropdown selection temp_option.change( fn=toggle_temperature_input, inputs=temp_option, outputs=temperature_slider ) temp.click( fn=temp_automation, inputs=[temperature_slider, host_input, port_input, username_input, password_input, pioreactor_input, experiment_input, temp_state, temp_option] ) with gr.Blocks(): with gr.Row(): with gr.Column(): gr.Markdown("# OD Reading") od_state = gr.Radio(choices=["start", "stop"], label="State") od_button = gr.Button("Send Command") od_button.click( fn=od, inputs=[host_input, port_input, username_input, password_input, pioreactor_input, experiment_input, od_state] ) with gr.Column(): gr.Markdown("# Growth Rate") gr_state = gr.Radio(choices=["start", "stop"], label="State") gr_button = gr.Button("Send Command") gr_button.click( fn=grf, inputs=[host_input, port_input, username_input, password_input, pioreactor_input, experiment_input, gr_state] ) with gr.Blocks(): gr.Markdown("# LEDS") with gr.Row(): # Row for all channels with gr.Column(): gr.Markdown("### Channel A") led1 = gr.Slider(minimum=0, maximum=100, step=1, label="Intensity") led1_button = gr.Button("Send Command") led1_button.click( fn=led1fn, inputs=[led1, host_input, port_input, username_input, password_input, pioreactor_input, experiment_input] ) with gr.Column(): gr.Markdown("### Channel B") led2 = gr.Slider(minimum=0, maximum=100, step=1, label="Intensity") led2_button = gr.Button("Send Command") led2_button.click( fn=led2fn, inputs=[led2, host_input, port_input, username_input, password_input, pioreactor_input, experiment_input] ) with gr.Column(): gr.Markdown("### Channel C") led3 = gr.Slider(minimum=0, maximum=100, step=1, label="Intensity") led3_button = gr.Button("Send Command") led3_button.click( fn=led3fn, inputs=[led3, host_input, port_input, username_input, password_input, pioreactor_input, experiment_input] ) with gr.Column(): gr.Markdown("### Channel D") led4 = gr.Slider(minimum=0, maximum=100, step=1, label="Intensity") led4_button = gr.Button("Send Command") led4_button.click( fn=led4fn, inputs=[led4, host_input, port_input, username_input, password_input, pioreactor_input, experiment_input] ) with gr.Blocks(): gr.Markdown("# Dosing") with gr.Row(): with gr.Column(): gr.Markdown("### Add Media") add_media_ml = gr.Number(label="Media (mL)", step=1, minimum=0, maximum=14) add_media_button = gr.Button("Send Command") add_media_button.click( fn=add_media, inputs=[add_media_ml, host_input, port_input, username_input, password_input, pioreactor_input, experiment_input] ) with gr.Column(): gr.Markdown("### Remove Waste") remove_waste_ml = gr.Number(label="Waste (mL)", step=1, minimum=0, maximum=20) remove_waste_button = gr.Button("Send Command") remove_waste_button.click( fn=remove_waste, inputs=[remove_waste_ml, host_input, port_input, username_input, password_input, pioreactor_input, experiment_input] ) with gr.Column(): gr.Markdown("### Cycle Media") cycle_media_sec = gr.Number(label="Cycle (s)", step=1, minimum=0, maximum=60) cycle_media_button = gr.Button("Send Command") cycle_media_button.click( fn=cycle_media, inputs=[cycle_media_sec, host_input, port_input, username_input, password_input, pioreactor_input, experiment_input] ) # Launch the interface demo.launch()