File size: 3,202 Bytes
2f4df5a 0a82a06 2f4df5a 36f12a2 2f4df5a 36f12a2 2f4df5a 36f12a2 2f4df5a 36f12a2 2f4df5a 36f12a2 2f4df5a 36f12a2 2f4df5a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 |
import torch
from transformers import (
AutoTokenizer,
AutoModelForCausalLM,
pipeline,
LogitsProcessor,
LogitsProcessorList
)
from typing import Any, List, Dict
class FixedVocabLogitsProcessor(LogitsProcessor):
"""
A custom LogitsProcessor that restricts the vocabulary
to a fixed set of token IDs, masking out everything else.
"""
def __init__(self, allowed_ids: set[int], fill_value=float('-inf')):
"""
Args:
allowed_ids (set[int]): Token IDs allowed for generation.
fill_value (float): Value used to mask disallowed tokens, default -inf.
"""
self.allowed_ids = allowed_ids
self.fill_value = fill_value
def __call__(self, input_ids: torch.LongTensor, scores: torch.FloatTensor) -> torch.FloatTensor:
"""
Args:
input_ids: shape (batch_size, sequence_length)
scores: shape (batch_size, vocab_size) - pre-softmax logits for the next token
Returns:
scores: shape (batch_size, vocab_size) with masked logits
"""
batch_size, vocab_size = scores.size()
for b in range(batch_size):
for token_id in range(vocab_size):
if token_id not in self.allowed_ids:
scores[b, token_id] = self.fill_value
return scores
class EndpointHandler:
def __init__(self, path=""):
# Load tokenizer and model
self.tokenizer = AutoTokenizer.from_pretrained(path)
self.model = AutoModelForCausalLM.from_pretrained(path, device_map="auto", torch_dtype=torch.float16)
def __call__(self, data: Any) -> List[Dict[str, str]]:
# Extract inputs and parameters
inputs = data.pop("inputs", data)
parameters = data.pop("parameters", {})
vocab_list = data.pop("vocab_list", None)
if not vocab_list:
raise ValueError("You must provide a 'vocab_list' to define allowed tokens.")
# Define allowed tokens dynamically
allowed_ids = set()
for word in vocab_list:
for tid in self.tokenizer.encode(word, add_special_tokens=False):
allowed_ids.add(tid)
for tid in self.tokenizer.encode(" " + word, add_special_tokens=False):
allowed_ids.add(tid)
# Create custom logits processor
logits_processors = LogitsProcessorList([FixedVocabLogitsProcessor(allowed_ids=allowed_ids)])
# Prepare input IDs
input_ids = self.tokenizer(inputs, return_tensors="pt").input_ids.to(self.model.device)
# Generate output
output_ids = self.model.generate(
input_ids=input_ids,
logits_processor=logits_processors,
max_length=parameters.get("max_length", 30),
num_beams=parameters.get("num_beams", 1),
do_sample=parameters.get("do_sample", False),
pad_token_id=self.tokenizer.eos_token_id,
no_repeat_ngram_size=parameters.get("no_repeat_ngram_size", 3)
)
# Decode the output
generated_text = self.tokenizer.decode(output_ids[0], skip_special_tokens=True)
return [{"generated_text": generated_text}]
|