|
import numpy as np |
|
import torch |
|
from bayes_opt import BayesianOptimization, SequentialDomainReductionTransformer |
|
from lpips import LPIPS |
|
from scipy.optimize import curve_fit |
|
from scipy.stats import beta as beta_distribution |
|
|
|
from transformers import CLIPImageProcessor, CLIPModel |
|
from utils import compute_lpips, compute_smoothness_and_consistency |
|
|
|
|
|
class BetaPriorPipeline: |
|
def __init__(self, pipe, model_ID="openai/clip-vit-base-patch32"): |
|
self.model = CLIPModel.from_pretrained(model_ID) |
|
self.preprocess = CLIPImageProcessor.from_pretrained(model_ID) |
|
self.pipe = pipe |
|
|
|
def _compute_clip(self, embedding_a, embedding_b): |
|
similarity_score = torch.nn.functional.cosine_similarity( |
|
embedding_a, embedding_b |
|
) |
|
return 1 - similarity_score[0] |
|
|
|
def _get_feature(self, image): |
|
with torch.no_grad(): |
|
if isinstance(image, np.ndarray): |
|
image = self.preprocess( |
|
image, return_tensors="pt", do_rescale=False |
|
).pixel_values |
|
else: |
|
image = self.preprocess(image, return_tensors="pt").pixel_values |
|
embedding = self.model.get_image_features(image) |
|
return embedding |
|
|
|
def _update_alpha_beta(self, xs, ds): |
|
uniform_point = [] |
|
ds_sum = sum(ds) |
|
for i in range(len(ds)): |
|
uniform_point.append(ds[i] / ds_sum) |
|
uniform_point = [0] + uniform_point |
|
uniform_points = np.cumsum(uniform_point) |
|
|
|
xs = np.asarray(xs) |
|
uniform_points = np.asarray(uniform_points) |
|
|
|
def beta_cdf(x, alpha, beta_param): |
|
return beta_distribution.cdf(x, alpha, beta_param) |
|
|
|
initial_guess = [1.0, 1.0] |
|
bounds = ([1e-6, 1e-6], [np.inf, np.inf]) |
|
params, covariance = curve_fit( |
|
beta_cdf, xs, uniform_points, p0=initial_guess, bounds=bounds |
|
) |
|
|
|
fitted_alpha, fitted_beta = params |
|
return fitted_alpha, fitted_beta |
|
|
|
def _add_next_point( |
|
self, |
|
ds, |
|
xs, |
|
images, |
|
features, |
|
alpha, |
|
beta_param, |
|
prompt_start, |
|
prompt_end, |
|
negative_prompt, |
|
latent_start, |
|
latent_end, |
|
num_inference_steps, |
|
uniform=False, |
|
**kwargs, |
|
): |
|
idx = np.argmax(ds) |
|
A = xs[idx] |
|
B = xs[idx + 1] |
|
F_A = beta_distribution.cdf(A, alpha, beta_param) |
|
F_B = beta_distribution.cdf(B, alpha, beta_param) |
|
|
|
|
|
F_t = (F_A + F_B) / 2 |
|
|
|
|
|
t = beta_distribution.ppf(F_t, alpha, beta_param) |
|
|
|
if uniform: |
|
idx = np.argmax(np.array(xs) - np.array([0] + xs[:-1])) - 1 |
|
t = (xs[idx] + xs[idx + 1]) / 2 |
|
|
|
if t < 0 or t > 1: |
|
return xs, False |
|
|
|
ims = self.pipe.interpolate_single( |
|
t, |
|
prompt_start=prompt_start, |
|
prompt_end=prompt_end, |
|
negative_prompt=negative_prompt, |
|
latent_start=latent_start, |
|
latent_end=latent_end, |
|
early="fused_outer", |
|
num_inference_steps=num_inference_steps, |
|
**kwargs, |
|
) |
|
|
|
added_image = ims.images[1] |
|
added_feature = self._get_feature(added_image) |
|
d1 = self._compute_clip(features[idx], added_feature) |
|
d2 = self._compute_clip(features[idx + 1], added_feature) |
|
|
|
images.insert(idx + 1, ims.images[1]) |
|
features.insert(idx + 1, added_feature) |
|
xs.insert(idx + 1, t) |
|
del ds[idx] |
|
ds.insert(idx, d1) |
|
ds.insert(idx + 1, d2) |
|
return xs, True |
|
|
|
def explore_with_beta( |
|
self, |
|
progress, |
|
prompt_start, |
|
prompt_end, |
|
negative_prompt, |
|
latent_start, |
|
latent_end, |
|
num_inference_steps=28, |
|
exploration_size=16, |
|
init_alpha=3, |
|
init_beta=3, |
|
uniform=False, |
|
**kwargs, |
|
): |
|
xs = [0.0, 0.5, 1.0] |
|
images = self.pipe.interpolate_single( |
|
0.5, |
|
prompt_start=prompt_start, |
|
prompt_end=prompt_end, |
|
negative_prompt=negative_prompt, |
|
latent_start=latent_start, |
|
latent_end=latent_end, |
|
early="fused_outer", |
|
num_inference_steps=num_inference_steps, |
|
**kwargs, |
|
) |
|
images = images.images |
|
images = [images[0], images[1], images[2]] |
|
features = [self._get_feature(image) for image in images] |
|
ds = [ |
|
self._compute_clip(features[0], features[1]), |
|
self._compute_clip(features[1], features[2]), |
|
] |
|
alpha = init_alpha |
|
beta_param = init_beta |
|
print( |
|
"Alpha:", |
|
alpha, |
|
"| Beta:", |
|
beta_param, |
|
"| Current Coefs:", |
|
xs, |
|
"| Current Distances:", |
|
ds, |
|
) |
|
progress(3, desc="Exploration") |
|
for i in progress.tqdm(range(3, exploration_size)): |
|
xs, flag = self._add_next_point( |
|
ds, |
|
xs, |
|
images, |
|
features, |
|
alpha, |
|
beta_param, |
|
prompt_start, |
|
prompt_end, |
|
negative_prompt, |
|
latent_start, |
|
latent_end, |
|
num_inference_steps, |
|
uniform=uniform, |
|
**kwargs, |
|
) |
|
if not flag: |
|
break |
|
alpha, beta_param = self._update_alpha_beta(xs, ds) |
|
if uniform: |
|
alpha = 1 |
|
beta_param = 1 |
|
print(f"--------Exploration: {len(xs)} / {exploration_size}--------") |
|
print( |
|
"Alpha:", |
|
alpha, |
|
"| Beta:", |
|
beta_param, |
|
"| Current Coefs:", |
|
xs, |
|
"| Current Distances:", |
|
ds, |
|
) |
|
|
|
return images, features, ds, xs, alpha, beta_param |
|
|
|
def extract_uniform_points(self, ds, interpolation_size): |
|
expected_dis = sum(ds) / (interpolation_size - 1) |
|
current_sum = 0 |
|
output_idxs = [0] |
|
for idx, d in enumerate(ds): |
|
current_sum += d |
|
if current_sum >= expected_dis: |
|
output_idxs.append(idx) |
|
current_sum = 0 |
|
return output_idxs |
|
|
|
def extract_uniform_points_plus(self, features, interpolation_size): |
|
weights = -1 * np.ones((len(features), len(features))) |
|
for i in range(len(features)): |
|
for j in range(i + 1, len(features)): |
|
weights[i][j] = self._compute_clip(features[i], features[j]) |
|
m = len(features) |
|
n = interpolation_size |
|
_, best_path = self.find_minimal_spread_and_path(n, m, weights) |
|
print("Optimal smooth path:", best_path) |
|
return best_path |
|
|
|
def find_minimal_spread_and_path(self, n, m, weights): |
|
|
|
W = sorted( |
|
{ |
|
weights[i][j] |
|
for i in range(m - 1) |
|
for j in range(i + 1, m) |
|
if weights[i][j] != -1 |
|
} |
|
) |
|
min_weight = W[0] |
|
max_weight = W[-1] |
|
|
|
low = 0.0 |
|
high = max_weight - min_weight |
|
epsilon = 1e-6 |
|
|
|
best_D = None |
|
best_path = None |
|
|
|
while high - low > epsilon: |
|
D = (low + high) / 2 |
|
result = self.is_path_possible(D, n, m, weights, W) |
|
if result is not None: |
|
|
|
high = D |
|
best_D = D |
|
best_path = result |
|
else: |
|
low = D |
|
|
|
return best_D, best_path |
|
|
|
def is_path_possible(self, D, n, m, weights, W): |
|
for w_min in W: |
|
w_max = w_min + D |
|
if w_max > W[-1]: |
|
break |
|
|
|
|
|
dp = [[None] * (n + 1) for _ in range(m)] |
|
dp[0][1] = ( |
|
float("-inf"), |
|
float("inf"), |
|
[0], |
|
) |
|
|
|
for l in range(1, n): |
|
for i in range(m): |
|
if dp[i][l] is not None: |
|
max_w, min_w, path = dp[i][l] |
|
for j in range(i + 1, m): |
|
w = weights[i][j] |
|
if w != -1 and w_min <= w <= w_max: |
|
|
|
new_max_w = max(max_w, w) |
|
new_min_w = min(min_w, w) |
|
new_diff = new_max_w - new_min_w |
|
if new_diff <= D: |
|
dp_j_l_plus_1 = dp[j][l + 1] |
|
if dp_j_l_plus_1 is None or new_diff < ( |
|
dp_j_l_plus_1[0] - dp_j_l_plus_1[1] |
|
): |
|
dp[j][l + 1] = ( |
|
new_max_w, |
|
new_min_w, |
|
path + [j], |
|
) |
|
|
|
if dp[m - 1][n] is not None: |
|
|
|
_, _, path = dp[m - 1][n] |
|
return path |
|
|
|
return None |
|
|
|
def generate_interpolation( |
|
self, |
|
progress, |
|
prompt_start, |
|
prompt_end, |
|
negative_prompt, |
|
latent_start, |
|
latent_end, |
|
num_inference_steps=28, |
|
exploration_size=16, |
|
init_alpha=3, |
|
init_beta=3, |
|
interpolation_size=7, |
|
uniform=False, |
|
**kwargs, |
|
): |
|
images, features, ds, xs, alpha, beta_param = self.explore_with_beta( |
|
progress, |
|
prompt_start, |
|
prompt_end, |
|
negative_prompt, |
|
latent_start, |
|
latent_end, |
|
num_inference_steps, |
|
exploration_size, |
|
init_alpha, |
|
init_beta, |
|
uniform=uniform, |
|
**kwargs, |
|
) |
|
|
|
output_idx = self.extract_uniform_points_plus(features, interpolation_size) |
|
output_images = [] |
|
for idx in output_idx: |
|
output_images.append(images[idx]) |
|
|
|
|
|
self.images = images |
|
self.ds = ds |
|
self.xs = xs |
|
self.alpha = alpha |
|
self.beta_param = beta_param |
|
|
|
return output_images |
|
|
|
|
|
def bayesian_prior_selection( |
|
interpolation_pipe, |
|
latent1: torch.FloatTensor, |
|
latent2: torch.FloatTensor, |
|
prompt1: str, |
|
prompt2: str, |
|
lpips_model: LPIPS, |
|
guide_prompt: str | None = None, |
|
negative_prompt: str = "", |
|
size: int = 3, |
|
num_inference_steps: int = 25, |
|
warmup_ratio: float = 1, |
|
early: str = "vfused", |
|
late: str = "self", |
|
target_score: float = 0.9, |
|
n_iter: int = 15, |
|
p_min: float | None = None, |
|
p_max: float | None = None, |
|
) -> tuple: |
|
""" |
|
Select the alpha and beta parameters for the interpolation using Bayesian optimization. |
|
|
|
Args: |
|
interpolation_pipe (any): The interpolation pipeline. |
|
latent1 (torch.FloatTensor): The first source latent vector. |
|
latent2 (torch.FloatTensor): The second source latent vector. |
|
prompt1 (str): The first source prompt. |
|
prompt2 (str): The second source prompt. |
|
lpips_model (any): The LPIPS model used to compute perceptual distances. |
|
guide_prompt (str | None, optional): The guide prompt for the interpolation, if any. Defaults to None. |
|
negative_prompt (str, optional): The negative prompt for the interpolation, default to empty string. Defaults to "". |
|
size (int, optional): The size of the interpolation sequence. Defaults to 3. |
|
num_inference_steps (int, optional): The number of inference steps. Defaults to 25. |
|
warmup_ratio (float, optional): The warmup ratio. Defaults to 1. |
|
early (str, optional): The early fusion method. Defaults to "vfused". |
|
late (str, optional): The late fusion method. Defaults to "self". |
|
target_score (float, optional): The target score. Defaults to 0.9. |
|
n_iter (int, optional): The maximum number of iterations. Defaults to 15. |
|
p_min (float, optional): The minimum value of alpha and beta. Defaults to None. |
|
p_max (float, optional): The maximum value of alpha and beta. Defaults to None. |
|
Returns: |
|
tuple: A tuple containing the selected alpha and beta parameters. |
|
""" |
|
|
|
def get_smoothness(alpha, beta): |
|
""" |
|
Black-box objective function of Bayesian Optimization. |
|
Get the smoothness of the interpolated sequence with the given alpha and beta. |
|
""" |
|
if alpha < beta and large_alpha_prior: |
|
return 0 |
|
if alpha > beta and not large_alpha_prior: |
|
return 0 |
|
if alpha == beta: |
|
return init_smoothness |
|
interpolation_sequence = interpolation_pipe.interpolate_save_gpu( |
|
latent1, |
|
latent2, |
|
prompt1, |
|
prompt2, |
|
guide_prompt=guide_prompt, |
|
negative_prompt=negative_prompt, |
|
size=size, |
|
num_inference_steps=num_inference_steps, |
|
warmup_ratio=warmup_ratio, |
|
early=early, |
|
late=late, |
|
alpha=alpha, |
|
beta=beta, |
|
) |
|
smoothness, _, _ = compute_smoothness_and_consistency( |
|
interpolation_sequence, lpips_model |
|
) |
|
return smoothness |
|
|
|
|
|
|
|
images = interpolation_pipe.interpolate_single( |
|
0.5, |
|
latent1, |
|
latent2, |
|
prompt1, |
|
prompt2, |
|
guide_prompt=guide_prompt, |
|
negative_prompt=negative_prompt, |
|
num_inference_steps=num_inference_steps, |
|
warmup_ratio=warmup_ratio, |
|
early=early, |
|
late=late, |
|
) |
|
|
|
distances = compute_lpips(images, lpips_model) |
|
|
|
init_smoothness, _, _ = compute_smoothness_and_consistency(images, lpips_model) |
|
|
|
large_alpha_prior = distances[0] < distances[1] |
|
|
|
|
|
num_warmup_steps = warmup_ratio * num_inference_steps |
|
if p_min is None: |
|
p_min = 1 |
|
if p_max is None: |
|
p_max = num_warmup_steps |
|
pbounds = {"alpha": (p_min, p_max), "beta": (p_min, p_max)} |
|
bounds_transformer = SequentialDomainReductionTransformer(minimum_window=0.1) |
|
optimizer = BayesianOptimization( |
|
f=get_smoothness, |
|
pbounds=pbounds, |
|
random_state=1, |
|
bounds_transformer=bounds_transformer, |
|
allow_duplicate_points=True, |
|
) |
|
alpha_init = [p_min, (p_min + p_max) / 2, p_max] |
|
beta_init = [p_min, (p_min + p_max) / 2, p_max] |
|
|
|
|
|
for alpha in alpha_init: |
|
for beta in beta_init: |
|
optimizer.probe(params={"alpha": alpha, "beta": beta}, lazy=False) |
|
latest_result = optimizer.res[-1] |
|
latest_score = latest_result["target"] |
|
if latest_score >= target_score: |
|
return alpha, beta |
|
|
|
|
|
for _ in range(n_iter): |
|
optimizer.maximize(init_points=0, n_iter=1) |
|
max_score = optimizer.max["target"] |
|
if max_score >= target_score: |
|
print(f"Stopping early, target of {target_score} reached.") |
|
break |
|
|
|
results = optimizer.max |
|
alpha = results["params"]["alpha"] |
|
beta = results["params"]["beta"] |
|
return alpha, beta |
|
|
|
|
|
def generate_beta_tensor( |
|
size: int, alpha: float = 3, beta: float = 3 |
|
) -> torch.FloatTensor: |
|
""" |
|
Assume size as n |
|
Generates a PyTorch tensor of values [x0, x1, ..., xn-1] for the Beta distribution |
|
where each xi satisfies F(xi) = i/(n-1) for the CDF F of the Beta distribution. |
|
|
|
Args: |
|
size (int): The number of values to generate. |
|
alpha (float): The alpha parameter of the Beta distribution. |
|
beta (float): The beta parameter of the Beta distribution. |
|
|
|
Returns: |
|
torch.Tensor: A tensor of the inverse CDF values of the Beta distribution. |
|
""" |
|
|
|
prob_values = [i / (size - 1) for i in range(size)] |
|
inverse_cdf_values = beta_distribution.ppf(prob_values, alpha, beta) |
|
|
|
|
|
return torch.tensor(inverse_cdf_values, dtype=torch.float32) |
|
|