思维链与 Thinking 模型
357 × 289 等于多少?人算这道题也要列竖式一步步来。普通 next-token LLM 在没有显式推理过程时,往往要直接预测最终答案。对于多步数学、逻辑、代码题,这会把很多中间计算压在一次回答里,容易出错。
Chain-of-Thought(CoT)让模型把中间步骤写出来再给答案,准确率大幅提升。o1、DeepSeek-R1 这类 thinking 模型走得更远:它们不是推理时临时加思考,而是训练阶段就学会了「先想再答」。这一节拆解这背后的原理。
CoT 的原理可以用一句话概括:在 prompt 里示范或要求「先写中间步骤,再给最终答案」,让中间结果也进入上下文。经典来源是 Chain-of-Thought Prompting。
写出中间步骤的好处是每一步都在缩小问题规模,而且中间某步出错时后续步骤还有机会修正。经典 CoT prompting 本身不改变模型参数;但如果模型在预训练、SFT 或 RL 中见过大量推理格式,它也可能已经学会了遇到复杂问题时先展开步骤。
Thinking 模型更进一步——它们在训练阶段就被强化学习塑造成「先产生内部思考链,再输出最终答案」的行为模式。DeepSeek-R1 的重要启发是:在数学、代码等可验证任务上,合适的 RL 奖励可以强化更长、更会检查的推理轨迹;但这依赖基座模型、奖励设计、采样规模和训练稳定性。参考:DeepSeek-R1。
1. LLM 的推理缺陷
用户: 357 × 289 = ?
普通 LLM 的内部过程:
输入 token → embedding → transformer × N → 输出 "103173"
这里说“猜”是教学比喻。模型确实是在做 next-token prediction;它也可能学到可泛化的计算模式。问题是:如果不生成中间步骤,复杂计算更容易被压缩成一次直接预测。
如果训练数据里没有 357×289 这个具体组合,它通常会猜错。
根本原因:Transformer 的每一层都是固定计算量。不管问题是「1+1」还是「微积分」, 单次前向传播的层数固定,计算深度有限;生成 CoT 时,模型可以把前面生成的中间结果放回上下文,相当于用更多输出 token 换更多推理步骤。
就像让你心算 357×289,你不能直接给出答案,需要:
- 357×200 = 71400
- 357×80 = 28560
- 357×9 = 3213
- 71400 + 28560 + 3213 = 103173
LLM 也需要这个「分步计算」的过程——这就是 CoT。
2. Chain-of-Thought(CoT)
CoT 的核心思想极其简单:在 prompt 里示范「先写推理过程,再写答案」。
没有 CoT 的 prompt:
Q: 357 × 289 = ?
A: 103173 ← 模型直接猜
有 CoT 的 prompt (Few-shot):
Q: 123 × 45 = ?
A: 123×40=4920, 123×5=615, 4920+615=5535。答案是 5535。
Q: 357 × 289 = ?
A: ← 模型会模仿上面的格式,先写过程再给答案
为什么有效? 因为模型在生成推理过程时,每个中间步骤的结果都变成了后续步骤的「上下文」。 Transformer 的 attention 能看到前面算出的中间结果,从而基于它们继续推理。
本质上:CoT 把「一次前向传播解决不了的问题」变成了「多次前向传播接力解决」。
# 真实计算演示:CoT 为什么有效
import numpy as np
import random
print("=== CoT 为什么有效 ===")
print()
target = 357 * 289
# 没有 CoT:要求一步算出
print("没有 CoT:")
print(f" 模型需要一步算出 357 × 289 = ?")
print(f" 正确答案: {target:,}")
np.random.seed(42)
guesses = np.random.randint(80000, 120000, size=5)
closest = guesses[np.argmin(np.abs(guesses - target))]
print(f" 模型只能「猜」: {closest:,} (差了 {abs(closest - target):,})")
print(f" → 一步到位太难了!")
print()
# 有 CoT:分步计算
print("有 CoT (分步计算):")
result = 0
steps = []
for multiplier, label in [(200, "200"), (80, "80"), (9, "9")]:
partial = 357 * multiplier
result += partial
steps.append(partial)
print(f" 357 × {label} = {partial:,}")
total = sum(steps)
print(f" {' + '.join(f'{s:,}' for s in steps)} = {total:,}")
print()
print(f"✅ CoT 结果: {total:,} == 正确答案 {target:,}")
print()
print("关键:每一步的计算结果都可以被后续步骤引用。")
print("这相当于用「生成 token 的时间」换「计算深度」。")
=== CoT 为什么有效 ===
没有 CoT:
模型需要一步算出 357 × 289 = ?
正确答案: 103,173
模型只能「猜」: 95,795 (差了 7,378)
→ 一步到位太难了!
有 CoT (分步计算):
357 × 200 = 71,400
357 × 80 = 28,560
357 × 9 = 3,213
71,400 + 28,560 + 3,213 = 103,173
✅ CoT 结果: 103,173 == 正确答案 103,173
关键:每一步的计算结果都可以被后续步骤引用。
这相当于用「生成 token 的时间」换「计算深度」。
2.5 Self-Consistency(自一致性):一道题多问几次,投票决定
CoT 有一个问题:模型可能会在某条推理链上犯错。 同样一道数学题,你问模型 5 次(temperature > 0),它可能给出 3 种不同的推理过程和答案。
Self-Consistency 的核心思想:
同一道题 → 采样 N 条不同的 CoT 推理链 → 对最终答案投票 → 得票最多的就是最终答案
为什么有效?
直觉:你拿一道难题问 5 个同学——
- 每个同学都可能在某一步想歪
- 但不同同学犯不同错误的概率高,犯相同错误的概率低
- 正确答案只有一个,在多条正确/部分正确的推理链中出现的频率最高
数学上:如果单条推理链的正确率是 p,N 条链中大多数正确的概率随 N 增长:
p=0.6, N=1: 正确率 60%
p=0.6, N=5: 至少 3 条正确的概率 = C(5,3)p^3(1-p)^2 + ... ≈ 68%
p=0.7, N=5: 至少 3 条正确的概率 ≈ 84% ← 提升显著!
不做什么:不需要重新训练模型,不需要改模型架构,只需要在推理时多采样 + 投票。
下面用一个模拟演示全过程:
# ============================================================
# Self-Consistency 演示:同一道题,5 条推理链,投票选答案
# ============================================================
import random
random.seed(42)
print("=" * 70)
print("题目:小明有 15 个苹果, 给了小红 3 个,又买了 8 个,")
print(" 然后吃掉 2 个,最后把剩下的一半给了小刚。")
print(" 小明现在还有几个苹果?")
print("=" * 70)
# 模拟 5 条不同的推理链(模拟模型在 temperature > 0 时的不同采样)
# 每条链展示「推理过程」+「最终答案」
reasoning_chains = [
{
"id": 1,
"reasoning": [
"Step 1: 初始有 15 个",
"Step 2: 给小红 3 个 → 15 - 3 = 12 个",
"Step 3: 又买了 8 个 → 12 + 8 = 20 个",
"Step 4: 吃掉 2 个 → 20 - 2 = 18 个",
"Step 5: 把一半给小刚 → 18 / 2 = 9 个",
],
"answer": 9,
"correct": True
},
{
"id": 2,
"reasoning": [
"Step 1: 初始有 15 个",
"Step 2: 给小红 3 个 → 15 - 3 = 12 个",
"Step 3: 又买了 8 个 → 12 + 8 = 20 个",
"Step 4: 吃掉 2 个 → 20 - 2 = 18 个",
"Step 5: 一半给小刚 → 18 / 2 = 9 个",
],
"answer": 9,
"correct": True
},
{
"id": 3,
"reasoning": [
"Step 1: 初始有 15 个",
"Step 2: 给小红 3 个 → 15 - 3 = 12 个",
"Step 3: 又买了 8 个 → 12 + 8 = 20 个",
"Step 4: 吃掉 2 个 → 20 - 2 = 18 个",
"Step 5: 把剩下的一半给小刚 → 剩下 18 - 9 = 9 个", # 推理对,答案也对
],
"answer": 9,
"correct": True
},
{
"id": 4,
"reasoning": [
"Step 1: 初始有 15 个",
"Step 2: 给小红 3 个 → 15 - 3 = 12 个",
"Step 3: 又买了 8 个 → 12 + 8 = 20 个",
"Step 4: 吃掉 2 个 → 20 - 2 = 18 个",
"Step 5: 忘了除以 2!→ 答案 18 个", # ← 忘了最后一步!
],
"answer": 18,
"correct": False
},
{
"id": 5,
"reasoning": [
"Step 1: 初始有 15 个",
"Step 2: 给小红 3 个,买 8 个 → 总数 15 - 3 + 8 = 20 个",
"Step 3: 吃掉 2 个 → 20 - 2 = 18 个",
"Step 4: 一半给小刚 → 给 18/2 = 9 个,自己剩 9 个",
],
"answer": 9,
"correct": True
},
]
# 打印每条推理链
for chain in reasoning_chains:
print(f"\n--- 推理链 #{chain['id']} ---")
for step in chain['reasoning']:
print(f" {step}")
status = "✓ 正确" if chain['correct'] else "✗ 错误"
print(f" 答案: {chain['answer']} 个苹果 {status}")
# ============================================================
# 多数投票
# ============================================================
print(f"\n{'=' * 70}")
print("投票环节")
print(f"{'=' * 70}")
from collections import Counter
answers = [c['answer'] for c in reasoning_chains]
vote_counts = Counter(answers)
for ans, count in vote_counts.most_common():
correct_mark = "✓" if ans == 9 else "✗"
bar = "█" * count
print(f" 答案 {ans} 个: {count} 票 {bar} {correct_mark}")
winner = vote_counts.most_common(1)[0][0]
winner_is_correct = (winner == 9)
print(f"\n 🏆 最终答案(多数投票): {winner} 个苹果")
print(f" 是否正确: {'✓ 正确!' if winner_is_correct else '✗ 错误'}")
# ============================================================
# 对比:单次 vs Self-Consistency
# ============================================================
print(f"\n{'=' * 70}")
print("对比分析")
print(f"{'=' * 70}")
print(f" 单次采样(随机取一条): 正确率 = 4/5 = 80%")
print(f" Self-Consistency (5 条投票): 正确率 = 100%(这次全都对了!)")
print(f"")
print(f" 关键洞察:")
print(f" - 推理链 #4 在最后一步犯了错(忘了除以 2)")
print(f" - 但其他 4 条链都对了,投票结果 = 9(正确)")
print(f" - Self-Consistency 把那条错误的「吞掉了」!")
print(f"")
print(f" 这就是 Self-Consistency 的力量:")
print(f" 「少数服从多数」——一条链犯错没关系,多条链一起不会犯同样的错。")
======================================================================
题目:小明有 15 个苹果,给了小红 3 个,又买了 8 个,
然后吃掉 2 个,最后把剩下的一半给了小刚。
小明现在还有几个苹果?
======================================================================
--- 推理链 #1 ---
Step 1: 初始有 15 个
Step 2: 给小红 3 个 → 15 - 3 = 12 个
Step 3: 又买了 8 个 → 12 + 8 = 20 个
Step 4: 吃掉 2 个 → 20 - 2 = 18 个
Step 5: 把一半给小刚 → 18 / 2 = 9 个
答案: 9 个苹果 ✓ 正确
--- 推理链 #2 ---
Step 1: 初始有 15 个
Step 2: 给小红 3 个 → 15 - 3 = 12 个
Step 3: 又买了 8 个 → 12 + 8 = 20 个
Step 4: 吃掉 2 个 → 20 - 2 = 18 个
Step 5: 一半给小刚 → 18 / 2 = 9 个
答案: 9 个苹果 ✓ 正确
--- 推理链 #3 ---
Step 1: 初始有 15 个
Step 2: 给小红 3 个 → 15 - 3 = 12 个
Step 3: 又买了 8 个 → 12 + 8 = 20 个
Step 4: 吃掉 2 个 → 20 - 2 = 18 个
Step 5: 把剩下的一半给小刚 → 剩下 18 - 9 = 9 个
答案: 9 个苹果 ✓ 正确
--- 推理链 #4 ---
Step 1: 初始有 15 个
Step 2: 给小红 3 个 → 15 - 3 = 12 个
Step 3: 又买了 8 个 → 12 + 8 = 20 个
Step 4: 吃掉 2 个 → 20 - 2 = 18 个
Step 5: 忘了除以 2!→ 答案 18 个
答案: 18 个苹果 ✗ 错误
--- 推理链 #5 ---
Step 1: 初始有 15 个
Step 2: 给小红 3 个,买 8 个 → 总数 15 - 3 + 8 = 20 个
Step 3: 吃掉 2 个 → 20 - 2 = 18 个
Step 4: 一半给小刚 → 给 18/2 = 9 个,自己剩 9 个
答案: 9 个苹果 ✓ 正确
======================================================================
投票环节
======================================================================
答案 9 个: 4 票 ████ ✓
答案 18 个: 1 票 █ ✗
🏆 最终答案(多数投票): 9 个苹果
是否正确: ✓ 正确!
======================================================================
对比分析
======================================================================
单次采样(随机取一条): 正确率 = 4/5 = 80%
Self-Consistency (5 条投票): 正确率 = 100%(这次全都对了!)
关键洞察:
- 推理链 #4 在最后一步犯了错(忘了除以 2)
- 但其他 4 条链都对了,投票结果 = 9(正确)
- Self-Consistency 把那条错误的「吞掉了」!
这就是 Self-Consistency 的力量:
「少数服从多数」——一条链犯错没关系,多条链一起不会犯同样的错。
2.5.1 Self-Consistency vs Thinking 模型的关系
Self-Consistency 和后面要讲的 Thinking 模型(o1、R1)有本质区别:
| Self-Consistency | Thinking 模型 | |
|---|---|---|
| 怎么做 | 推理时多次采样 + 投票 | 训练时就学会"自己反思" |
| 成本 | N 倍推理成本(采样 N 次) | 1 次推理,但内部思考很长 |
| 需要训练吗 | 不需要 | 需要(RL 训练) |
| 关键能力 | 多样性采样 | 自我验证、纠错、回溯 |
本质上:
- Self-Consistency 是外部纠错——靠多次采样来"碰"正确答案
- Thinking 模型是内部纠错——在一条推理链里自己检查、自己改
后者更难训练,但推理时更高效(不需要重复采样)。 R1 一类模型的 self-verification 更适合理解为“在单条较长推理链中做检查和回溯”。它和 Self-Consistency 都利用了多步/多样信息,但机制不同,不能简单等同。Self-Consistency 参考:Wang et al. 2022。
3. 从 CoT 到 Thinking 模型
CoT 的问题是:思考过程暴露给用户了。用户不一定想看你的草稿纸。
Thinking 模型的做法,是把“草稿纸”和“最终答案”分开:
用户看到的:
Q: 357 × 289 = ?
A: 103173
模型内部实际生成的:
<think>
357×200=71400
357×80=28560
357×9=3213
71400+28560+3213=103173
</think>
103173
这里的 <think> 和 </think> 就是 thinking 符号。
它们像一对括号:中间是思考草稿,后面才是最终回答。
前端或 API 可以选择只展示 </think> 后面的答案,把中间草稿折叠或隐藏。
本质:Thinking 模型 = CoT 的工程化封装。思考过程还在,只是被特殊符号分区管理。
3.5 Think 符号的底层原理:它是怎么被模型「理解」的
3.5.1 为什么 <think> 不是普通文字?
你可能会想:<think> 不就是几个字符吗?模型输出这几个字符就行了?
不是这么简单。
如果 <think> 只是普通文字,可能出现三个问题:
- 容易被 tokenizer 切碎:
<think>可能变成<、think、>三个 token。 - 结束边界不稳定:模型可能在草稿里意外写出
</think>,导致前端提前结束 thinking。 - 训练不好控制:你很难稳定地区分 thinking 区间和 answer 区间。
所以一种更稳的做法是:把 <think> 和 </think> 加成 special tokens。但这不是唯一方案,有些模型也会使用普通字符串、chat template、API block 或单独字段来区分 reasoning 和 answer。
也就是说,它们要像 <BOS>、<EOS> 一样,有自己的独立 ID:
<think> -> 100
</think> -> 101
这样模型生成 ID 100,就等于进入思考区间;生成 ID 101,就等于退出思考区间。
3.5.2 最新实践:怎么把新的 thinking 符号加入训练?
现代训练里,新增 <think> 这类符号一般分两种情况。
情况 A:从零训练 tokenizer 和模型
一开始就把这些符号放进 tokenizer 的 special token 列表:
<BOS>、<EOS>、<PAD>、<think>、</think>
这样训练 tokenizer 时,它们不会被拆碎;训练模型时,它们也会像普通 token 一样学到 embedding。
情况 B:在已有模型上继续训练
这是更常见的路线。比如你拿一个开源基座模型,想让它学会新的 thinking 格式,通常要做 4 步:
- 给 tokenizer 加符号:把
<think>、</think>加成 special tokens。 - 扩模型 embedding:tokenizer 词表变大,模型的
Embedding和输出层也要变大。 - 准备格式化数据:样本里真的要有
<think>...</think>。 - 继续 SFT / RL:让模型通过 loss 或 reward 学会什么时候打开、什么时候关闭 thinking。
工程里常见的伪代码是:
new_tokens = {"additional_special_tokens": ["<think>", "</think>"]}
num_added = tokenizer.add_special_tokens(new_tokens)
model.resize_token_embeddings(len(tokenizer))
# 然后用带 <think>...</think> 的数据继续训练
这一步很容易被误解:加 token 只是给模型一支新笔,不等于它会写推理。 真正让它学会 thinking 的,是后面的训练数据、loss mask、reward 设计。
# 用最小例子演示:新增 <think> 符号后的训练样本
vocab = {
"<BOS>": 0,
"<EOS>": 1,
"<PAD>": 2,
"用户": 3,
"助手": 4,
"答案": 5,
"357": 6,
"289": 7,
"103173": 8,
}
new_symbols = ["<think>", "</think>"]
for symbol in new_symbols:
if symbol not in vocab:
vocab[symbol] = len(vocab)
train_tokens = [
"<BOS>",
"用户", "357", "289",
"助手", "<think>", "357", "289", "103173", "</think>",
"答案", "103173",
"<EOS>",
]
train_ids = [vocab[token] for token in train_tokens]
print("新增符号:")
for symbol in new_symbols:
print(f" {symbol} -> ID {vocab[symbol]}")
print()
print("训练样本:")
print(train_tokens)
print()
print("训练 ID:")
print(train_ids)
print()
print("关键观察:模型不是听懂了 '<think>' 这个英文单词,")
print("而是在训练中学会 ID 9 和 ID 10 之间应该写推理草稿。")
新增符号:
<think> -> ID 9
</think> -> ID 10
训练样本:
['<BOS>', '用户', '357', '289', '助手', '<think>', '357', '289', '103173', '</think>', '答案', '103173', '<EOS>']
训练 ID:
[0, 3, 6, 7, 4, 9, 6, 7, 8, 10, 5, 8, 1]
关键观察:模型不是听懂了 '<think>' 这个英文单词,
而是在训练中学会 ID 9 和 ID 10 之间应该写推理草稿。
# ============================================================
# 演示:Special Token 是怎么被分词的
# ============================================================
print("=== Special Token vs 普通文字:分词对比 ===\n")
# 模拟 tokenizer 的行为 — 用一个简化的 BPE 风格 tokenizer
# 核心问题:<think> 如果只是普通文字,它是什么命运?
# 假设这是 tokenizer 的词表(简化版)
vocab = {
"我": 1, "喜欢": 2, "吃": 3, "苹果": 4, "橘子": 5,
"think": 7, "<": 9, ">": 10, "/": 11,
"推理": 13, "过程": 14, "答案": 15, "是": 16,
# ↓ 关键:如果设为 special token,会有唯一 ID
# "<think>": 100,
# "</think>": 101,
}
print("【词表】普通 token(不含 special token):")
for k, v in sorted(vocab.items(), key=lambda x: x[1]):
print(f" '{k}' → {v}")
print()
# ========================================
# 场景 1:没有 special token 时
# ========================================
print("=" * 55)
print("场景 1:<think> 只是普通文字(没有加入词表)")
print("=" * 55)
def tokenize_plain(text, vocab):
"""模拟 BPE/word-level tokenizer,没有 special token 的情况"""
tokens = []
i = 0
while i < len(text):
# 按最长匹配找普通 token
matched = None
for word in sorted(vocab.keys(), key=len, reverse=True):
if text[i:].startswith(word):
matched = (word, vocab[word])
break
if matched:
tokens.append(matched)
i += len(matched[0])
else:
# 逐字符 fallback
ch = text[i]
tid = ord(ch) % 50 + 20 # 模拟 unknown token 的 id
tokens.append((ch if ch != ' ' else '⎵', tid))
i += 1
return tokens
text = "我喜欢<think>推理过程</think>答案是苹果"
tokens_plain = tokenize_plain(text, vocab)
print(f"输入: {text}")
print(f"\n分词结果 ({len(tokens_plain)} 个 token):")
ids = []
for word, tid in tokens_plain:
ids.append(tid)
flag = "⚠️拆碎" if tid in [9,10,11] else ""
print(f" [{word:6s}] → ID {tid:3d} {flag}")
print(f"\nToken ID 序列: {ids}")
print(f"\n关键问题:")
print(f" ❌ '<think>' 被切成 3 个 token: < think > 共 3 个 ID")
print(f" ❌ 如果 thinking 内容里出现 '<'(如 x<5) → 混乱")
print(f" ❌ 训练时无法用 token ID 精确定位 'thinking 从哪开始'")
print(f" ❌ 模型要学 '恰好按顺序输出这 5 个 token' → 极难学会")
# ========================================
# 场景 2:添加 special token 后
# ========================================
print(f"\n{'='*55}")
print("场景 2:<think> 设为 Special Token(更稳的做法之一)")
print("=" * 55)
# 把 special token 加入词表
vocab_with_special = vocab.copy()
vocab_with_special["<think>"] = 100
vocab_with_special["</think>"] = 101
print("词表新增:")
print(" '<think>' → 100 ★ special token")
print(" '</think>' → 101 ★ special token")
print()
def tokenize_with_special(text, vocab):
"""
tokenizer 会先扫描 special token(最长匹配优先),
再扫描普通 token。这是所有真实 tokenizer 的做法。
"""
tokens = []
i = 0
while i < len(text):
# Step 1:先匹配 special token(ID >= 100)
matched = None
for word in sorted(vocab.keys(), key=lambda x: (-len(x), x)):
if text[i:].startswith(word):
matched = (word, vocab[word])
break
if matched:
tokens.append(matched)
i += len(matched[0])
else:
ch = text[i]
tokens.append((ch, ord(ch) % 50 + 20))
i += 1
return tokens
tokens_special = tokenize_with_special(text, vocab_with_special)
print(f"输入: {text}")
print(f"\n分词结果 ({len(tokens_special)} 个 token):")
ids_special = []
for word, tid in tokens_special:
ids_special.append(tid)
flag = "★ 特殊" if tid >= 100 else ""
print(f" [{word:12s}] → ID {tid:3d} {flag}")
print(f"\nToken ID 序列: {ids_special}")
print(f"Token 数量: {len(tokens_plain)} → {len(tokens_special)} 个")
print(f"\n关键优势:")
print(f" ✅ '<think>' = 一个 token (ID=100),不会被切碎")
print(f" ✅ thinking 内部出现 '<' '>' 无影响(先匹配 special)")
print(f" ✅ 检查 ID==100/101 即可精确定位 thinking 边界")
print(f" ✅ 模型只需学会 '在合适位置输出 token 100'")
# ========================================
# 用真实 tokenizer 演示(如果 tiktoken 可用)
# ========================================
print(f"\n{'='*55}")
print("真实 tokenizer 的做法")
print("=" * 55)
try:
import tiktoken
# GPT-2 的 tokenizer(没有 think special token)
enc = tiktoken.get_encoding("gpt2")
plain_result = enc.encode("<think>")
print(f"\nGPT-2 tokenizer 编码 '<think>':")
print(f" Token IDs: {plain_result}")
print(f" 每个 token: {[enc.decode([t]) for t in plain_result]}")
print(f" → 被切成 {len(plain_result)} 个 token(因为 GPT-2 词表里没有 '<think>')")
# 说明真实做法
print(f"\nDeepSeek-R1 / Qwen3 的做法:")
print(f" 1. 在 tokenizer 词表中添加 special token:")
print(f" tokenizer.add_special_tokens({{'<think>': ..., '</think>': ...}})")
print(f" 2. 模型 embedding 矩阵扩大一行(为新 token)")
print(f" 3. 现在 '<think>' 就是一个完整的 token 了")
except ImportError:
print("\n(未安装 tiktoken,跳过真实 tokenizer 演示)")
print("pip install tiktoken 即可运行上述演示")
print()
print("以 GPT-4 的 cl100k_base 为例:")
print(" '<think>' 如果没加入词表 → 可能被切成多个 BPE token")
print(" 加入词表后 → 1 个 token → 模型学习成本降低 5 倍")
print(f"\n总结: add_special_tokens() 是训练 thinking 模型的第一步。")
=== Special Token vs 普通文字:分词对比 ===
【词表】普通 token(不含 special token):
'我' → 1
'喜欢' → 2
'吃' → 3
'苹果' → 4
'橘子' → 5
'think' → 7
'<' → 9
'>' → 10
'/' → 11
'推理' → 13
'过程' → 14
'答案' → 15
'是' → 16
=======================================================
场景 1:<think> 只是普通文字(没有加入词表)
=======================================================
输入: 我喜欢<think>推理过程</think>答案是苹果
分词结果 (14 个 token):
[我 ] → ID 1
[喜欢 ] → ID 2
[< ] → ID 9 ⚠️拆碎
[think ] → ID 7
[> ] → ID 10 ⚠️拆碎
[推理 ] → ID 13
[过程 ] → ID 14
[< ] → ID 9 ⚠️拆碎
[/ ] → ID 11 ⚠️拆碎
[think ] → ID 7
[> ] → ID 10 ⚠️拆碎
[答案 ] → ID 15
[是 ] → ID 16
[苹果 ] → ID 4
Token ID 序列: [1, 2, 9, 7, 10, 13, 14, 9, 11, 7, 10, 15, 16, 4]
关键问题:
❌ '<think>' 被切成 3 个 token: < think > 共 3 个 ID
❌ 如果 thinking 内容里出现 '<'(如 x<5) → 混乱
❌ 训练时无法用 token ID 精确定位 'thinking 从哪开始'
❌ 模型要学 '恰好按顺序输出这 5 个 token' → 极难学会
=======================================================
场景 2:<think> 设为 Special Token(正确做法)
=======================================================
词表新增:
'<think>' → 100 ★ special token
'</think>' → 101 ★ special token
输入: 我喜欢<think>推理过程</think>答案是苹果
分词结果 (9 个 token):
[我 ] → ID 1
[喜欢 ] → ID 2
[<think> ] → ID 100 ★ 特殊
[推理 ] → ID 13
[过程 ] → ID 14
[</think> ] → ID 101 ★ 特殊
[答案 ] → ID 15
[是 ] → ID 16
[苹果 ] → ID 4
Token ID 序列: [1, 2, 100, 13, 14, 101, 15, 16, 4]
Token 数量: 14 → 9 个
关键优势:
✅ '<think>' = 一个 token (ID=100),不会被切碎
✅ thinking 内部出现 '<' '>' 无影响(先匹配 special)
✅ 检查 ID==100/101 即可精确定位 thinking 边界
✅ 模型只需学会 '在合适位置输出 token 100'
=======================================================
真实 tokenizer 的做法
=======================================================
GPT-2 tokenizer 编码 '<think>':
Token IDs: [27, 14925, 29]
每个 token: ['<', 'think', '>']
→ 被切成 3 个 token(因为 GPT-2 词表里没有 '<think>')
DeepSeek-R1 / Qwen3 的做法:
1. 在 tokenizer 词表中添加 special token:
tokenizer.add_special_tokens({'<think>': ..., '</think>': ...})
2. 模型 embedding 矩阵扩大一行(为新 token)
3. 现在 '<think>' 就是一个完整的 token 了
总结: add_special_tokens() 是训练 thinking 模型的第一步。
3.6 Thinking 模型训练时的 Loss 怎么算
有了 <think> 这种 special token,一个重要的问题出现了:
训练时,thinking 部分的 token 要算 loss 吗?
这有三种策略,各有优劣。我们先看一个具体例子:
完整的 assistant 输出 token 序列:
[<think>, 3, 1, 2, ×, 2, 0, 0, =, 7, 1, 4, 0, 0, </think>, 1, 0, 3, 1, 7, 3]
│←──────── thinking tokens ────────────→│←─ answer tokens ─→│
14 个 token 6 个 token
策略对比
┌─────────────────────┬──────────────────┬──────────────────┬──────────────────┐
│ │ 策略 A: Full Loss│ 策略 B: Answer │ 策略 C: Selective│
│ │ (全算 loss) │ Only (只算答案) │ Weighting(加权) │
├─────────────────────┼──────────────────┼──────────────────┼──────────────────┤
│ thinking token 的 │ ✅ 算 │ ❌ 不算 │ 🔶 算但降权 │
│ loss │ │ │ (×0.1) │
│ answer token 的 │ ✅ 算 │ ✅ 算 │ ✅ 算 │
│ loss │ │ │ │
│ special token 的 │ ✅ 算 │ ❌ 不算 │ ❌ 不算 │
│ loss (<think>等) │ │ │ │
├─────────────────────┼──────────────────┼──────────────────┼──────────────────┤
│ 优点 │ 简单直接 │ 只关心最终答案 │ 平衡两者 │
│ │ 思考质量也被优化 │ 思考自由度高 │ 思考有引导但不 │
│ │ │ │ 主导训练 │
├─── ──────────────────┼──────────────────┼──────────────────┼──────────────────┤
│ 缺点 │ 思考可能变成 │ 思考过程可能 │ 需要调权重超参 │
│ │ 「表演」而非推理 │ 退化或乱写 │ │
├─────────────────────┼──────────────────┼──────────────────┼──────────────────┤
│ 代表模型/场景 │ DeepSeek-R1 │ 早期 thinking │ 部分 RL 实验 │
│ │ (RL 阶段全算) │ 实验 │ │
└─────────────────────┴──────────────────┴──────────────────┴──────────────────┘
SFT 和 RL 阶段要分开理解
SFT 阶段可以讨论 full loss / answer-only / selective weighting:如果完全不监督 thinking 区间,模型可能只学到最终答案格式,thinking 行为不稳定。
RL 阶段不是普通 CE loss “全算 token”。模型先采样回答,再根据结果 reward 优化策略;更长推理是否被强化,取决于 reward、格式约束、采样长度和训练稳定性。
但全算 loss 也有代价:模型可能学会「表演式思考」—— 写一堆看起来像推理但实际无用的 token,只是因为这些 token 的 loss 低(模型很确定)。 RL 的结果奖励可以抑制一部分无用推理,但不能保证所有 thinking 都真实有效;仍需要格式检查、长度控制和任务评测。
实现上怎么做?
核心是构造一个 loss_mask——一个和 labels 同样形状的 0/1 数组:
# labels: [100, 3, 1, 2, 11, ..., 101, 1, 0, 3, 1, 7, 3]
# loss_mask:[ 0, 1, 1, 1, 1, ..., 0, 1, 1, 1, 1, 1, 1]
# ↑ special token 不算 ↑ special token 不算
#
# 最终 loss = cross_entropy(logits, labels) * loss_mask
# → special token 位置 loss=0 → 不产生梯度
下面用代码完整实现这三种策略。
# ============================================================
# 完整实现:三种 Loss 策略的 loss_mask 构造
# ============================================================
import torch
import torch.nn.functional as F
import torch.nn as nn
print("=== Thinking 模型 Loss Mask 完整实现 ===\n")
# 模拟一个 batch 的 token 序列
# Special token ID: <think>=100, </think>=101
# 普通 token: 1-50
# PAD=0, IGNORE=-100(PyTorch 标准忽略值)
THINK_START = 100
THINK_END = 101
IGNORE = -100
# 模拟训练数据:两个样本的 labels
# 样本 1: <think> 3,1,2,11,2,0,0 </think> 1,0,3,1,7,3 ← 正常的 thinking→answer
# 样本 2: <think> 5,×,6,=,3,0 </think> 3,0 ← 简短的 thinking→answer
batch_labels = torch.tensor([
[100, 3, 1, 2, 11, 2, 0, 0, 101, 1, 0, 3, 1, 7, 3, 0],
[100, 5, 12, 6, 13, 3, 0, 101, 3, 0, 0, 0, 0, 0, 0, 0],
]) # shape: [2, 16]
# 模拟模型输出的 logits
VOCAB_SIZE = 200
logits = torch.randn(2, 16, VOCAB_SIZE)
print("Batch labels:")
print(batch_labels)
print()
# ============================================================
# 策略 A: Full Loss — 所有 token 都算 loss
# ============================================================
print("=" * 60)
print("策略 A: Full Loss(全算)")
print("=" * 60)
def make_full_loss_mask(labels, ignore_id=0):
"""最简单的 mask:只忽略 PAD token"""
return (labels != ignore_id).float()
mask_full = make_full_loss_mask(batch_labels)
print("loss_mask (0=PAD不算, 1=算):")
print(mask_full.int())
print()
# 计算 loss
loss_full = F.cross_entropy(
logits.view(-1, VOCAB_SIZE),
batch_labels.view(-1),
ignore_index=0, # PAD 不参与
reduction='none'
).view(2, 16)
print("每个位置的 loss:")
print(loss_full)
print()
total_full = loss_full.sum() / mask_full.sum()
print(f"Average loss (full): {total_full:.4f}")
print("→ thinking 和 answer 的每个 token 都优化")
print()
# ============================================================
# 策略 B: Answer Only — 只算 </think> 之后的 loss
# ============================================================
print("=" * 60)
print("策略 B: Answer Only(只算答案部分)")
print("=" * 60)
def make_answer_only_mask(labels, think_start=100, think_end=101, ignore_id=0):
"""
只让 </think> 之后的 token 参与 loss 计算。
逻辑:从每个序列中找到 think_end 的位置,之后的 token 才参与 loss。
"""
mask = torch.zeros_like(labels, dtype=torch.float)
for b in range(labels.shape[0]):
# 找 </think> 的位置
end_positions = (labels[b] == think_end).nonzero(as_tuple=True)[0]
if len(end_positions) > 0:
end_pos = end_positions[0].item()
# </think> 之后(不含)到 EOS/PAD 之前
for pos in range(end_pos + 1, labels.shape[1]):
if labels[b, pos] == ignore_id:
break
mask[b, pos] = 1.0
# 如果没找到 think_end → 整个序列都可能是非 thinking 模式 → 全算
return mask
mask_answer_only = make_answer_only_mask(batch_labels)
print("loss_mask (0=不算, 1=算):")
print(mask_answer_only.int())
print()
# 标注哪些 token 属于 thinking,哪些属于 answer
print("Token 标注 (T=thinking, A=answer, S=special, P=PAD):")
for b in range(2):
row = ""
for pos in range(16):
tid = batch_labels[b, pos].item()
if tid == 0:
row += " P "
elif tid == 100:
row += "[S "
elif tid == 101:
row += " S]"
elif mask_answer_only[b, pos] == 1:
row += " A "
else:
row += " T "
print(f" 样本{b}: {row}")
print()
# 构造 masked labels:不参与 loss 的位置设为 IGNORE
labels_masked_B = batch_labels.clone()
labels_masked_B[mask_answer_only == 0] = IGNORE
loss_answer_only = F.cross_entropy(
logits.view(-1, VOCAB_SIZE),
labels_masked_B.view(-1),
ignore_index=IGNORE,
)
print(f"Average loss (answer only): {loss_answer_only:.4f}")
print("→ 只有 answer token 被优化,thinking 自由发挥")
print()
# ============================================================
# 策略 C: Selective Weighting — thinking 降权
# ============================================================
print("=" * 60)
print("策略 C: Selective Weighting(thinking 降权 ×0.1)")
print("=" * 60)
def make_selective_weight_mask(labels, think_start=100, think_end=101,
thinking_weight=0.1, ignore_id=0):
"""
thinking token 和 answer token 都算 loss,但权重不同:
- thinking token: weight = 0.1
- answer token: weight = 1.0
- special token: 不算
"""
weight_mask = torch.zeros_like(labels, dtype=torch.float)
for b in range(labels.shape[0]):
in_thinking = False
for pos in range(labels.shape[1]):
tid = labels[b, pos].item()
if tid == think_start:
in_thinking = True
continue # special token 本身不算
elif tid == think_end:
in_thinking = False
continue # special token 本身不算
elif tid == ignore_id:
break # PAD 不算
if in_thinking:
weight_mask[b, pos] = thinking_weight # thinking: 0.1
else:
weight_mask[b, pos] = 1.0 # answer: 1.0
return weight_mask
weight_mask = make_selective_weight_mask(batch_labels)
print("权重矩阵:")
for b in range(2):
print(f" 样本{b}: {[f'{w:.1f}' for w in weight_mask[b].tolist()]}")
print()
# 加权 loss
loss_per_token = F.cross_entropy(
logits.view(-1, VOCAB_SIZE),
batch_labels.view(-1),
ignore_index=0,
reduction='none'
).view(2, 16)
weighted_loss = (loss_per_token * weight_mask).sum() / weight_mask.sum()
print(f"Average loss (selective weighting): {weighted_loss:.4f}")
print("→ thinking 有 10% 的优化信号,answer 有 100%")
# ============================================================
# 三种策略对比
# ============================================================
print(f"\n{'='*60}")
print("三种策略对比总结")
print("=" * 60)
print(f"""
┌──────────────────┬──────────┬──────────┬──────────┐
│ │ Full Loss│Answer Only│Selective │
├──────────────────┼──────────┼──────────┼──────────┤
│ Avg Loss │ {total_full:.4f} │ {loss_answer_only:.4f} │ {weighted_loss:.4f} │
│ thinking 被优化 │ ✅ │ ❌ │ 🔶0.1 │
│ answer 被优化 │ ✅ │ ✅ │ ✅ │
│ 思考不会退化 │ ✅ │ ❌ │ ✅ │
│ 实现复杂度 │ 低 │ 中 │ 中 │
└──────────────────┴──────────┴──────────┴──────────┘
""")
print("实际训练中怎么选?")
print(" SFT 阶段 → 通常用 Answer Only(让模型先学会格式)")
print(" RL 阶段 → 通常用 Full Loss(DeepSeek-R1 的做法)")
print(" 混合阶段 → 用 Selective Weighting 做平滑过渡")
print()
print("但所有策略的共同点:special token 本身(<think>/</think>)不算 loss。")
print("因为这些 token 只是标记,它们的「预测」没有意义。")
=== Thinking 模型 Loss Mask 完整实现 ===
Batch labels:
tensor([[100, 3, 1, 2, 11, 2, 0, 0, 101, 1, 0, 3, 1, 7,
3, 0],
[100, 5, 12, 6, 13, 3, 0, 101, 3, 0, 0, 0, 0, 0,
0, 0]])
============================================================
策略 A: Full Loss(全算)
============================================================
loss_mask (0=PAD不算, 1=算):
tensor([[1, 1, 1, 1, 1, 1, 0, 0, 1, 1, 0, 1, 1, 1, 1, 0],
[1, 1, 1, 1, 1, 1, 0, 1, 1, 0, 0, 0, 0, 0, 0, 0]], dtype=torch.int32)
每个位置的 loss:
tensor([[4.8708, 6.5770, 6.5824, 5.1351, 6.3231, 4.8046, 0.0000, 0.0000, 6.8438,
5.2807, 0.0000, 6.0612, 5.2646, 4.5894, 7.0696, 0.0000],
[4.7409, 5.9732, 6.1340, 6.0056, 6.7161, 6.5794, 0.0000, 5.6591, 2.5543,
0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000]])
Average loss (full): 5.6882
→ thinking 和 answer 的每个 token 都优化
============================================================
策略 B: Answer Only(只算答案部分)
============================================================
loss_mask (0=不算, 1=算):
tensor([[0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0]], dtype=torch.int32)
Token 标注 (T=thinking, A=answer, S=special, P=PAD):
样本0: [S T T T T T P P S] A P T T T T P
样本1: [S T T T T T P S] A P P P P P P P
Average loss (answer only): 3.9175
→ 只有 answer token 被优化,thinking 自由发挥
============================================================
策略 C: Selective Weighting(thinking 降权 ×0.1)
============================================================
权重矩阵:
样本0: ['0.0', '0.1', '0.1', '0.1', '0.1', '0.1', '0.0', '0.0', '0.0', '0.0', '0.0', '0.0', '0.0', '0.0', '0.0', '0.0']
样本1: ['0.0', '0.1', '0.1', '0.1', '0.1', '0.1', '0.0', '0.0', '0.0', '0.0', '0.0', '0.0', '0.0', '0.0', '0.0', '0.0']
Average loss (selective weighting): 6.0830
→ thinking 有 10% 的优化信号,answer 有 100%
============================================================
三种策略对比总结
============================================================
┌──────────────────┬──────────┬──────────┬──────────┐
│ │ Full Loss│Answer Only│Selective │
├──────────────────┼──────────┼──────────┼──────────┤
│ Avg Loss │ 5.6882 │ 3.9175 │ 6.0830 │
│ thinking 被优化 │ ✅ │ ❌ │ 🔶0.1 │
│ answer 被优化 │ ✅ │ ✅ │ ✅ │
│ 思考不会退化 │ ✅ │ ❌ │ ✅ │
│ 实现复杂度 │ 低 │ 中 │ 中 │
└──────────────────┴──────────┴──────────┴──────────┘