Spaces:
Sleeping
Sleeping
from pydantic import BaseModel | |
import numpy as np | |
import torch | |
from PIL import Image | |
from transformers import Idefics2ForConditionalGeneration, Idefics2Processor, PreTrainedModel, ProcessorMixin | |
from typing import Optional | |
import re | |
import logging | |
import pickle | |
from pathlib import Path | |
import numpy as np | |
from matplotlib import pyplot as plt | |
from sklearn.cluster import KMeans | |
from huggingface_hub import hf_hub_download | |
class BaseModelYamlJsonMixin: | |
""" | |
BaseModel with helper methods for loading and saving to yaml/json format. | |
""" | |
def from_yaml(cls, path: Path): | |
with open(path, "r", encoding="utf-8") as f: | |
return cls(**yaml.safe_load(f)) | |
def to_yaml(self: BaseModel, path: Path): | |
with open(path, "w", encoding="utf-8") as f: | |
yaml.safe_dump(self.model_dump(), f) | |
def from_json(cls, path: Path): | |
with open(path, "r", encoding="utf-8") as f: | |
return cls.model_validate_json(f.read()) | |
def to_json(self: BaseModel, path: Path, indent: int = 4, *args, **kwargs): | |
with open(path, "w", encoding="utf-8") as f: | |
f.write(self.model_dump_json(indent=indent, *args, **kwargs)) | |
class BaseModelWithYamlJsonFromTo(BaseModel, BaseModelYamlJsonMixin): | |
pass | |
class Idefics2TrainAdditionalConfig(BaseModel): | |
""" | |
num_action_tokens (`int`, defaults to `32`): | |
Number of action tokens to add to the tokenizer vocabulary. | |
do_image_splitting (`bool`, *optional*, defaults to `False`): | |
Whether to split the image into a sequence 4 equal sub-images concatenated with the original image. That | |
strategy was first introduced in https://arxiv.org/abs/2311.06607. | |
lora_config (`dict`, defaults to recommended config from https://x.com/danielhanchen/status/1791900967472140583): | |
Configuration for the LoRA model. If it is `None`, the model will not use LoRA. | |
""" | |
# must be set to extend vocabulary of model + tokenizer | |
num_action_tokens: int = -1 # it will be overwritten by the processor_config.yml | |
# must be set to be used in pipeline | |
num_actions: int = -1 # it will be overwritten by the processor_config.yml | |
do_image_splitting: bool = True | |
freeze_original_vocab: bool = False | |
freeze_vision_model: bool = False | |
freeze_connector: bool = False | |
torch_dtype: str = "bfloat16" | |
lora_config: dict | None = dict( | |
r=256, | |
lora_alpha=512, | |
lora_dropout=0.1, | |
target_modules="all-linear", | |
use_rslora=True, | |
init_lora_weights="gaussian", | |
modules_to_save=["lm_head", "embed_tokens"], | |
) | |
model_name_or_path: str = "HuggingFaceM4/idefics2-8b" | |
class KMeansActionTokenizer(): | |
def __init__(self, action_count: int = 128): | |
self.action_count = action_count | |
self.kmeans = KMeans(n_clusters=self.action_count, random_state=np.random.RandomState(seed=42)) | |
def token_count(self): | |
return self.action_count | |
def from_pretrained(cls, model_path: str | Path): | |
model_path = Path(model_path) | |
self = cls() | |
action_tokenizer_path = hf_hub_download(repo_id=str(model_path), filename="tokenizer.pkl") | |
with open(action_tokenizer_path, "rb") as file: | |
self.kmeans = pickle.load(file) | |
self.action_count = self.kmeans.n_clusters | |
# assert self.action_count == 32 | |
return self | |
def save_pretrained(self, model_path: str | Path): | |
model_path = Path(model_path) | |
model_path.mkdir(exist_ok=True) | |
with open(model_path / "tokenizer.pkl", "wb") as file: | |
pickle.dump(self.kmeans, file) | |
def train(self, actions): | |
self.kmeans.fit(actions) | |
def tokenize(self, action, padding=False, max_length=-1, truncation=False): | |
# action: (K, 3) shape, adjusted delta_position and delta_yaw | |
return [i for i in self.kmeans.predict(action)] | |
def detokenize(self, tokens): | |
# Token Check | |
check = np.asarray(tokens) | |
in_valid_range = (0 <= check) & (check < self.action_count) | |
if not in_valid_range.all(): | |
logging.warning(f"Invalid tokens occur: {tokens}") | |
# If error occurs, return stop action. | |
return np.asarray([[0.0, 0.0, 0.0] for _ in range(len(tokens))]) | |
return np.asarray([self.kmeans.cluster_centers_[t] for t in tokens]) | |
def visualize(self, figset=None): | |
if figset is None: | |
fig, axes = plt.subplots(nrows=2, ncols=1, figsize=(12, 16), dpi=300) | |
else: | |
fig, axes = figset | |
FONT = {"fontsize": 20} | |
axes[0].set_title("Center", fontdict=FONT) | |
axes[1].set_title("Center_Rot", fontdict=FONT) | |
labels = self.kmeans.labels_ | |
centers = self.kmeans.cluster_centers_ | |
# plot center. each center is given as (x, y, yaw). plot point (x,y) and arrow from (x,y) to p', with direction of yaw. consider (x, y)'s scale | |
scale_factor = 0.05 | |
for i, center in enumerate(centers): | |
x, y, yaw = center | |
axes[0].plot(x, y, "ro") | |
axes[0].arrow( | |
x, | |
y, | |
np.cos(yaw) * scale_factor, | |
np.sin(yaw) * scale_factor, | |
head_width=scale_factor * 0.3, | |
head_length=scale_factor * 0.3, | |
fc="k", | |
ec="k", | |
) | |
axes[0].text(x, y, f"{i}", fontsize=10) | |
axes[0].axis("equal") | |
axes[0].grid(True) | |
# filter centers that are not far from origin in distance 0.3 | |
_centers = centers[np.linalg.norm(centers[:, :2], axis=1) < 0.05] | |
# print(f"action near zero: {_centers}") | |
scale_factor = 0.1 | |
for center in _centers: | |
x, y, yaw = center | |
axes[1].plot(x, y, "ro") | |
axes[1].arrow( | |
x, | |
y, | |
np.cos(yaw) * scale_factor, | |
np.sin(yaw) * scale_factor, | |
head_width=scale_factor * 0.3, | |
head_length=scale_factor * 0.3, | |
fc="k", | |
ec="k", | |
) | |
axes[1].axis("equal") | |
axes[1].grid(True) | |
return fig, axes | |
class Idefics2PipelineConfig(BaseModelWithYamlJsonFromTo): | |
pipeline_class: str = "Idefics2Pipeline" | |
train_additional_cfg: Idefics2TrainAdditionalConfig | |
class Idefics2Pipeline(): | |
def __init__( | |
self, | |
model: PreTrainedModel, | |
processor: ProcessorMixin, | |
action_tokenizer: KMeansActionTokenizer, | |
config: Idefics2PipelineConfig, | |
): | |
self.model = model | |
self.processor = processor | |
self.action_tokenizer = action_tokenizer | |
self.config = config | |
def save_pretrained( | |
self, | |
save_directory: str, | |
): | |
if not isinstance(save_directory, Path): | |
save_directory = Path(save_directory) | |
self.model.save_pretrained(save_directory) | |
self.processor.save_pretrained(save_directory) | |
self.action_tokenizer.save_pretrained(save_directory) | |
self.config.to_json(f"{save_directory}/pipeline_config.json") | |
def from_pretrained(cls, pretrained_model_name_or_path: str): | |
pipeline_config_path = hf_hub_download(repo_id=pretrained_model_name_or_path, filename="pipeline_config.json") | |
pipeline_config_path = Path(pipeline_config_path) | |
config = Idefics2PipelineConfig.model_validate_json( | |
(pipeline_config_path).read_text() | |
) | |
if not isinstance(pretrained_model_name_or_path, Path): | |
pretrained_model_name_or_path = Path(pretrained_model_name_or_path) | |
model = Idefics2ForConditionalGeneration.from_pretrained(pretrained_model_name_or_path) | |
processor = Idefics2Processor.from_pretrained(pretrained_model_name_or_path) | |
model.eval() | |
action_tokenizer = KMeansActionTokenizer.from_pretrained(pretrained_model_name_or_path) | |
return cls(model, processor, action_tokenizer, config) | |
def to(self, device): | |
return self.model.to(device) | |
def __call__( | |
self, | |
examples: list[dict], | |
return_traj: Optional[bool] = False, | |
): | |
""" | |
call model with examples | |
Args: | |
examples: list of example, [B, example] | |
return_traj: return trajectory if True | |
""" | |
raise NotImplementedError("Not implemented yet") | |
# same as idefics2 data collator | |
texts = [] | |
images = [] | |
for example in examples: | |
image = example["images"] | |
messages = example["messages"] | |
text = self.processor.apply_chat_template(messages, add_generation_prompt=False) | |
texts.append(text.strip()) | |
images.append(image) | |
inputs = self.processor(text=texts, images=images, return_tensors="pt", padding=True) | |
generate_ids = self.model.generate(**inputs, max_new_tokens=self.config.num_actions) | |
generated_text = self.processor.batch_decode(generate_ids, skip_special_tokens=True) | |
if return_traj: | |
return self.action_tokenizer.detokenize(generated_text) | |
else: | |
return generated_text | |
def __call__( | |
self, | |
message_list: list[list[dict]], | |
images_list: list[list[Image.Image]], | |
return_traj: Optional[bool] = False, | |
): | |
""" | |
call model with message and images | |
Args: | |
message_list: list of messages, [B, messages] | |
images_list: list of images, [B, images] | |
return_traj: return trajectory if True | |
""" | |
# we don't use batch inference for run model worker | |
if len(message_list) != 1: | |
raise ValueError("No batch api call allowed for Idefics2Pipeline") | |
message = message_list[0] | |
images = images_list[0] | |
prompt = self.processor.apply_chat_template(message, add_generation_prompt=True) | |
prompt.replace("<end_of_utterance>", "") | |
# add space to match the training data | |
prompt = prompt + " " | |
inputs = self.processor(text=prompt, images=images, return_tensors="pt", padding=True) | |
device = self.model.device | |
inputs = {k: v.to(device) for k, v in inputs.items()} | |
generate_ids = self.model.generate( | |
**inputs, max_new_tokens=self.config.train_additional_cfg.num_actions, top_k=1 | |
) | |
generated_texts = self.processor.batch_decode(generate_ids, skip_special_tokens=True) | |
if return_traj: | |
pred_action = re.findall(r"<ACTION_(\d+)>", generated_texts[0]) | |
# pred_action = pred_action if len(pred_action) == self.config.num_actions else [-1] * self.config.num_actions | |
pred_action = np.array(pred_action, dtype=np.int64) | |
return self.action_tokenizer.detokenize(pred_action).tolist() | |
else: | |
return generated_texts |