两类题:A 类是 AI/ML 专属(Attention、LayerNorm 反传、NMS、KMeans、LoRA、GRPO advantage 等),B 类是 AI 面试官偏爱的 LeetCode 子集(LRU、滑窗、Top-K、蓄水池、字典树等)。所有代码都可跑,shape 都标清楚。
A. AI/ML 专属手撕
B. LeetCode AI 子集
这类题是 AI 岗区别于普通 SDE 的核心。面试官要看的是:你能不能用 numpy/pytorch 把模型里最常见的那几块 block 徒手复现,并且在被追问"为什么减 max"、"为什么要 sqrt(d)"、"LayerNorm 反传里那个 mean 梯度项怎么来的"时能答上来。
给定 Q、K、V 和可选 mask,用 numpy 实现 softmax(QK^T/√d) V。
输入:Q shape [B, Lq, d],K shape [B, Lk, d],V shape [B, Lk, dv],mask shape [B, Lq, Lk](True 处被屏蔽)。输出 shape [B, Lq, dv]。
-inf,softmax 之后自然变 0import numpy as np
def softmax(x, axis=-1):
x = x - np.max(x, axis=axis, keepdims=True)
e = np.exp(x)
return e / np.sum(e, axis=axis, keepdims=True)
def scaled_dot_product_attention(Q, K, V, mask=None):
"""
Q: [B, Lq, d]
K: [B, Lk, d]
V: [B, Lk, dv]
mask: [B, Lq, Lk] bool, True 处屏蔽 (例如 padding / causal)
return: [B, Lq, dv], attn [B, Lq, Lk]
"""
d = Q.shape[-1]
scores = np.matmul(Q, np.swapaxes(K, -1, -2)) / np.sqrt(d) # [B, Lq, Lk]
if mask is not None:
scores = np.where(mask, -1e9, scores)
attn = softmax(scores, axis=-1) # [B, Lq, Lk]
out = np.matmul(attn, V) # [B, Lq, dv]
return out, attn
if __name__ == "__main__":
B, Lq, Lk, d, dv = 2, 3, 4, 8, 8
Q = np.random.randn(B, Lq, d)
K = np.random.randn(B, Lk, d)
V = np.random.randn(B, Lk, dv)
causal = np.triu(np.ones((Lq, Lk), dtype=bool), k=1)[None].repeat(B, axis=0)
out, attn = scaled_dot_product_attention(Q, K, V, mask=causal)
print(out.shape, attn.shape, attn[0].sum(-1)) # 每行应为 1
时间 O(B · Lq · Lk · max(d, dv)),空间 O(B · Lq · Lk)(attn 矩阵)。FlashAttention 的核心就是不显式建出这个 Lq × Lk 矩阵。
-1e9 还是 -inf?—— fp32 下都行,fp16 下用 -1e4 之类,否则会上溢 NaN。[B, 1, Lk](逐 batch),causal 是 [1, Lq, Lk](逐 query)。两者 OR 起来广播。用 pytorch 写一个 nn.Module 版的 MHA,支持 causal mask。
输入 x shape [B, L, D],num_heads = h,head_dim = D / h。实现投影、split head、attention、concat、output projection 的整套流程。
import torch
import torch.nn as nn
import torch.nn.functional as F
class MultiHeadAttention(nn.Module):
def __init__(self, d_model, num_heads, dropout=0.0):
super().__init__()
assert d_model % num_heads == 0
self.h = num_heads
self.d_h = d_model // num_heads
self.qkv = nn.Linear(d_model, 3 * d_model, bias=False)
self.o = nn.Linear(d_model, d_model, bias=False)
self.drop = nn.Dropout(dropout)
def forward(self, x, mask=None):
# x: [B, L, D]
B, L, D = x.shape
qkv = self.qkv(x) # [B, L, 3D]
qkv = qkv.reshape(B, L, 3, self.h, self.d_h)
qkv = qkv.permute(2, 0, 3, 1, 4) # [3, B, h, L, d_h]
q, k, v = qkv[0], qkv[1], qkv[2]
scores = (q @ k.transpose(-1, -2)) / (self.d_h ** 0.5) # [B, h, L, L]
if mask is not None:
# mask: [B, 1, L, L] or [1, 1, L, L], True = 屏蔽
scores = scores.masked_fill(mask, float('-inf'))
attn = F.softmax(scores, dim=-1)
attn = self.drop(attn)
out = attn @ v # [B, h, L, d_h]
out = out.transpose(1, 2).reshape(B, L, D) # [B, L, D]
return self.o(out)
def causal_mask(L, device):
return torch.triu(torch.ones(L, L, dtype=torch.bool, device=device), diagonal=1)
if __name__ == "__main__":
mha = MultiHeadAttention(d_model=64, num_heads=8)
x = torch.randn(2, 10, 64)
m = causal_mask(10, x.device)[None, None] # [1, 1, L, L]
y = mha(x, mask=m)
print(y.shape) # torch.Size([2, 10, 64])
时间 O(B · h · L^2 · d_h) = O(B · L^2 · D),空间同量级。KV cache 推理时把 K/V 缓存下来,每步只算一行 Q。
nn.Linear(D, 3D) 一次算完,计算利用率更高。[B, h, L, d_h] 而不是 [B, L, h, d_h]?—— attention 沿 L 维做,batched matmul 要求 batch 维在前;h 维和 B 一起看作 batch。k, v append 到历史 cache 后面,q 只用当前 token;mask 自然退化成全 1。h_kv < h,attention 前把 K、V 沿 head 维 repeat。只用 numpy 实现 LayerNorm,包括反向传播对 x、gamma、beta 的梯度。最容易翻车的题之一。
输入 x shape [N, D],沿最后一维做归一化。输出 y = gamma * (x - μ) / sqrt(σ² + ε) + beta。给定 dy,手算 dx, dgamma, dbeta。
x_hat, mu, vardμ/dx 的链、dσ/dx 的链,不能只写直接项,这是面试官最爱挖的坑dx = (gamma/σ) · (dy - mean(dy) - x_hat · mean(dy·x_hat))(沿归一维求均值)import numpy as np
def layernorm_forward(x, gamma, beta, eps=1e-5):
"""
x: [N, D]
gamma: [D]
beta: [D]
"""
mu = x.mean(axis=-1, keepdims=True) # [N, 1]
var = x.var(axis=-1, keepdims=True) # [N, 1]
std = np.sqrt(var + eps) # [N, 1]
x_hat = (x - mu) / std # [N, D]
y = gamma * x_hat + beta # [N, D]
cache = (x_hat, gamma, std)
return y, cache
def layernorm_backward(dy, cache):
"""
dy: [N, D]
return dx [N,D], dgamma [D], dbeta [D]
"""
x_hat, gamma, std = cache
N, D = dy.shape
dbeta = dy.sum(axis=0) # [D]
dgamma = (dy * x_hat).sum(axis=0) # [D]
# dL/d x_hat
dx_hat = dy * gamma # [N, D]
# 沿 D 维的两个均值项 —— 面试重点
mean_dxh = dx_hat.mean(axis=-1, keepdims=True) # [N,1]
mean_dxh_xhat = (dx_hat * x_hat).mean(axis=-1, keepdims=True) # [N,1]
dx = (dx_hat - mean_dxh - x_hat * mean_dxh_xhat) / std
return dx, dgamma, dbeta
# 数值梯度检验
if __name__ == "__main__":
np.random.seed(0)
N, D = 4, 5
x = np.random.randn(N, D)
gamma = np.random.randn(D)
beta = np.random.randn(D)
y, cache = layernorm_forward(x, gamma, beta)
dy = np.random.randn(*y.shape)
dx, dgamma, dbeta = layernorm_backward(dy, cache)
# 数值梯度
def f(xx):
yy, _ = layernorm_forward(xx, gamma, beta)
return (yy * dy).sum()
h = 1e-5
dx_num = np.zeros_like(x)
for i in range(N):
for j in range(D):
x[i,j] += h; fp = f(x)
x[i,j] -= 2*h; fm = f(x)
x[i,j] += h
dx_num[i,j] = (fp - fm) / (2*h)
print("dx rel err:", np.abs(dx - dx_num).max() / (np.abs(dx_num).max() + 1e-9))
forward / backward 都是 O(N · D)。
mean_dxh、mean_dxh_xhat 两项都要减?—— 因为 μ 和 σ 都是 x 的函数,x 动一下整行的 μ, σ 都动,反向传播链规则会带出这两项。N 求;LN 沿特征维归一,均值项沿 D 求。公式形式一样,只是哪一维做 mean 换了。BN 的 forward 区分训练和推理两种模式,推理时用 running stats。
输入 x shape [N, D]。训练阶段用当前 batch 的均值方差并更新 running stats;推理阶段直接用 running stats。
import numpy as np
class BatchNorm1d:
def __init__(self, D, momentum=0.1, eps=1e-5):
self.gamma = np.ones(D)
self.beta = np.zeros(D)
self.running_mean = np.zeros(D)
self.running_var = np.ones(D)
self.momentum = momentum
self.eps = eps
def __call__(self, x, training=True):
# x: [N, D]
if training:
mu = x.mean(axis=0) # [D]
var = x.var(axis=0) # [D]
x_hat = (x - mu) / np.sqrt(var + self.eps)
# EMA 更新 (pytorch 风格 momentum: new = (1-m)*old + m*batch)
self.running_mean = (1 - self.momentum) * self.running_mean + self.momentum * mu
self.running_var = (1 - self.momentum) * self.running_var + self.momentum * var
else:
x_hat = (x - self.running_mean) / np.sqrt(self.running_var + self.eps)
return self.gamma * x_hat + self.beta
if __name__ == "__main__":
bn = BatchNorm1d(5)
for _ in range(100):
x = np.random.randn(32, 5) * 2 + 3
bn(x, training=True)
x_test = np.random.randn(4, 5) * 2 + 3
y = bn(x_test, training=False)
print("running_mean:", bn.running_mean) # 应接近 3
print("running_var:", bn.running_var) # 应接近 4
O(N · D)。
[N, C, H, W],沿 N, H, W 求 mean/var,每个通道一个 (γ, β)。目标检测后处理。给一堆框和分数,去掉和高分框 IoU 过大的低分框。
输入 boxes shape [N, 4](x1, y1, x2, y2),scores shape [N],阈值 iou_thresh。输出保留的下标列表。
import numpy as np
def nms(boxes, scores, iou_thresh=0.5):
"""
boxes: [N, 4] (x1, y1, x2, y2)
scores: [N]
return: 保留的 indices (python list)
"""
x1, y1, x2, y2 = boxes[:,0], boxes[:,1], boxes[:,2], boxes[:,3]
areas = (x2 - x1) * (y2 - y1) # [N]
order = scores.argsort()[::-1] # 分数从大到小
keep = []
while order.size > 0:
i = order[0]
keep.append(int(i))
if order.size == 1:
break
rest = order[1:]
# 相交矩形
xx1 = np.maximum(x1[i], x1[rest])
yy1 = np.maximum(y1[i], y1[rest])
xx2 = np.minimum(x2[i], x2[rest])
yy2 = np.minimum(y2[i], y2[rest])
w = np.maximum(0.0, xx2 - xx1)
h = np.maximum(0.0, yy2 - yy1)
inter = w * h
iou = inter / (areas[i] + areas[rest] - inter + 1e-9)
order = rest[iou <= iou_thresh] # 留下 IoU 不高的
return keep
if __name__ == "__main__":
boxes = np.array([
[10, 10, 50, 50],
[12, 12, 52, 52], # 和第 0 个高度重合
[100, 100, 150, 150],
[105, 105, 155, 155], # 和第 2 个高度重合
], dtype=np.float32)
scores = np.array([0.9, 0.8, 0.95, 0.7])
print(nms(boxes, scores, 0.5)) # [2, 0]
最坏 O(N^2)。GPU 上有 soft-nms、matrix nms 的并行版本。
十行内的题,但要能说清"为什么减 max"。
输入任意 shape 的 x 和 axis,返回 softmax,不能因 exp 上溢出 inf。
c = max(x),所有 exp 的参数都 ≤ 0,不会上溢import numpy as np
def softmax(x, axis=-1):
x_max = np.max(x, axis=axis, keepdims=True)
e = np.exp(x - x_max)
return e / np.sum(e, axis=axis, keepdims=True)
def log_softmax(x, axis=-1):
x_max = np.max(x, axis=axis, keepdims=True)
shifted = x - x_max
return shifted - np.log(np.sum(np.exp(shifted), axis=axis, keepdims=True))
if __name__ == "__main__":
x = np.array([1000., 1001., 1002.])
print(np.exp(x) / np.exp(x).sum()) # nan
print(softmax(x)) # [0.09, 0.244, 0.665]
print(log_softmax(x)) # finite
O(n)。
log(softmax(x))?—— softmax 算完再 log 会丢精度,log(sum(exp)) 用 LSE 合并,既稳又准。log(sum(exp)) 来说不重要(最大那一项恒等于 0,不会下溢到 0)。手写一版可选 label smoothing 的 CE,再把对 logits 的梯度一起给出来。
输入 logits shape [N, C]、targets shape [N](类别下标)。smoothing = ε:真实类概率变成 1 - ε,其他类平均分 ε/(C-1)。
import numpy as np
def log_softmax(x, axis=-1):
m = np.max(x, axis=axis, keepdims=True)
s = x - m
return s - np.log(np.exp(s).sum(axis=axis, keepdims=True))
def cross_entropy(logits, targets, smoothing=0.0):
"""
logits: [N, C]
targets: [N] int
return: (loss: scalar, dlogits: [N, C])
"""
N, C = logits.shape
logp = log_softmax(logits, axis=-1) # [N, C]
p = np.exp(logp)
# 构造平滑 target 分布
if smoothing > 0:
q = np.full((N, C), smoothing / (C - 1))
q[np.arange(N), targets] = 1.0 - smoothing
else:
q = np.zeros((N, C))
q[np.arange(N), targets] = 1.0
loss = -(q * logp).sum() / N
dlogits = (p - q) / N # 对 logits 的梯度
return loss, dlogits
if __name__ == "__main__":
np.random.seed(0)
logits = np.random.randn(4, 5)
targets = np.array([0, 2, 1, 4])
loss, g = cross_entropy(logits, targets, smoothing=0.1)
print(loss, g.shape)
O(N · C)。
CE(logits, target) = NLL(log_softmax(logits), target)。pytorch 的 CrossEntropyLoss 吃 logits,NLLLoss 吃 log_probs,一步步来的。完整的 forward、loss、grad、update 四件套,是所有手撕训练循环的 hello world。
给 X shape [N, D],y shape [N] ∈ {0, 1},用 SGD + mini-batch 训一个二分类 LR。
np.logaddexp(0, -z),z 是 logitimport numpy as np
def sigmoid(z):
# 稳定版
return np.where(z >= 0, 1/(1+np.exp(-z)), np.exp(z)/(1+np.exp(z)))
def bce_with_logits(z, y):
# numerically stable: log(1+exp(-|z|)) + max(z,0) - z*y
return np.mean(np.logaddexp(0, -np.abs(z)) + np.maximum(z, 0) - z * y)
def fit_logreg(X, y, lr=0.1, epochs=50, batch_size=32, seed=0):
rng = np.random.default_rng(seed)
N, D = X.shape
w = np.zeros(D)
b = 0.0
for ep in range(epochs):
idx = rng.permutation(N)
for s in range(0, N, batch_size):
bi = idx[s:s+batch_size]
xb, yb = X[bi], y[bi]
z = xb @ w + b # [B]
p = sigmoid(z)
grad_w = xb.T @ (p - yb) / len(bi) # [D]
grad_b = (p - yb).mean()
w -= lr * grad_w
b -= lr * grad_b
if ep % 10 == 0:
z_all = X @ w + b
print(f"epoch {ep} loss={bce_with_logits(z_all, y):.4f}")
return w, b
if __name__ == "__main__":
rng = np.random.default_rng(0)
X = rng.standard_normal((200, 3))
true_w = np.array([1.5, -2.0, 0.7])
y = (X @ true_w + 0.3 + rng.standard_normal(200) * 0.1 > 0).astype(np.float64)
w, b = fit_logreg(X, y, lr=0.2, epochs=30)
print("w =", w, " b =", b)
每个 epoch O(N · D)。
grad_w += lam * w(偏置一般不正则)。无监督聚类的入门题,向量化实现。
给 X shape [N, D]、K,返回聚类中心和每个样本的标签。
||x - c||^2 = ||x||^2 + ||c||^2 - 2 x·cimport numpy as np
def kmeans(X, K, max_iter=100, tol=1e-4, seed=0):
rng = np.random.default_rng(seed)
N, D = X.shape
# kmeans++ 初始化
centers = np.empty((K, D))
centers[0] = X[rng.integers(N)]
for k in range(1, K):
d2 = ((X[:, None, :] - centers[:k][None]) ** 2).sum(-1).min(1) # [N]
probs = d2 / d2.sum()
centers[k] = X[rng.choice(N, p=probs)]
for it in range(max_iter):
# E 步:距离 [N, K]
d2 = ((X[:, None, :] - centers[None, :, :]) ** 2).sum(-1)
labels = d2.argmin(axis=1)
# M 步
new_centers = np.empty_like(centers)
for k in range(K):
mask = labels == k
if mask.any():
new_centers[k] = X[mask].mean(axis=0)
else:
new_centers[k] = X[rng.integers(N)] # 空簇重采样
shift = np.linalg.norm(new_centers - centers)
centers = new_centers
if shift < tol:
break
return centers, labels
if __name__ == "__main__":
rng = np.random.default_rng(0)
X = np.vstack([
rng.standard_normal((50, 2)) + np.array([0, 0]),
rng.standard_normal((50, 2)) + np.array([5, 5]),
rng.standard_normal((50, 2)) + np.array([0, 5]),
])
c, lab = kmeans(X, 3)
print(c)
每次迭代 O(N · K · D),总共跑 T 次迭代。
给一个冻结的 nn.Linear,在外面套一对低秩矩阵 A、B,实现 y = W x + (α/r) · B A x。
包装一个 base: nn.Linear(in, out),加入可训练参数 A: [r, in]、B: [out, r]。A 用 Kaiming 初始化,B 初始化为 0(保证训练开始时 LoRA 分支输出 0,不破坏原模型)。
base.weight 冻结,A, B 参与训练scaling = α / r 缩放W + (α/r) B A 合并到 base.weight,零额外延迟import math
import torch
import torch.nn as nn
import torch.nn.functional as F
class LoRALinear(nn.Module):
def __init__(self, base: nn.Linear, r: int = 8, alpha: float = 16.0, dropout: float = 0.0):
super().__init__()
self.base = base
for p in self.base.parameters():
p.requires_grad = False
in_f, out_f = base.in_features, base.out_features
self.r = r
self.scaling = alpha / r
self.A = nn.Parameter(torch.empty(r, in_f))
self.B = nn.Parameter(torch.zeros(out_f, r))
nn.init.kaiming_uniform_(self.A, a=math.sqrt(5))
self.drop = nn.Dropout(dropout) if dropout > 0 else nn.Identity()
def forward(self, x):
# x: [..., in_f]
y = self.base(x) # 冻结主干
lora = F.linear(self.drop(x), self.A) # [..., r]
lora = F.linear(lora, self.B) # [..., out_f]
return y + self.scaling * lora
@torch.no_grad()
def merge_(self):
"""把 LoRA 合进 base.weight,之后可以当普通 Linear 用。"""
delta = self.scaling * (self.B @ self.A) # [out, in]
self.base.weight.add_(delta)
if __name__ == "__main__":
base = nn.Linear(16, 32)
lora = LoRALinear(base, r=4, alpha=8)
x = torch.randn(2, 16)
# 初始化时 B=0,输出应等于 base(x)
assert torch.allclose(lora(x), base(x))
# 训几下 A/B 之后就不等了
lora.A.data.normal_(); lora.B.data.normal_()
y1 = lora(x)
lora.merge_()
y2 = base(x)
print((y1 - y2).abs().max()) # ~0
训练额外参数量 r · (in + out),对于 in = out = 4096, r = 8:65k vs 原 16M,节省 ~250×。
q_proj, v_proj 最常见,k_proj/o_proj、MLP 也可以,效果和参数量权衡。embedding + N×(pre-norm → attention → residual → pre-norm → mlp → residual) → final norm → lm_head。跑得通。
实现一个极简 GPT:vocab_size、n_layer、n_head、d_model、max_len,输入 token_ids shape [B, L],输出 logits shape [B, L, vocab]。causal self-attention。
import math
import torch
import torch.nn as nn
import torch.nn.functional as F
class CausalSelfAttention(nn.Module):
def __init__(self, d_model, n_head):
super().__init__()
assert d_model % n_head == 0
self.h = n_head
self.d_h = d_model // n_head
self.qkv = nn.Linear(d_model, 3 * d_model, bias=False)
self.o = nn.Linear(d_model, d_model, bias=False)
def forward(self, x):
B, L, D = x.shape
qkv = self.qkv(x).reshape(B, L, 3, self.h, self.d_h).permute(2, 0, 3, 1, 4)
q, k, v = qkv[0], qkv[1], qkv[2] # [B, h, L, d_h]
att = (q @ k.transpose(-1, -2)) / math.sqrt(self.d_h) # [B, h, L, L]
mask = torch.triu(torch.ones(L, L, device=x.device, dtype=torch.bool), diagonal=1)
att = att.masked_fill(mask, float('-inf'))
att = F.softmax(att, dim=-1)
out = (att @ v).transpose(1, 2).reshape(B, L, D)
return self.o(out)
class MLP(nn.Module):
def __init__(self, d_model, mult=4):
super().__init__()
self.fc1 = nn.Linear(d_model, mult * d_model)
self.fc2 = nn.Linear(mult * d_model, d_model)
def forward(self, x):
return self.fc2(F.gelu(self.fc1(x)))
class Block(nn.Module):
def __init__(self, d_model, n_head):
super().__init__()
self.ln1 = nn.LayerNorm(d_model)
self.attn = CausalSelfAttention(d_model, n_head)
self.ln2 = nn.LayerNorm(d_model)
self.mlp = MLP(d_model)
def forward(self, x):
x = x + self.attn(self.ln1(x))
x = x + self.mlp(self.ln2(x))
return x
class MiniGPT(nn.Module):
def __init__(self, vocab, d_model=128, n_layer=4, n_head=4, max_len=256):
super().__init__()
self.tok = nn.Embedding(vocab, d_model)
self.pos = nn.Embedding(max_len, d_model)
self.blocks = nn.ModuleList([Block(d_model, n_head) for _ in range(n_layer)])
self.ln_f = nn.LayerNorm(d_model)
self.lm_head = nn.Linear(d_model, vocab, bias=False)
# weight tying
self.lm_head.weight = self.tok.weight
def forward(self, ids):
# ids: [B, L]
B, L = ids.shape
pos = torch.arange(L, device=ids.device)
x = self.tok(ids) + self.pos(pos)[None] # [B, L, D]
for blk in self.blocks:
x = blk(x)
x = self.ln_f(x)
return self.lm_head(x) # [B, L, vocab]
if __name__ == "__main__":
m = MiniGPT(vocab=100, d_model=64, n_layer=2, n_head=4, max_len=16)
ids = torch.randint(0, 100, (2, 12))
logits = m(ids)
print(logits.shape) # torch.Size([2, 12, 100])
forward O(L^2 · D · n_layer),MLP 项 O(L · D^2)。两者中哪个大取决于 L 和 D 的比。
lm_head.weight = tok_emb.weight,输入 embedding 和输出映射共享,省参数也略涨点。给 Q、K,应用 RoPE 旋转(不改 V)。LLaMA / Qwen / DeepSeek 全系都用。
Q、K shape [B, h, L, d_h](d_h 必须是偶数)。对每两个相邻维度当作复数,按位置 m 的角度 θ_i = 10000^(-2i/d_h) 旋转。
[..., d_h/2, 2],看成复平面import torch
def build_rope_cache(seq_len, d_h, base=10000.0, device='cpu'):
assert d_h % 2 == 0
# [d_h/2]
inv_freq = 1.0 / (base ** (torch.arange(0, d_h, 2, device=device).float() / d_h))
t = torch.arange(seq_len, device=device).float() # [L]
freqs = torch.outer(t, inv_freq) # [L, d_h/2]
cos = freqs.cos()[None, None] # [1, 1, L, d_h/2]
sin = freqs.sin()[None, None]
return cos, sin
def apply_rope(x, cos, sin):
"""
x: [B, h, L, d_h]
cos, sin: [1, 1, L, d_h/2]
"""
B, h, L, d = x.shape
x = x.reshape(B, h, L, d // 2, 2)
x1, x2 = x[..., 0], x[..., 1] # [B, h, L, d/2]
rot1 = x1 * cos - x2 * sin
rot2 = x1 * sin + x2 * cos
out = torch.stack([rot1, rot2], dim=-1).reshape(B, h, L, d)
return out
if __name__ == "__main__":
B, h, L, d_h = 2, 4, 8, 16
q = torch.randn(B, h, L, d_h)
k = torch.randn(B, h, L, d_h)
cos, sin = build_rope_cache(L, d_h)
q_rot = apply_rope(q, cos, sin)
k_rot = apply_rope(k, cos, sin)
print(q_rot.shape, k_rot.shape)
O(B · h · L · d_h),cos/sin 可以预计算缓存。
base 或按频段插值,使得高频项外推不混叠。推理 sampling 三件套(temperature / top-k / top-p),写一个函数输入 logits 输出一个 token。
输入 logits shape [B, V],参数 T, top_k, top_p,返回采样出的 ids shape [B]。
torch.multinomial 采一个import torch
import torch.nn.functional as F
def sample(logits, temperature=1.0, top_k=0, top_p=0.0):
"""
logits: [B, V]
return ids: [B]
"""
logits = logits / max(temperature, 1e-6)
# Top-K
if top_k > 0:
kth = torch.topk(logits, top_k, dim=-1).values[:, -1, None] # [B, 1]
logits = torch.where(logits < kth, torch.full_like(logits, float('-inf')), logits)
# Top-P
if 0.0 < top_p < 1.0:
sorted_logits, sorted_idx = torch.sort(logits, descending=True, dim=-1)
sorted_probs = F.softmax(sorted_logits, dim=-1)
cum = sorted_probs.cumsum(dim=-1)
# 要移除的 token:累积概率已超 p 的"之后"的位置
remove_sorted = cum > top_p
# 保证至少留一个(把最前面那个标为 False)
remove_sorted[..., 0] = False
# 把 sorted 空间的 mask 散回原始位置
remove = torch.zeros_like(remove_sorted)
remove.scatter_(dim=-1, index=sorted_idx, src=remove_sorted)
logits = logits.masked_fill(remove, float('-inf'))
probs = F.softmax(logits, dim=-1)
ids = torch.multinomial(probs, num_samples=1).squeeze(-1) # [B]
return ids
if __name__ == "__main__":
torch.manual_seed(0)
logits = torch.randn(2, 10)
print(sample(logits, temperature=0.8, top_k=5, top_p=0.9))
top-k 用 partial sort O(V log k),top-p 需要完整排序 O(V log V)。大词表时 top-p 比 top-k 贵不少。
DeepSeek 提出、现在 R1 系列在用的 GRPO。核心:一个 prompt 采 G 个 rollout,组内用奖励均值做 baseline,不需要 critic。
给定一个 batch:prompts B 条,每条采 G 个 rollout;rewards shape [B, G],logp(新策略)和 old_logp、ref_logp 都是 shape [B, G, T](T 是 response token 数),mask 同 shape。实现 advantage 和 GRPO loss(含 KL)。
import torch
def grpo_loss(logp, old_logp, ref_logp, rewards, mask,
clip_eps=0.2, kl_coef=0.04):
"""
logp, old_logp, ref_logp, mask: [B, G, T]
rewards: [B, G] (trajectory-level outcome reward)
返回 scalar loss 和一些 logging dict。
"""
B, G, T = logp.shape
# 1) 组内标准化 advantage
r_mean = rewards.mean(dim=1, keepdim=True) # [B, 1]
r_std = rewards.std(dim=1, keepdim=True) + 1e-8 # [B, 1]
adv = (rewards - r_mean) / r_std # [B, G]
adv = adv.unsqueeze(-1).expand(-1, -1, T) # [B, G, T]
adv = adv.detach() # stop grad
# 2) PPO clip surrogate
ratio = torch.exp(logp - old_logp) # [B, G, T]
unclipped = ratio * adv
clipped = torch.clamp(ratio, 1 - clip_eps, 1 + clip_eps) * adv
pg_per_tok = -torch.minimum(unclipped, clipped) # 最小化 -min
# 3) KL (ref || policy), Schulman k3 无偏估计
diff = ref_logp - logp
kl_per_tok = torch.exp(diff) - diff - 1.0 # >= 0
# 4) mask 平均(沿 T 有效 token,再 batch 平均)
loss_per_tok = pg_per_tok + kl_coef * kl_per_tok
denom = mask.sum().clamp(min=1.0)
loss = (loss_per_tok * mask).sum() / denom
with torch.no_grad():
stats = {
'pg_loss': (pg_per_tok * mask).sum() / denom,
'kl': (kl_per_tok * mask).sum() / denom,
'adv_abs': adv.abs().mean(),
'ratio': (ratio * mask).sum() / denom,
}
return loss, stats
if __name__ == "__main__":
torch.manual_seed(0)
B, G, T = 2, 4, 6
logp = torch.randn(B, G, T, requires_grad=True)
old_logp = logp.detach() + 0.01 * torch.randn_like(logp)
ref_logp = logp.detach() + 0.05 * torch.randn_like(logp)
rewards = torch.randn(B, G)
mask = torch.ones(B, G, T); mask[:, :, -1] = 0 # 最后一个 token 被屏蔽
loss, stats = grpo_loss(logp, old_logp, ref_logp, rewards, mask)
loss.backward()
print("loss:", loss.item(), "kl:", stats['kl'].item())
O(B · G · T)。真正贵的是前面的 rollout + logp 计算,这部分 loss 本身是小钱。
不是所有 LC 题 AI 面试都爱考。下面这 8 道是面到吐的 —— 每道都能和 AI 系统里的真实场景挂上钩(KV cache、流式数据、tokenizer、检索等),这也是面试官喜欢它们的原因。
get / put 均 O(1) 的定容缓存。
KV cache 管理、embedding cache、推理时 prefix cache 的淘汰策略本质都是 LRU 或其变种(LFU、ARC)。vLLM 的 block manager 就是这套思路。
class Node:
__slots__ = ('key', 'val', 'prev', 'next')
def __init__(self, k=0, v=0):
self.key = k; self.val = v
self.prev = None; self.next = None
class LRUCache:
def __init__(self, capacity: int):
self.cap = capacity
self.map = {}
self.head = Node(); self.tail = Node() # 哨兵
self.head.next = self.tail
self.tail.prev = self.head
def _remove(self, n):
n.prev.next = n.next
n.next.prev = n.prev
def _insert_front(self, n):
n.next = self.head.next
n.prev = self.head
self.head.next.prev = n
self.head.next = n
def get(self, key):
if key not in self.map:
return -1
n = self.map[key]
self._remove(n); self._insert_front(n)
return n.val
def put(self, key, val):
if key in self.map:
n = self.map[key]
n.val = val
self._remove(n); self._insert_front(n)
return
if len(self.map) >= self.cap:
lru = self.tail.prev
self._remove(lru)
del self.map[lru.key]
n = Node(key, val)
self._insert_front(n)
self.map[key] = n
if __name__ == "__main__":
c = LRUCache(2)
c.put(1, 1); c.put(2, 2)
print(c.get(1)) # 1
c.put(3, 3) # 淘汰 2
print(c.get(2)) # -1
get / put 均 O(1),空间 O(capacity)。
OrderedDict?—— 能,move_to_end + popitem(last=False),但面试官一般希望你写双向链表版。给数组和窗口大小 k,输出每个窗口的最大值。
和流式数据、实时监控、moving max/min 特征工程直接相关;也是"在线算法 O(1) 均摊"的标志题。
i - k + 1 的下标从头部弹出from collections import deque
def max_sliding_window(nums, k):
dq = deque() # 存 index, 对应 nums[index] 单调递减
res = []
for i, x in enumerate(nums):
while dq and nums[dq[-1]] <= x:
dq.pop()
dq.append(i)
# 移除越界的队首
if dq[0] <= i - k:
dq.popleft()
if i >= k - 1:
res.append(nums[dq[0]])
return res
if __name__ == "__main__":
print(max_sliding_window([1,3,-1,-3,5,3,6,7], 3))
# [3,3,5,5,6,7]
时间 O(n)(每个下标进队、出队各一次),空间 O(k)。
数组里出现次数最多的 k 个元素。
推荐系统、召回 top-k、向量检索打分排序的基础;也能测你对堆的理解(小顶堆维护 top-k)。
from collections import Counter
import heapq
def top_k_heap(nums, k):
cnt = Counter(nums)
# nlargest 内部就是小顶堆 size=k
return [x for x, _ in heapq.nlargest(k, cnt.items(), key=lambda it: it[1])]
def top_k_bucket(nums, k):
cnt = Counter(nums)
n = len(nums)
buckets = [[] for _ in range(n + 1)]
for x, c in cnt.items():
buckets[c].append(x)
res = []
for c in range(n, 0, -1):
res.extend(buckets[c])
if len(res) >= k:
return res[:k]
return res
if __name__ == "__main__":
print(top_k_heap([1,1,1,2,2,3], 2)) # [1, 2]
print(top_k_bucket([1,1,1,2,2,3], 2)) # [1, 2]
堆法 O(n log k);桶排序 O(n) 时间和空间。
从一个长度未知的数据流里均匀随机采 k 个。
大数据训练集采样、日志抽样、评估集构造全是这套;而且一问 probability 证明,考的就是你推导概率的基本功。
import random
def reservoir_sample(stream, k, seed=0):
rng = random.Random(seed)
pool = []
for i, x in enumerate(stream):
if i < k:
pool.append(x)
else:
j = rng.randint(0, i) # [0, i]
if j < k:
pool[j] = x
return pool
if __name__ == "__main__":
# 验证均匀性:采 1 个,重复 100k 次
from collections import Counter
c = Counter()
for seed in range(100_000):
out = reservoir_sample(range(10), 1, seed=seed)
c[out[0]] += 1
print(c) # 每个数应接近 10000
O(n) 时间,O(k) 空间。
有序数组在某处旋转过,找目标值。O(log n)。
二分的"变体意识" —— 很多 AI 系统题本质是"单调性 + 二分"(最长序列 / 学习率搜索 / context window 判定),这题是变体二分的入门。
mid 和 left:若 nums[left] <= nums[mid],说明左半有序;否则右半有序<= 的边界,有相等元素时才不会出错def search(nums, target):
lo, hi = 0, len(nums) - 1
while lo <= hi:
mid = (lo + hi) // 2
if nums[mid] == target:
return mid
if nums[lo] <= nums[mid]: # 左半有序
if nums[lo] <= target < nums[mid]:
hi = mid - 1
else:
lo = mid + 1
else: # 右半有序
if nums[mid] < target <= nums[hi]:
lo = mid + 1
else:
hi = mid - 1
return -1
if __name__ == "__main__":
print(search([4,5,6,7,0,1,2], 0)) # 4
print(search([4,5,6,7,0,1,2], 3)) # -1
O(log n)。
nums[lo] == nums[mid] == nums[hi] 时无法判断哪半有序,只能 lo++, hi--,最坏退化 O(n)。按层输出二叉树节点值。
BFS / DFS 的代表题,改几下就成 zigzag、右视图、最深层等一串变体。图的遍历是 GNN、依赖分析、计算图调度的基础。
sz,连续 pop sz 个就是一层from collections import deque
class TreeNode:
def __init__(self, val=0, left=None, right=None):
self.val = val; self.left = left; self.right = right
def level_order(root):
if not root: return []
res = []
q = deque([root])
while q:
sz = len(q)
level = []
for _ in range(sz):
n = q.popleft()
level.append(n.val)
if n.left: q.append(n.left)
if n.right: q.append(n.right)
res.append(level)
return res
if __name__ == "__main__":
# 3
# / \
# 9 20
# / \
# 15 7
root = TreeNode(3, TreeNode(9), TreeNode(20, TreeNode(15), TreeNode(7)))
print(level_order(root)) # [[3], [9, 20], [15, 7]]
时间空间均 O(n)。
res[depth].append(val)。在文本中找模式串第一次出现位置,O(n + m)。
tokenizer 的字典匹配、stopword 过滤、敏感词检测,很多时候绕不开 KMP/AC 自动机。字节那几个团队(搜索、风控、广告)反复考。
f[i]:表示 pattern[0..i] 的最长真前后缀长度i 扫 text,j 扫 pattern,不匹配就回退到 f[j-1],文本指针不回退def build_fail(p):
m = len(p)
f = [0] * m
k = 0
for i in range(1, m):
while k > 0 and p[k] != p[i]:
k = f[k-1]
if p[k] == p[i]:
k += 1
f[i] = k
return f
def kmp_search(text, pattern):
if not pattern:
return 0
f = build_fail(pattern)
j = 0
for i, ch in enumerate(text):
while j > 0 and pattern[j] != ch:
j = f[j-1]
if pattern[j] == ch:
j += 1
if j == len(pattern):
return i - j + 1 # 首次匹配起点
return -1
if __name__ == "__main__":
print(kmp_search("ababcabcabababd", "ababd")) # 10
print(build_fail("ababd")) # [0,0,1,2,0]
O(n + m)。
f[i] 是 p[0..i] 的"最长真前缀 = 真后缀"的长度。当 p[j] 失配时,前 j 个字符已经匹配了 text 的某段,把 j 跳到 f[j-1] 相当于复用了前缀。支持 insert / search / startsWith。
BPE / WordPiece 的 vocab 匹配、n-gram LM、自动补全、路由前缀匹配。tokenizer 里"最长前缀匹配"直接跑 trie。
children: dict 和 end: boolclass TrieNode:
__slots__ = ('children', 'end')
def __init__(self):
self.children = {}
self.end = False
class Trie:
def __init__(self):
self.root = TrieNode()
def insert(self, word: str):
node = self.root
for ch in word:
if ch not in node.children:
node.children[ch] = TrieNode()
node = node.children[ch]
node.end = True
def _walk(self, prefix):
node = self.root
for ch in prefix:
if ch not in node.children:
return None
node = node.children[ch]
return node
def search(self, word: str) -> bool:
n = self._walk(word)
return n is not None and n.end
def starts_with(self, prefix: str) -> bool:
return self._walk(prefix) is not None
def longest_prefix_match(self, s: str):
"""tokenizer 常见用法:返回 s 从头开始最长的在 trie 中的完整词的长度。"""
node = self.root
best = 0
for i, ch in enumerate(s):
if ch not in node.children:
break
node = node.children[ch]
if node.end:
best = i + 1
return best
if __name__ == "__main__":
t = Trie()
for w in ["cat", "car", "care", "dog"]:
t.insert(w)
print(t.search("car")) # True
print(t.search("ca")) # False
print(t.starts_with("ca")) # True
print(t.longest_prefix_match("careful")) # 4 ("care")
insert / search / startsWith 均 O(len(word))。空间和所有 word 的总字符数成正比,可用 double array trie 压缩。
[None]*26 快;字符集大(Unicode、BPE)用 dict。# Q: [B, L, d], K: [B, L, d], return: [B, L, d],这逼着你把 shape 先想清楚;面试官看到这个注释也会认为你"训练有素"。
# TODO: 确认 1/N 还是 1/(N-1) 先过,继续往下写。最后回来处理。卡住不动是手撕最大的死穴。
if __name__ == "__main__":,构造小 shape 跑一下。能跑通给面试官看到 shape 对,是大大的加分项。跑不通也没事,面试官会和你一起 debug,这反而是互动的好机会。