File size: 3,829 Bytes
365f542
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
import torch

def precompute_freqs_cis(head_dim: int, max_seq_len: int, theta: float = 10000.0):    
    # For half the dimensions, build the scale factor:
    freq_seq = torch.arange(0, head_dim, 2).float() / head_dim
    freqs = 1.0 / (theta ** freq_seq)

    # Outer product with positions
    t = torch.arange(max_seq_len, dtype=torch.float32)
    angles = torch.outer(t, freqs)
    
    # Build a complex exponential e^{i * theta}
    freqs_cis = torch.polar(
        torch.ones_like(angles),
        angles
    )
    return freqs_cis


def reshape_for_broadcast(freqs_cis: torch.Tensor, x: torch.Tensor):
    """
    x is [B, n_heads, seq_len, head_dim_as_complex],
    so we want to broadcast freqs_cis from [max_seq_len, half_dim]
    to [1, 1, seq_len, half_dim].
    """
    seq_len = x.shape[2]
    freqs_cis = freqs_cis[:seq_len]  # slice down to current seq_len
    return freqs_cis.view(1, 1, seq_len, -1)


def apply_rotary_emb(
    xq: torch.Tensor,
    xk: torch.Tensor,
    freqs_cis: torch.Tensor,
) -> tuple[torch.Tensor, torch.Tensor]:
    # Convert real -> complex by grouping last dim in pairs
    # shape => [B, n_heads, seq_len, head_dim//2, 2] => complex => [B, n_heads, seq_len, head_dim//2]
    xq_complex = torch.view_as_complex(xq.float().reshape(*xq.shape[:-1], -1, 2))
    xk_complex = torch.view_as_complex(xk.float().reshape(*xk.shape[:-1], -1, 2))

    # Broadcast the frequencies to match [B, n_heads, seq_len, head_dim//2]
    freqs_cis = reshape_for_broadcast(freqs_cis, xq_complex)

    # Multiply => apply rotation
    xq_complex = xq_complex * freqs_cis
    xk_complex = xk_complex * freqs_cis

    # Convert back to real => shape [B, n_heads, seq_len, head_dim]
    xq_out = torch.view_as_real(xq_complex).reshape(*xq.shape)
    xk_out = torch.view_as_real(xk_complex).reshape(*xk.shape)
    return xq_out.type_as(xq), xk_out.type_as(xk)


def main():
    import math
    from torch.testing import assert_close

    # Test 1: No rotation at position 0
    dim = 2
    freqs_cis = precompute_freqs_cis(dim=dim, max_seq_len=1, theta=1.0)
    xq = torch.tensor([[[[1.0, 0.0]]]])
    xq_out, _ = apply_rotary_emb(xq, xq.clone(), freqs_cis)
    assert_close(xq_out, xq, msg="Test 1 failed")
    print("Test 1 passed.")

    # Test 2: Verify rotation at positions [0..4] in 2D
    L = 5
    freqs_cis = precompute_freqs_cis(dim=dim, max_seq_len=L, theta=1.0)
    xq = torch.tensor([[[[1.0, 0.0] for _ in range(L)]]])
    xq_out, _ = apply_rotary_emb(xq, xq.clone(), freqs_cis)
    expected = torch.tensor([[[[math.cos(p), math.sin(p)] for p in range(L)]]])
    assert_close(xq_out, expected, rtol=1e-6, atol=1e-6, msg="Test 2 failed")
    print("Test 2 passed.")

    # Test 3: Higher dimension at position 0
    xq = torch.tensor([[[[1.0, 0.0, 1.0, 0.0]]]])
    freqs_cis = precompute_freqs_cis(dim=4, max_seq_len=1, theta=1.0)
    xq_out, _ = apply_rotary_emb(xq, xq.clone(), freqs_cis)
    assert_close(xq_out, xq, msg="Test 3 failed")
    print("Test 3 passed.")

    # Test 4: Random shape & norm checks
    torch.manual_seed(1337)
    B, H, L, D = 2, 3, 5, 8
    xq = torch.randn(B, H, L, D)
    xk = torch.randn(B, H, L, D)
    freqs_cis = precompute_freqs_cis(dim=D, max_seq_len=L, theta=1.0)
    xq_out, xk_out = apply_rotary_emb(xq, xk, freqs_cis)
    assert xq_out.shape == (B, H, L, D), "Test 4 Q shape failed"
    assert xk_out.shape == (B, H, L, D), "Test 4 K shape failed"
    for b in range(B):
        for h in range(H):
            for l in range(L):
                assert torch.allclose(xq[b,h,l].norm(), xq_out[b,h,l].norm(), atol=1e-5), "Test 4 Q norm failed"
                assert torch.allclose(xk[b,h,l].norm(), xk_out[b,h,l].norm(), atol=1e-5), "Test 4 K norm failed"
    print("Test 4 passed.\nAll tests passed successfully!")

if __name__ == "__main__":
    main()