jclyo1's picture
updates
6eeaadf
raw
history blame
3.01 kB
from fastapi import FastAPI, UploadFile
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse
import subprocess
import os
import json
import uuid
import logging
import torch
from diffusers import (
StableDiffusionPipeline,
DPMSolverMultistepScheduler,
EulerDiscreteScheduler,
)
app = FastAPI()
@app.get("/generate")
def generate_image(prompt, model):
torch.cuda.empty_cache()
modelArray = model.split(",")
modelName = modelArray[0]
modelVersion = modelArray[1]
pipeline = StableDiffusionPipeline.from_pretrained(
str(modelName), torch_dtype=torch.float16
)
pipeline.scheduler = EulerDiscreteScheduler.from_config(pipeline.scheduler.config)
pipeline = pipeline.to("cuda")
image = pipeline(prompt, num_inference_steps=50, height=512, width=512).images[0]
filename = str(uuid.uuid4()) + ".jpg"
image.save(filename)
# assertion = {
# "assertions": [
# {
# "label": "com.truepic.custom.ai",
# "data": {
# "model_name": modelName,
# "model_version": modelVersion,
# "prompt": prompt,
# },
# }
# ]
# }
# json_object = json.dumps(assertion)
subprocess.check_output(
[
"./scripts/sign.sh",
filename,
]
)
subprocess.check_output(
[
"cp",
"output.jpg",
"static/" + filename,
]
)
return {"response": filename}
@app.post("/verify")
def verify_image(fileUpload: UploadFile):
logging.warning("in verify")
logging.warning(fileUpload.filename)
# check if the file has been uploaded
if fileUpload.filename:
# strip the leading path from the file name
fn = os.path.basename(fileUpload.filename)
# open read and write the file into the server
open(fn, "wb").write(fileUpload.file.read())
response = subprocess.check_output(
[
"./scripts/verify.sh",
fileUpload.filename,
]
)
logging.warning(response)
response_list = response.splitlines()
c2pa_string = str(response_list[0])
c2pa = c2pa_string.split(":", 1)
watermark_string = str(response_list[1])
watermark = watermark_string.split(":", 1)
original_media_string = str(response_list[2])
original_media = original_media_string.split(":", 1)
original_media = str(original_media[1])
original_media = original_media.strip()
return {"response": fileUpload.filename, "contains_c2pa" : c2pa[1].strip(), "contains_watermark" : watermark[1].strip(), "original_media" : original_media}
app.mount("/", StaticFiles(directory="static", html=True), name="static")
@app.get("/")
def index() -> FileResponse:
return FileResponse(path="/app/static/index.html", media_type="text/html")