toto10's picture
Upload folder using huggingface_hub (#1)
34097e9
raw
history blame
14.9 kB
import json
import requests
import io
import base64
from PIL import Image, PngImagePlugin
import asyncio
import httpx
import os
import time
import serverHelper
import prompt_shortcut
import metadata_to_json
import search
sd_url = os.environ.get('SD_URL', 'http://127.0.0.1:7860')
async def txt2ImgRequest(payload):
# payload = {
# "prompt": "cute cat, kitten",
# "steps": 10
# }
print("payload: ",payload)
if(payload['use_prompt_shortcut']): # use edit prompt
#edit prompt, replaceShortcut(prompt)
prompt_shortcut_dict = prompt_shortcut.load()
prompt_shortcut_dict.update(payload["prompt_shortcut_ui_dict"])
payload['prompt'] = prompt_shortcut.replaceShortcut(payload['prompt'],prompt_shortcut_dict)
# edit negative prompt, replaceShortcut(negative_prompt)
payload['negative_prompt'] = prompt_shortcut.replaceShortcut(payload['negative_prompt'],prompt_shortcut_dict)
#request the images to be generated
request_path = "/sdapi/v1/txt2img"
async with httpx.AsyncClient() as client:
response = await client.post(url=f'{sd_url}/sdapi/v1/txt2img', json=payload, timeout=None)
r = response.json()
#create a directory to store the images at
# dirName = f'{time.time()}'
# dir_fullpath,dirName = serverHelper.makeDirPathName()
uniqueDocumentId = payload['uniqueDocumentId']
dir_fullpath,dirName = serverHelper.getUniqueDocumentDirPathName(uniqueDocumentId)
serverHelper.createFolder(dir_fullpath)
image_paths = []
#for each image store the prompt and settings in the meta data
metadata = []
images_info = []
for i in r['images']:
image = Image.open(io.BytesIO(base64.b64decode(i.split(",",1)[0])))
png_payload = {
"image": "data:image/png;base64," + i
}
response2 = await client.post(url=f'{sd_url}/sdapi/v1/png-info', json=png_payload)
pnginfo = PngImagePlugin.PngInfo()
pnginfo.add_text("parameters", response2.json().get("info"))
image_name = f'output- {time.time()}.png'
image_path = f'output/{dirName}/{image_name}'
image_paths.append(image_path)
image.save(f'./{image_path}', pnginfo=pnginfo)
metadata_info = response2.json().get("info")
metadata_json = metadata_to_json.convertMetadataToJson(metadata_info)
metadata.append(metadata_json)
images_info.append({"base64":i,"path":image_path})
print("metadata_json: ", metadata_json)
return dirName,images_info,metadata
import base64
from io import BytesIO
def img_2_b64(image):
buff = BytesIO()
image.save(buff, format="PNG")
img_byte = base64.b64encode(buff.getvalue())
img_str = img_byte.decode("utf-8")
return img_str
from typing import Union
from fastapi import FastAPI
from fastapi import APIRouter, Request
router = APIRouter()
@router.get("/")
def read_root():
return {"Hello": "World"}
@router.get("/version")
def getVersion():
manifest_dir = "..\..\manifest.json"
manifest = {'version': '0.0.0'}
version = "0.0.0"
try:
with open(manifest_dir, 'r') as f:
manifest = json.load(f)
version = manifest['version']
except:
print("couldn't read the manifest.json")
return {"version": f"v{version}"}
# @router.post("/txt2img/")
# async def txt2ImgHandle(payload:Payload):
# print("txt2ImgHandle: \n")
# txt2ImgRequest(payload)
# return {"prompt":payload.prompt,"images": ""}
from fastapi import Request, Response
import img2imgapi
@router.post("/sd_url/")
async def changeSdUrl(request:Request):
global sd_url
try:
payload = await request.json()
print("changeSdUrl: payload:",payload)
print(f"change sd url from {sd_url} to {payload['sd_url']} \n")
sd_url = payload['sd_url']
except:
print("error occurred in changeSdUrl()")
# response.body = resp.content
# return {}
return {"sd_url":sd_url}
@router.post("/txt2img/")
async def txt2ImgHandle(request:Request):
print("txt2ImgHandle: \n")
payload = await request.json()
dir_name,images_info,metadata, = await txt2ImgRequest(payload)
# return {"prompt":payload.prompt,"images": ""}
return {"payload": payload,"dir_name": dir_name,"images_info":images_info,"metadata":metadata}
@router.post("/img2img/")
async def img2ImgHandle(request:Request):
print("img2ImgHandle: \n")
payload = await request.json()
dir_name,images_info,metadata = await img2imgapi.img2ImgRequest(sd_url,payload)
# return {"prompt":payload.prompt,"images": ""}
return {"payload": payload,"dir_name": dir_name,"images_info":images_info,"metadata":metadata}
@router.post("/getInitImage/")
async def getInitImageHandle(request:Request):
print("getInitImageHandle: \n")
payload = await request.json()
print("payload:",payload)
init_img_dir = "./init_images"
init_img_name = payload["init_image_name"]# change this to "image_name"
numOfAttempts = 3
init_img_str = ""
for i in range(numOfAttempts):
try:
image_path = f"{init_img_dir}/{init_img_name}"
init_img = Image.open(image_path)
init_img_str = img_2_b64(init_img)
# # If file exists, delete it.
# if os.path.isfile(image_path):
# os.remove(image_path)
except:
print(f"exception:fail to read an image file {image_path}, will try again {i} of {numOfAttempts}")
#sleep for one second every time you try to read an image and fail
time.sleep(1)
continue;
return {"payload": payload,"init_image_str":init_img_str}
@router.get('/config')
async def sdapi(request: Request, response: Response):
try:
resp = requests.get(url=f'{sd_url}/config', params=request.query_params)
response.status_code = resp.status_code
response.body = resp.content
except:
print(f'exception: fail to send request to {sd_url}/config')
print(f'{request}')
return response
@router.get('/sdapi/v1/{path:path}')
async def sdapi(path: str, request: Request, response: Response):
try:
resp = requests.get(url=f'{sd_url}/sdapi/v1/{path}', params=request.query_params)
response.status_code = resp.status_code
response.body = resp.content
except:
print(f'exception: fail to send request to {sd_url}/sdapi/v1/{path}')
print(f'{request}')
return response
@router.post('/sdapi/v1/{path:path}')
async def sdapi(path: str, request: Request, response: Response):
try:
json = await request.json()
except:
json = {}
try:
# if(path =="interrupt"):
# resp = requests.post(url=f'{sd_url}/sdapi/v1/{path}', params=request.query_params)
# else:
# resp = requests.post(url=f'{sd_url}/sdapi/v1/{path}', params=request.query_params, json=await request.json())
resp = requests.post(url=f'{sd_url}/sdapi/v1/{path}', params=request.query_params, json=json)
response.status_code = resp.status_code
response.body = resp.content
except:
print(f'exception: fail to send request to {sd_url}/sdapi/v1/{path}')
print(f'{request}')
return response
# async def base64ToPng(base64_image,image_path):
# base64_img_bytes = base64_image.encode('utf-8')
# with open(image_path, 'wb') as file_to_save:
# decoded_image_data = base64.decodebytes(base64_img_bytes)
# file_to_save.write(decoded_image_data)
@router.post('/save/png/')
async def savePng(request:Request):
print("savePng()")
try:
json = await request.json()
except:
json = {}
print("json:",json)
try:
folder = './init_images'
image_path = f"{folder}/{json['image_name']}"
await img2imgapi.base64ToPng(json['base64'],image_path)
return {"status":f"{json['image_name']} has been saved"}
except:
print(f'{request}')
return {"error": "error message: could not save the image file"}
@router.post('/search/image/')
async def searchImage(request:Request):
try:
json = await request.json()
except:
json = {}
try:
keywords = json.get('keywords','cute dogs')
images = await search.imageSearch(keywords)
print(images)
return {"images":images}
except:
print("keywords",keywords)
# print(f'{request}')
return {"error": "error message: can't preform an image search"}
@router.post('/mask/expansion/')
async def maskExpansionHandler(request:Request):
try:
json = await request.json()
except:
json = {}
try:
# keywords = json.get('keywords','cute dogs')
base64_mask_image = json['mask']
mask_expansion = json['mask_expansion']
#convert base64 to img
await img2imgapi.base64ToPng(base64_mask_image,"original_mask.png")#save a copy of the mask for debugging
mask_image = img2imgapi.b64_2_img(base64_mask_image)
expanded_mask_img = img2imgapi.maskExpansion(mask_image,mask_expansion)
base64_expanded_mask_image = img2imgapi.img_2_b64(expanded_mask_img)
await img2imgapi.base64ToPng(base64_expanded_mask_image,"expanded_mask.png")#save a copy of the mask of the expanded_mask for debugging
print("successful mask expansion operation")
return {"mask":base64_expanded_mask_image}
except:
print("request",request)
raise Exception(f"couldn't preform mask expansion")
# return response
return {"error": "error message: can't preform an mask expansion"}
@router.post('/history/load')
async def loadHistory(request: Request):
# {'image_paths','metadata_setting'}
history = {}
try:
json = await request.json()
except:
json = {}
try:
uniqueDocumentId = json['uniqueDocumentId']
import glob
image_paths = glob.glob(f'./output/{uniqueDocumentId}/*.png')
settings_paths = glob.glob(f'./output/{uniqueDocumentId}/*.json')#note: why is we are not using settings_paths?
print("loadHistory: image_paths:", image_paths)
history['image_paths'] = image_paths
history['metadata_jsons'] = []
history['base64_images'] = []
for image_path in image_paths:
print("image_path: ", image_path)
metadata_dict = metadata_to_json.createMetadataJsonFileIfNotExist(image_path)
history['metadata_jsons'].routerend(metadata_dict)
img = Image.open(image_path)
base64_image = img_2_b64(img)
history['base64_images'].routerend(base64_image)
except:
print(f'{request}')
#reverse the order so that newer generated images path will be shown first
history['image_paths'].reverse()
history['metadata_jsons'].reverse()
history['base64_images'].reverse()
return {"image_paths":history['image_paths'], "metadata_jsons":history['metadata_jsons'],"base64_images": history['base64_images']}
@router.post('/prompt_shortcut/load')
async def loadPromptShortcut(request: Request):
prompt_shortcut_json = {}
try:
json = await request.json()
except:
json = {}
try:
prompt_shortcut_json = prompt_shortcut.load()
# response.body = {"prompt_shortcut":prompt_shortcut}
# response.status_code = 200
except:
# print(f'exception: fail to send request to {sd_url}/sdapi/v1/{path}')
print(f'{request}')
# return response
return {"prompt_shortcut":prompt_shortcut_json}
@router.post('/prompt_shortcut/save')
async def loadPromptShortcut(request: Request):
prompt_shortcut_json = {}
try:
json = await request.json()
except:
json = {}
try:
print("json: ",json)
print("json['prompt_shortcut']: ",json['prompt_shortcut'])
# save the prompt shortcut to the prompt_shortcut.json
prompt_shortcut_json = json['prompt_shortcut']
# response.body = {"prompt_shortcut":prompt_shortcut}
# response.body = {"prompt_shortcut":prompt_shortcut}
prompt_shortcut.writeToJson("prompt_shortcut.json",prompt_shortcut_json)
except:
# print(f'exception: fail to send request to {sd_url}/sdapi/v1/{path}')
print(f'error occurred durning reading the request {request}')
# return response
return {"prompt_shortcut":prompt_shortcut_json}
@router.post("/swapModel")
async def swapModel(request:Request):
print("swapModel: \n")
payload = await request.json()
print("payload:",payload)
model_title = payload.title
option_payload = {
# "sd_model_checkpoint": "Anything-V3.0-pruned.ckpt [2700c435]"
"sd_model_checkpoint": model_title
}
response = requests.post(url=f'{sd_url}/sdapi/v1/options', json=option_payload)
import webbrowser
@router.post("/open/url/")
async def openUrl(request:Request):
try:
json = await request.json()
except:
json = {}
url = ""
print("json: ",json)
try:
url = json['url']
webbrowser.open(url) # Go to example.com
except:
# print(f'exception: fail to send request to {sd_url}/sdapi/v1/{path}')
print(f'an error has occurred durning processing the request {request}')
# return response
return {"url":url}
@router.get('/lora/list')
async def list_available_loras():
lora_dict = {}
try:
from modules import shared
import glob
os.makedirs(shared.cmd_opts.lora_dir, exist_ok=True)
candidates = \
glob.glob(os.path.join(shared.cmd_opts.lora_dir, '**/*.pt'), recursive=True) + \
glob.glob(os.path.join(shared.cmd_opts.lora_dir, '**/*.safetensors'), recursive=True) + \
glob.glob(os.path.join(shared.cmd_opts.lora_dir, '**/*.ckpt'), recursive=True)
for filename in sorted(candidates, key=str.lower):
if os.path.isdir(filename):
continue
name = os.path.splitext(os.path.basename(filename))[0]
print("lora name: ",name)
# available_loras[name] = LoraOnDisk(name, filename)
lora_dict[name] = name
except Exception as e:
print("list_available_loras() error ",repr(e),e)
return lora_dict
app = FastAPI()
app.include_router(router)