steveyin's picture
Upload 20 files
8ff3f46 verified
raw
history blame
5.75 kB
from threading import Thread
from datetime import datetime
import cv2
import uvicorn
import contextlib
import threading
import time
import requests
class Server(uvicorn.Server):
'''
Use this Server class to server a uvicorn in a separate thread,
so to proceed to gradio UI launch
https://github.com/encode/uvicorn/issues/742
A couple of other links for reference --
https://stackoverflow.com/questions/61577643/python-how-to-use-fastapi-and-uvicorn-run-without-blocking-the-thread
https://stackoverflow.com/questions/76142431/how-to-run-another-application-within-the-same-running-event-loop/76148361#76148361
'''
def install_signal_handlers(self):
pass
@contextlib.contextmanager
def run_in_thread(self):
thread = threading.Thread(target=self.run)
thread.start()
try:
while not self.started:
time.sleep(1e-3)
yield
finally:
self.should_exit = True
thread.join()
class CountsPerSec:
"""
Class that tracks the number of occurrences ("counts") of an
arbitrary event and returns the frequency in occurrences
(counts) per second. The caller must increment the count.
"""
def __init__(self):
self._start_time = None
self._num_occurrences = 0
def start(self):
self._start_time = datetime.now()
return self
def increment(self):
self._num_occurrences += 1
def countsPerSec(self):
elapsed_time = (datetime.now() - self._start_time).total_seconds()
return self._num_occurrences / elapsed_time if elapsed_time > 0 else 0
class VideoGet:
"""
Class that continuously gets frames from a VideoCapture object
with a dedicated thread.
"""
def __init__(self, src=0):
self.stream = cv2.VideoCapture(src)
(self.grabbed, self.frame) = self.stream.read()
self.tn = Thread(target=self.get, args=())
self.stopped = False
def start(self):
self.tn.start()
return self
def get(self):
while not self.stopped:
if not self.grabbed:
self.stop()
else:
(self.grabbed, self.frame) = self.stream.read()
def stop(self):
self.tn.join()
self.stopped = True
class VideoShow:
"""
Class that continuously shows a frame using a dedicated thread.
"""
def __init__(self, frame=None):
self.frame = frame
self.tn = Thread(target=self.show, args=())
self.stopped = False
def start(self):
self.tn.start()
return self
def show(self):
while not self.stopped:
cv2.imshow("Video", self.frame)
if cv2.waitKey(1) == ord("q"):
self.stopped = True
def stop(self):
self.tn.join()
self.stopped = True
def show_fps(frame, iterations_per_sec):
"""
Add iterations per second text to lower-left corner of a frame.
"""
cv2.putText(
img=frame,
text="{:.0f} fps".format(iterations_per_sec),
org=(1000, 50),
fontFace=cv2.FONT_HERSHEY_SIMPLEX,
fontScale=0.8,
color=(0, 255, 255),
thickness=1,
lineType=cv2.LINE_AA
)
cv2.putText(
img=frame, # annotated_frame,
text=datetime.now().strftime("%m/%d/%Y %H:%M:%S"),
org=(500, 50),
fontFace=cv2.FONT_HERSHEY_SIMPLEX,
fontScale=0.8,
color=(0, 255, 255),
thickness=1,
lineType=cv2.LINE_AA
)
return frame
def draw_text(
img,
text,
pos=(0, 0),
font=cv2.FONT_HERSHEY_SIMPLEX,
font_scale=1,
font_thickness=2,
line_type=cv2.LINE_AA,
text_color=(0, 255, 0),
text_color_bg=(0, 0, 0)
) -> None:
"""draw a text with background color on image frame
Args:
img (_type_): _description_
text (_type_): _description_
pos (tuple, optional): _description_. Defaults to (0, 0).
font (_type_, optional): _description_. Defaults to cv2.FONT_HERSHEY_SIMPLEX.
font_scale (int, optional): _description_. Defaults to 1.
font_thickness (int, optional): _description_. Defaults to 2.
line_type (_type_, optional): _description_. Defaults to cv2.LINE_AA.
text_color (tuple, optional): _description_. Defaults to (0, 255, 0).
text_color_bg (tuple, optional): _description_. Defaults to (0, 0, 0).
Returns:
_type_: _description_
"""
x, y = pos
text_size, _ = cv2.getTextSize(text, font, font_scale, font_thickness)
text_w, text_h = text_size
cv2.rectangle(img, (x, y + 10), (x + text_w, max(0, y - text_h - 10)), text_color_bg, -1)
cv2.putText(
img=img,
text=text,
org=pos,
fontFace=font,
fontScale=font_scale,
color=text_color,
thickness=font_thickness,
lineType=line_type
)
return text_size
def try_site(youtube_url: str) -> bool:
"""Check if a youtube url is playable
Args:
youtube_url (str): a given url for testing
Returns:
bool: whether or not that youtube_url is playable
"""
pattern = '"playabilityStatus":{"status":"ERROR","reason":"Video unavailable"'
request = requests.get(youtube_url)
return False if pattern in request.text else True
def make_table_from_dict(obj: dict, selected_key: str) -> list:
table = []
for k, v in obj.items():
if k == selected_key:
# print(k, v, selected_key)
table.append({"name": k, "value": v, "selected": True})
else:
table.append({"name": k, "value": v, "selected": False})
return table