CM2000112 / inference.py
jayparmr's picture
Upload 118 files
19b3da3
raw
history blame
9.77 kB
from typing import List, Optional
import torch
from internals.data.dataAccessor import update_db
from internals.data.task import Task, TaskType
from internals.pipelines.commons import Img2Img, Text2Img
from internals.pipelines.controlnets import ControlNet
from internals.pipelines.img_classifier import ImageClassifier
from internals.pipelines.img_to_text import Image2Text
from internals.pipelines.prompt_modifier import PromptModifier
from internals.pipelines.safety_checker import SafetyChecker
from internals.util.args import apply_style_args
from internals.util.avatar import Avatar
from internals.util.cache import auto_clear_cuda_and_gc
from internals.util.commons import pickPoses, upload_image, upload_images
from internals.util.config import set_configs_from_task, set_root_dir
from internals.util.failure_hander import FailureHandler
from internals.util.lora_style import LoraStyle
from internals.util.slack import Slack
torch.backends.cudnn.benchmark = True
torch.backends.cuda.matmul.allow_tf32 = True
num_return_sequences = 4 # the number of results to generate
auto_mode = False
prompt_modifier = PromptModifier(num_of_sequences=num_return_sequences)
img2text = Image2Text()
img_classifier = ImageClassifier()
controlnet = ControlNet()
lora_style = LoraStyle()
text2img_pipe = Text2Img()
img2img_pipe = Img2Img()
safety_checker = SafetyChecker()
slack = Slack()
avatar = Avatar()
def get_patched_prompt(task: Task):
def add_style_and_character(prompt: List[str], additional: Optional[str] = None):
for i in range(len(prompt)):
prompt[i] = avatar.add_code_names(prompt[i])
prompt[i] = lora_style.prepend_style_to_prompt(prompt[i], task.get_style())
if additional:
prompt[i] = additional + " " + prompt[i]
prompt = task.get_prompt()
if task.is_prompt_engineering():
prompt = prompt_modifier.modify(prompt)
else:
prompt = [prompt] * num_return_sequences
ori_prompt = [task.get_prompt()] * num_return_sequences
class_name = None
# if task.get_imageUrl():
# class_name = img_classifier.classify(
# task.get_imageUrl(), task.get_width(), task.get_height()
# )
add_style_and_character(ori_prompt, class_name)
add_style_and_character(prompt, class_name)
print({"prompts": prompt})
return (prompt, ori_prompt)
def get_patched_prompt_tile_upscale(task: Task):
if task.get_prompt():
prompt = task.get_prompt()
else:
prompt = img2text.process(task.get_imageUrl())
prompt = avatar.add_code_names(prompt)
prompt = lora_style.prepend_style_to_prompt(prompt, task.get_style())
class_name = img_classifier.classify(
task.get_imageUrl(), task.get_width(), task.get_height()
)
prompt = class_name + " " + prompt
print({"prompt": prompt})
return prompt
@update_db
@auto_clear_cuda_and_gc(controlnet)
@slack.auto_send_alert
def canny(task: Task):
prompt, _ = get_patched_prompt(task)
controlnet.load_canny()
# pipe2 is used for canny and pose
lora_patcher = lora_style.get_patcher(controlnet.pipe2, task.get_style())
lora_patcher.patch()
images, has_nsfw = controlnet.process_canny(
prompt=prompt,
imageUrl=task.get_imageUrl(),
seed=task.get_seed(),
steps=task.get_steps(),
width=task.get_width(),
height=task.get_height(),
guidance_scale=task.get_cy_guidance_scale(),
negative_prompt=[
f"monochrome, neon, x-ray, negative image, oversaturated, {task.get_negative_prompt()}"
]
* num_return_sequences,
**lora_patcher.kwargs(),
)
generated_image_urls = upload_images(images, "_canny", task.get_taskId())
lora_patcher.cleanup()
controlnet.cleanup()
return {
"modified_prompts": prompt,
"generated_image_urls": generated_image_urls,
"has_nsfw": has_nsfw,
}
@update_db
@auto_clear_cuda_and_gc(controlnet)
@slack.auto_send_alert
def tile_upscale(task: Task):
output_key = "crecoAI/{}_tile_upscaler.png".format(task.get_taskId())
prompt = get_patched_prompt_tile_upscale(task)
controlnet.load_tile_upscaler()
lora_patcher = lora_style.get_patcher(controlnet.pipe, task.get_style())
lora_patcher.patch()
images, has_nsfw = controlnet.process_tile_upscaler(
imageUrl=task.get_imageUrl(),
seed=task.get_seed(),
steps=task.get_steps(),
width=task.get_width(),
height=task.get_height(),
prompt=prompt,
resize_dimension=task.get_resize_dimension(),
negative_prompt=task.get_negative_prompt(),
guidance_scale=task.get_ti_guidance_scale(),
)
generated_image_url = upload_image(images[0], output_key)
lora_patcher.cleanup()
controlnet.cleanup()
return {
"modified_prompts": prompt,
"generated_image_url": generated_image_url,
"has_nsfw": has_nsfw,
}
@update_db
@auto_clear_cuda_and_gc(controlnet)
@slack.auto_send_alert
def pose(task: Task, s3_outkey: str = "_pose", poses: Optional[list] = None):
prompt, _ = get_patched_prompt(task)
controlnet.load_pose()
# pipe2 is used for canny and pose
lora_patcher = lora_style.get_patcher(controlnet.pipe2, task.get_style())
lora_patcher.patch()
if poses is None:
poses = [controlnet.detect_pose(task.get_imageUrl())] * num_return_sequences
images, has_nsfw = controlnet.process_pose(
prompt=prompt,
image=poses,
seed=task.get_seed(),
steps=task.get_steps(),
negative_prompt=[task.get_negative_prompt()] * num_return_sequences,
width=task.get_width(),
height=task.get_height(),
guidance_scale=task.get_po_guidance_scale(),
**lora_patcher.kwargs(),
)
generated_image_urls = upload_images(images, s3_outkey, task.get_taskId())
lora_patcher.cleanup()
controlnet.cleanup()
return {
"modified_prompts": prompt,
"generated_image_urls": generated_image_urls,
"has_nsfw": has_nsfw,
}
@update_db
@auto_clear_cuda_and_gc(controlnet)
@slack.auto_send_alert
def text2img(task: Task):
prompt, ori_prompt = get_patched_prompt(task)
lora_patcher = lora_style.get_patcher(text2img_pipe.pipe, task.get_style())
lora_patcher.patch()
torch.manual_seed(task.get_seed())
images, has_nsfw = text2img_pipe.process(
prompt=ori_prompt,
modified_prompts=prompt,
num_inference_steps=task.get_steps(),
guidance_scale=7.5,
height=task.get_height(),
width=task.get_width(),
negative_prompt=[task.get_negative_prompt()] * num_return_sequences,
iteration=task.get_iteration(),
**lora_patcher.kwargs(),
)
generated_image_urls = upload_images(images, "", task.get_taskId())
lora_patcher.cleanup()
return {
"modified_prompts": prompt,
"generated_image_urls": generated_image_urls,
"has_nsfw": has_nsfw,
}
@update_db
@auto_clear_cuda_and_gc(controlnet)
@slack.auto_send_alert
def img2img(task: Task):
prompt, _ = get_patched_prompt(task)
lora_patcher = lora_style.get_patcher(img2img_pipe.pipe, task.get_style())
lora_patcher.patch()
torch.manual_seed(task.get_seed())
images, has_nsfw = img2img_pipe.process(
prompt=prompt,
imageUrl=task.get_imageUrl(),
negative_prompt=[task.get_negative_prompt()] * num_return_sequences,
steps=task.get_steps(),
width=task.get_width(),
height=task.get_height(),
strength=task.get_i2i_strength(),
guidance_scale=task.get_i2i_guidance_scale(),
**lora_patcher.kwargs(),
)
generated_image_urls = upload_images(images, "_imgtoimg", task.get_taskId())
lora_patcher.cleanup()
return {
"modified_prompts": prompt,
"generated_image_urls": generated_image_urls,
"has_nsfw": has_nsfw,
}
def model_fn(model_dir):
print("Logs: model loaded .... starts")
set_root_dir(__file__)
FailureHandler.register()
avatar.load_local()
prompt_modifier.load()
img2text.load()
img_classifier.load()
lora_style.load(model_dir)
safety_checker.load()
controlnet.load(model_dir)
text2img_pipe.load(model_dir)
img2img_pipe.create(text2img_pipe)
safety_checker.apply(text2img_pipe)
safety_checker.apply(img2img_pipe)
safety_checker.apply(controlnet)
print("Logs: model loaded ....")
return
@FailureHandler.clear
def predict_fn(data, pipe):
task = Task(data)
print("task is ", data)
FailureHandler.handle(task)
try:
# Set set_environment
set_configs_from_task(task)
# Apply arguments
apply_style_args(data)
# Re-fetch styles
lora_style.fetch_styles()
# Fetch avatars
avatar.fetch_from_network(task.get_model_id())
task_type = task.get_type()
if task_type == TaskType.TEXT_TO_IMAGE:
# character sheet
if "character sheet" in task.get_prompt().lower():
return pose(task, s3_outkey="", poses=pickPoses())
else:
return text2img(task)
elif task_type == TaskType.IMAGE_TO_IMAGE:
return img2img(task)
elif task_type == TaskType.CANNY:
return canny(task)
elif task_type == TaskType.POSE:
return pose(task)
elif task_type == TaskType.TILE_UPSCALE:
return tile_upscale(task)
else:
raise Exception("Invalid task type")
except Exception as e:
print(f"Error: {e}")
slack.error_alert(task, e)
return None