Transformer_500M / rotary_emb.py
yagizdevre's picture
transformer new
365f542
import torch
def precompute_freqs_cis(head_dim: int, max_seq_len: int, theta: float = 10000.0):
# For half the dimensions, build the scale factor:
freq_seq = torch.arange(0, head_dim, 2).float() / head_dim
freqs = 1.0 / (theta ** freq_seq)
# Outer product with positions
t = torch.arange(max_seq_len, dtype=torch.float32)
angles = torch.outer(t, freqs)
# Build a complex exponential e^{i * theta}
freqs_cis = torch.polar(
torch.ones_like(angles),
angles
)
return freqs_cis
def reshape_for_broadcast(freqs_cis: torch.Tensor, x: torch.Tensor):
"""
x is [B, n_heads, seq_len, head_dim_as_complex],
so we want to broadcast freqs_cis from [max_seq_len, half_dim]
to [1, 1, seq_len, half_dim].
"""
seq_len = x.shape[2]
freqs_cis = freqs_cis[:seq_len] # slice down to current seq_len
return freqs_cis.view(1, 1, seq_len, -1)
def apply_rotary_emb(
xq: torch.Tensor,
xk: torch.Tensor,
freqs_cis: torch.Tensor,
) -> tuple[torch.Tensor, torch.Tensor]:
# Convert real -> complex by grouping last dim in pairs
# shape => [B, n_heads, seq_len, head_dim//2, 2] => complex => [B, n_heads, seq_len, head_dim//2]
xq_complex = torch.view_as_complex(xq.float().reshape(*xq.shape[:-1], -1, 2))
xk_complex = torch.view_as_complex(xk.float().reshape(*xk.shape[:-1], -1, 2))
# Broadcast the frequencies to match [B, n_heads, seq_len, head_dim//2]
freqs_cis = reshape_for_broadcast(freqs_cis, xq_complex)
# Multiply => apply rotation
xq_complex = xq_complex * freqs_cis
xk_complex = xk_complex * freqs_cis
# Convert back to real => shape [B, n_heads, seq_len, head_dim]
xq_out = torch.view_as_real(xq_complex).reshape(*xq.shape)
xk_out = torch.view_as_real(xk_complex).reshape(*xk.shape)
return xq_out.type_as(xq), xk_out.type_as(xk)
def main():
import math
from torch.testing import assert_close
# Test 1: No rotation at position 0
dim = 2
freqs_cis = precompute_freqs_cis(dim=dim, max_seq_len=1, theta=1.0)
xq = torch.tensor([[[[1.0, 0.0]]]])
xq_out, _ = apply_rotary_emb(xq, xq.clone(), freqs_cis)
assert_close(xq_out, xq, msg="Test 1 failed")
print("Test 1 passed.")
# Test 2: Verify rotation at positions [0..4] in 2D
L = 5
freqs_cis = precompute_freqs_cis(dim=dim, max_seq_len=L, theta=1.0)
xq = torch.tensor([[[[1.0, 0.0] for _ in range(L)]]])
xq_out, _ = apply_rotary_emb(xq, xq.clone(), freqs_cis)
expected = torch.tensor([[[[math.cos(p), math.sin(p)] for p in range(L)]]])
assert_close(xq_out, expected, rtol=1e-6, atol=1e-6, msg="Test 2 failed")
print("Test 2 passed.")
# Test 3: Higher dimension at position 0
xq = torch.tensor([[[[1.0, 0.0, 1.0, 0.0]]]])
freqs_cis = precompute_freqs_cis(dim=4, max_seq_len=1, theta=1.0)
xq_out, _ = apply_rotary_emb(xq, xq.clone(), freqs_cis)
assert_close(xq_out, xq, msg="Test 3 failed")
print("Test 3 passed.")
# Test 4: Random shape & norm checks
torch.manual_seed(1337)
B, H, L, D = 2, 3, 5, 8
xq = torch.randn(B, H, L, D)
xk = torch.randn(B, H, L, D)
freqs_cis = precompute_freqs_cis(dim=D, max_seq_len=L, theta=1.0)
xq_out, xk_out = apply_rotary_emb(xq, xk, freqs_cis)
assert xq_out.shape == (B, H, L, D), "Test 4 Q shape failed"
assert xk_out.shape == (B, H, L, D), "Test 4 K shape failed"
for b in range(B):
for h in range(H):
for l in range(L):
assert torch.allclose(xq[b,h,l].norm(), xq_out[b,h,l].norm(), atol=1e-5), "Test 4 Q norm failed"
assert torch.allclose(xk[b,h,l].norm(), xk_out[b,h,l].norm(), atol=1e-5), "Test 4 K norm failed"
print("Test 4 passed.\nAll tests passed successfully!")
if __name__ == "__main__":
main()