|
"""Implementation of the paper: |
|
|
|
LLaMA-Adapter: Efficient Fine-tuning of Language Models with Zero-init Attention |
|
https://arxiv.org/abs/2303.16199 |
|
""" |
|
|
|
import math |
|
from dataclasses import dataclass |
|
|
|
import torch |
|
import torch.nn as nn |
|
from torch.nn import functional as F |
|
import lit_llama.model as llama |
|
from lit_llama.model import build_rope_cache, apply_rope, RMSNorm, MLP |
|
|
|
|
|
@dataclass |
|
class LLaMAConfig(llama.LLaMAConfig): |
|
adapter_prompt_length: int = 10 |
|
adapter_start_layer: int = 2 |
|
|
|
|
|
class CausalSelfAttention(nn.Module): |
|
"""A modification of `lit_llama.model.CausalSelfAttention` that adds the attention |
|
over the adaption prompt.""" |
|
|
|
def __init__(self, config: LLaMAConfig, block_idx: int) -> None: |
|
super().__init__() |
|
assert config.n_embd % config.n_head == 0 |
|
|
|
|
|
self.c_attn = nn.Linear(config.n_embd, 3 * config.n_embd, bias=False) |
|
|
|
self.c_proj = nn.Linear(config.n_embd, config.n_embd, bias=False) |
|
|
|
if block_idx >= config.adapter_start_layer: |
|
|
|
self.adapter_wte = nn.Embedding(config.adapter_prompt_length, config.n_embd) |
|
|
|
self.gating_factor = torch.nn.Parameter(torch.zeros(1)) |
|
|
|
self.n_head = config.n_head |
|
self.n_embd = config.n_embd |
|
self.block_size = config.block_size |
|
self.block_idx = block_idx |
|
self.adapter_prompt_length = config.adapter_prompt_length |
|
self.adapter_start_layer = config.adapter_start_layer |
|
self.rope_cache = None |
|
|
|
def forward(self, x: torch.Tensor) -> torch.Tensor: |
|
B, T, C = x.size() |
|
|
|
|
|
q, k, v = self.c_attn(x).split(self.n_embd, dim=2) |
|
|
|
head_size = C // self.n_head |
|
k = k.view(B, T, self.n_head, head_size).transpose(1, 2) |
|
q = q.view(B, T, self.n_head, head_size).transpose(1, 2) |
|
v = v.view(B, T, self.n_head, head_size).transpose(1, 2) |
|
|
|
if self.rope_cache is None: |
|
|
|
self.rope_cache = build_rope_cache( |
|
seq_len=self.block_size, |
|
n_elem=self.n_embd // self.n_head, |
|
dtype=x.dtype, |
|
device=x.device, |
|
) |
|
|
|
q = apply_rope(q, self.rope_cache) |
|
k = apply_rope(k, self.rope_cache) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
y = F.scaled_dot_product_attention(q, k, v, attn_mask=None, dropout_p=0.0, is_causal=True) |
|
|
|
if self.block_idx >= self.adapter_start_layer: |
|
prefix = self.adapter_wte.weight.reshape(1, self.adapter_prompt_length, self.n_embd) |
|
|
|
aT = prefix.size(1) |
|
_, ak, av = self.c_attn(prefix).split(self.n_embd, dim=2) |
|
ak = ak.view(1, aT, self.n_head, head_size).repeat(B, 1, 1, 1).transpose(1, 2) |
|
av = av.view(1, aT, self.n_head, head_size).repeat(B, 1, 1, 1).transpose(1, 2) |
|
|
|
amask = torch.ones(q.shape[-2], ak.shape[-2], dtype=torch.bool, device=x.device) |
|
ay = F.scaled_dot_product_attention(q, ak, av, attn_mask=amask, dropout_p=0.0, is_causal=False) |
|
y = y + self.gating_factor * ay |
|
|
|
y = y.transpose(1, 2).contiguous().view(B, T, C) |
|
|
|
|
|
y = self.c_proj(y) |
|
|
|
return y |
|
|
|
|
|
class Block(nn.Module): |
|
"""The implementation is identical to `lit_llama.model.Block` with the exception that |
|
we replace the attention layer where adaption is implemented.""" |
|
|
|
def __init__(self, config: LLaMAConfig, block_idx: int) -> None: |
|
super().__init__() |
|
self.rms_1 = RMSNorm(config.n_embd) |
|
self.attn = CausalSelfAttention(config, block_idx) |
|
self.rms_2 = RMSNorm(config.n_embd) |
|
self.mlp = MLP(config) |
|
|
|
def forward(self, x: torch.Tensor) -> torch.Tensor: |
|
x = x + self.attn(self.rms_1(x)) |
|
x = x + self.mlp(self.rms_2(x)) |
|
return x |
|
|
|
|
|
class LLaMA(llama.LLaMA): |
|
"""The implementation is identical to `lit_llama.model.LLaMA` with the exception that |
|
the `Block` saves the layer index and passes it down to the attention layer.""" |
|
|
|
def __init__(self, config: LLaMAConfig) -> None: |
|
nn.Module.__init__(self) |
|
assert config.vocab_size is not None |
|
assert config.block_size is not None |
|
self.config = config |
|
|
|
self.lm_head = nn.Linear(config.n_embd, config.vocab_size, bias=False) |
|
self.transformer = nn.ModuleDict( |
|
dict( |
|
wte=nn.Embedding(config.vocab_size, config.n_embd), |
|
h=nn.ModuleList([Block(config, i) for i in range(config.n_layer)]), |
|
ln_f=RMSNorm(config.n_embd), |
|
) |
|
) |
|
|
|
@classmethod |
|
def from_name(cls, name: str): |
|
return cls(LLaMAConfig.from_name(name)) |
|
|
|
|
|
def mark_only_adapter_as_trainable(model: LLaMA) -> None: |
|
"""Sets `requires_grad=False` for all non-adapter weights.""" |
|
for name, param in model.named_parameters(): |
|
param.requires_grad = "adapter_wte" in name or "gating_factor" in name |
|
|
|
|
|
def adapter_state_from_state_dict(state_dict: dict) -> dict: |
|
"""Returns the model state dict with only the adapter weights for saving.""" |
|
return {name: param for name, param in state_dict.items() if "adapter_wte" in name or "gating_factor" in name} |
|
|