import gradio as gr import paho.mqtt.client as mqtt import os import json import time HOST = os.environ.get("host") PORT = int(os.environ.get("port")) USERNAME = os.environ.get("username") PASSWORD = os.environ.get("password") PIOREACTOR = os.environ.get("pioreactor") client = None client_custom = None experiment = None running = [] temp_mqtt_auto = None rpm_mqtt = None led_mqtt = None experiments = [] def on_connect(client, userdata, flags, rc): print("Connected with result code "+str(rc)) def create_client(host, port, username, password): print(host, port, username, password) client = mqtt.Client() client.username_pw_set(username, password) client.tls_set(tls_version=mqtt.ssl.PROTOCOL_TLS) client.on_connect = on_connect client.connect(host, port) client.loop_start() return client def on_message_worker(client, userdata, message): payload = message.payload.decode("utf-8") data = json.loads(payload) global experiment global running global temp_mqtt_auto global rpm_mqtt global led_mqtt global experiments experiment = data.get("experiment", None) running = data.get("running", []) temp_mqtt_auto = data.get("temperature_automation", None) rpm_mqtt = data.get("stirring", None) led_mqtt = data.get("leds", None) experiments = data.get("experiments", []) def store_client(): global client global HOST global PORT global USERNAME global PASSWORD client = create_client(HOST, PORT, USERNAME, PASSWORD) def custom_client(host, port, username, password): global client_custom client_custom = create_client(host, port, username, password) def stirring(rpm, host, port, username, password, pioreactor, experiment, state): global client_custom if client_custom is None: custom_client(host, port, username, password) if state == "start": payload = { "command": "start_stirring", "experiment": experiment, "reactor": pioreactor, "rpm": rpm } client_custom.publish(f"pioreactor/control", json.dumps(payload)) elif state == "stop": payload = { "command": "stop_stirring", "experiment": experiment, "reactor": pioreactor } client_custom.publish(f"pioreactor/control", json.dumps(payload)) elif state == "update": payload = { "command": "update_stirring_rpm", "experiment": experiment, "reactor": pioreactor, "rpm": rpm } client_custom.publish(f"pioreactor/control", json.dumps(payload)) else: print("Invalid state") def toggle_temperature_input(selected_option): # Show the temperature slider only if "Heat To Temp" is selected return gr.update(visible=selected_option == "Heat To Temp") def temp_automation(temperature, host, port, username, password, pioreactor, experiment, state, option): global client_custom if client_custom is None: custom_client(host, port, username, password) payload_options = "thermostat" if option == "Heat To Temp" else "only_record_temperature" if state == "start": payload = { "command": "set_temperature_automation", "experiment": experiment, "reactor": pioreactor, "temp": temperature, "automation": payload_options } client_custom.publish(f"pioreactor/control", json.dumps(payload)) elif state == "stop": payload = { "command": "temp_update", "experiment": experiment, "reactor": pioreactor, "settings": { "$state": "disconnected" } } client_custom.publish(f"pioreactor/control", json.dumps(payload)) elif state == "update": payload = { "command": "temp_update", "experiment": experiment, "reactor": pioreactor, "settings": { "target_temperature": temperature } } client_custom.publish(f"pioreactor/control", json.dumps(payload)) elif state == "restart": payload = { "command": "temp_restart", "experiment": experiment, "reactor": pioreactor, "automation": payload_options, "temp": temperature } client_custom.publish(f"pioreactor/control", json.dumps(payload)) else: print("Invalid state") def od(host, port, username, password, pioreactor, experiment, state): global client_custom if client_custom is None: custom_client(host, port, username, password) if state == "start": payload = { "command": "start_od_reading", "experiment": experiment, "reactor": pioreactor } client_custom.publish(f"pioreactor/control", json.dumps(payload)) elif state == "stop": payload = { "command": "stop_od_reading", "experiment": experiment, "reactor": pioreactor } client_custom.publish(f"pioreactor/control", json.dumps(payload)) else: print("Invalid state") def grf(host, port, username, password, pioreactor, experiment, state): global client_custom if client_custom is None: custom_client(host, port, username, password) if state == "start": payload = { "command": "start_growth_rate", "experiment": experiment, "reactor": pioreactor } client_custom.publish(f"pioreactor/control", json.dumps(payload)) elif state == "stop": payload = { "command": "stop_growth_rate", "experiment": experiment, "reactor": pioreactor } client_custom.publish(f"pioreactor/control", json.dumps(payload)) else: print("Invalid state") def led1fn(led1, host, port, username, password, pioreactor, experiment): global client_custom if client_custom is None: custom_client(host, port, username, password) payload = { "command": "set_led_intensity", "experiment": experiment, "reactor": pioreactor, "led": "A", "brightness": led1 } client_custom.publish(f"pioreactor/control", json.dumps(payload)) def led2fn(led2, host, port, username, password, pioreactor, experiment): global client_custom if client_custom is None: custom_client(host, port, username, password) payload = { "command": "set_led_intensity", "experiment": experiment, "reactor": pioreactor, "led": "B", "brightness": led2 } client_custom.publish(f"pioreactor/control", json.dumps(payload)) def led3fn(led3, host, port, username, password, pioreactor, experiment): global client_custom if client_custom is None: custom_client(host, port, username, password) payload = { "command": "set_led_intensity", "experiment": experiment, "reactor": pioreactor, "led": "C", "brightness": led3 } client_custom.publish(f"pioreactor/control", json.dumps(payload)) def led4fn(led4, host, port, username, password, pioreactor, experiment): global client_custom if client_custom is None: custom_client(host, port, username, password) payload = { "command": "set_led_intensity", "experiment": experiment, "reactor": pioreactor, "led": "D", "brightness": led4 } client_custom.publish(f"pioreactor/control", json.dumps(payload)) def add_media(media, host, port, username, password, pioreactor, experiment): global client_custom if client_custom is None: custom_client(host, port, username, password) payload = { "command": "pump_add_media", "experiment": experiment, "reactor": pioreactor, "volume": media } client_custom.publish(f"pioreactor/control", json.dumps(payload)) def remove_waste(waste, host, port, username, password, pioreactor, experiment): global client_custom if client_custom is None: custom_client(host, port, username, password) payload = { "command": "pump_remove_media", "experiment": experiment, "reactor": pioreactor, "volume": waste } client_custom.publish(f"pioreactor/control", json.dumps(payload)) def cycle_media(cycle, host, port, username, password, pioreactor, experiment): global client_custom if client_custom is None: custom_client(host, port, username, password) payload = { "command": "circulate_media", "experiment": experiment, "reactor": pioreactor, "duration": cycle } client_custom.publish(f"pioreactor/control", json.dumps(payload)) def void_client(): global client_custom if client_custom is not None: client_custom.disconnect() client_custom = None print("Client disconnected") def stirring_default(rpm, experiment, state): global client if state == "start": payload = { "command": "start_stirring", "experiment": experiment, "reactor": PIOREACTOR, "rpm": rpm } client.publish(f"pioreactor/control", json.dumps(payload)) elif state == "stop": payload = { "command": "stop_stirring", "experiment": experiment, "reactor": PIOREACTOR } client.publish(f"pioreactor/control", json.dumps(payload)) elif state == "update": payload = { "command": "update_stirring_rpm", "experiment": experiment, "reactor": PIOREACTOR, "rpm": rpm } client.publish(f"pioreactor/control", json.dumps(payload)) else: print("Invalid state") def temp_automation_default(temperature, experiment, state, option): payload_options = "thermostat" if option == "Heat To Temp" else "only_record_temperature" global client if state == "start": payload = { "command": "set_temperature_automation", "experiment": experiment, "reactor": PIOREACTOR, "temp": temperature, "automation": payload_options } client.publish(f"pioreactor/control", json.dumps(payload)) elif state == "stop": payload = { "command": "temp_update", "experiment": experiment, "reactor": PIOREACTOR, "settings": { "$state": "disconnected" } } client.publish(f"pioreactor/control", json.dumps(payload)) elif state == "update": payload = { "command": "temp_update", "experiment": experiment, "reactor": PIOREACTOR, "settings": { "target_temperature": temperature } } client.publish(f"pioreactor/control", json.dumps(payload)) elif state == "restart": payload = { "command": "temp_restart", "experiment": experiment, "reactor": PIOREACTOR, "automation": payload_options, "temp": temperature } client.publish(f"pioreactor/control", json.dumps(payload)) else: print("Invalid state") def od_default(experiment, state): global client if state == "start": payload = { "command": "start_od_reading", "experiment": experiment, "reactor": PIOREACTOR } client.publish(f"pioreactor/control", json.dumps(payload)) elif state == "stop": payload = { "command": "stop_od_reading", "experiment": experiment, "reactor": PIOREACTOR } client.publish(f"pioreactor/control", json.dumps(payload)) else: print("Invalid state") def grf_default(experiment, state): global client if state == "start": payload = { "command": "start_growth_rate", "experiment": experiment, "reactor": PIOREACTOR } client.publish(f"pioreactor/control", json.dumps(payload)) elif state == "stop": payload = { "command": "stop_growth_rate", "experiment": experiment, "reactor": PIOREACTOR } client.publish(f"pioreactor/control", json.dumps(payload)) else: print("Invalid state") def led1fn_default(led1, experiment): global client payload = { "command": "set_led_intensity", "experiment": experiment, "reactor": PIOREACTOR, "led": "A", "brightness": led1 } client.publish(f"pioreactor/control", json.dumps(payload)) def led2fn_default(led2, experiment): global client payload = { "command": "set_led_intensity", "experiment": experiment, "reactor": PIOREACTOR, "led": "B", "brightness": led2 } client.publish(f"pioreactor/control", json.dumps(payload)) def led3fn_default(led3, experiment): global client payload = { "command": "set_led_intensity", "experiment": experiment, "reactor": PIOREACTOR, "led": "C", "brightness": led3 } client.publish(f"pioreactor/control", json.dumps(payload)) def led4fn_default(led4, experiment): global client payload = { "command": "set_led_intensity", "experiment": experiment, "reactor": PIOREACTOR, "led": "D", "brightness": led4 } client.publish(f"pioreactor/control", json.dumps(payload)) def add_media_default(media, experiment): global client payload = { "command": "pump_add_media", "experiment": experiment, "reactor": PIOREACTOR, "volume": media } client.publish(f"pioreactor/control", json.dumps(payload)) def remove_waste_default(waste, experiment): global client payload = { "command": "pump_remove_media", "experiment": experiment, "reactor": PIOREACTOR, "volume": waste } client.publish(f"pioreactor/control", json.dumps(payload)) def cycle_media_default(cycle, experiment): global client payload = { "command": "circulate_media", "experiment": experiment, "reactor": PIOREACTOR, "duration": cycle } client.publish(f"pioreactor/control", json.dumps(payload)) def get_status(host, port, username, password, pioreactor, exp): global client_custom if client_custom is None: custom_client(host, port, username, password) client_custom.subscribe(f"pioreactor/{pioreactor}/worker") client_custom.on_message = on_message_worker global experiment experiment = None payload = { "command": "get_worker", "experiment": exp, "reactor": pioreactor } client_custom.publish(f"pioreactor/control", json.dumps(payload)) timeout = 10 start = time.time() while experiment is None and (time.time() - start) < timeout: time.sleep(0.1) client_custom.unsubscribe(f"pioreactor/{pioreactor}/worker") return experiment, running, temp_mqtt_auto, rpm_mqtt, led_mqtt, experiments def get_status_default(exp): global client client.subscribe(f"pioreactor/{PIOREACTOR}/worker") client.on_message = on_message_worker global experiment experiment = None payload = { "command": "get_worker", "experiment": exp, "reactor": PIOREACTOR } client.publish(f"pioreactor/control", json.dumps(payload)) timeout = 10 start = time.time() while experiment is None and (time.time() - start) < timeout: time.sleep(0.1) client.unsubscribe(f"pioreactor/{PIOREACTOR}/worker") return experiment, running, temp_mqtt_auto, rpm_mqtt, led_mqtt, experiments def new_experiment_default(new_exp, exp): global client payload = { "command": "new_experiment", "experiment": new_exp, "reactor": PIOREACTOR, } client.publish(f"pioreactor/control", json.dumps(payload)) def remove_experiment_default(rem_exp, exp): global client payload = { "command": "delete_experiment", "experiment": rem_exp, "reactor": PIOREACTOR, } client.publish(f"pioreactor/control", json.dumps(payload)) def change_experiment_default(ch_exp, exp): global client payload = { "command": "change_experiment", "experiment": exp, "experiment_new": ch_exp, "reactor": PIOREACTOR, } client.publish(f"pioreactor/control", json.dumps(payload)) # Define the interface components with gr.Blocks() as demo: with gr.Tab("Default"): experiment_input = gr.Textbox(label="Experiment", value="Ed") experiment_input.input(fn=void_client) client = create_client(HOST, PORT, USERNAME, PASSWORD) with gr.Blocks(): gr.Markdown("# Status") get_status = gr.Button("Get Status") with gr.Row(): experiment_output = gr.Textbox(label="Experiment", interactive=False) running_output = gr.Textbox(label="Running", interactive=False) with gr.Row(): temp_output = gr.Textbox(label="Temperature Automation", interactive=False) rpm_output = gr.Textbox(label="RPM", interactive=False) led_output = gr.Textbox(label="LEDs", interactive=False) experiments_output = gr.Textbox(label="Experiments", interactive=False) get_status.click( fn=get_status_default, inputs=[experiment_input], outputs=[experiment_output, running_output, temp_output, rpm_output, led_output, experiments_output] ) with gr.Blocks(): gr.Markdown("# Experiments") with gr.Row(): new_experiment = gr.Textbox(label="New Experiment") new_experiment_button = gr.Button("Add Experiment") with gr.Row(): remove_experiment = gr.Textbox(label="Remove Experiment") remove_experiment_button = gr.Button("Remove Experiment") with gr.Row(): change_experiment = gr.Textbox(label="Change Experiment") change_experiment_button = gr.Button("Change Experiment") new_experiment_button.click( fn=new_experiment_default, inputs=[new_experiment, experiment_input] ) remove_experiment_button.click( fn=remove_experiment_default, inputs=[remove_experiment, experiment_input] ) change_experiment_button.click( fn=change_experiment_default, inputs=[change_experiment, experiment_input] ) with gr.Blocks(): gr.Markdown("# Temperature Automation") # Dropdown for selecting automation type temp_option = gr.Dropdown( choices=["Record Temp Only", "Heat To Temp"], label="Temperature Automation", value="Record Temp Only" ) # Slider for temperature (initially hidden) temperature_slider = gr.Slider(minimum=0, maximum=60, step=1, label="Temperature", visible=False) # Button to start automation with gr.Row(): temp_state = gr.Radio(choices=["start", "stop", "update", "restart"], label="State") temp = gr.Button("Send Command") # Update visibility of the slider based on dropdown selection temp_option.change( fn=toggle_temperature_input, inputs=temp_option, outputs=temperature_slider ) temp.click( fn=temp_automation_default, inputs=[temperature_slider, experiment_input, temp_state, temp_option] ) with gr.Blocks(): with gr.Row(): with gr.Column(): gr.Markdown("# OD Reading") od_state = gr.Radio(choices=["start", "stop"], label="State") od_button = gr.Button("Send Command") od_button.click( fn=od_default, inputs=[experiment_input, od_state] ) with gr.Column(): gr.Markdown("# Growth Rate") gr_state = gr.Radio(choices=["start", "stop"], label="State") gr_button = gr.Button("Send Command") gr_button.click( fn=grf_default, inputs=[experiment_input, gr_state] ) with gr.Blocks(): gr.Markdown("# LEDS") with gr.Row(): with gr.Column(): gr.Markdown("### Channel A") led1 = gr.Slider(minimum=0, maximum=100, step=1, label="Intensity") led1_button = gr.Button("Send Command") led1_button.click( fn=led1fn_default, inputs=[led1, experiment_input] ) with gr.Column(): gr.Markdown("### Channel B") led2 = gr.Slider(minimum=0, maximum=100, step=1, label="Intensity") led2_button = gr.Button("Send Command") led2_button.click( fn=led2fn_default, inputs=[led2, experiment_input] ) with gr.Column(): gr.Markdown("### Channel C") led3 = gr.Slider(minimum=0, maximum=100, step=1, label="Intensity") led3_button = gr.Button("Send Command") led3_button.click( fn=led3fn_default, inputs=[led3, experiment_input] ) with gr.Column(): gr.Markdown("### Channel D") led4 = gr.Slider(minimum=0, maximum=100, step=1, label="Intensity") led4_button = gr.Button("Send Command") led4_button.click( fn=led4fn_default, inputs=[led4, experiment_input] ) with gr.Blocks(): gr.Markdown("# Dosing") with gr.Row(): with gr.Column(): gr.Markdown("### Add Media") add_media_ml = gr.Number(label="Media (mL)", step=1, minimum=0, maximum=14) add_media_button = gr.Button("Send Command") add_media_button.click( fn=add_media_default, inputs=[add_media_ml, experiment_input] ) with gr.Column(): gr.Markdown("### Remove Waste") remove_waste_ml = gr.Number(label="Waste (mL)", step=1, minimum=0, maximum=20) remove_waste_button = gr.Button("Send Command") remove_waste_button.click( fn=remove_waste_default, inputs=[remove_waste_ml, experiment_input] ) with gr.Column(): gr.Markdown("### Cycle Media") cycle_media_sec = gr.Number(label="Cycle (s)", step=1, minimum=0, maximum=60) cycle_media_button = gr.Button("Send Command") cycle_media_button.click( fn=cycle_media_default, inputs=[cycle_media_sec, experiment_input] ) with gr.Tab("Custom"): # Input components with gr.Row(): host_input = gr.Textbox(label="Host") port_input = gr.Number(label="Port") host_input.input(fn=void_client) port_input.input(fn=void_client) with gr.Row(): username_input = gr.Textbox(label="Username") password_input = gr.Textbox(label="Password") pioreactor_input = gr.Textbox(label="Pioreactor") experiment_input = gr.Textbox(label="Experiment") username_input.input(fn=void_client) password_input.input(fn=void_client) pioreactor_input.input(fn=void_client) experiment_input.input(fn=void_client) with gr.Blocks(): gr.Markdown("# Status") get_status = gr.Button("Get Status") with gr.Row(): experiment_output = gr.Textbox(label="Experiment", interactive=False) running_output = gr.Textbox(label="Running", interactive=False) with gr.Row(): temp_output = gr.Textbox(label="Temperature Automation", interactive=False) rpm_output = gr.Textbox(label="RPM", interactive=False) led_output = gr.Textbox(label="LEDs", interactive=False) experiments_output = gr.Textbox(label="Experiments", interactive=False) get_status.click( fn=get_status_default, inputs=[experiment_input], outputs=[experiment_output, running_output, temp_output, rpm_output, led_output, experiments_output] ) with gr.Blocks(): gr.Markdown("# Stirring") rpm_input = gr.Slider(minimum=0, maximum=2000, step=50, label="RPM") with gr.Row(): stir_state = gr.Radio(choices=["start", "stop", "update"], label="State") st = gr.Button("Send Command") st.click( fn=stirring, inputs=[rpm_input, host_input, port_input, username_input, password_input, pioreactor_input, experiment_input, stir_state] ) with gr.Blocks(): gr.Markdown("# Temperature Automation") # Dropdown for selecting automation type temp_option = gr.Dropdown( choices=["Record Temp Only", "Heat To Temp"], label="Temperature Automation", value="Record Temp Only" ) # Slider for temperature (initially hidden) temperature_slider = gr.Slider(minimum=0, maximum=60, step=1, label="Temperature", visible=False) # Button to start automation with gr.Row(): temp_state = gr.Radio(choices=["start", "stop", "update", "restart"], label="State") temp = gr.Button("Send Command") # Update visibility of the slider based on dropdown selection temp_option.change( fn=toggle_temperature_input, inputs=temp_option, outputs=temperature_slider ) temp.click( fn=temp_automation, inputs=[temperature_slider, host_input, port_input, username_input, password_input, pioreactor_input, experiment_input, temp_state, temp_option] ) with gr.Blocks(): with gr.Row(): with gr.Column(): gr.Markdown("# OD Reading") od_state = gr.Radio(choices=["start", "stop"], label="State") od_button = gr.Button("Send Command") od_button.click( fn=od, inputs=[host_input, port_input, username_input, password_input, pioreactor_input, experiment_input, od_state] ) with gr.Column(): gr.Markdown("# Growth Rate") gr_state = gr.Radio(choices=["start", "stop"], label="State") gr_button = gr.Button("Send Command") gr_button.click( fn=grf, inputs=[host_input, port_input, username_input, password_input, pioreactor_input, experiment_input, gr_state] ) with gr.Blocks(): gr.Markdown("# LEDS") with gr.Row(): # Row for all channels with gr.Column(): gr.Markdown("### Channel A") led1 = gr.Slider(minimum=0, maximum=100, step=1, label="Intensity") led1_button = gr.Button("Send Command") led1_button.click( fn=led1fn, inputs=[led1, host_input, port_input, username_input, password_input, pioreactor_input, experiment_input] ) with gr.Column(): gr.Markdown("### Channel B") led2 = gr.Slider(minimum=0, maximum=100, step=1, label="Intensity") led2_button = gr.Button("Send Command") led2_button.click( fn=led2fn, inputs=[led2, host_input, port_input, username_input, password_input, pioreactor_input, experiment_input] ) with gr.Column(): gr.Markdown("### Channel C") led3 = gr.Slider(minimum=0, maximum=100, step=1, label="Intensity") led3_button = gr.Button("Send Command") led3_button.click( fn=led3fn, inputs=[led3, host_input, port_input, username_input, password_input, pioreactor_input, experiment_input] ) with gr.Column(): gr.Markdown("### Channel D") led4 = gr.Slider(minimum=0, maximum=100, step=1, label="Intensity") led4_button = gr.Button("Send Command") led4_button.click( fn=led4fn, inputs=[led4, host_input, port_input, username_input, password_input, pioreactor_input, experiment_input] ) with gr.Blocks(): gr.Markdown("# Dosing") with gr.Row(): with gr.Column(): gr.Markdown("### Add Media") add_media_ml = gr.Number(label="Media (mL)", step=1, minimum=0, maximum=14) add_media_button = gr.Button("Send Command") add_media_button.click( fn=add_media, inputs=[add_media_ml, host_input, port_input, username_input, password_input, pioreactor_input, experiment_input] ) with gr.Column(): gr.Markdown("### Remove Waste") remove_waste_ml = gr.Number(label="Waste (mL)", step=1, minimum=0, maximum=20) remove_waste_button = gr.Button("Send Command") remove_waste_button.click( fn=remove_waste, inputs=[remove_waste_ml, host_input, port_input, username_input, password_input, pioreactor_input, experiment_input] ) with gr.Column(): gr.Markdown("### Cycle Media") cycle_media_sec = gr.Number(label="Cycle (s)", step=1, minimum=0, maximum=60) cycle_media_button = gr.Button("Send Command") cycle_media_button.click( fn=cycle_media, inputs=[cycle_media_sec, host_input, port_input, username_input, password_input, pioreactor_input, experiment_input] ) # Launch the interface demo.launch()