import torch from transformers import ( AutoTokenizer, AutoModelForCausalLM, pipeline, LogitsProcessor, LogitsProcessorList ) from typing import Any, List, Dict class FixedVocabLogitsProcessor(LogitsProcessor): """ A custom LogitsProcessor that restricts the vocabulary to a fixed set of token IDs, masking out everything else. """ def __init__(self, allowed_ids: set[int], fill_value=float('-inf')): """ Args: allowed_ids (set[int]): Token IDs allowed for generation. fill_value (float): Value used to mask disallowed tokens, default -inf. """ self.allowed_ids = allowed_ids self.fill_value = fill_value def __call__(self, input_ids: torch.LongTensor, scores: torch.FloatTensor) -> torch.FloatTensor: """ Args: input_ids: shape (batch_size, sequence_length) scores: shape (batch_size, vocab_size) - pre-softmax logits for the next token Returns: scores: shape (batch_size, vocab_size) with masked logits """ batch_size, vocab_size = scores.size() for b in range(batch_size): for token_id in range(vocab_size): if token_id not in self.allowed_ids: scores[b, token_id] = self.fill_value return scores class EndpointHandler: def __init__(self, path=""): # Load tokenizer and model self.tokenizer = AutoTokenizer.from_pretrained(path) self.model = AutoModelForCausalLM.from_pretrained(path, device_map="auto", torch_dtype=torch.float16) def __call__(self, data: Any) -> List[Dict[str, str]]: # Extract inputs and parameters inputs = data.pop("inputs", data) parameters = data.pop("parameters", {}) vocab_list = data.pop("vocab_list", None) if not vocab_list: raise ValueError("You must provide a 'vocab_list' to define allowed tokens.") # Define allowed tokens dynamically allowed_ids = set() for word in vocab_list: for tid in self.tokenizer.encode(word, add_special_tokens=False): allowed_ids.add(tid) for tid in self.tokenizer.encode(" " + word, add_special_tokens=False): allowed_ids.add(tid) # Create custom logits processor logits_processors = LogitsProcessorList([FixedVocabLogitsProcessor(allowed_ids=allowed_ids)]) # Prepare input IDs input_ids = self.tokenizer(inputs, return_tensors="pt").input_ids.to(self.model.device) # Generate output output_ids = self.model.generate( input_ids=input_ids, logits_processor=logits_processors, max_length=parameters.get("max_length", 30), num_beams=parameters.get("num_beams", 1), do_sample=parameters.get("do_sample", False), pad_token_id=self.tokenizer.eos_token_id, no_repeat_ngram_size=parameters.get("no_repeat_ngram_size", 3) ) # Decode the output generated_text = self.tokenizer.decode(output_ids[0], skip_special_tokens=True) return [{"generated_text": generated_text}]