|
|
|
import json |
|
import requests |
|
import io |
|
import base64 |
|
from PIL import Image, PngImagePlugin |
|
import asyncio |
|
import httpx |
|
|
|
|
|
import os |
|
import time |
|
import serverHelper |
|
import prompt_shortcut |
|
import metadata_to_json |
|
import search |
|
sd_url = os.environ.get('SD_URL', 'http://127.0.0.1:7860') |
|
|
|
|
|
|
|
|
|
async def txt2ImgRequest(payload): |
|
|
|
|
|
|
|
|
|
print("payload: ",payload) |
|
|
|
if(payload['use_prompt_shortcut']): |
|
|
|
prompt_shortcut_dict = prompt_shortcut.load() |
|
prompt_shortcut_dict.update(payload["prompt_shortcut_ui_dict"]) |
|
payload['prompt'] = prompt_shortcut.replaceShortcut(payload['prompt'],prompt_shortcut_dict) |
|
|
|
payload['negative_prompt'] = prompt_shortcut.replaceShortcut(payload['negative_prompt'],prompt_shortcut_dict) |
|
|
|
|
|
|
|
request_path = "/sdapi/v1/txt2img" |
|
|
|
async with httpx.AsyncClient() as client: |
|
response = await client.post(url=f'{sd_url}/sdapi/v1/txt2img', json=payload, timeout=None) |
|
r = response.json() |
|
|
|
|
|
|
|
|
|
uniqueDocumentId = payload['uniqueDocumentId'] |
|
dir_fullpath,dirName = serverHelper.getUniqueDocumentDirPathName(uniqueDocumentId) |
|
serverHelper.createFolder(dir_fullpath) |
|
image_paths = [] |
|
|
|
metadata = [] |
|
images_info = [] |
|
for i in r['images']: |
|
image = Image.open(io.BytesIO(base64.b64decode(i.split(",",1)[0]))) |
|
|
|
png_payload = { |
|
"image": "data:image/png;base64," + i |
|
} |
|
response2 = await client.post(url=f'{sd_url}/sdapi/v1/png-info', json=png_payload) |
|
pnginfo = PngImagePlugin.PngInfo() |
|
pnginfo.add_text("parameters", response2.json().get("info")) |
|
image_name = f'output- {time.time()}.png' |
|
|
|
image_path = f'output/{dirName}/{image_name}' |
|
image_paths.append(image_path) |
|
image.save(f'./{image_path}', pnginfo=pnginfo) |
|
|
|
metadata_info = response2.json().get("info") |
|
metadata_json = metadata_to_json.convertMetadataToJson(metadata_info) |
|
metadata.append(metadata_json) |
|
images_info.append({"base64":i,"path":image_path}) |
|
print("metadata_json: ", metadata_json) |
|
|
|
|
|
return dirName,images_info,metadata |
|
|
|
import base64 |
|
from io import BytesIO |
|
|
|
|
|
def img_2_b64(image): |
|
buff = BytesIO() |
|
image.save(buff, format="PNG") |
|
img_byte = base64.b64encode(buff.getvalue()) |
|
img_str = img_byte.decode("utf-8") |
|
return img_str |
|
|
|
|
|
from typing import Union |
|
|
|
from fastapi import FastAPI |
|
from fastapi import APIRouter, Request |
|
|
|
|
|
|
|
router = APIRouter() |
|
|
|
|
|
@router.get("/") |
|
def read_root(): |
|
return {"Hello": "World"} |
|
|
|
|
|
@router.get("/version") |
|
def getVersion(): |
|
manifest_dir = "..\..\manifest.json" |
|
|
|
manifest = {'version': '0.0.0'} |
|
version = "0.0.0" |
|
try: |
|
|
|
with open(manifest_dir, 'r') as f: |
|
manifest = json.load(f) |
|
version = manifest['version'] |
|
except: |
|
print("couldn't read the manifest.json") |
|
return {"version": f"v{version}"} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
from fastapi import Request, Response |
|
import img2imgapi |
|
|
|
|
|
|
|
|
|
|
|
|
|
@router.post("/sd_url/") |
|
async def changeSdUrl(request:Request): |
|
global sd_url |
|
try: |
|
|
|
payload = await request.json() |
|
print("changeSdUrl: payload:",payload) |
|
print(f"change sd url from {sd_url} to {payload['sd_url']} \n") |
|
sd_url = payload['sd_url'] |
|
except: |
|
print("error occurred in changeSdUrl()") |
|
|
|
|
|
return {"sd_url":sd_url} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@router.post("/txt2img/") |
|
async def txt2ImgHandle(request:Request): |
|
print("txt2ImgHandle: \n") |
|
payload = await request.json() |
|
dir_name,images_info,metadata, = await txt2ImgRequest(payload) |
|
|
|
return {"payload": payload,"dir_name": dir_name,"images_info":images_info,"metadata":metadata} |
|
|
|
@router.post("/img2img/") |
|
async def img2ImgHandle(request:Request): |
|
print("img2ImgHandle: \n") |
|
payload = await request.json() |
|
dir_name,images_info,metadata = await img2imgapi.img2ImgRequest(sd_url,payload) |
|
|
|
return {"payload": payload,"dir_name": dir_name,"images_info":images_info,"metadata":metadata} |
|
|
|
|
|
|
|
|
|
|
|
|
|
@router.post("/getInitImage/") |
|
async def getInitImageHandle(request:Request): |
|
print("getInitImageHandle: \n") |
|
payload = await request.json() |
|
print("payload:",payload) |
|
init_img_dir = "./init_images" |
|
init_img_name = payload["init_image_name"] |
|
|
|
numOfAttempts = 3 |
|
init_img_str = "" |
|
for i in range(numOfAttempts): |
|
try: |
|
image_path = f"{init_img_dir}/{init_img_name}" |
|
init_img = Image.open(image_path) |
|
init_img_str = img_2_b64(init_img) |
|
|
|
|
|
|
|
|
|
|
|
except: |
|
print(f"exception:fail to read an image file {image_path}, will try again {i} of {numOfAttempts}") |
|
|
|
time.sleep(1) |
|
continue; |
|
|
|
|
|
|
|
return {"payload": payload,"init_image_str":init_img_str} |
|
|
|
@router.get('/config') |
|
async def sdapi(request: Request, response: Response): |
|
try: |
|
|
|
resp = requests.get(url=f'{sd_url}/config', params=request.query_params) |
|
response.status_code = resp.status_code |
|
response.body = resp.content |
|
except: |
|
print(f'exception: fail to send request to {sd_url}/config') |
|
print(f'{request}') |
|
return response |
|
|
|
@router.get('/sdapi/v1/{path:path}') |
|
async def sdapi(path: str, request: Request, response: Response): |
|
try: |
|
|
|
resp = requests.get(url=f'{sd_url}/sdapi/v1/{path}', params=request.query_params) |
|
response.status_code = resp.status_code |
|
response.body = resp.content |
|
except: |
|
print(f'exception: fail to send request to {sd_url}/sdapi/v1/{path}') |
|
print(f'{request}') |
|
return response |
|
|
|
@router.post('/sdapi/v1/{path:path}') |
|
async def sdapi(path: str, request: Request, response: Response): |
|
try: |
|
json = await request.json() |
|
except: |
|
json = {} |
|
|
|
try: |
|
|
|
|
|
|
|
|
|
|
|
resp = requests.post(url=f'{sd_url}/sdapi/v1/{path}', params=request.query_params, json=json) |
|
|
|
response.status_code = resp.status_code |
|
response.body = resp.content |
|
except: |
|
print(f'exception: fail to send request to {sd_url}/sdapi/v1/{path}') |
|
print(f'{request}') |
|
return response |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@router.post('/save/png/') |
|
async def savePng(request:Request): |
|
print("savePng()") |
|
try: |
|
json = await request.json() |
|
|
|
except: |
|
json = {} |
|
|
|
print("json:",json) |
|
try: |
|
folder = './init_images' |
|
image_path = f"{folder}/{json['image_name']}" |
|
await img2imgapi.base64ToPng(json['base64'],image_path) |
|
|
|
|
|
|
|
|
|
return {"status":f"{json['image_name']} has been saved"} |
|
except: |
|
print(f'{request}') |
|
return {"error": "error message: could not save the image file"} |
|
|
|
|
|
@router.post('/search/image/') |
|
async def searchImage(request:Request): |
|
try: |
|
json = await request.json() |
|
except: |
|
json = {} |
|
|
|
|
|
try: |
|
keywords = json.get('keywords','cute dogs') |
|
images = await search.imageSearch(keywords) |
|
print(images) |
|
|
|
|
|
return {"images":images} |
|
except: |
|
print("keywords",keywords) |
|
|
|
return {"error": "error message: can't preform an image search"} |
|
|
|
@router.post('/mask/expansion/') |
|
async def maskExpansionHandler(request:Request): |
|
try: |
|
json = await request.json() |
|
except: |
|
json = {} |
|
|
|
|
|
try: |
|
|
|
base64_mask_image = json['mask'] |
|
mask_expansion = json['mask_expansion'] |
|
|
|
|
|
await img2imgapi.base64ToPng(base64_mask_image,"original_mask.png") |
|
|
|
mask_image = img2imgapi.b64_2_img(base64_mask_image) |
|
|
|
expanded_mask_img = img2imgapi.maskExpansion(mask_image,mask_expansion) |
|
base64_expanded_mask_image = img2imgapi.img_2_b64(expanded_mask_img) |
|
await img2imgapi.base64ToPng(base64_expanded_mask_image,"expanded_mask.png") |
|
|
|
print("successful mask expansion operation") |
|
return {"mask":base64_expanded_mask_image} |
|
|
|
except: |
|
print("request",request) |
|
raise Exception(f"couldn't preform mask expansion") |
|
|
|
return {"error": "error message: can't preform an mask expansion"} |
|
|
|
|
|
@router.post('/history/load') |
|
async def loadHistory(request: Request): |
|
|
|
history = {} |
|
try: |
|
json = await request.json() |
|
except: |
|
json = {} |
|
|
|
try: |
|
|
|
uniqueDocumentId = json['uniqueDocumentId'] |
|
|
|
import glob |
|
|
|
image_paths = glob.glob(f'./output/{uniqueDocumentId}/*.png') |
|
settings_paths = glob.glob(f'./output/{uniqueDocumentId}/*.json') |
|
print("loadHistory: image_paths:", image_paths) |
|
|
|
|
|
history['image_paths'] = image_paths |
|
history['metadata_jsons'] = [] |
|
history['base64_images'] = [] |
|
for image_path in image_paths: |
|
print("image_path: ", image_path) |
|
metadata_dict = metadata_to_json.createMetadataJsonFileIfNotExist(image_path) |
|
history['metadata_jsons'].routerend(metadata_dict) |
|
|
|
|
|
img = Image.open(image_path) |
|
base64_image = img_2_b64(img) |
|
history['base64_images'].routerend(base64_image) |
|
|
|
except: |
|
|
|
print(f'{request}') |
|
|
|
|
|
|
|
|
|
history['image_paths'].reverse() |
|
history['metadata_jsons'].reverse() |
|
history['base64_images'].reverse() |
|
return {"image_paths":history['image_paths'], "metadata_jsons":history['metadata_jsons'],"base64_images": history['base64_images']} |
|
|
|
|
|
@router.post('/prompt_shortcut/load') |
|
async def loadPromptShortcut(request: Request): |
|
prompt_shortcut_json = {} |
|
try: |
|
json = await request.json() |
|
except: |
|
json = {} |
|
|
|
try: |
|
|
|
prompt_shortcut_json = prompt_shortcut.load() |
|
|
|
|
|
except: |
|
|
|
print(f'{request}') |
|
|
|
|
|
return {"prompt_shortcut":prompt_shortcut_json} |
|
@router.post('/prompt_shortcut/save') |
|
async def loadPromptShortcut(request: Request): |
|
prompt_shortcut_json = {} |
|
try: |
|
json = await request.json() |
|
except: |
|
json = {} |
|
|
|
try: |
|
print("json: ",json) |
|
print("json['prompt_shortcut']: ",json['prompt_shortcut']) |
|
|
|
prompt_shortcut_json = json['prompt_shortcut'] |
|
|
|
|
|
prompt_shortcut.writeToJson("prompt_shortcut.json",prompt_shortcut_json) |
|
except: |
|
|
|
print(f'error occurred durning reading the request {request}') |
|
|
|
return {"prompt_shortcut":prompt_shortcut_json} |
|
|
|
@router.post("/swapModel") |
|
async def swapModel(request:Request): |
|
print("swapModel: \n") |
|
payload = await request.json() |
|
print("payload:",payload) |
|
model_title = payload.title |
|
option_payload = { |
|
|
|
"sd_model_checkpoint": model_title |
|
|
|
} |
|
response = requests.post(url=f'{sd_url}/sdapi/v1/options', json=option_payload) |
|
|
|
|
|
import webbrowser |
|
@router.post("/open/url/") |
|
async def openUrl(request:Request): |
|
try: |
|
json = await request.json() |
|
except: |
|
json = {} |
|
|
|
url = "" |
|
print("json: ",json) |
|
try: |
|
url = json['url'] |
|
webbrowser.open(url) |
|
except: |
|
|
|
print(f'an error has occurred durning processing the request {request}') |
|
|
|
return {"url":url} |
|
|
|
|
|
@router.get('/lora/list') |
|
async def list_available_loras(): |
|
lora_dict = {} |
|
try: |
|
from modules import shared |
|
import glob |
|
|
|
os.makedirs(shared.cmd_opts.lora_dir, exist_ok=True) |
|
|
|
candidates = \ |
|
glob.glob(os.path.join(shared.cmd_opts.lora_dir, '**/*.pt'), recursive=True) + \ |
|
glob.glob(os.path.join(shared.cmd_opts.lora_dir, '**/*.safetensors'), recursive=True) + \ |
|
glob.glob(os.path.join(shared.cmd_opts.lora_dir, '**/*.ckpt'), recursive=True) |
|
|
|
for filename in sorted(candidates, key=str.lower): |
|
if os.path.isdir(filename): |
|
continue |
|
|
|
name = os.path.splitext(os.path.basename(filename))[0] |
|
print("lora name: ",name) |
|
|
|
lora_dict[name] = name |
|
|
|
except Exception as e: |
|
print("list_available_loras() error ",repr(e),e) |
|
return lora_dict |
|
|
|
app = FastAPI() |
|
app.include_router(router) |