|
import torch |
|
from transformers import ( |
|
AutoTokenizer, |
|
AutoModelForCausalLM, |
|
pipeline, |
|
LogitsProcessor, |
|
LogitsProcessorList |
|
) |
|
from typing import Any, List, Dict |
|
|
|
|
|
class FixedVocabLogitsProcessor(LogitsProcessor): |
|
""" |
|
A custom LogitsProcessor that restricts the vocabulary |
|
to a fixed set of token IDs, masking out everything else. |
|
""" |
|
|
|
def __init__(self, allowed_ids: set[int], fill_value=float('-inf')): |
|
""" |
|
Args: |
|
allowed_ids (set[int]): Token IDs allowed for generation. |
|
fill_value (float): Value used to mask disallowed tokens, default -inf. |
|
""" |
|
self.allowed_ids = allowed_ids |
|
self.fill_value = fill_value |
|
|
|
def __call__(self, input_ids: torch.LongTensor, scores: torch.FloatTensor) -> torch.FloatTensor: |
|
""" |
|
Args: |
|
input_ids: shape (batch_size, sequence_length) |
|
scores: shape (batch_size, vocab_size) - pre-softmax logits for the next token |
|
Returns: |
|
scores: shape (batch_size, vocab_size) with masked logits |
|
""" |
|
batch_size, vocab_size = scores.size() |
|
for b in range(batch_size): |
|
for token_id in range(vocab_size): |
|
if token_id not in self.allowed_ids: |
|
scores[b, token_id] = self.fill_value |
|
return scores |
|
|
|
|
|
class EndpointHandler: |
|
def __init__(self, path=""): |
|
|
|
self.tokenizer = AutoTokenizer.from_pretrained(path) |
|
self.model = AutoModelForCausalLM.from_pretrained(path, device_map="auto", torch_dtype=torch.float16) |
|
|
|
def __call__(self, data: Any) -> List[Dict[str, str]]: |
|
|
|
inputs = data.pop("inputs", data) |
|
parameters = data.pop("parameters", {}) |
|
vocab_list = data.pop("vocab_list", None) |
|
|
|
if not vocab_list: |
|
raise ValueError("You must provide a 'vocab_list' to define allowed tokens.") |
|
|
|
|
|
allowed_ids = set() |
|
for word in vocab_list: |
|
for tid in self.tokenizer.encode(word, add_special_tokens=False): |
|
allowed_ids.add(tid) |
|
for tid in self.tokenizer.encode(" " + word, add_special_tokens=False): |
|
allowed_ids.add(tid) |
|
|
|
|
|
logits_processors = LogitsProcessorList([FixedVocabLogitsProcessor(allowed_ids=allowed_ids)]) |
|
|
|
|
|
input_ids = self.tokenizer(inputs, return_tensors="pt").input_ids.to(self.model.device) |
|
|
|
|
|
output_ids = self.model.generate( |
|
input_ids=input_ids, |
|
logits_processor=logits_processors, |
|
max_length=parameters.get("max_length", 30), |
|
num_beams=parameters.get("num_beams", 1), |
|
do_sample=parameters.get("do_sample", False), |
|
pad_token_id=self.tokenizer.eos_token_id, |
|
no_repeat_ngram_size=parameters.get("no_repeat_ngram_size", 3) |
|
) |
|
|
|
|
|
generated_text = self.tokenizer.decode(output_ids[0], skip_special_tokens=True) |
|
|
|
return [{"generated_text": generated_text}] |
|
|