blanchon's picture
🔥 First commit
08d80be
raw
history blame
11.6 kB
from pathlib import Path
import shutil
import tempfile
import gradio as gr
import uuid
from typing_extensions import TypedDict, Tuple
from fastapi import FastAPI
from fastapi.staticfiles import StaticFiles
import uvicorn
app = FastAPI()
# create a static directory to store the static files
gs_dir = Path(str(tempfile.gettempdir())) / "gaussian_splatting_gradio"
gs_dir.mkdir(parents=True, exist_ok=True)
# mount FastAPI StaticFiles server
app.mount("/static", StaticFiles(directory=gs_dir), name="static")
StateDict = TypedDict("StateDict", {
"uuid": str,
})
def getHTML():
html_body = """
<body>
<div id="progress"></div>
<div id="message"></div>
<div class="scene" id="spinner">
<div class="cube-wrapper">
<div class="cube">
<div class="cube-faces">
<div class="cube-face bottom"></div>
<div class="cube-face top"></div>
<div class="cube-face left"></div>
<div class="cube-face right"></div>
<div class="cube-face back"></div>
<div class="cube-face front"></div>
</div>
</div>
</div>
</div>
<canvas id="canvas"></canvas>
<div id="quality">
<span id="fps"></span>
</div>
<style>
.cube-wrapper {
transform-style: preserve-3d;
}
.cube {
transform-style: preserve-3d;
transform: rotateX(45deg) rotateZ(45deg);
animation: rotation 2s infinite;
}
.cube-faces {
transform-style: preserve-3d;
height: 80px;
width: 80px;
position: relative;
transform-origin: 0 0;
transform: translateX(0) translateY(0) translateZ(-40px);
}
.cube-face {
position: absolute;
inset: 0;
background: #0017ff;
border: solid 1px #ffffff;
}
.cube-face.top {
transform: translateZ(80px);
}
.cube-face.front {
transform-origin: 0 50%;
transform: rotateY(-90deg);
}
.cube-face.back {
transform-origin: 0 50%;
transform: rotateY(-90deg) translateZ(-80px);
}
.cube-face.right {
transform-origin: 50% 0;
transform: rotateX(-90deg) translateY(-80px);
}
.cube-face.left {
transform-origin: 50% 0;
transform: rotateX(-90deg) translateY(-80px) translateZ(80px);
}
@keyframes rotation {
0% {
transform: rotateX(45deg) rotateY(0) rotateZ(45deg);
animation-timing-function: cubic-bezier(
0.17,
0.84,
0.44,
1
);
}
50% {
transform: rotateX(45deg) rotateY(0) rotateZ(225deg);
animation-timing-function: cubic-bezier(
0.76,
0.05,
0.86,
0.06
);
}
100% {
transform: rotateX(45deg) rotateY(0) rotateZ(405deg);
animation-timing-function: cubic-bezier(
0.17,
0.84,
0.44,
1
);
}
}
.scene,
#message {
position: absolute;
display: flex;
top: 0;
right: 0;
left: 0;
bottom: 0;
z-index: 2;
height: 100%;
width: 100%;
align-items: center;
justify-content: center;
}
#message {
font-weight: bold;
font-size: large;
color: red;
pointer-events: none;
}
#progress {
position: absolute;
top: 0;
height: 5px;
background: blue;
z-index: 99;
transition: width 0.1s ease-in-out;
}
#quality {
position: absolute;
bottom: 10px;
z-index: 999;
right: 10px;
}
#canvas {
display: block;
position: absolute;
top: 0;
left: 0;
width: 100%;
height: 100%;
touch-action: none;
}
#instructions {
background: rgba(0,0,0,0.6);
white-space: pre-wrap;
padding: 10px;
border-radius: 10px;
font-size: x-small;
}
</style>
</body>
"""
html = f"""
<head>
<title>3D Gaussian Splatting Viewer</title>
<script src="http://zeus.blanchon.cc/dropshare/main.js"></script>
</head>
{html_body}
"""
return f"""<iframe style="width: 100%; height: 900px" srcdoc='{html}'></iframe>"""
def createStateSession() -> StateDict:
# Create new session
session_uuid = str(uuid.uuid4())
print("createStateSession")
print(session_uuid)
return StateDict(
uuid=session_uuid,
)
def removeStateSession(session_state_value: StateDict):
# Clean up previous session
return StateDict(
uuid=None,
)
def makeButtonVisible() -> Tuple[gr.Button, gr.Button]:
process_button = gr.Button(visible=True)
reset_button = gr.Button(visible=False) #TODO: I will bring this back when I figure out how to stop the process
return process_button, reset_button
def resetSession(state: StateDict) -> Tuple[StateDict, gr.Button, gr.Button]:
print("resetSession")
new_state = removeStateSession(state)
process_button = gr.Button(visible=False)
reset_button = gr.Button(visible=False)
return new_state, process_button, reset_button
def process(
# *args, **kwargs
session_state_value: StateDict,
filepath: str,
ffmpeg_fps: int,
ffmpeg_qscale: int,
colmap_camera: str,
):
if session_state_value["uuid"] is None:
return
print("process")
# print(args)
# print(kwargs)
# return
print(session_state_value)
print(f"Processing {filepath}")
try:
session_tmpdirname = gs_dir / str(session_state_value['uuid'])
session_tmpdirname.mkdir(parents=True, exist_ok=True)
print('Created temporary directory', session_tmpdirname)
gs_dir_path = Path(session_tmpdirname)
logfile_path = Path(session_tmpdirname) / "log.txt"
logfile_path.touch()
with logfile_path.open("w") as log_file:
# Create log file
logfile_path.touch()
from services.ffmpeg import ffmpeg_run
ffmpeg_run(
video_path = Path(filepath),
output_path = gs_dir_path,
fps = int(ffmpeg_fps),
qscale = int(ffmpeg_qscale),
stream_file=log_file
)
from services.colmap import colmap
colmap(
source_path=gs_dir_path,
camera=str(colmap_camera),
stream_file=log_file
)
print("Done with colmap")
# Create a zip of the gs_dir_path folder
print(gs_dir, gs_dir_path)
print(gs_dir_path.name)
archive = shutil.make_archive("result", 'zip', gs_dir, gs_dir_path)
print('Created zip file', archive)
# Move the zip file to the gs_dir_path folder
shutil.move(archive, gs_dir_path)
from services.gaussian_splatting_cuda import gaussian_splatting_cuda
gaussian_splatting_cuda(
data_path = gs_dir_path,
output_path = gs_dir_path / "output",
gs_command = str(Path(__file__).parent.absolute() / "build" / 'gaussian_splatting_cuda'),
iterations = 100,
convergence_rate = 0.01,
resolution = 512,
enable_cr_monitoring = False,
force = False,
empty_gpu_cache = False,
stream_file = log_file
)
except Exception:
pass
# print('Error - Removing temporary directory', session_tmpdirname)
# shutil.rmtree(session_tmpdirname)
def updateLog(session_state_value: StateDict) -> str:
if session_state_value["uuid"] is None:
return ""
log_file = gs_dir / str(session_state_value['uuid']) / "log.txt"
if not log_file.exists():
return ""
with log_file.open("r") as log_file:
logs = log_file.read()
return logs
with gr.Blocks() as demo:
session_state = gr.State({
"uuid": None,
})
with gr.Row():
with gr.Column():
video_input = gr.PlayableVideo(
format="mp4",
source="upload",
label="Upload a video",
include_audio=False
)
with gr.Row(variant="panel"):
ffmpeg_fps = gr.Number(
label="FFMPEG FPE",
value=1,
minimum=1,
maximum=5,
step=0.10,
)
ffmpeg_qscale = gr.Number(
label="FFMPEG QSCALE",
value=1,
minimum=1,
maximum=5,
step=1,
)
colmap_camera = gr.Dropdown(
label="COLMAP Camera",
value="OPENCV",
choices=["OPENCV", "SIMPLE_PINHOLE", "PINHOLE", "SIMPLE_RADIAL", "RADIAL"],
)
text_log = gr.Textbox(
label="Logs",
info="Logs",
interactive=False,
show_copy_button=True
)
# text_log = gr.Code(
# label="Logs",
# language=None,
# interactive=False,
# )
process_button = gr.Button("Process", visible=False)
reset_button = gr.ClearButton(
components=[video_input, text_log, ffmpeg_fps, ffmpeg_qscale, colmap_camera],
label="Reset",
visible=False,
)
process_event = process_button.click(
fn=process,
inputs=[session_state, video_input, ffmpeg_fps, ffmpeg_qscale, colmap_camera],
outputs=[],
)
upload_event = video_input.upload(
fn=makeButtonVisible,
inputs=[],
outputs=[process_button, reset_button]
).then(
fn=createStateSession,
inputs=[],
outputs=[session_state],
).then(
fn=updateLog,
inputs=[session_state],
outputs=[text_log],
every=2,
)
reset_button.click(
fn=resetSession,
inputs=[session_state],
outputs=[session_state, process_button, reset_button],
cancels=[process_event]
)
video_input.clear(
fn=resetSession,
inputs=[session_state],
outputs=[session_state, process_button, reset_button],
cancels=[process_event]
)
demo.close
# gr.LoginButton, gr.LogoutButton
# gr.HuggingFaceDatasetSaver
# gr.OAuthProfile
# with gr.Tab("jsdn"):
# input_mic = gr.HTML(getHTML())
demo.queue()
# demo.launch()
# mount Gradio app to FastAPI app
app = gr.mount_gradio_app(app, demo, path="/")
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=7860)