Spaces:
Sleeping
Sleeping
import streamlit as st | |
import uuid, io, sys, os | |
def get_user_id(): | |
if "user_id" not in st.session_state: | |
query_params = st.query_params | |
user_id = query_params.get("user_id", None) | |
if not user_id: | |
user_id = str(uuid.uuid4()) | |
st.query_params["user_id"] = user_id | |
st.session_state.user_id = user_id | |
return st.session_state.user_id | |
def get_credentials(use_own_creds: bool): | |
if use_own_creds: | |
HIVEMQ_USERNAME = st.text_input("HIVEMQ Username", type="password") | |
HIVEMQ_PASSWORD = st.text_input("HIVEMQ Password", type="password") | |
HIVEMQ_HOST = st.text_input("HIVEMQ Host", type="password") | |
DEVICE_ENDPOINT = st.text_input("Device ID", type="password") | |
PORT = st.number_input("Port", min_value=1, step=1, value=8883) | |
else: | |
HIVEMQ_USERNAME = os.environ.get("HIVEMQ_USERNAME") | |
HIVEMQ_PASSWORD = os.environ.get("HIVEMQ_PASSWORD") | |
HIVEMQ_HOST = os.environ.get("HIVEMQ_HOST") | |
DEVICE_ENDPOINT = os.environ.get("DEVICE_ID") | |
PORT = int(os.environ.get("PORT", 8883)) | |
return HIVEMQ_USERNAME, HIVEMQ_PASSWORD, HIVEMQ_HOST, DEVICE_ENDPOINT, PORT | |
class StdoutRedirector(io.StringIO): | |
def __init__(self, log_area, max_lines=20): | |
super().__init__() | |
self.log_area = log_area | |
self.logs = [] | |
self.max_lines = max_lines | |
def write(self, message): | |
if message.strip(): | |
self.logs.append(message.strip()) | |
if len(self.logs) > self.max_lines: | |
self.logs.pop(0) | |
self.log_area.markdown( | |
f"<div class='log-window'>{'<br>'.join(self.logs)}</div>", | |
unsafe_allow_html=True | |
) | |
def render_log_window(): | |
st.markdown(""" | |
<style> | |
.log-window { | |
height: 150px; | |
overflow-y: auto; | |
font-family: monospace; | |
background-color: var(--background-color); | |
color: var(--text-color); | |
padding: 10px; | |
border-radius: 5px; | |
border: 2px solid white; | |
} | |
</style> | |
""", unsafe_allow_html=True) | |
log_area = st.empty() | |
stdout_redirector = StdoutRedirector(log_area) | |
sys.stdout = stdout_redirector |