|
import time |
|
import torch |
|
import lightning as L |
|
from torch.utils.data import DataLoader |
|
from lightning.fabric.loggers import CSVLogger |
|
from lightning.fabric.strategies import FSDPStrategy |
|
from tsai_gpt.model import GPT, Block, Config |
|
from tsai_gpt.tokenizer import Tokenizer |
|
from tsai_gpt.packed_dataset import CombinedDataset, PackedDataset |
|
from tsai_gpt.speed_monitor import SpeedMonitorBase, estimate_flops, measure_flops |
|
from tsai_gpt.speed_monitor import SpeedMonitorFabric as SpeedMonitor |
|
from tsai_gpt.utils import chunked_cross_entropy, get_default_supported_precision, num_parameters, load_checkpoint, gptq_quantization |
|
import torch.nn as nn |
|
from pathlib import Path |
|
import sys |
|
import random |
|
from torch import nn |
|
import lightning.pytorch as pl |
|
from torch.nn import functional as F |
|
|
|
|
|
|
|
model_name = "pythia-160m" |
|
name = "redpajama" |
|
|
|
def _init_weights(module: nn.Module) -> None: |
|
"""Meant to be used with `gpt.apply(gpt._init_weights)`.""" |
|
if isinstance(module, nn.Linear): |
|
torch.nn.init.normal_(module.weight, mean=0.0, std=0.02) |
|
if module.bias is not None: |
|
torch.nn.init.zeros_(module.bias) |
|
elif isinstance(module, nn.Embedding): |
|
torch.nn.init.normal_(module.weight, mean=0.0, std=0.02) |
|
|
|
config = Config.from_name(model_name) |
|
model = GPT(config) |
|
|
|
next(model.parameters()).sum() |
|
|
|
model.apply(_init_weights) |
|
model.load_state_dict |
|
|
|
|
|
checkpoint_dir = Path("final-gpt-model-ckpt.pth") |
|
strategy = "auto" |
|
quantize = None |
|
devices = 1 |
|
precision = None |
|
|
|
|
|
precision = get_default_supported_precision(training=False) |
|
plugins = None |
|
fabric = L.Fabric(devices=devices, precision=precision, strategy=strategy, plugins=plugins) |
|
fabric.launch() |
|
fabric.print(f"Loading model {str(checkpoint_dir)!r} with {config.__dict__}", file=sys.stderr) |
|
|
|
with fabric.init_module(empty_init=True), gptq_quantization(quantize=="gptq.int4"): |
|
model = GPT(config) |
|
|
|
model.eval() |
|
model = fabric.setup_module(model) |
|
load_checkpoint(fabric, model, checkpoint_dir) |
|
|
|
tokenizer = Tokenizer(Path('tokenizer_config')) |
|
|
|
@torch.inference_mode() |
|
def generate( |
|
model: GPT, |
|
idx: torch.Tensor, |
|
max_returned_tokens: int, |
|
*, |
|
temperature: float = 1.0, |
|
top_k:int = None, |
|
eos_id:int = None, |
|
) -> torch.Tensor: |
|
"""Takes a conditioning sequence (prompt) as input and continues to generate as many tokens as requested. |
|
The implementation of this function is modified from A. Karpathy's nanoGPT. |
|
Args: |
|
model: The model to use. |
|
idx: Tensor of shape (T) with indices of the prompt sequence. |
|
max_returned_tokens: The maximum number of tokens to return (given plus generated). |
|
temperature: Scales the predicted logits by 1 / temperature. |
|
top_k: If specified, only sample among the tokens with the k highest probabilities. |
|
eos_id: If specified, stop generating any more token once the <eos> token is triggered. |
|
""" |
|
T = idx.size(0) |
|
assert max_returned_tokens > T |
|
if model.max_seq_length < max_returned_tokens - 1: |
|
|
|
|
|
|
|
raise NotImplementedError(f"max_seq_length {model.max_seq_length} needs to be >= {max_returned_tokens - 1}") |
|
|
|
device, dtype = idx.device, idx.dtype |
|
|
|
empty = torch.empty(max_returned_tokens, dtype=dtype, device=device) |
|
empty[:T] = idx |
|
idx = empty |
|
input_pos = torch.arange(0, T, device=device) |
|
|
|
|
|
for _ in range(max_returned_tokens - T): |
|
x = idx.index_select(0, input_pos).view(1, -1) |
|
|
|
|
|
logits = model(x, input_pos) |
|
logits = logits[0, -1] / temperature |
|
|
|
|
|
if top_k is not None: |
|
v, _ = torch.topk(logits, min(top_k, logits.size(-1))) |
|
logits = torch.where(logits < v[[-1]], -float("Inf"), logits) |
|
|
|
probs = torch.nn.functional.softmax(logits, dim=-1) |
|
idx_next = torch.multinomial(probs, num_samples=1).to(dtype=dtype) |
|
|
|
|
|
input_pos = input_pos[-1:] + 1 |
|
|
|
|
|
idx = idx.index_copy(0, input_pos, idx_next) |
|
|
|
|
|
if idx_next == eos_id: |
|
return idx[:input_pos] |
|
|
|
return idx |
|
|
|
|
|
device = 'cuda' if torch.cuda.is_available() else 'cpu' |
|
|
|
def generate_text(input_text, temperature=0.8, max_tokens=200, top_k=None): |
|
encoded = tokenizer.encode(input_text, device=fabric.device) |
|
max_returned_tokens = encoded.size(0) + max_tokens |
|
|
|
|
|
with fabric.init_tensor(): |
|
|
|
model.max_seq_length = max_returned_tokens |
|
|
|
|
|
with fabric.init_tensor(): |
|
model.set_kv_cache(batch_size=1) |
|
|
|
y = generate(model, encoded, max_returned_tokens, temperature=temperature, top_k=top_k) |
|
|
|
return(tokenizer.decode(y)) |
|
|
|
import gradio as gr |
|
|
|
title = "GPT from scratch" |
|
|
|
description1 = "GPT implementation taken from <a href='https://github.com/Lightning-AI/lit-gpt'>Lit-GPT</a>. It is trained on a samples of the <a href='https://huggingface.co/datasets/togethercomputer/RedPajama-Data-1T-Sample'>RedPajama 1 trillion dataset</a> to understand how GPT's are trained and built. The github link can be found <a href='https://github.com/mkthoma/gpt_from_scratch'>here.</a>" |
|
|
|
demo = gr.Interface(generate_text, |
|
inputs=[gr.Textbox(label="Enter any prompt ", type="text", value="Once upon a time,"), |
|
gr.Slider(minimum=0, maximum=1, step=0.1, value=0.8, label="Temperature"), |
|
gr.Slider(minimum=200, maximum=1000, step=50, value=300, label="Max Tokens"), |
|
gr.Slider(minimum=10, maximum=100, step=5, value=20, label="Top K")], |
|
outputs=gr.Textbox(label="Text generated", type="text"), title=title, description=description1) |
|
|
|
|
|
demo.launch() |
|
|