import torch def precompute_freqs_cis(head_dim: int, max_seq_len: int, theta: float = 10000.0): # For half the dimensions, build the scale factor: freq_seq = torch.arange(0, head_dim, 2).float() / head_dim freqs = 1.0 / (theta ** freq_seq) # Outer product with positions t = torch.arange(max_seq_len, dtype=torch.float32) angles = torch.outer(t, freqs) # Build a complex exponential e^{i * theta} freqs_cis = torch.polar( torch.ones_like(angles), angles ) return freqs_cis def reshape_for_broadcast(freqs_cis: torch.Tensor, x: torch.Tensor): """ x is [B, n_heads, seq_len, head_dim_as_complex], so we want to broadcast freqs_cis from [max_seq_len, half_dim] to [1, 1, seq_len, half_dim]. """ seq_len = x.shape[2] freqs_cis = freqs_cis[:seq_len] # slice down to current seq_len return freqs_cis.view(1, 1, seq_len, -1) def apply_rotary_emb( xq: torch.Tensor, xk: torch.Tensor, freqs_cis: torch.Tensor, ) -> tuple[torch.Tensor, torch.Tensor]: # Convert real -> complex by grouping last dim in pairs # shape => [B, n_heads, seq_len, head_dim//2, 2] => complex => [B, n_heads, seq_len, head_dim//2] xq_complex = torch.view_as_complex(xq.float().reshape(*xq.shape[:-1], -1, 2)) xk_complex = torch.view_as_complex(xk.float().reshape(*xk.shape[:-1], -1, 2)) # Broadcast the frequencies to match [B, n_heads, seq_len, head_dim//2] freqs_cis = reshape_for_broadcast(freqs_cis, xq_complex) # Multiply => apply rotation xq_complex = xq_complex * freqs_cis xk_complex = xk_complex * freqs_cis # Convert back to real => shape [B, n_heads, seq_len, head_dim] xq_out = torch.view_as_real(xq_complex).reshape(*xq.shape) xk_out = torch.view_as_real(xk_complex).reshape(*xk.shape) return xq_out.type_as(xq), xk_out.type_as(xk) def main(): import math from torch.testing import assert_close # Test 1: No rotation at position 0 dim = 2 freqs_cis = precompute_freqs_cis(dim=dim, max_seq_len=1, theta=1.0) xq = torch.tensor([[[[1.0, 0.0]]]]) xq_out, _ = apply_rotary_emb(xq, xq.clone(), freqs_cis) assert_close(xq_out, xq, msg="Test 1 failed") print("Test 1 passed.") # Test 2: Verify rotation at positions [0..4] in 2D L = 5 freqs_cis = precompute_freqs_cis(dim=dim, max_seq_len=L, theta=1.0) xq = torch.tensor([[[[1.0, 0.0] for _ in range(L)]]]) xq_out, _ = apply_rotary_emb(xq, xq.clone(), freqs_cis) expected = torch.tensor([[[[math.cos(p), math.sin(p)] for p in range(L)]]]) assert_close(xq_out, expected, rtol=1e-6, atol=1e-6, msg="Test 2 failed") print("Test 2 passed.") # Test 3: Higher dimension at position 0 xq = torch.tensor([[[[1.0, 0.0, 1.0, 0.0]]]]) freqs_cis = precompute_freqs_cis(dim=4, max_seq_len=1, theta=1.0) xq_out, _ = apply_rotary_emb(xq, xq.clone(), freqs_cis) assert_close(xq_out, xq, msg="Test 3 failed") print("Test 3 passed.") # Test 4: Random shape & norm checks torch.manual_seed(1337) B, H, L, D = 2, 3, 5, 8 xq = torch.randn(B, H, L, D) xk = torch.randn(B, H, L, D) freqs_cis = precompute_freqs_cis(dim=D, max_seq_len=L, theta=1.0) xq_out, xk_out = apply_rotary_emb(xq, xk, freqs_cis) assert xq_out.shape == (B, H, L, D), "Test 4 Q shape failed" assert xk_out.shape == (B, H, L, D), "Test 4 K shape failed" for b in range(B): for h in range(H): for l in range(L): assert torch.allclose(xq[b,h,l].norm(), xq_out[b,h,l].norm(), atol=1e-5), "Test 4 Q norm failed" assert torch.allclose(xk[b,h,l].norm(), xk_out[b,h,l].norm(), atol=1e-5), "Test 4 K norm failed" print("Test 4 passed.\nAll tests passed successfully!") if __name__ == "__main__": main()