Spaces:
Runtime error
Runtime error
import concurrent.futures | |
import io | |
import os | |
import numpy as np | |
import oss2 | |
import requests | |
from PIL import Image, ImageDraw, ImageFont | |
from .log import logger | |
# oss | |
access_key_id = os.getenv("ACCESS_KEY_ID") | |
access_key_secret = os.getenv("ACCESS_KEY_SECRET") | |
bucket_name = os.getenv("BUCKET_NAME") | |
endpoint = os.getenv("ENDPOINT") | |
bucket = oss2.Bucket(oss2.Auth(access_key_id, access_key_secret), endpoint, bucket_name) | |
oss_path = "nightfury.abc/ImageSynthesizerHF" | |
oss_path_img_gallery = "nightfury.abc/ImageSynthesizerHF_img_gallery" | |
def download_img_pil(index, img_url): | |
# print(img_url) | |
r = requests.get(img_url, stream=True) | |
if r.status_code == 200: | |
img = Image.open(io.BytesIO(r.content)) | |
return (index, img) | |
else: | |
logger.error(f"Fail to download: {img_url}") | |
def download_images(img_urls, batch_size): | |
imgs_pil = [None] * batch_size | |
# worker_results = [] | |
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: | |
to_do = [] | |
for i, url in enumerate(img_urls): | |
future = executor.submit(download_img_pil, i, url) | |
to_do.append(future) | |
for future in concurrent.futures.as_completed(to_do): | |
ret = future.result() | |
# worker_results.append(ret) | |
index, img_pil = ret | |
imgs_pil[index] = img_pil # Arrange the URLs in order, which will be used to download the associated images or svg later. | |
return imgs_pil | |
def upload_np_2_oss(input_image, name="cache.png", gallery=False): | |
assert name.lower().endswith((".png", ".jpg")), name | |
imgByteArr = io.BytesIO() | |
if name.lower().endswith(".png"): | |
Image.fromarray(input_image).save(imgByteArr, format="PNG") | |
else: | |
Image.fromarray(input_image).save(imgByteArr, format="JPEG", quality=95) | |
imgByteArr = imgByteArr.getvalue() | |
if gallery: | |
path = oss_path_img_gallery | |
else: | |
path = oss_path | |
bucket.put_object(path + "/" + name, imgByteArr) # data is data, which can be a picture | |
ret = bucket.sign_url('GET', path + "/" + name, 60 * 60 * 24) # The return value is the link, and the parameters are, in order, method/file path on oss/expiration time (s) | |
del imgByteArr | |
return ret | |
def upload_json_string_2_oss(jsonStr, name="cache.txt", gallery=False): | |
if gallery: | |
path = oss_path_img_gallery | |
else: | |
path = oss_path | |
bucket.put_object(path + "/" + name, bytes(jsonStr, "utf-8")) # data is data | |
ret = bucket.sign_url('GET', path + "/" + name, 60 * 60 * 24) # The return value is the link, and the parameters are, in order, method/file path on oss/expiration time (s) | |
return ret | |
def upload_preprocess(pil_base_image_rgba, pil_layout_image_dict, pil_style_image_dict, pil_color_image_dict, | |
pil_fg_mask): | |
np_out_base_image = np_out_layout_image = np_out_style_image = np_out_color_image = None | |
if pil_base_image_rgba is not None: | |
np_fg_image = np.array(pil_base_image_rgba)[..., :3] | |
np_fg_mask = np.expand_dims(np.array(pil_fg_mask).astype(float), axis=-1) / 255. | |
np_fg_mask = np_fg_mask * 0.5 + 0.5 | |
np_out_base_image = (np_fg_image * np_fg_mask + (1 - np_fg_mask) * np.array([0, 0, 255])).round().clip(0, | |
255).astype( | |
np.uint8) | |
if pil_layout_image_dict is not None: | |
np_layout_image = np.array(pil_layout_image_dict["image"].convert("RGBA")) | |
np_layout_image, np_layout_alpha = np_layout_image[..., :3], np_layout_image[..., 3] | |
np_layout_mask = np.array(pil_layout_image_dict["mask"].convert("L")) | |
np_layout_mask = ((np_layout_alpha > 127) * (np_layout_mask < 127)).astype(float)[..., None] | |
np_layout_mask = np_layout_mask * 0.5 + 0.5 | |
np_out_layout_image = ( | |
np_layout_image * np_layout_mask + (1 - np_layout_mask) * np.array([0, 0, 255])).round().clip(0, | |
255).astype( | |
np.uint8) | |
if pil_style_image_dict is not None: | |
np_style_image = np.array(pil_style_image_dict["image"].convert("RGBA")) | |
np_style_image, np_style_alpha = np_style_image[..., :3], np_style_image[..., 3] | |
np_style_mask = np.array(pil_style_image_dict["mask"].convert("L")) | |
np_style_mask = ((np_style_alpha > 127) * (np_style_mask < 127)).astype(float)[..., None] | |
np_style_mask = np_style_mask * 0.5 + 0.5 | |
np_out_style_image = ( | |
np_style_image * np_style_mask + (1 - np_style_mask) * np.array([0, 0, 255])).round().clip(0, | |
255).astype( | |
np.uint8) | |
if pil_color_image_dict is not None: | |
np_color_image = np.array(pil_color_image_dict["image"].convert("RGBA")) | |
np_color_image, np_color_alpha = np_color_image[..., :3], np_color_image[..., 3] | |
np_color_mask = np.array(pil_color_image_dict["mask"].convert("L")) | |
np_color_mask = ((np_color_alpha > 127) * (np_color_mask < 127)).astype(float)[..., None] | |
np_color_mask = np_color_mask * 0.5 + 0.5 | |
np_out_color_image = ( | |
np_color_image * np_color_mask + (1 - np_color_mask) * np.array([0, 0, 255])).round().clip(0, | |
255).astype( | |
np.uint8) | |
return np_out_base_image, np_out_layout_image, np_out_style_image, np_out_color_image | |
def pad_image(image, target_size): | |
iw, ih = image.size # Original image size | |
w, h = target_size # The size of the target image | |
scale = min(w / iw, h / ih) # minimum ratio to convert | |
# Guaranteed length or width, at least one size that fits the target image 0.5 guarantees rounding | |
nw = int(iw * scale + 0.5) | |
nh = int(ih * scale + 0.5) | |
image = image.resize((nw, nh), Image.BICUBIC) # Changing the image size, bi-legislative interpolation works well | |
new_image = Image.new('RGB', target_size, (255, 255, 255)) # generate white image | |
new_image.paste(image, ((w - nw) // 2, (h - nh) // 2)) # Style that fills the image with the middle image and black on both sides | |
return new_image | |
def add_text(image, text): | |
w, h = image.size | |
text_image = image.copy() | |
text_image_draw = ImageDraw.Draw(text_image) | |
ttf = ImageFont.truetype("assets/ttf/FreeMonoBold.ttf", int(h / 10)) | |
left, top, right, bottom = ttf.getbbox(text) | |
text_image_draw.rectangle((0, 0, right + left, bottom + top), fill=(255, 255, 255)) | |
image = Image.blend(image, text_image, 0.5) | |
image_draw = ImageDraw.Draw(image) | |
fillColor = (0, 0, 0, 255) # Text color: black | |
pos = (0, 0) # The position of the upper left corner of the text (distance from the left border, distance from the top border) | |
image_draw.text(pos, text, font=ttf, fill=fillColor) | |
return image.convert("RGB") | |
def compose_image(image_list, text_list, pil_size, nrow, ncol): | |
w, h = pil_size # Each small picture size | |
if len(image_list) > nrow * ncol: | |
raise ValueError("The parameters of the composite image and the required quantity do not match!") | |
assert len(image_list) == len(text_list) | |
new_image_list = [] | |
new_text_list = [] | |
for image, text in zip(image_list, text_list): | |
if image is not None: | |
new_image_list.append(image) | |
new_text_list.append(text) | |
if len(new_image_list) == 1: | |
ncol = nrow = 1 | |
to_image = Image.new('RGB', (ncol * w, nrow * h), (255, 255, 255)) # Create a new diagram | |
for y in range(1, nrow + 1): | |
for x in range(1, ncol + 1): | |
if ncol * (y - 1) + x - 1 < len(new_image_list): | |
from_image = new_image_list[ncol * (y - 1) + x - 1].resize((w, h), Image.BICUBIC) | |
from_text = new_text_list[ncol * (y - 1) + x - 1] | |
if from_text is not None: | |
from_image = add_text(from_image, from_text) | |
to_image.paste(from_image, ((x - 1) * w, (y - 1) * h)) | |
return to_image | |
def split_text_lines(text, max_w, ttf): | |
text_split_lines = [] | |
text_h = 0 | |
if text != "": | |
line_start = 0 | |
while line_start < len(text): | |
line_count = 0 | |
_, _, right, bottom = ttf.getbbox(text[line_start: line_start + line_count + 1]) | |
while right < max_w and line_count < len(text): | |
line_count += 1 | |
_, _, right, bottom = ttf.getbbox(text[line_start: line_start + line_count + 1]) | |
text_split_lines.append(text[line_start:line_start + line_count]) | |
text_h += bottom | |
line_start += line_count | |
return text_split_lines, text_h | |
def add_prompt(image, prompt, negative_prompt): | |
if prompt == "" and negative_prompt == "": | |
return image | |
if prompt != "": | |
prompt = "Prompt: " + prompt | |
if negative_prompt != "": | |
negative_prompt = "Negative prompt: " + negative_prompt | |
w, h = image.size | |
ttf = ImageFont.truetype("assets/ttf/AlibabaPuHuiTi-2-55-Regular.ttf", int(h / 20)) | |
prompt_split_lines, prompt_h = split_text_lines(prompt, w, ttf) | |
negative_prompt_split_lines, negative_prompt_h = split_text_lines(negative_prompt, w, ttf) | |
text_h = prompt_h + negative_prompt_h | |
text = "\n".join(prompt_split_lines + negative_prompt_split_lines) | |
text_image = Image.new(image.mode, (w, text_h), color=(255, 255, 255)) | |
text_image_draw = ImageDraw.Draw(text_image) | |
text_image_draw.text((0, 0), text, font=ttf, fill=(0, 0, 0)) | |
out_image = Image.new(image.mode, (w, h + text_h), color=(255, 255, 255)) | |
out_image.paste(image, (0, 0)) | |
out_image.paste(text_image, (0, h)) | |
return out_image | |
def merge_images(np_fg_image, np_layout_image, np_style_image, np_color_image, np_res_image, prompt, negative_prompt): | |
pil_res_image = Image.fromarray(np_res_image) | |
w, h = pil_res_image.size | |
pil_fg_image = None if np_fg_image is None else pad_image(Image.fromarray(np_fg_image), (w, h)) | |
pil_layout_image = None if np_layout_image is None else pad_image(Image.fromarray(np_layout_image), (w, h)) | |
pil_style_image = None if np_style_image is None else pad_image(Image.fromarray(np_style_image), (w, h)) | |
pil_color_image = None if np_color_image is None else pad_image(Image.fromarray(np_color_image), (w, h)) | |
input_images = [pil_layout_image, pil_style_image, pil_color_image, pil_fg_image] | |
input_texts = ['Layout', 'Style', 'Color', 'Subject'] | |
input_compose_image = compose_image(input_images, input_texts, (w, h), nrow=2, ncol=2) | |
input_compose_image = input_compose_image.resize((w, h), Image.BICUBIC) | |
output_compose_image = compose_image([input_compose_image, pil_res_image], [None, None], (w, h), nrow=1, | |
ncol=2) | |
output_compose_image = add_prompt(output_compose_image, prompt, negative_prompt) | |
output_compose_image = np.array(output_compose_image) | |
return output_compose_image | |