|
|
|
|
|
|
|
|
|
|
|
from logging import PlaceHolder
|
|
import math
|
|
import os
|
|
import sys
|
|
import traceback
|
|
import copy
|
|
import numpy as np
|
|
import modules.scripts as scripts
|
|
import gradio as gr
|
|
|
|
|
|
from modules import images,processing
|
|
from modules.processing import process_images, Processed
|
|
from modules.processing import Processed
|
|
from modules.shared import opts, cmd_opts, state
|
|
class Script(scripts.Script):
|
|
|
|
|
|
|
|
def run(self,p,cfg,eta,dns ,loops,nSingle):
|
|
return self.runAdvanced(p,cfg,eta,dns ,loops,nSingle)
|
|
|
|
def show(self, is_img2img):
|
|
self.isAdvanced=True
|
|
return True
|
|
def title(self):
|
|
return "CFG Scheduling" if (self.isAdvanced) else "CFG Auto"
|
|
|
|
def uiAdvanced(self, is_img2img):
|
|
|
|
placeholder="The steps on which to modify, in format step:value - example: 0:10 ; 10:15"
|
|
n0 = gr.Textbox(label="CFG",placeholder=placeholder)
|
|
placeholder="You can also use functions like: 0: math.fabs(-t) ; 1: (1-t/T) ; 2:=e ;3:t*d"
|
|
n1 = gr.Textbox(label="ETA",placeholder=placeholder)
|
|
|
|
|
|
n2 = gr.Slider(minimum=0, maximum=1, step=0.01, label='Target Denoising : Decay per Batch', value=0.5)
|
|
with gr.Row():
|
|
loops=gr.Number(value=1,precision=0,label="loops")
|
|
nSingle= gr.Checkbox(label="Loop returns one")
|
|
|
|
return [n0,n1,n2 ,loops,nSingle]
|
|
|
|
def uiAuto(self, is_img2img):
|
|
self.autoOptions={"b1":"Blur First V1","b2":"Blur Last","f1":"Force at Start V1","f2":"Force Allover"}
|
|
with gr.Row():
|
|
dns = gr.Slider(minimum=0, maximum=1, step=0.01, label='Target Denoising : Decay per Batch', value=0.25)
|
|
n0=gr.Dropdown(list(self.autoOptions.values()),value=self.autoOptions["b1"],label="Scheduler")
|
|
with gr.Row():
|
|
n1 = gr.Slider(minimum=0, maximum=100, step=1, label='Main Strength', value=10)
|
|
n2 = gr.Slider(minimum=0, maximum=100, step=1, label='Sub- Strength', value=10)
|
|
with gr.Row():
|
|
n3 = gr.Slider(minimum=0, maximum=100, step=1, label='Main Range', value=10)
|
|
n4 = gr.Slider(minimum=0, maximum=100, step=1, label='Sub- Range', value=10)
|
|
with gr.Row():
|
|
loops=gr.Number(value=1,precision=0,label="loops")
|
|
nSingle= gr.Checkbox(label="Loop returns one")
|
|
return [n0,dns, n1,n2,n3,n4 ,loops,nSingle]
|
|
|
|
def ui(self, is_img2img):
|
|
return self.uiAdvanced(is_img2img) if (self.isAdvanced) else self.uiAuto(is_img2img)
|
|
|
|
def prepare(self,p,cfg,eta):
|
|
sampler_name=p.sampler_name
|
|
if not sampler_name:
|
|
print("Warning: sampler not specified. Using Euler a")
|
|
sampler_name="Euler a"
|
|
|
|
if sampler_name in ('Euler a','Euler','LMS','DPM++ 2M','DPM fast','LMS Karras','DPM++ 2M Karras'):
|
|
max_mul_count = p.steps * p.batch_size
|
|
steps_per_mul = p.batch_size
|
|
|
|
elif sampler_name in ('Heun','DPM2','DPM2 a','DPM++ 2S a','DPM2 Karras','DPM2 a Karras','DPM++ 2S a Karras'):
|
|
max_mul_count = ((p.steps*2)-1) * p.batch_size
|
|
steps_per_mul = 2 * p.batch_size
|
|
|
|
elif sampler_name=='DDIM':
|
|
max_mul_count = fix_ddim_step_count(p.steps)
|
|
steps_per_mul = 1
|
|
|
|
elif sampler_name=='PLMS':
|
|
max_mul_count = fix_ddim_step_count(p.steps)+1
|
|
steps_per_mul = 1
|
|
else:
|
|
print("Not supported sampler", p.sampler_name, p.sampler_index)
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
self.p=p
|
|
cfg=cfg.strip()
|
|
eta=eta.strip()
|
|
if cfg:
|
|
p.cfg_scale=Fake_float(p.cfg_scale,self.split(cfg,str(p.cfg_scale)) , max_mul_count, steps_per_mul)
|
|
|
|
if eta:
|
|
if (eta.find("@")==-1):
|
|
p.s_churn=p.eta =Fake_float(p.eta or 1,self.split(eta,str(p.eta)) , max_mul_count, steps_per_mul)
|
|
|
|
|
|
|
|
else:
|
|
eta=eta.split("@")
|
|
if eta[0].strip()!="":
|
|
p.s_churn=Fake_float(p.s_churn or 1,self.split(eta[0],str(p.s_churn)), max_mul_count, steps_per_mul)
|
|
if len(eta)>1 and eta[1].strip()!="":
|
|
p.s_noise=Fake_float(p.s_noise or 1,self.split(eta[1],str(p.s_noise)), max_mul_count, steps_per_mul)
|
|
if len(eta)>2 and eta[2].strip()!="":
|
|
p.s_tmin=Fake_float(p.s_tmin or 1,self.split(eta[2],str(p.s_tmin)), max_mul_count, steps_per_mul)
|
|
if len(eta)>3 and eta[3].strip()!="":
|
|
p.s_tmax=Fake_float(p.s_tmax or 1,self.split(eta[2],str(p.s_tmax)), max_mul_count, steps_per_mul)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def runBasic(self,p,n0,dns,ns1,ns2,nr1,nr2 ,loops,nSingle):
|
|
if(n0==self.autoOptions["b1"]):
|
|
cfg=f"""0:{ns2}/2 if (t<T* (({nr1}/100)**2)) else cfg"""
|
|
eta=f"""0:{ns1}+1 if (t<T*(({nr1}/100)**2) ) else e*({nr2}/50)"""
|
|
elif(n0==self.autoOptions["f1"]):
|
|
cfg=f"""0:({ns1}*4)*((1-d**0.5)**1.5)/(t*(30-cfg)/30+1)/(l*2+1) if (t<T*{nr1}/100) else 0.1 if (t<T*({nr1}+{nr2}-{nr1}*{nr2})/100) else 7-d*7"""
|
|
eta=f"""0:0.8+{ns2}/25-min(t*0.1, 0.8+{ns2}/25 -0.01) if (t<T*{nr1}/100) else {ns2}/(10*(1+l*0.5)) if (t<T*({nr1}+{nr2}-{nr1}*{nr2})/100) else 1+e"""
|
|
elif(n0==self.autoOptions["b2"]):
|
|
cfg=f"""0:cfg if (e>{nr1}/100 or e<(1-({nr1}+{nr2}*(100-{nr1})/100)/100)) else {ns2}/10"""
|
|
eta=f"""0:e if (e>{nr1}/100 or e<(1-({nr1}+{nr2}*(100-{nr1})/100)/100)) else {ns1}/10"""
|
|
elif(n0==self.autoOptions["f2"]):
|
|
cfg=f"""= min(40,max(0,cfg+x(t)*({ns2}-50)/2 )) """
|
|
eta=f"""0:(1-(t%(2+ 10-.1*{nr1} ))/ (2+10-.1*{nr1}) )*{ns1}*.1 * (e*(100-{nr2})+{nr2})*.01 """
|
|
self.cfgsib={"Scheduler":n0,'Main Strength':ns1,'Sub- Strength':ns2,'Main Range':nr1,'Sub- Range':nr2}
|
|
self.cfgsib={"Scheduler":n0,'Main Strength':ns1,'Sub- Strength':ns2,'Main Range':nr1,'Sub- Range':nr2}
|
|
return self.runAdvanced(p,cfg,eta,dns ,loops,nSingle)
|
|
|
|
|
|
def runAdvanced(self, p, cfg,eta,dns ,loops,nSingle):
|
|
self.initSeed=p.seed
|
|
|
|
loops = loops if (loops>0) else 1
|
|
|
|
batch_count=p.n_iter
|
|
state.job_count = loops*p.n_iter
|
|
p.denoising_strength=p.denoising_strength or (1 if (self.isAdvanced) else 0.2)
|
|
initial_denoising_strength=p.denoising_strength
|
|
p.do_not_save_grid = True
|
|
if hasattr(p,"init_images"):
|
|
original_init_image = p.init_images
|
|
initial_color_corrections = [processing.setup_color_correction(p.init_images[0])]
|
|
else:
|
|
original_init_image=None
|
|
|
|
all_images = []
|
|
cfgsi=" loops:"+str(loops)+" terget denoising: "+str(dns)+"\nCFG: "+cfg+"\nETA: "+eta+"\n"
|
|
|
|
p.extra_generation_params = {
|
|
"CFG Scheduler Info":cfgsi,
|
|
}
|
|
|
|
|
|
|
|
if (self.isAdvanced==False):
|
|
self.cfgsib.update(p.extra_generation_params)
|
|
p.extra_generation_params=self.cfgsib
|
|
|
|
if loops>1:
|
|
processing.fix_seed(p)
|
|
|
|
|
|
for n in range(batch_count):
|
|
proc=None
|
|
history = []
|
|
p.denoising_strength=initial_denoising_strength
|
|
if (original_init_image!=None):
|
|
p.init_images=original_init_image
|
|
for loop in range(loops):
|
|
if opts.img2img_color_correction and original_init_image!=None:
|
|
p.color_corrections = initial_color_corrections
|
|
|
|
p.batch_size = 1
|
|
p.n_iter = 1
|
|
self.loop=loop
|
|
self.prepare(p, cfg,eta)
|
|
proc = process_images(p)
|
|
if loop==0:
|
|
self.initInfo=proc.info
|
|
self.initSeed=proc.seed
|
|
if len(proc.images)>0:
|
|
history.append(proc.images[0])
|
|
p.seed+=1
|
|
p.init_images=[proc.images[0]]
|
|
|
|
p.denoising_strength=initial_denoising_strength+(dns-initial_denoising_strength)*(loop+1)/(loops)
|
|
else:
|
|
break
|
|
|
|
all_images += history
|
|
if loops>0:
|
|
p.seed=self.initSeed
|
|
|
|
return proc if(nSingle) else Processed(p, all_images, self.initSeed, self.initInfo)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def peek(self,val):
|
|
print(val)
|
|
return val
|
|
|
|
def split(self,src,default='0'):
|
|
p=self.p
|
|
self.P=copy.copy({
|
|
'cfg':float(str(p.cfg_scale)),
|
|
'd':p.denoising_strength or 1,
|
|
'l':self.loop,
|
|
'min':min,
|
|
'max':max,
|
|
'abs':abs,
|
|
'pow':pow,
|
|
'pi':math.pi,
|
|
'x':self._interpolate,
|
|
'int':int,
|
|
'floor':math.floor,
|
|
'peek':self.peek,
|
|
})
|
|
|
|
if src[0:4]=="eval":
|
|
src="0:"+src[4:]
|
|
if src[0]=="=":
|
|
src="0:"+src[1:]
|
|
|
|
|
|
while src[len(src)-1] in [";"," "]:
|
|
src=src[0:len(src)-1]
|
|
while src[0] in [";"," "]:
|
|
src=src[1:]
|
|
|
|
arr0 = src.split(';')
|
|
|
|
|
|
arr=[]
|
|
for j in arr0:
|
|
|
|
v=j.split(":")
|
|
q=v[0].split(",")
|
|
|
|
for i in q:
|
|
arr.append(i+":"+v[1])
|
|
|
|
|
|
|
|
|
|
arr.sort(key=self._sort)
|
|
s=[]
|
|
val=default
|
|
for j in range(p.steps+1):
|
|
i=0
|
|
while i<len(arr) and i<=j:
|
|
v=arr[i].split(":")
|
|
|
|
if math.floor(int(v[0]) if v[0].isnumeric() else float(v[0])*p.steps)==j:
|
|
val=v[1].strip()
|
|
break
|
|
i=i+1
|
|
|
|
|
|
if val[0]=="=":
|
|
val=val[1:]
|
|
|
|
_eta=1-j/p.steps
|
|
params={'t':j,'T':p.steps,'math':math,'p':p,'e':float(str(_eta))}
|
|
|
|
params.update(copy.copy(self.P))
|
|
|
|
s.append(float(eval(val,params)))
|
|
|
|
|
|
|
|
print(np.round(s,1),"\n")
|
|
return s
|
|
|
|
def _interpolate(self,v,start=0,end=None,m=1):
|
|
end=end or self.p.steps
|
|
v=min(max(v,start),end)-start
|
|
return v*m/(end-start)+(1 if m<0 else 0)
|
|
|
|
def _sort(self,a):
|
|
_=a.split(":")[0]
|
|
return math.floor(int(_) if (_.isnumeric()) else float(_)*self.p.steps)
|
|
|
|
def evaluate (self,src):
|
|
s=[]
|
|
p=self.p
|
|
T=self.p.steps
|
|
for j in range(T+1):
|
|
_eta=1-j/p.steps
|
|
params={'t':j,'T':p.steps,'math':math,'p':p,'e':_eta}
|
|
params.update(self.P)
|
|
s.append(float(eval(src,params)))
|
|
return s
|
|
|
|
class Fake_float(float):
|
|
def __new__(self, value, arr, max_mul_count, steps_per_mul):
|
|
return float.__new__(self, value)
|
|
|
|
def __init__(self, value, arr, max_mul_count, steps_per_mul):
|
|
float.__init__(value)
|
|
self.arr = arr
|
|
self.curstep = 0
|
|
|
|
|
|
|
|
|
|
self.max_mul_count = max_mul_count
|
|
self.current_mul = 0
|
|
self.steps_per_mul = steps_per_mul
|
|
self.current_step = 0
|
|
self.max_step_count = (max_mul_count // steps_per_mul) + (max_mul_count % steps_per_mul > 0)
|
|
|
|
|
|
def __mul__(self,other):
|
|
return self.fake_mul(other)
|
|
|
|
def __rmul__(self,other):
|
|
return self.fake_mul(other)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def fake_mul(self,other):
|
|
|
|
return self.get_fake_value(other) * other
|
|
|
|
|
|
def get_fake_value(self,other):
|
|
if (self.max_step_count==1):
|
|
fake_value = self.arr[0]
|
|
else:
|
|
|
|
fake_value = self.arr[self.curstep]
|
|
self.current_mul = (self.current_mul+1) % self.max_mul_count
|
|
self.curstep = (self.current_mul) // self.steps_per_mul
|
|
self.current_step+=1
|
|
return fake_value
|
|
|
|
|
|
|
|
|
|
def fix_ddim_step_count(steps):
|
|
valid_step = 999 / (1000 // steps)
|
|
if valid_step == int(valid_step): steps=int(valid_step)+1
|
|
if ((1000 % steps)!=0): steps +=1
|
|
return steps
|
|
|