|
import torch |
|
|
|
def precompute_freqs_cis(head_dim: int, max_seq_len: int, theta: float = 10000.0): |
|
|
|
freq_seq = torch.arange(0, head_dim, 2).float() / head_dim |
|
freqs = 1.0 / (theta ** freq_seq) |
|
|
|
|
|
t = torch.arange(max_seq_len, dtype=torch.float32) |
|
angles = torch.outer(t, freqs) |
|
|
|
|
|
freqs_cis = torch.polar( |
|
torch.ones_like(angles), |
|
angles |
|
) |
|
return freqs_cis |
|
|
|
|
|
def reshape_for_broadcast(freqs_cis: torch.Tensor, x: torch.Tensor): |
|
""" |
|
x is [B, n_heads, seq_len, head_dim_as_complex], |
|
so we want to broadcast freqs_cis from [max_seq_len, half_dim] |
|
to [1, 1, seq_len, half_dim]. |
|
""" |
|
seq_len = x.shape[2] |
|
freqs_cis = freqs_cis[:seq_len] |
|
return freqs_cis.view(1, 1, seq_len, -1) |
|
|
|
|
|
def apply_rotary_emb( |
|
xq: torch.Tensor, |
|
xk: torch.Tensor, |
|
freqs_cis: torch.Tensor, |
|
) -> tuple[torch.Tensor, torch.Tensor]: |
|
|
|
|
|
xq_complex = torch.view_as_complex(xq.float().reshape(*xq.shape[:-1], -1, 2)) |
|
xk_complex = torch.view_as_complex(xk.float().reshape(*xk.shape[:-1], -1, 2)) |
|
|
|
|
|
freqs_cis = reshape_for_broadcast(freqs_cis, xq_complex) |
|
|
|
|
|
xq_complex = xq_complex * freqs_cis |
|
xk_complex = xk_complex * freqs_cis |
|
|
|
|
|
xq_out = torch.view_as_real(xq_complex).reshape(*xq.shape) |
|
xk_out = torch.view_as_real(xk_complex).reshape(*xk.shape) |
|
return xq_out.type_as(xq), xk_out.type_as(xk) |
|
|
|
|
|
def main(): |
|
import math |
|
from torch.testing import assert_close |
|
|
|
|
|
dim = 2 |
|
freqs_cis = precompute_freqs_cis(dim=dim, max_seq_len=1, theta=1.0) |
|
xq = torch.tensor([[[[1.0, 0.0]]]]) |
|
xq_out, _ = apply_rotary_emb(xq, xq.clone(), freqs_cis) |
|
assert_close(xq_out, xq, msg="Test 1 failed") |
|
print("Test 1 passed.") |
|
|
|
|
|
L = 5 |
|
freqs_cis = precompute_freqs_cis(dim=dim, max_seq_len=L, theta=1.0) |
|
xq = torch.tensor([[[[1.0, 0.0] for _ in range(L)]]]) |
|
xq_out, _ = apply_rotary_emb(xq, xq.clone(), freqs_cis) |
|
expected = torch.tensor([[[[math.cos(p), math.sin(p)] for p in range(L)]]]) |
|
assert_close(xq_out, expected, rtol=1e-6, atol=1e-6, msg="Test 2 failed") |
|
print("Test 2 passed.") |
|
|
|
|
|
xq = torch.tensor([[[[1.0, 0.0, 1.0, 0.0]]]]) |
|
freqs_cis = precompute_freqs_cis(dim=4, max_seq_len=1, theta=1.0) |
|
xq_out, _ = apply_rotary_emb(xq, xq.clone(), freqs_cis) |
|
assert_close(xq_out, xq, msg="Test 3 failed") |
|
print("Test 3 passed.") |
|
|
|
|
|
torch.manual_seed(1337) |
|
B, H, L, D = 2, 3, 5, 8 |
|
xq = torch.randn(B, H, L, D) |
|
xk = torch.randn(B, H, L, D) |
|
freqs_cis = precompute_freqs_cis(dim=D, max_seq_len=L, theta=1.0) |
|
xq_out, xk_out = apply_rotary_emb(xq, xk, freqs_cis) |
|
assert xq_out.shape == (B, H, L, D), "Test 4 Q shape failed" |
|
assert xk_out.shape == (B, H, L, D), "Test 4 K shape failed" |
|
for b in range(B): |
|
for h in range(H): |
|
for l in range(L): |
|
assert torch.allclose(xq[b,h,l].norm(), xq_out[b,h,l].norm(), atol=1e-5), "Test 4 Q norm failed" |
|
assert torch.allclose(xk[b,h,l].norm(), xk_out[b,h,l].norm(), atol=1e-5), "Test 4 K norm failed" |
|
print("Test 4 passed.\nAll tests passed successfully!") |
|
|
|
if __name__ == "__main__": |
|
main() |