Spaces:
Sleeping
Sleeping
import torch | |
import torch.nn as nn | |
import math | |
# RMSNorm is a normalization technique that normalizes the input by dividing by the square root of the variance plus a small number to prevent division by zero | |
class LlamaRMSNorm(nn.Module): | |
def __init__(self, hidden_size, eps=1e-5): # the number of features/dimensions/embeddings in the input, eps is a small number to prevent division by zero | |
super().__init__() | |
self.weight = nn.Parameter(torch.ones(hidden_size)) # weight is a learnable parameter that scales the input | |
self.eps = eps | |
def forward(self, x): | |
norm = x.pow(2).mean(-1, keepdim=True).sqrt() + self.eps # compute the norm of the input | |
return x / norm * self.weight # normalize the input by dividing by the norm and scale it by the weight parameter | |
# RotaryEmbedding is a technique that rotates the input by a learnable angle | |
class LlamaRotaryEmbedding(nn.Module): | |
def __init__(self, dim, base=10000, device=None): # dim is the number of features/dimensions/embeddings in the input, base is a base number for the frequency, device is the device to store the buffer | |
super().__init__() | |
inv_freq = 1.0 / (base ** (torch.arange(0, dim, 2, device=device).float() / dim)) # compute the inverse frequency | |
self.register_buffer("inv_freq", inv_freq) # register the inverse frequency as a buffer | |
def forward(self, x, seq_len): | |
seq_len = seq_len.to(x.device) # convert seq_len to the device of the input | |
t = torch.arange(seq_len, device=x.device) # create a tensor of the sequence length | |
freqs = torch.einsum("i,j->ij", t, self.inv_freq) # compute the frequency by taking the dot product of the sequence length and the inverse frequency | |
emb = torch.cat((freqs, freqs), dim=-1) # concatenate the frequency with itself | |
return emb | |
class LlamaMLP(nn.Module): | |
def __init__(self, dim, hidden_dim): | |
super().__init__() | |
self.gate_proj = nn.Linear(dim, hidden_dim, bias=False) # create the gate projection layer with the input dimension and the hidden dimension | |
self.up_proj = nn.Linear(dim, hidden_dim, bias=False) # create the up projection layer with the input dimension and the hidden dimension | |
self.down_proj = nn.Linear(hidden_dim, dim, bias=False) # create the down projection layer with the hidden dimension and the output dimension | |
self.act_fn = nn.SiLU() # create the activation function | |
def forward(self, x): | |
gated = self.gate_proj(x) # apply the gate projection to the input | |
hidden = self.up_proj(x) # apply the up projection to the input | |
return self.down_proj(self.act_fn(gated * hidden)) # apply the activation function to the gated and hidden values and then apply the down projection | |
class LlamaAttention(nn.Module): | |
def __init__(self, dim, num_heads=8): | |
super().__init__() | |
self.num_heads = num_heads | |
self.head_dim = dim // num_heads | |
self.q_proj = nn.Linear(dim, dim, bias=False) | |
self.k_proj = nn.Linear(dim, dim, bias=False) | |
self.v_proj = nn.Linear(dim, dim, bias=False) | |
self.o_proj = nn.Linear(dim, dim, bias=False) | |
def forward(self, x): | |
batch_size, seq_len, dim = x.size() # [batch_size, seq_len, dim] -> [4, 128, 576] | |
q = self.q_proj(x) | |
k = self.k_proj(x) | |
v = self.v_proj(x) | |
# Split heads | |
q = q.view(batch_size, seq_len, self.num_heads, self.head_dim).transpose(1, 2) # [batch_size, num_heads, seq_len, head_dim] | |
k = k.view(batch_size, seq_len, self.num_heads, self.head_dim).transpose(1, 2) | |
v = v.view(batch_size, seq_len, self.num_heads, self.head_dim).transpose(1, 2) | |
# Scaled dot-product attention | |
scores = torch.matmul(q, k.transpose(-2, -1)) / math.sqrt(self.head_dim) | |
attention = torch.softmax(scores, dim=-1) | |
context = torch.matmul(attention, v) | |
# Combine heads | |
context = context.transpose(1, 2).reshape(batch_size, seq_len, dim) | |
return self.o_proj(context) | |
class LlamaDecoderLayer(nn.Module): | |
def __init__(self, dim, hidden_dim, num_heads): | |
super().__init__() | |
self.self_attn = LlamaAttention(dim, num_heads) | |
self.mlp = LlamaMLP(dim, hidden_dim) | |
self.input_layernorm = LlamaRMSNorm(dim) | |
self.post_attention_layernorm = LlamaRMSNorm(dim) | |
def forward(self, x): | |
residual = x | |
x = self.input_layernorm(x) | |
x = self.self_attn(x) | |
x = x + residual | |
residual = x | |
x = self.post_attention_layernorm(x) | |
x = self.mlp(x) | |
x = x + residual | |
return x | |
class LlamaModel(nn.Module): | |
def __init__(self, vocab_size, dim, num_layers, hidden_dim, num_heads): | |
super().__init__() | |
self.embed_tokens = nn.Embedding(vocab_size, dim) | |
self.layers = nn.ModuleList([ | |
LlamaDecoderLayer(dim, hidden_dim, num_heads) for _ in range(num_layers) | |
]) | |
self.norm = LlamaRMSNorm(dim) | |
self.rotary_emb = LlamaRotaryEmbedding(dim) | |
def forward(self, x): | |
x = self.embed_tokens(x) | |
for layer in self.layers: | |
x = layer(x) | |
return self.norm(x) | |
class LlamaForCausalLM(nn.Module): | |
def __init__(self, vocab_size, dim, num_layers, hidden_dim, num_heads): | |
super().__init__() | |
self.model = LlamaModel(vocab_size, dim, num_layers, hidden_dim, num_heads) | |
self.lm_head = nn.Linear(dim, vocab_size, bias=False) | |
def forward(self, x): | |
x = self.model(x) | |
return self.lm_head(x) | |
def get_model(tokenizer): | |
vocab_size = tokenizer.vocab_size # Use actual tokenizer vocab size | |
return LlamaForCausalLM( | |
vocab_size=vocab_size, | |
dim=576, | |
num_layers=30, | |
hidden_dim=1536, | |
num_heads=8 | |
) | |
# model = get_model() | |
# print(model) |