跳到主要内容

思维链与 Thinking 模型

357 × 289 等于多少?人算这道题也要列竖式一步步来。普通 next-token LLM 在没有显式推理过程时,往往要直接预测最终答案。对于多步数学、逻辑、代码题,这会把很多中间计算压在一次回答里,容易出错。

Chain-of-Thought(CoT)让模型把中间步骤写出来再给答案,准确率大幅提升。o1、DeepSeek-R1 这类 thinking 模型走得更远:它们不是推理时临时加思考,而是训练阶段就学会了「先想再答」。这一节拆解这背后的原理。

CoT 的原理可以用一句话概括:在 prompt 里示范或要求「先写中间步骤,再给最终答案」,让中间结果也进入上下文。经典来源是 Chain-of-Thought Prompting

写出中间步骤的好处是每一步都在缩小问题规模,而且中间某步出错时后续步骤还有机会修正。经典 CoT prompting 本身不改变模型参数;但如果模型在预训练、SFT 或 RL 中见过大量推理格式,它也可能已经学会了遇到复杂问题时先展开步骤。

Thinking 模型更进一步——它们在训练阶段就被强化学习塑造成「先产生内部思考链,再输出最终答案」的行为模式。DeepSeek-R1 的重要启发是:在数学、代码等可验证任务上,合适的 RL 奖励可以强化更长、更会检查的推理轨迹;但这依赖基座模型、奖励设计、采样规模和训练稳定性。参考:DeepSeek-R1

1. LLM 的推理缺陷

用户: 357 × 289 = ?

普通 LLM 的内部过程:
输入 token → embedding → transformer × N → 输出 "103173"

这里说“猜”是教学比喻。模型确实是在做 next-token prediction;它也可能学到可泛化的计算模式。问题是:如果不生成中间步骤,复杂计算更容易被压缩成一次直接预测。
如果训练数据里没有 357×289 这个具体组合,它通常会猜错。

根本原因:Transformer 的每一层都是固定计算量。不管问题是「1+1」还是「微积分」, 单次前向传播的层数固定,计算深度有限;生成 CoT 时,模型可以把前面生成的中间结果放回上下文,相当于用更多输出 token 换更多推理步骤。

就像让你心算 357×289,你不能直接给出答案,需要:

  1. 357×200 = 71400
  2. 357×80 = 28560
  3. 357×9 = 3213
  4. 71400 + 28560 + 3213 = 103173

LLM 也需要这个「分步计算」的过程——这就是 CoT。

2. Chain-of-Thought(CoT)

CoT 的核心思想极其简单:在 prompt 里示范「先写推理过程,再写答案」

没有 CoT 的 prompt:
Q: 357 × 289 = ?
A: 103173 ← 模型直接猜

有 CoT 的 prompt (Few-shot):
Q: 123 × 45 = ?
A: 123×40=4920, 123×5=615, 4920+615=5535。答案是 5535。

Q: 357 × 289 = ?
A: ← 模型会模仿上面的格式,先写过程再给答案

为什么有效? 因为模型在生成推理过程时,每个中间步骤的结果都变成了后续步骤的「上下文」。 Transformer 的 attention 能看到前面算出的中间结果,从而基于它们继续推理。

本质上:CoT 把「一次前向传播解决不了的问题」变成了「多次前向传播接力解决」

# 真实计算演示:CoT 为什么有效
import numpy as np
import random

print("=== CoT 为什么有效 ===")
print()

target = 357 * 289

# 没有 CoT:要求一步算出
print("没有 CoT:")
print(f" 模型需要一步算出 357 × 289 = ?")
print(f" 正确答案: {target:,}")
np.random.seed(42)
guesses = np.random.randint(80000, 120000, size=5)
closest = guesses[np.argmin(np.abs(guesses - target))]
print(f" 模型只能「猜」: {closest:,} (差了 {abs(closest - target):,})")
print(f" → 一步到位太难了!")
print()

# 有 CoT:分步计算
print("有 CoT (分步计算):")
result = 0
steps = []
for multiplier, label in [(200, "200"), (80, "80"), (9, "9")]:
partial = 357 * multiplier
result += partial
steps.append(partial)
print(f" 357 × {label} = {partial:,}")

total = sum(steps)
print(f" {' + '.join(f'{s:,}' for s in steps)} = {total:,}")
print()
print(f"✅ CoT 结果: {total:,} == 正确答案 {target:,}")
print()
print("关键:每一步的计算结果都可以被后续步骤引用。")
print("这相当于用「生成 token 的时间」换「计算深度」。")

=== CoT 为什么有效 ===

没有 CoT:
模型需要一步算出 357 × 289 = ?
正确答案: 103,173
模型只能「猜」: 95,795 (差了 7,378)
→ 一步到位太难了!

有 CoT (分步计算):
357 × 200 = 71,400
357 × 80 = 28,560
357 × 9 = 3,213
71,400 + 28,560 + 3,213 = 103,173

✅ CoT 结果: 103,173 == 正确答案 103,173

关键:每一步的计算结果都可以被后续步骤引用。
这相当于用「生成 token 的时间」换「计算深度」。

2.5 Self-Consistency(自一致性):一道题多问几次,投票决定

CoT 有一个问题:模型可能会在某条推理链上犯错。 同样一道数学题,你问模型 5 次(temperature > 0),它可能给出 3 种不同的推理过程和答案。

Self-Consistency 的核心思想

同一道题 → 采样 N 条不同的 CoT 推理链 → 对最终答案投票 → 得票最多的就是最终答案

为什么有效?

直觉:你拿一道难题问 5 个同学——

  • 每个同学都可能在某一步想歪
  • 但不同同学犯不同错误的概率高,犯相同错误的概率低
  • 正确答案只有一个,在多条正确/部分正确的推理链中出现的频率最高

数学上:如果单条推理链的正确率是 p,N 条链中大多数正确的概率随 N 增长:

p=0.6, N=1: 正确率 60%
p=0.6, N=5: 至少 3 条正确的概率 = C(5,3)p^3(1-p)^2 + ... ≈ 68%
p=0.7, N=5: 至少 3 条正确的概率 ≈ 84% ← 提升显著!

不做什么:不需要重新训练模型,不需要改模型架构,只需要在推理时多采样 + 投票

下面用一个模拟演示全过程:

# ============================================================
# Self-Consistency 演示:同一道题,5 条推理链,投票选答案
# ============================================================
import random
random.seed(42)

print("=" * 70)
print("题目:小明有 15 个苹果,给了小红 3 个,又买了 8 个,")
print(" 然后吃掉 2 个,最后把剩下的一半给了小刚。")
print(" 小明现在还有几个苹果?")
print("=" * 70)

# 模拟 5 条不同的推理链(模拟模型在 temperature > 0 时的不同采样)
# 每条链展示「推理过程」+「最终答案」

reasoning_chains = [
{
"id": 1,
"reasoning": [
"Step 1: 初始有 15 个",
"Step 2: 给小红 3 个 → 15 - 3 = 12 个",
"Step 3: 又买了 8 个 → 12 + 8 = 20 个",
"Step 4: 吃掉 2 个 → 20 - 2 = 18 个",
"Step 5: 把一半给小刚 → 18 / 2 = 9 个",
],
"answer": 9,
"correct": True
},
{
"id": 2,
"reasoning": [
"Step 1: 初始有 15 个",
"Step 2: 给小红 3 个 → 15 - 3 = 12 个",
"Step 3: 又买了 8 个 → 12 + 8 = 20 个",
"Step 4: 吃掉 2 个 → 20 - 2 = 18 个",
"Step 5: 一半给小刚 → 18 / 2 = 9 个",
],
"answer": 9,
"correct": True
},
{
"id": 3,
"reasoning": [
"Step 1: 初始有 15 个",
"Step 2: 给小红 3 个 → 15 - 3 = 12 个",
"Step 3: 又买了 8 个 → 12 + 8 = 20 个",
"Step 4: 吃掉 2 个 → 20 - 2 = 18 个",
"Step 5: 把剩下的一半给小刚 → 剩下 18 - 9 = 9 个", # 推理对,答案也对
],
"answer": 9,
"correct": True
},
{
"id": 4,
"reasoning": [
"Step 1: 初始有 15 个",
"Step 2: 给小红 3 个 → 15 - 3 = 12 个",
"Step 3: 又买了 8 个 → 12 + 8 = 20 个",
"Step 4: 吃掉 2 个 → 20 - 2 = 18 个",
"Step 5: 忘了除以 2!→ 答案 18 个", # ← 忘了最后一步!
],
"answer": 18,
"correct": False
},
{
"id": 5,
"reasoning": [
"Step 1: 初始有 15 个",
"Step 2: 给小红 3 个,买 8 个 → 总数 15 - 3 + 8 = 20 个",
"Step 3: 吃掉 2 个 → 20 - 2 = 18 个",
"Step 4: 一半给小刚 → 给 18/2 = 9 个,自己剩 9 个",
],
"answer": 9,
"correct": True
},
]

# 打印每条推理链
for chain in reasoning_chains:
print(f"\n--- 推理链 #{chain['id']} ---")
for step in chain['reasoning']:
print(f" {step}")
status = "✓ 正确" if chain['correct'] else "✗ 错误"
print(f" 答案: {chain['answer']} 个苹果 {status}")

# ============================================================
# 多数投票
# ============================================================
print(f"\n{'=' * 70}")
print("投票环节")
print(f"{'=' * 70}")

from collections import Counter
answers = [c['answer'] for c in reasoning_chains]
vote_counts = Counter(answers)

for ans, count in vote_counts.most_common():
correct_mark = "✓" if ans == 9 else "✗"
bar = "█" * count
print(f" 答案 {ans} 个: {count}{bar} {correct_mark}")

winner = vote_counts.most_common(1)[0][0]
winner_is_correct = (winner == 9)

print(f"\n 🏆 最终答案(多数投票): {winner} 个苹果")
print(f" 是否正确: {'✓ 正确!' if winner_is_correct else '✗ 错误'}")

# ============================================================
# 对比:单次 vs Self-Consistency
# ============================================================
print(f"\n{'=' * 70}")
print("对比分析")
print(f"{'=' * 70}")
print(f" 单次采样(随机取一条): 正确率 = 4/5 = 80%")
print(f" Self-Consistency (5 条投票): 正确率 = 100%(这次全都对了!)")
print(f"")
print(f" 关键洞察:")
print(f" - 推理链 #4 在最后一步犯了错(忘了除以 2)")
print(f" - 但其他 4 条链都对了,投票结果 = 9(正确)")
print(f" - Self-Consistency 把那条错误的「吞掉了」!")
print(f"")
print(f" 这就是 Self-Consistency 的力量:")
print(f" 「少数服从多数」——一条链犯错没关系,多条链一起不会犯同样的错。")
======================================================================
题目:小明有 15 个苹果,给了小红 3 个,又买了 8 个,
然后吃掉 2 个,最后把剩下的一半给了小刚。
小明现在还有几个苹果?
======================================================================

--- 推理链 #1 ---
Step 1: 初始有 15 个
Step 2: 给小红 3 个 → 15 - 3 = 12 个
Step 3: 又买了 8 个 → 12 + 8 = 20 个
Step 4: 吃掉 2 个 → 20 - 2 = 18 个
Step 5: 把一半给小刚 → 18 / 2 = 9 个
答案: 9 个苹果 ✓ 正确

--- 推理链 #2 ---
Step 1: 初始有 15 个
Step 2: 给小红 3 个 → 15 - 3 = 12 个
Step 3: 又买了 8 个 → 12 + 8 = 20 个
Step 4: 吃掉 2 个 → 20 - 2 = 18 个
Step 5: 一半给小刚 → 18 / 2 = 9 个
答案: 9 个苹果 ✓ 正确

--- 推理链 #3 ---
Step 1: 初始有 15 个
Step 2: 给小红 3 个 → 15 - 3 = 12 个
Step 3: 又买了 8 个 → 12 + 8 = 20 个
Step 4: 吃掉 2 个 → 20 - 2 = 18 个
Step 5: 把剩下的一半给小刚 → 剩下 18 - 9 = 9 个
答案: 9 个苹果 ✓ 正确

--- 推理链 #4 ---
Step 1: 初始有 15 个
Step 2: 给小红 3 个 → 15 - 3 = 12 个
Step 3: 又买了 8 个 → 12 + 8 = 20 个
Step 4: 吃掉 2 个 → 20 - 2 = 18 个
Step 5: 忘了除以 2!→ 答案 18 个
答案: 18 个苹果 ✗ 错误

--- 推理链 #5 ---
Step 1: 初始有 15 个
Step 2: 给小红 3 个,买 8 个 → 总数 15 - 3 + 8 = 20 个
Step 3: 吃掉 2 个 → 20 - 2 = 18 个
Step 4: 一半给小刚 → 给 18/2 = 9 个,自己剩 9 个
答案: 9 个苹果 ✓ 正确

======================================================================
投票环节
======================================================================
答案 9 个: 4 票 ████ ✓
答案 18 个: 1 票 █ ✗

🏆 最终答案(多数投票): 9 个苹果
是否正确: ✓ 正确!

======================================================================
对比分析
======================================================================
单次采样(随机取一条): 正确率 = 4/5 = 80%
Self-Consistency (5 条投票): 正确率 = 100%(这次全都对了!)

关键洞察:
- 推理链 #4 在最后一步犯了错(忘了除以 2)
- 但其他 4 条链都对了,投票结果 = 9(正确)
- Self-Consistency 把那条错误的「吞掉了」!

这就是 Self-Consistency 的力量:
「少数服从多数」——一条链犯错没关系,多条链一起不会犯同样的错。

2.5.1 Self-Consistency vs Thinking 模型的关系

Self-Consistency 和后面要讲的 Thinking 模型(o1、R1)有本质区别:

Self-ConsistencyThinking 模型
怎么做推理时多次采样 + 投票训练时就学会"自己反思"
成本N 倍推理成本(采样 N 次)1 次推理,但内部思考很长
需要训练吗不需要需要(RL 训练)
关键能力多样性采样自我验证、纠错、回溯

本质上

  • Self-Consistency 是外部纠错——靠多次采样来"碰"正确答案
  • Thinking 模型是内部纠错——在一条推理链里自己检查、自己改

后者更难训练,但推理时更高效(不需要重复采样)。 R1 一类模型的 self-verification 更适合理解为“在单条较长推理链中做检查和回溯”。它和 Self-Consistency 都利用了多步/多样信息,但机制不同,不能简单等同。Self-Consistency 参考:Wang et al. 2022

3. 从 CoT 到 Thinking 模型

CoT 的问题是:思考过程暴露给用户了。用户不一定想看你的草稿纸。

Thinking 模型的做法,是把“草稿纸”和“最终答案”分开:

用户看到的:
Q: 357 × 289 = ?
A: 103173

模型内部实际生成的:
<think>
357×200=71400
357×80=28560
357×9=3213
71400+28560+3213=103173
</think>
103173

这里的 <think></think> 就是 thinking 符号。 它们像一对括号:中间是思考草稿,后面才是最终回答。

前端或 API 可以选择只展示 </think> 后面的答案,把中间草稿折叠或隐藏。

本质:Thinking 模型 = CoT 的工程化封装。思考过程还在,只是被特殊符号分区管理。

3.5 Think 符号的底层原理:它是怎么被模型「理解」的

3.5.1 为什么 <think> 不是普通文字?

你可能会想:<think> 不就是几个字符吗?模型输出这几个字符就行了?

不是这么简单。

如果 <think> 只是普通文字,可能出现三个问题:

  1. 容易被 tokenizer 切碎<think> 可能变成 <think> 三个 token。
  2. 结束边界不稳定:模型可能在草稿里意外写出 </think>,导致前端提前结束 thinking。
  3. 训练不好控制:你很难稳定地区分 thinking 区间和 answer 区间。

所以一种更稳的做法是:把 <think></think> 加成 special tokens。但这不是唯一方案,有些模型也会使用普通字符串、chat template、API block 或单独字段来区分 reasoning 和 answer。

也就是说,它们要像 <BOS><EOS> 一样,有自己的独立 ID:

<think>   -> 100
</think> -> 101

这样模型生成 ID 100,就等于进入思考区间;生成 ID 101,就等于退出思考区间。

3.5.2 最新实践:怎么把新的 thinking 符号加入训练?

现代训练里,新增 <think> 这类符号一般分两种情况。

情况 A:从零训练 tokenizer 和模型

一开始就把这些符号放进 tokenizer 的 special token 列表:

<BOS>、<EOS>、<PAD>、<think>、</think>

这样训练 tokenizer 时,它们不会被拆碎;训练模型时,它们也会像普通 token 一样学到 embedding。

情况 B:在已有模型上继续训练

这是更常见的路线。比如你拿一个开源基座模型,想让它学会新的 thinking 格式,通常要做 4 步:

  1. 给 tokenizer 加符号:把 <think></think> 加成 special tokens。
  2. 扩模型 embedding:tokenizer 词表变大,模型的 Embedding 和输出层也要变大。
  3. 准备格式化数据:样本里真的要有 <think>...</think>
  4. 继续 SFT / RL:让模型通过 loss 或 reward 学会什么时候打开、什么时候关闭 thinking。

工程里常见的伪代码是:

new_tokens = {"additional_special_tokens": ["<think>", "</think>"]}
num_added = tokenizer.add_special_tokens(new_tokens)
model.resize_token_embeddings(len(tokenizer))

# 然后用带 <think>...</think> 的数据继续训练

这一步很容易被误解:加 token 只是给模型一支新笔,不等于它会写推理。 真正让它学会 thinking 的,是后面的训练数据、loss mask、reward 设计。

# 用最小例子演示:新增 <think> 符号后的训练样本
vocab = {
"<BOS>": 0,
"<EOS>": 1,
"<PAD>": 2,
"用户": 3,
"助手": 4,
"答案": 5,
"357": 6,
"289": 7,
"103173": 8,
}

new_symbols = ["<think>", "</think>"]
for symbol in new_symbols:
if symbol not in vocab:
vocab[symbol] = len(vocab)

train_tokens = [
"<BOS>",
"用户", "357", "289",
"助手", "<think>", "357", "289", "103173", "</think>",
"答案", "103173",
"<EOS>",
]
train_ids = [vocab[token] for token in train_tokens]

print("新增符号:")
for symbol in new_symbols:
print(f" {symbol} -> ID {vocab[symbol]}")

print()
print("训练样本:")
print(train_tokens)
print()
print("训练 ID:")
print(train_ids)
print()
print("关键观察:模型不是听懂了 '<think>' 这个英文单词,")
print("而是在训练中学会 ID 9 和 ID 10 之间应该写推理草稿。")
新增符号:
<think> -> ID 9
</think> -> ID 10

训练样本:
['<BOS>', '用户', '357', '289', '助手', '<think>', '357', '289', '103173', '</think>', '答案', '103173', '<EOS>']

训练 ID:
[0, 3, 6, 7, 4, 9, 6, 7, 8, 10, 5, 8, 1]

关键观察:模型不是听懂了 '<think>' 这个英文单词,
而是在训练中学会 ID 9 和 ID 10 之间应该写推理草稿。
# ============================================================
# 演示:Special Token 是怎么被分词的
# ============================================================
print("=== Special Token vs 普通文字:分词对比 ===\n")

# 模拟 tokenizer 的行为 — 用一个简化的 BPE 风格 tokenizer
# 核心问题:<think> 如果只是普通文字,它是什么命运?

# 假设这是 tokenizer 的词表(简化版)
vocab = {
"我": 1, "喜欢": 2, "吃": 3, "苹果": 4, "橘子": 5,
"think": 7, "<": 9, ">": 10, "/": 11,
"推理": 13, "过程": 14, "答案": 15, "是": 16,
# ↓ 关键:如果设为 special token,会有唯一 ID
# "<think>": 100,
# "</think>": 101,
}

print("【词表】普通 token(不含 special token):")
for k, v in sorted(vocab.items(), key=lambda x: x[1]):
print(f" '{k}' → {v}")
print()

# ========================================
# 场景 1:没有 special token 时
# ========================================
print("=" * 55)
print("场景 1:<think> 只是普通文字(没有加入词表)")
print("=" * 55)

def tokenize_plain(text, vocab):
"""模拟 BPE/word-level tokenizer,没有 special token 的情况"""
tokens = []
i = 0
while i < len(text):
# 按最长匹配找普通 token
matched = None
for word in sorted(vocab.keys(), key=len, reverse=True):
if text[i:].startswith(word):
matched = (word, vocab[word])
break
if matched:
tokens.append(matched)
i += len(matched[0])
else:
# 逐字符 fallback
ch = text[i]
tid = ord(ch) % 50 + 20 # 模拟 unknown token 的 id
tokens.append((ch if ch != ' ' else '⎵', tid))
i += 1
return tokens

text = "我喜欢<think>推理过程</think>答案是苹果"
tokens_plain = tokenize_plain(text, vocab)

print(f"输入: {text}")
print(f"\n分词结果 ({len(tokens_plain)} 个 token):")
ids = []
for word, tid in tokens_plain:
ids.append(tid)
flag = "⚠️拆碎" if tid in [9,10,11] else ""
print(f" [{word:6s}] → ID {tid:3d} {flag}")

print(f"\nToken ID 序列: {ids}")
print(f"\n关键问题:")
print(f" ❌ '<think>' 被切成 3 个 token: < think > 共 3 个 ID")
print(f" ❌ 如果 thinking 内容里出现 '<'(如 x<5) → 混乱")
print(f" ❌ 训练时无法用 token ID 精确定位 'thinking 从哪开始'")
print(f" ❌ 模型要学 '恰好按顺序输出这 5 个 token' → 极难学会")

# ========================================
# 场景 2:添加 special token 后
# ========================================
print(f"\n{'='*55}")
print("场景 2:<think> 设为 Special Token(更稳的做法之一)")
print("=" * 55)

# 把 special token 加入词表
vocab_with_special = vocab.copy()
vocab_with_special["<think>"] = 100
vocab_with_special["</think>"] = 101

print("词表新增:")
print(" '<think>' → 100 ★ special token")
print(" '</think>' → 101 ★ special token")
print()

def tokenize_with_special(text, vocab):
"""
tokenizer 会先扫描 special token(最长匹配优先),
再扫描普通 token。这是所有真实 tokenizer 的做法。
"""
tokens = []
i = 0
while i < len(text):
# Step 1:先匹配 special token(ID >= 100)
matched = None
for word in sorted(vocab.keys(), key=lambda x: (-len(x), x)):
if text[i:].startswith(word):
matched = (word, vocab[word])
break
if matched:
tokens.append(matched)
i += len(matched[0])
else:
ch = text[i]
tokens.append((ch, ord(ch) % 50 + 20))
i += 1
return tokens

tokens_special = tokenize_with_special(text, vocab_with_special)

print(f"输入: {text}")
print(f"\n分词结果 ({len(tokens_special)} 个 token):")
ids_special = []
for word, tid in tokens_special:
ids_special.append(tid)
flag = "★ 特殊" if tid >= 100 else ""
print(f" [{word:12s}] → ID {tid:3d} {flag}")

print(f"\nToken ID 序列: {ids_special}")
print(f"Token 数量: {len(tokens_plain)}{len(tokens_special)} 个")
print(f"\n关键优势:")
print(f" ✅ '<think>' = 一个 token (ID=100),不会被切碎")
print(f" ✅ thinking 内部出现 '<' '>' 无影响(先匹配 special)")
print(f" ✅ 检查 ID==100/101 即可精确定位 thinking 边界")
print(f" ✅ 模型只需学会 '在合适位置输出 token 100'")

# ========================================
# 用真实 tokenizer 演示(如果 tiktoken 可用)
# ========================================
print(f"\n{'='*55}")
print("真实 tokenizer 的做法")
print("=" * 55)

try:
import tiktoken

# GPT-2 的 tokenizer(没有 think special token)
enc = tiktoken.get_encoding("gpt2")
plain_result = enc.encode("<think>")
print(f"\nGPT-2 tokenizer 编码 '<think>':")
print(f" Token IDs: {plain_result}")
print(f" 每个 token: {[enc.decode([t]) for t in plain_result]}")
print(f" → 被切成 {len(plain_result)} 个 token(因为 GPT-2 词表里没有 '<think>')")

# 说明真实做法
print(f"\nDeepSeek-R1 / Qwen3 的做法:")
print(f" 1. 在 tokenizer 词表中添加 special token:")
print(f" tokenizer.add_special_tokens({{'<think>': ..., '</think>': ...}})")
print(f" 2. 模型 embedding 矩阵扩大一行(为新 token)")
print(f" 3. 现在 '<think>' 就是一个完整的 token 了")

except ImportError:
print("\n(未安装 tiktoken,跳过真实 tokenizer 演示)")
print("pip install tiktoken 即可运行上述演示")
print()
print("以 GPT-4 的 cl100k_base 为例:")
print(" '<think>' 如果没加入词表 → 可能被切成多个 BPE token")
print(" 加入词表后 → 1 个 token → 模型学习成本降低 5 倍")

print(f"\n总结: add_special_tokens() 是训练 thinking 模型的第一步。")
=== Special Token vs 普通文字:分词对比 ===

【词表】普通 token(不含 special token):
'我' → 1
'喜欢' → 2
'吃' → 3
'苹果' → 4
'橘子' → 5
'think' → 7
'<' → 9
'>' → 10
'/' → 11
'推理' → 13
'过程' → 14
'答案' → 15
'是' → 16

=======================================================
场景 1:<think> 只是普通文字(没有加入词表)
=======================================================
输入: 我喜欢<think>推理过程</think>答案是苹果

分词结果 (14 个 token):
[我 ] → ID 1
[喜欢 ] → ID 2
[< ] → ID 9 ⚠️拆碎
[think ] → ID 7
[> ] → ID 10 ⚠️拆碎
[推理 ] → ID 13
[过程 ] → ID 14
[< ] → ID 9 ⚠️拆碎
[/ ] → ID 11 ⚠️拆碎
[think ] → ID 7
[> ] → ID 10 ⚠️拆碎
[答案 ] → ID 15
[是 ] → ID 16
[苹果 ] → ID 4

Token ID 序列: [1, 2, 9, 7, 10, 13, 14, 9, 11, 7, 10, 15, 16, 4]

关键问题:
❌ '<think>' 被切成 3 个 token: < think > 共 3 个 ID
❌ 如果 thinking 内容里出现 '<'(如 x<5) → 混乱
❌ 训练时无法用 token ID 精确定位 'thinking 从哪开始'
❌ 模型要学 '恰好按顺序输出这 5 个 token' → 极难学会

=======================================================
场景 2:<think> 设为 Special Token(正确做法)
=======================================================
词表新增:
'<think>' → 100 ★ special token
'</think>' → 101 ★ special token

输入: 我喜欢<think>推理过程</think>答案是苹果

分词结果 (9 个 token):
[我 ] → ID 1
[喜欢 ] → ID 2
[<think> ] → ID 100 ★ 特殊
[推理 ] → ID 13
[过程 ] → ID 14
[</think> ] → ID 101 ★ 特殊
[答案 ] → ID 15
[是 ] → ID 16
[苹果 ] → ID 4

Token ID 序列: [1, 2, 100, 13, 14, 101, 15, 16, 4]
Token 数量: 14 → 9 个

关键优势:
✅ '<think>' = 一个 token (ID=100),不会被切碎
✅ thinking 内部出现 '<' '>' 无影响(先匹配 special)
✅ 检查 ID==100/101 即可精确定位 thinking 边界
✅ 模型只需学会 '在合适位置输出 token 100'

=======================================================
真实 tokenizer 的做法
=======================================================

GPT-2 tokenizer 编码 '<think>':
Token IDs: [27, 14925, 29]
每个 token: ['<', 'think', '>']
→ 被切成 3 个 token(因为 GPT-2 词表里没有 '<think>')

DeepSeek-R1 / Qwen3 的做法:
1. 在 tokenizer 词表中添加 special token:
tokenizer.add_special_tokens({'<think>': ..., '</think>': ...})
2. 模型 embedding 矩阵扩大一行(为新 token)
3. 现在 '<think>' 就是一个完整的 token 了

总结: add_special_tokens() 是训练 thinking 模型的第一步。

3.6 Thinking 模型训练时的 Loss 怎么算

有了 <think> 这种 special token,一个重要的问题出现了:

训练时,thinking 部分的 token 要算 loss 吗?

这有三种策略,各有优劣。我们先看一个具体例子:

完整的 assistant 输出 token 序列:
[<think>, 3, 1, 2, ×, 2, 0, 0, =, 7, 1, 4, 0, 0, </think>, 1, 0, 3, 1, 7, 3]
│←──────── thinking tokens ────────────→│←─ answer tokens ─→│
14 个 token 6 个 token

策略对比

┌─────────────────────┬──────────────────┬──────────────────┬──────────────────┐
│ │ 策略 A: Full Loss│ 策略 B: Answer │ 策略 C: Selective│
│ │ (全算 loss) │ Only (只算答案) │ Weighting(加权) │
├─────────────────────┼──────────────────┼──────────────────┼──────────────────┤
│ thinking token 的 │ ✅ 算 │ ❌ 不算 │ 🔶 算但降权 │
│ loss │ │ │ (×0.1) │
│ answer token 的 │ ✅ 算 │ ✅ 算 │ ✅ 算 │
│ loss │ │ │ │
│ special token 的 │ ✅ 算 │ ❌ 不算 │ ❌ 不算 │
│ loss (<think>等) │ │ │ │
├─────────────────────┼──────────────────┼──────────────────┼──────────────────┤
│ 优点 │ 简单直接 │ 只关心最终答案 │ 平衡两者 │
│ │ 思考质量也被优化 │ 思考自由度高 │ 思考有引导但不 │
│ │ │ │ 主导训练 │
├─────────────────────┼──────────────────┼──────────────────┼──────────────────┤
│ 缺点 │ 思考可能变成 │ 思考过程可能 │ 需要调权重超参 │
│ │ 「表演」而非推理 │ 退化或乱写 │ │
├─────────────────────┼──────────────────┼──────────────────┼──────────────────┤
│ 代表模型/场景 │ DeepSeek-R1 │ 早期 thinking │ 部分 RL 实验 │
│ │ (RL 阶段全算) │ 实验 │ │
└─────────────────────┴──────────────────┴──────────────────┴──────────────────┘

SFT 和 RL 阶段要分开理解

SFT 阶段可以讨论 full loss / answer-only / selective weighting:如果完全不监督 thinking 区间,模型可能只学到最终答案格式,thinking 行为不稳定。

RL 阶段不是普通 CE loss “全算 token”。模型先采样回答,再根据结果 reward 优化策略;更长推理是否被强化,取决于 reward、格式约束、采样长度和训练稳定性。

但全算 loss 也有代价:模型可能学会「表演式思考」—— 写一堆看起来像推理但实际无用的 token,只是因为这些 token 的 loss 低(模型很确定)。 RL 的结果奖励可以抑制一部分无用推理,但不能保证所有 thinking 都真实有效;仍需要格式检查、长度控制和任务评测。

实现上怎么做?

核心是构造一个 loss_mask——一个和 labels 同样形状的 0/1 数组:

# labels:   [100, 3, 1, 2, 11, ..., 101, 1, 0, 3, 1, 7, 3]
# loss_mask:[ 0, 1, 1, 1, 1, ..., 0, 1, 1, 1, 1, 1, 1]
# ↑ special token 不算 ↑ special token 不算
#
# 最终 loss = cross_entropy(logits, labels) * loss_mask
# → special token 位置 loss=0 → 不产生梯度

下面用代码完整实现这三种策略。

# ============================================================
# 完整实现:三种 Loss 策略的 loss_mask 构造
# ============================================================
import torch
import torch.nn.functional as F

import torch.nn as nn

print("=== Thinking 模型 Loss Mask 完整实现 ===\n")

# 模拟一个 batch 的 token 序列
# Special token ID: <think>=100, </think>=101
# 普通 token: 1-50
# PAD=0, IGNORE=-100(PyTorch 标准忽略值)
THINK_START = 100
THINK_END = 101
IGNORE = -100

# 模拟训练数据:两个样本的 labels
# 样本 1: <think> 3,1,2,11,2,0,0 </think> 1,0,3,1,7,3 ← 正常的 thinking→answer
# 样本 2: <think> 5,×,6,=,3,0 </think> 3,0 ← 简短的 thinking→answer
batch_labels = torch.tensor([
[100, 3, 1, 2, 11, 2, 0, 0, 101, 1, 0, 3, 1, 7, 3, 0],
[100, 5, 12, 6, 13, 3, 0, 101, 3, 0, 0, 0, 0, 0, 0, 0],
]) # shape: [2, 16]

# 模拟模型输出的 logits
VOCAB_SIZE = 200
logits = torch.randn(2, 16, VOCAB_SIZE)

print("Batch labels:")
print(batch_labels)
print()

# ============================================================
# 策略 A: Full Loss — 所有 token 都算 loss
# ============================================================
print("=" * 60)
print("策略 A: Full Loss(全算)")
print("=" * 60)

def make_full_loss_mask(labels, ignore_id=0):
"""最简单的 mask:只忽略 PAD token"""
return (labels != ignore_id).float()

mask_full = make_full_loss_mask(batch_labels)
print("loss_mask (0=PAD不算, 1=算):")
print(mask_full.int())
print()

# 计算 loss
loss_full = F.cross_entropy(
logits.view(-1, VOCAB_SIZE),
batch_labels.view(-1),
ignore_index=0, # PAD 不参与
reduction='none'
).view(2, 16)

print("每个位置的 loss:")
print(loss_full)
print()

total_full = loss_full.sum() / mask_full.sum()
print(f"Average loss (full): {total_full:.4f}")
print("→ thinking 和 answer 的每个 token 都优化")
print()

# ============================================================
# 策略 B: Answer Only — 只算 </think> 之后的 loss
# ============================================================
print("=" * 60)
print("策略 B: Answer Only(只算答案部分)")
print("=" * 60)

def make_answer_only_mask(labels, think_start=100, think_end=101, ignore_id=0):
"""
只让 </think> 之后的 token 参与 loss 计算。
逻辑:从每个序列中找到 think_end 的位置,之后的 token 才参与 loss。
"""
mask = torch.zeros_like(labels, dtype=torch.float)

for b in range(labels.shape[0]):
# 找 </think> 的位置
end_positions = (labels[b] == think_end).nonzero(as_tuple=True)[0]
if len(end_positions) > 0:
end_pos = end_positions[0].item()
# </think> 之后(不含)到 EOS/PAD 之前
for pos in range(end_pos + 1, labels.shape[1]):
if labels[b, pos] == ignore_id:
break
mask[b, pos] = 1.0
# 如果没找到 think_end → 整个序列都可能是非 thinking 模式 → 全算

return mask

mask_answer_only = make_answer_only_mask(batch_labels)
print("loss_mask (0=不算, 1=算):")
print(mask_answer_only.int())
print()

# 标注哪些 token 属于 thinking,哪些属于 answer
print("Token 标注 (T=thinking, A=answer, S=special, P=PAD):")
for b in range(2):
row = ""
for pos in range(16):
tid = batch_labels[b, pos].item()
if tid == 0:
row += " P "
elif tid == 100:
row += "[S "
elif tid == 101:
row += " S]"
elif mask_answer_only[b, pos] == 1:
row += " A "
else:
row += " T "
print(f" 样本{b}: {row}")
print()

# 构造 masked labels:不参与 loss 的位置设为 IGNORE
labels_masked_B = batch_labels.clone()
labels_masked_B[mask_answer_only == 0] = IGNORE

loss_answer_only = F.cross_entropy(
logits.view(-1, VOCAB_SIZE),
labels_masked_B.view(-1),
ignore_index=IGNORE,
)
print(f"Average loss (answer only): {loss_answer_only:.4f}")
print("→ 只有 answer token 被优化,thinking 自由发挥")
print()

# ============================================================
# 策略 C: Selective Weighting — thinking 降权
# ============================================================
print("=" * 60)
print("策略 C: Selective Weighting(thinking 降权 ×0.1)")
print("=" * 60)

def make_selective_weight_mask(labels, think_start=100, think_end=101,
thinking_weight=0.1, ignore_id=0):
"""
thinking token 和 answer token 都算 loss,但权重不同:
- thinking token: weight = 0.1
- answer token: weight = 1.0
- special token: 不算
"""
weight_mask = torch.zeros_like(labels, dtype=torch.float)

for b in range(labels.shape[0]):
in_thinking = False
for pos in range(labels.shape[1]):
tid = labels[b, pos].item()

if tid == think_start:
in_thinking = True
continue # special token 本身不算
elif tid == think_end:
in_thinking = False
continue # special token 本身不算
elif tid == ignore_id:
break # PAD 不算

if in_thinking:
weight_mask[b, pos] = thinking_weight # thinking: 0.1
else:
weight_mask[b, pos] = 1.0 # answer: 1.0

return weight_mask

weight_mask = make_selective_weight_mask(batch_labels)
print("权重矩阵:")
for b in range(2):
print(f" 样本{b}: {[f'{w:.1f}' for w in weight_mask[b].tolist()]}")
print()

# 加权 loss
loss_per_token = F.cross_entropy(
logits.view(-1, VOCAB_SIZE),
batch_labels.view(-1),
ignore_index=0,
reduction='none'
).view(2, 16)

weighted_loss = (loss_per_token * weight_mask).sum() / weight_mask.sum()
print(f"Average loss (selective weighting): {weighted_loss:.4f}")
print("→ thinking 有 10% 的优化信号,answer 有 100%")

# ============================================================
# 三种策略对比
# ============================================================
print(f"\n{'='*60}")
print("三种策略对比总结")
print("=" * 60)
print(f"""
┌──────────────────┬──────────┬──────────┬──────────┐
│ │ Full Loss│Answer Only│Selective │
├──────────────────┼──────────┼──────────┼──────────┤
│ Avg Loss │ {total_full:.4f}{loss_answer_only:.4f}{weighted_loss:.4f}
│ thinking 被优化 │ ✅ │ ❌ │ 🔶0.1 │
│ answer 被优化 │ ✅ │ ✅ │ ✅ │
│ 思考不会退化 │ ✅ │ ❌ │ ✅ │
│ 实现复杂度 │ 低 │ 中 │ 中 │
└──────────────────┴──────────┴──────────┴──────────┘
""")

print("实际训练中怎么选?")
print(" SFT 阶段 → 通常用 Answer Only(让模型先学会格式)")
print(" RL 阶段 → 通常用 Full Loss(DeepSeek-R1 的做法)")
print(" 混合阶段 → 用 Selective Weighting 做平滑过渡")
print()
print("但所有策略的共同点:special token 本身(<think>/</think>)不算 loss。")
print("因为这些 token 只是标记,它们的「预测」没有意义。")

=== Thinking 模型 Loss Mask 完整实现 ===

Batch labels:
tensor([[100, 3, 1, 2, 11, 2, 0, 0, 101, 1, 0, 3, 1, 7,
3, 0],
[100, 5, 12, 6, 13, 3, 0, 101, 3, 0, 0, 0, 0, 0,
0, 0]])

============================================================
策略 A: Full Loss(全算)
============================================================
loss_mask (0=PAD不算, 1=算):
tensor([[1, 1, 1, 1, 1, 1, 0, 0, 1, 1, 0, 1, 1, 1, 1, 0],
[1, 1, 1, 1, 1, 1, 0, 1, 1, 0, 0, 0, 0, 0, 0, 0]], dtype=torch.int32)

每个位置的 loss:
tensor([[4.8708, 6.5770, 6.5824, 5.1351, 6.3231, 4.8046, 0.0000, 0.0000, 6.8438,
5.2807, 0.0000, 6.0612, 5.2646, 4.5894, 7.0696, 0.0000],
[4.7409, 5.9732, 6.1340, 6.0056, 6.7161, 6.5794, 0.0000, 5.6591, 2.5543,
0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000]])

Average loss (full): 5.6882
→ thinking 和 answer 的每个 token 都优化

============================================================
策略 B: Answer Only(只算答案部分)
============================================================
loss_mask (0=不算, 1=算):
tensor([[0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0]], dtype=torch.int32)

Token 标注 (T=thinking, A=answer, S=special, P=PAD):
样本0: [S T T T T T P P S] A P T T T T P
样本1: [S T T T T T P S] A P P P P P P P

Average loss (answer only): 3.9175
→ 只有 answer token 被优化,thinking 自由发挥

============================================================
策略 C: Selective Weighting(thinking 降权 ×0.1)
============================================================
权重矩阵:
样本0: ['0.0', '0.1', '0.1', '0.1', '0.1', '0.1', '0.0', '0.0', '0.0', '0.0', '0.0', '0.0', '0.0', '0.0', '0.0', '0.0']
样本1: ['0.0', '0.1', '0.1', '0.1', '0.1', '0.1', '0.0', '0.0', '0.0', '0.0', '0.0', '0.0', '0.0', '0.0', '0.0', '0.0']

Average loss (selective weighting): 6.0830
→ thinking 有 10% 的优化信号,answer 有 100%

============================================================
三种策略对比总结
============================================================

┌──────────────────┬──────────┬──────────┬──────────┐
│ │ Full Loss│Answer Only│Selective │
├──────────────────┼──────────┼──────────┼──────────┤
│ Avg Loss │ 5.6882 │ 3.9175 │ 6.0830 │
│ thinking 被优化 │ ✅ │ ❌ │ 🔶0.1 │
│ answer 被优化 │ ✅ │ ✅ │ ✅ │
│ 思考不会退化 │ ✅ │ ❌ │ ✅ │
│ 实现复杂度 │ 低 │ 中 │ 中 │
└──────────────────┴──────────┴──────────┴──────────┘

实际训练中怎么选?
SFT 阶段 → 通常用 Answer Only(让模型先学会格式)
RL 阶段 → 通常用 Full Loss(DeepSeek-R1 的做法)
混合阶段 → 用 Selective Weighting 做平滑过渡

但所有策略的共同点:special token 本身(<think>/</think>)不算 loss。
因为这些 token 只是标记,它们的「预测」没有意义。

4. Thinking 模型的训练方法

训练一个 thinking 模型分四步:

Step 1: 冷启动 SFT
收集几千条「带思考过程」的高质量数据
格式: Q → thinking... → A
用这些数据做监督微调,让模型学会「先想再说」的格式

Step 2: RL 推理训练(核心!)
用强化学习训练模型的推理能力
奖励信号:
- 答案正确 → +1
- 答案错误 → -1
- 格式正确(有 thinking 标签)→ +0.1
- 语言一致(thinking 和 answer 同语言)→ +0.1
模型自己探索更好的推理路径

Step 3: 拒绝采样 + SFT
用训练好的模型生成大量「问题→思考→答案」数据
只保留答案正确的样本
用这些高质量数据再做一轮 SFT

Step 4: 全场景 RL
在更多类型的数据上做 RL(有用性、安全性等)
让模型不仅会推理,还能好好聊天

最关键的是 Step 2:RL 让模型自己探索推理策略,而不是死记硬背人类的推理过程。

# 模拟 RL 训练中「奖励信号」的作用
print("=== RL 推理训练模拟 ===")
print()

print("问题: 15 + 27 = ?")
print()

# 模拟模型尝试不同的推理路径
attempts = [
("15+20=35, 35+7=42", 42, True),
("15+27=42", 42, True),
("15+30=45, 45-3=42", 42, True),
("直接猜: 41", 41, False),
("10+20=30, 5+7=12, 30+12=42", 42, True),
]

for i, (reasoning, answer, correct) in enumerate(attempts):
reward = 1 if correct else -1
# 额外奖励:推理步骤详细
steps = len(reasoning.split(','))
detail_bonus = min(steps * 0.05, 0.2)
total_reward = reward + detail_bonus

status = '✅' if correct else '❌'
print(f"尝试{i+1}: {status} {reasoning}")
print(f" 答案={answer}, 正确={correct}, 基础奖励={reward}, 详细奖励={detail_bonus:.2f}")
print(f" 总奖励={total_reward:+.2f}")
print()

print("RL 会强化高奖励的推理模式,抑制低奖励的。")
print("模型逐渐学会:推理越详细、越正确 → 奖励越高。")
=== RL 推理训练模拟 ===

问题: 15 + 27 = ?

尝试1: ✅ 15+20=35, 35+7=42
答案=42, 正确=True, 基础奖励=1, 详细奖励=0.10
总奖励=+1.10

尝试2: ✅ 15+27=42
答案=42, 正确=True, 基础奖励=1, 详细奖励=0.05
总奖励=+1.05

尝试3: ✅ 15+30=45, 45-3=42
答案=42, 正确=True, 基础奖励=1, 详细奖励=0.10
总奖励=+1.10

尝试4: ❌ 直接猜: 41
答案=41, 正确=False, 基础奖励=-1, 详细奖励=0.05
总奖励=-0.95

尝试5: ✅ 10+20=30, 5+7=12, 30+12=42
答案=42, 正确=True, 基础奖励=1, 详细奖励=0.15
总奖励=+1.15

RL 会强化高奖励的推理模式,抑制低奖励的。
模型逐渐学会:推理越详细、越正确 → 奖励越高。

5. 训练自己的 Thinking 模型

不需要从零开始!基于开源模型微调即可。完整流程:

1. 选基座模型
→ Qwen2.5-7B / DeepSeek-V3 / Llama-3 等
→ 要求:基座模型本身推理能力不能太差

2. 准备冷启动数据(几千条可做教学实验,真实效果要看质量和覆盖面)
→ 用强模型生成可检查的「问题→步骤→答案」数据;注意 reasoning API 可能不允许导出隐藏推理
→ 或者从 GSM8K、MATH 等数据集中提取
→ 格式:
<|user|>357 × 289 = ?
<|assistant|><think>
357×200=71400
357×80=28560
357×9=3213
71400+28560+3213=103173
</think>
103173

3. 冷启动 SFT(教学实验可用 LoRA 小规模跑通;耗时取决于模型、序列长度和硬件)
→ 用 LLaMA-Factory / Axolotl 等工具
→ 让模型学会 <think>/答案 格式

4. RL 训练(R1 类路线的重要环节;耗时取决于 rollout 数、题目数、平均生成长度和硬件)
→ 用 verl / OpenRLHF 等框架
→ 奖励函数:答案正确 + 格式正确
→ 数学题用规则验证(答案对不对一目了然)
→ 代码题用测试用例验证

5. 拒绝采样 + 第二轮 SFT
→ 用训练好的模型生成更多数据
→ 过滤掉错误的,保留正确的
→ 再做一轮 SFT 巩固

成本估算:以 Qwen2.5-7B 这类规模做参考,完整成本不能用固定价格概括;要先确定题目数、每题采样次数、平均生成长度、训练 epoch 和 GPU/API 单价。

# 模拟冷启动数据的格式
print("=== 冷启动数据格式示例 ===")
print()

training_examples = [
{
"question": "一个长方形的长是 12cm,宽是 8cm,求面积。",
"thinking": "长方形面积 = 长 × 宽\n面积 = 12 × 8 = 96\n所以面积是 96 平方厘米。",
"answer": "96 平方厘米"
},
{
"question": "小明有 15 个苹果,吃了 3 个,又买了 7 个,现在有几个?",
"thinking": "初始: 15 个\n吃了 3 个: 15 - 3 = 12 个\n又买了 7 个: 12 + 7 = 19 个\n所以现在有 19 个苹果。",
"answer": "19 个"
},
{
"question": "357 × 289 = ?",
"thinking": "357 × 200 = 71400\n357 × 80 = 28560\n357 × 9 = 3213\n71400 + 28560 + 3213 = 103173",
"answer": "103173"
}
]

for i, ex in enumerate(training_examples):
print(f"--- 样本 {i+1} ---")
print(f"<|user|>\n{ex['question']}\n")
print(f"<|assistant|><think>\n{ex['thinking']}\n</think>\n{ex['answer']}")
print()

print("模型在 SFT 阶段学习的就是这个格式。")
print("RL 阶段则让模型自己探索更好的 thinking 内容。")
=== 冷启动数据格式示例 ===

--- 样本 1 ---
<|user|>
一个长方形的长是 12cm,宽是 8cm,求面积。

<|assistant|><think>
长方形面积 = 长 × 宽
面积 = 12 × 8 = 96
所以面积是 96 平方厘米。
</think>
96 平方厘米

--- 样本 2 ---
<|user|>
小明有 15 个苹果,吃了 3 个,又买了 7 个,现在有几个?

<|assistant|><think>
初始: 15 个
吃了 3 个: 15 - 3 = 12 个
又买了 7 个: 12 + 7 = 19 个
所以现在有 19 个苹果。
</think>
19 个

--- 样本 3 ---
<|user|>
357 × 289 = ?

<|assistant|><think>
357 × 200 = 71400
357 × 80 = 28560
357 × 9 = 3213
71400 + 28560 + 3213 = 103173
</think>
103173

模型在 SFT 阶段学习的就是这个格式。
RL 阶段则让模型自己探索更好的 thinking 内容。

6. Thinking 模型的「啊哈时刻」

DeepSeek-R1 论文里提到一个有趣的现象:模型在 RL 训练中自己学会了「反思」。

模型的 thinking 过程:
第一步: 我觉得答案是 42...
第二步: 等等,让我重新检查一下...
第三步: 哦不对,15+27 应该是 42,但我刚才的推理有问题...
第四步: 重新算: 15+20=35, 35+7=42。确认,答案是 42。

DeepSeek-R1 论文观察到 R1-Zero 在 RL 中出现了类似“重新检查”的 aha moment。更稳妥的理解是:在可验证任务上,奖励信号可能鼓励模型探索出更长、更会检查的推理轨迹;这不是保证,也不等于模型像人一样理解自己。

# 模拟 RL 训练中「反思行为」的涌现
print("=== RL 训练中的反思行为涌现 ===")
print()

def evaluate_reasoning(reasoning, correct_answer):
"""评估推理过程,返回 (答案, 是否正确, 奖励)"""
import re
numbers = re.findall(r'[\d.]+', reasoning)
answer = float(numbers[-1]) if numbers else None
correct = (answer == correct_answer)

reward = 1.0 if correct else -1.0
steps = reasoning.count('+') + reasoning.count('-') + reasoning.count('×') + reasoning.count('=')
reward += min(steps * 0.05, 0.2)
has_check = any(w in reasoning for w in ['验证', '检查', '确认', '重新', '不对'])
if has_check and correct:
reward += 0.15
return answer, correct, reward

correct_answer = 42.0
stages = [
("训练初期", [
"15+27=41",
"15+27=44",
"15+27=39",
]),
("训练中期(开始反思)", [
"15+27=41... 不对,重新算。15+20=35, 35+7=42。验证:42-27=15 ✓",
"15+20=35, 35+7=42",
"10+20=30, 5+7=12, 30+12=42。确认正确。",
]),
("训练后期(稳定推理)", [
"15+20=35, 35+7=42。验证:42-27=15 ✓",
"先算 15+20=35,再加 7=42。检查:42-15=27 ✓",
"15+27: 拆分为 15+20=35, 35+7=42。验证 42-27=15, 正确。",
]),
]

for stage_name, outputs in stages:
print(f"📋 {stage_name}:")
total_reward = 0
for reasoning in outputs:
answer, correct, reward = evaluate_reasoning(reasoning, correct_answer)
status = '✅' if correct else '❌'
total_reward += reward
print(f" {status} {reasoning[:60]}...")
print(f" 答案={answer}, 奖励={reward:+.2f}")
avg_reward = total_reward / len(outputs)
print(f" 平均奖励: {avg_reward:+.2f}")
print()

print("趋势: 初期猜测为主 → 中期偶然反思(奖励高) → 后期反思成为习惯")
print("这就是涌现行为:没有人教模型要反思,是 RL 奖励让它自己发现的。")
=== RL 训练中的反思行为涌现 ===

📋 训练初期:
❌ 15+27=41...
答案=41.0, 奖励=-0.90
❌ 15+27=44...
答案=44.0, 奖励=-0.90
❌ 15+27=39...
答案=39.0, 奖励=-0.90
平均奖励: -0.90

📋 训练中期(开始反思):
❌ 15+27=41... 不对,重新算。15+20=35, 35+7=42。验证:42-27=15 ✓...
答案=15.0, 奖励=-0.80
✅ 15+20=35, 35+7=42...
答案=42.0, 奖励=+1.20
✅ 10+20=30, 5+7=12, 30+12=42。确认正确。...
答案=42.0, 奖励=+1.35
平均奖励: +0.58

📋 训练后期(稳定推理):
❌ 15+20=35, 35+7=42。验证:42-27=15 ✓...
答案=15.0, 奖励=-0.80
❌ 先算 15+20=35,再加 7=42。检查:42-15=27 ✓...
答案=27.0, 奖励=-0.80
❌ 15+27: 拆分为 15+20=35, 35+7=42。验证 42-27=15, 正确。...
答案=15.0, 奖励=-0.80
平均奖励: -0.80

趋势: 初期猜测为主 → 中期偶然反思(奖励高) → 后期反思成为习惯
这就是涌现行为:没有人教模型要反思,是 RL 奖励让它自己发现的。

7. Thinking 模型的局限性

局限说明
思考过程可能很长(几百到几千 token),用户要等
思考 token 也要算钱(API 按 token 计费)
过度思考简单问题也想半天——「1+1=?」也写 500 字推理
语言混合思考时可能中英文混杂,影响可读性
不可控RL 训练出的思考策略是黑盒,不一定符合人类期望

更适合:数学、编程、逻辑推理、多约束规划、科学推导。 不一定适合:闲聊、短问答、风格写作、情感陪伴、低延迟场景。原因不是 thinking 没用,而是额外 reasoning token 会增加延迟、成本,并可能导致 overthinking。

8. Function Calling 与 Tool Use

前面讨论的 CoT 和 Thinking 让模型「更会推理」。但有些问题光靠推理不够——模型不知道今天的天气,不会执行代码,也不能直接查数据库。

Function Calling 的做法是:让模型输出结构化的「调用请求」,由外部程序执行后把结果喂回模型。模型本身不执行任何操作,它只是生成「我想调用什么函数、传什么参数」的 JSON。

# Function Calling 的完整流程演示

# Step 1: 定义可用工具
tools = [
{
"name": "get_weather",
"description": "获取指定城市的天气信息",
"parameters": {
"city": {"type": "string", "description": "城市名"},
"unit": {"type": "string", "enum": ["celsius", "fahrenheit"]}
}
},
{
"name": "calculate",
"description": "执行数学计算",
"parameters": {
"expression": {"type": "string", "description": "数学表达式,如 '2+3*4'"}
}
}
]

print("=== 定义的工具 ===")
for tool in tools:
print(f" {tool['name']}: {tool['description']}")
print(f" 参数: {list(tool['parameters'].keys())}")
print()
# Step 2: 模拟模型的 Function Calling 输出
# 实际中,模型经过训练后会输出结构化 JSON,而不是自然语言

import json

# 用户提问
user_query = "北京今天多少度?另外帮我算一下 357 * 289"

# 模型「决定」要调用两个函数(实际是模型生成的 JSON 输出)
model_output = json.dumps({
"tool_calls": [
{"name": "get_weather", "arguments": {"city": "北京", "unit": "celsius"}},
{"name": "calculate", "arguments": {"expression": "357*289"}}
]
}, ensure_ascii=False, indent=2)

print(f"用户: {user_query}")
print(f"\n模型输出(不是自然语言,而是 JSON):")
print(model_output)
# Step 3: 外部程序执行函数调用,把结果喂回模型

import random
import json

random.seed(42)

# 模拟工具执行
def execute_tool(name, arguments):
if name == "get_weather":
return {"temperature": 22, "condition": "晴", "humidity": 45}
elif name == "calculate":
return {"result": eval(arguments["expression"])}
return {"error": "unknown tool"}

# 执行并收集结果
tool_calls = json.loads(model_output)["tool_calls"]
results = []
for call in tool_calls:
result = execute_tool(call["name"], call["arguments"])
results.append({"name": call["name"], "result": result})
print(f"调用 {call['name']}({call['arguments']})")
print(f" → 返回: {result}")

print(f"\n这些结果会被拼入对话历史,模型再基于它们生成最终自然语言回复")
print(f"\n最终回复示例: \"北京今天 22°C,晴天。357×289 = 103,173。\"")

Function Calling 的本质

从技术角度看,Function Calling 就是结构化输出(JSON mode)+ prompt 引导

  1. 把工具描述(name、parameters、description)拼入 system prompt
  2. 模型经过微调,学会了输出 JSON 格式的调用请求,而不是自然语言
  3. 外部程序解析 JSON、执行函数、把结果拼回对话
  4. 模型看到结果后,生成最终的自然语言回复
用户提问 → [模型输出 JSON] → 外部执行 → 结果拼入对话 → [模型输出自然语言]

实际使用中的注意事项:

注意点说明
工具描述要清晰模型根据 description 决定是否调用,描述不清会调错
参数类型要严格用 JSON Schema 约束类型,模型可能生成非法参数
并行调用一个问题可能需要调多个工具(上面演示了这种情况)
错误处理工具执行可能失败,模型需要能处理错误结果
不要用来做计算简单计算让模型自己算就行,工具适合需要外部数据的场景

9. 主流 Thinking 模型对比

模型训练方式特点
OpenAI o 系列训练细节未公开闭源 reasoning models,API 提供 reasoning effort / reasoning 配置
DeepSeek-R1论文公开较详细公开论文描述了 cold-start、RL、rejection sampling、SFT 等多阶段流程
Qwen3官方文档支持 thinking/non-thinking支持通过 chat template / 参数切换 thinking 行为
Kimi / 其他推理模型公开信息随版本变化只在有官方技术报告或模型卡时写具体训练方式

共同点:都在产品或模型行为上区分“思考/推理预算”和“最终回答”,但闭源模型的训练细节不能写成确定事实。

10. 实操:启动与切换 Thinking 模式

每个平台启动 thinking 的方式都不一样,而且这些 API 会随着模型版本变化。下面讲的是截至当前资料更稳妥的用法:记住“看模型文档”,不要死记某一个参数永远可用。


10.1 OpenAI reasoning models(o 系列、GPT-5 系列)

OpenAI 的 reasoning models 通常通过 reasoning_effort 或 Responses API 里的 reasoning 配置控制思考强度:

常见取值: "low" | "medium" | "high"
部分新模型还支持 "minimal" 或 "none",以官方模型文档为准。

关键点

  • 支持 reasoning 的是 OpenAI reasoning models,例如 o3、o4-mini、GPT-5 系列等。
  • GPT-4o 这类普通模型不要默认当成 reasoning model 使用。
  • API 通常不会返回完整隐藏思考过程,但可能返回 reasoning token 的用量。
  • 计费和上下文占用要把 reasoning token 算进去。

参考:OpenAI reasoning guideOpenAI models


10.2 DeepSeek-R1 / DeepSeek thinking mode

DeepSeek-R1 这一类开源 thinking 模型,常用 Chat Template 把思考区间和最终答案分开。有些前端会把模型生成的 <think>...</think> 区间折叠起来:

模型输出格式:
<think>
先分析题目条件...
检查计算是否正确...
</think>
42

前端处理:
- 默认折叠 `<think>...</think>` 之间的内容
- 只显示 `</think>` 之后的最终答案

容易误解的一点:不能简单说“DeepSeek 永远不能关闭 thinking”。本地 R1 / R1-distill 模型通常会自然输出 thinking 格式;但 DeepSeek 官方 API 的新 thinking mode 已经支持通过参数打开或关闭 thinking。

常见情况:

  • deepseek-reasoner 会返回 reasoning_content 和最终 content
  • 新版 DeepSeek Chat/Reasoner API 支持 thinking.enabled 这类开关,具体以官方文档为准。

参考:DeepSeek reasoning modelDeepSeek thinking mode


10.3 Qwen3(明确支持 Thinking / Non-Thinking 双模式)

Qwen3 是少数在官方 chat template 中明确支持 thinking/non-thinking 切换的主流开源模型之一:

方式一: 通过 chat template 参数
enable_thinking=True → 进入 thinking 模式
enable_thinking=False → 关闭 thinking,直接回答

方式二: 通过在对话中发指令
用户: /think → 开启 thinking
用户: /no_think → 关闭 thinking

原理:Qwen3 训练时同时学了 thinking 和 non-thinking 两种模式的数据,所以它能根据 enable_thinking 开关来切换行为。

实际用法(HuggingFace Transformers):

# 开启 thinking
messages = [{"role": "user", "content": "357×289=?"}]
text = tokenizer.apply_chat_template(
messages, tokenize=False,
enable_thinking=True
)

# 关闭 thinking
text = tokenizer.apply_chat_template(
messages, tokenize=False,
enable_thinking=False
)

参考:Qwen3-8B model cardQwen quickstart


10.4 Anthropic Claude(Extended Thinking)

Claude 的 Extended Thinking 也有版本差异。Claude 3.7/4.x 曾支持手动设置 budget_tokens,但较新的 Opus 4.7/4.8 文档推荐 adaptive thinking,不再支持手动 budget_tokens。所以写代码时要先确认模型版本。

旧式手动 budget 的形态大致如下:

response = client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=20000,
thinking={
"type": "enabled",
"budget_tokens": 4000
},
messages=[{"role": "user", "content": "证明根号2是无理数"}]
)

新版模型可能改用 effort 或 adaptive thinking。参考:Anthropic extended thinking docs


10.5 本地推理(vLLM / SGLang / Ollama)

如果用 vLLM 或 SGLang 部署 R1 蒸馏模型,thinking 行为通常由 chat template 和推理框架的 parser 控制:

# vLLM 部署 DeepSeek-R1-Distill-Qwen-7B
vllm serve deepseek-ai/DeepSeek-R1-Distill-Qwen-7B \
--enable-reasoning \
--reasoning-parser deepseek_r1

前端可以选择显示、折叠或过滤 <think>...</think>。这不是模型“没有思考”,而是展示层决定是否把 thinking 内容给用户看。


总结:各平台开关 Thinking 方式对比

平台/模型启动方式能否关闭思考内容可见
OpenAI reasoning modelsreasoning_effort / reasoning 配置可调力度,取值随模型变化通常不可见,只看 token 用量
DeepSeek APIdeepseek-reasonerthinking.enabled新版 API 可配置;本地 R1 行为看模板API 可返回 reasoning_content
Qwen3enable_thinking=True/False/think/no_think可切换模板控制显隐
ClaudeExtended Thinking / adaptive thinking取决于模型版本分 block 返回或按版本策略返回
本地 R1 蒸馏Chat template + parser多数情况模型行为固定,前端可过滤可显示或折叠
# ============================================================
# 实操:各平台 API 调用示例(真实可运行代码)
# 注意:需要设置环境变量 API key 才能真实调用
# 如果没有 key,会打印 curl 等效命令并跳过
# ============================================================

import os

# ----------------------------------------------------------
# 1. OpenAI o3 API 调用
# ----------------------------------------------------------
openai_key = os.environ.get("OPENAI_API_KEY")
if openai_key:
from openai import OpenAI
client = OpenAI(api_key=openai_key)

response = client.chat.completions.create(
model="o3-mini",
reasoning_effort="medium", # low | medium | high
messages=[
{"role": "user", "content": "357 x 289 = ?"}
]
)

print("=== OpenAI o3-mini 结果 ===")
print(f"答案: {response.choices[0].message.content}")
print(f"Token 用量: {response.usage}")
else:
print("=== OpenAI o3 API (未设置 OPENAI_API_KEY,跳过) ===")
print("等效 curl:")
print('curl https://api.openai.com/v1/chat/completions \\')
print(' -H "Content-Type: application/json" \\')
print(' -H "Authorization: Bearer $OPENAI_API_KEY" \\')
print(" -d '{\"model\":\"o3-mini\",\"reasoning_effort\":\"medium\",\"messages\":[{\"role\":\"user\",\"content\":\"357x289=?\"}]}'")

# ----------------------------------------------------------
# 2. DeepSeek-R1 API 调用
# ----------------------------------------------------------
deepseek_key = os.environ.get("DEEPSEEK_API_KEY")
if deepseek_key:
from openai import OpenAI
client = OpenAI(
api_key=deepseek_key,
base_url="https://api.deepseek.com"
)

response = client.chat.completions.create(
model="deepseek-reasoner",
messages=[
{"role": "user", "content": "357 x 289 = ?"}
]
)

print("\n=== DeepSeek-R1 结果 ===")
rc = response.choices[0].message.reasoning_content
print(f"思考过程: {rc[:200]}...")
print(f"最终答案: {response.choices[0].message.content}")
else:
print("\n=== DeepSeek-R1 API (未设置 DEEPSEEK_API_KEY,跳过) ===")
print("等效 curl:")
print('curl https://api.deepseek.com/chat/completions \\')
print(' -H "Content-Type: application/json" \\')
print(' -H "Authorization: Bearer $DEEPSEEK_API_KEY" \\')
print(" -d '{\"model\":\"deepseek-reasoner\",\"messages\":[{\"role\":\"user\",\"content\":\"357x289=?\"}]}'")

# ----------------------------------------------------------
# 3. Qwen3 本地推理(HuggingFace Transformers + thinking 开关)
# ----------------------------------------------------------
QWEN3_MODEL = "Qwen/Qwen3-8B"
try:
from transformers import AutoModelForCausalLM, AutoTokenizer
import torch

print("\n=== Qwen3 本地推理 ===")
tokenizer = AutoTokenizer.from_pretrained(QWEN3_MODEL)
model = AutoModelForCausalLM.from_pretrained(
QWEN3_MODEL,
torch_dtype="auto",
device_map="auto"
)

messages = [{"role": "user", "content": "357 x 289 = ?"}]

# --- 方式 A: 开启 thinking ---
print("\n>>> enable_thinking=True (开启思考模式)")
text_on = tokenizer.apply_chat_template(
messages,
tokenize=False,
enable_thinking=True
)
inputs = tokenizer(text_on, return_tensors="pt").to(model.device)
with torch.no_grad():
outputs = model.generate(**inputs, max_new_tokens=1024)
result = tokenizer.decode(outputs[0], skip_special_tokens=False)
print(f"输出(含 thinking 标签): {result[:300]}...")

# --- 方式 B: 关闭 thinking ---
print("\n>>> enable_thinking=False (关闭思考,直接回答)")
text_off = tokenizer.apply_chat_template(
messages,
tokenize=False,
enable_thinking=False
)
inputs = tokenizer(text_off, return_tensors="pt").to(model.device)
with torch.no_grad():
outputs = model.generate(**inputs, max_new_tokens=512)
result = tokenizer.decode(outputs[0], skip_special_tokens=False)
print(f"输出: {result[:300]}...")

except Exception as e:
print(f"\n=== Qwen3 本地推理 (跳过: {type(e).__name__}) ===")
print(f"原因: {e}")
print()
print("如果要在本地运行 Qwen3,执行:")
print(" pip install transformers torch accelerate")
print(f' model = AutoModelForCausalLM.from_pretrained("{QWEN3_MODEL}", device_map="auto")')
print(" tokenizer.apply_chat_template(messages, enable_thinking=True)")

# ----------------------------------------------------------
# 4. Claude Extended Thinking API 调用
# ----------------------------------------------------------
anthropic_key = os.environ.get("ANTHROPIC_API_KEY")
if anthropic_key:
import anthropic
client = anthropic.Anthropic(api_key=anthropic_key)

response = client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=20000,
thinking={
"type": "enabled",
"budget_tokens": 4000
},
messages=[
{"role": "user", "content": "证明根号 2 是无理数"}
]
)

print("\n=== Claude Extended Thinking 结果 ===")
for block in response.content:
if block.type == "thinking":
print(f"思考过程: {block.thinking[:200]}...")
elif block.type == "text":
print(f"最终回答: {block.text[:200]}...")
else:
print("\n=== Claude API (未设置 ANTHROPIC_API_KEY,跳过) ===")
print("等效 curl:")
print('curl https://api.anthropic.com/v1/messages \\')
print(' -H "x-api-key: $ANTHROPIC_API_KEY" \\')
print(' -H "anthropic-version: 2023-06-01" \\')
print(' -H "content-type: application/json" \\')
print(" -d '{\"model\":\"claude-sonnet-4-20250514\",\"max_tokens\":20000,\"thinking\":{\"type\":\"enabled\",\"budget_tokens\":4000},\"messages\":[{\"role\":\"user\",\"content\":\"证明根号2是无理数\"}]}'")

# ----------------------------------------------------------
# 5. 通过 OpenRouter 调用(OpenAI 兼容 API,支持多模型)
# ----------------------------------------------------------
openrouter_key = os.environ.get("OPENROUTER_API_KEY")
if openrouter_key:
from openai import OpenAI
client = OpenAI(
api_key=openrouter_key,
base_url="https://openrouter.ai/api/v1"
)

response = client.chat.completions.create(
model="deepseek/deepseek-r1",
messages=[
{"role": "user", "content": "什么是熵?用一句话解释。"}
]
)

print("\n=== OpenRouter (DeepSeek-R1) 结果 ===")
msg = response.choices[0].message
reasoning = getattr(msg, "reasoning", None)
if reasoning:
print(f"思考过程: {reasoning[:200]}...")
print(f"最终答案: {msg.content[:200]}...")
else:
print("\n=== OpenRouter API (未设置 OPENROUTER_API_KEY,跳过) ===")
print("注册获取 key: https://openrouter.ai/keys")
print("支持模型: deepseek/deepseek-r1, openai/o1, anthropic/claude-sonnet-4 等")
print()
print("等效 curl:")
print('curl https://openrouter.ai/api/v1/chat/completions \\')
print(' -H "Authorization: Bearer $OPENROUTER_API_KEY" \\')
print(' -H "Content-Type: application/json" \\')
print(" -d '{\"model\":\"deepseek/deepseek-r1\",\"messages\":[{\"role\":\"user\",\"content\":\"什么是熵?\"}]}'")

=== OpenAI o3 API (未设置 OPENAI_API_KEY,跳过) ===
等效 curl:
curl https://api.openai.com/v1/chat/completions \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $OPENAI_API_KEY" \
-d '{"model":"o3-mini","reasoning_effort":"medium","messages":[{"role":"user","content":"357x289=?"}]}'

=== DeepSeek-R1 API (未设置 DEEPSEEK_API_KEY,跳过) ===
等效 curl:
curl https://api.deepseek.com/chat/completions \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $DEEPSEEK_API_KEY" \
-d '{"model":"deepseek-reasoner","messages":[{"role":"user","content":"357x289=?"}]}'

=== Qwen3 本地推理 (跳过: ModuleNotFoundError) ===
原因: No module named 'transformers'

如果要在本地运行 Qwen3,执行:
pip install transformers torch accelerate
model = AutoModelForCausalLM.from_pretrained("Qwen/Qwen3-8B", device_map="auto")
tokenizer.apply_chat_template(messages, enable_thinking=True)

=== Claude API (未设置 ANTHROPIC_API_KEY,跳过) ===
等效 curl:
curl https://api.anthropic.com/v1/messages \
-H "x-api-key: $ANTHROPIC_API_KEY" \
-H "anthropic-version: 2023-06-01" \
-H "content-type: application/json" \
-d '{"model":"claude-sonnet-4-20250514","max_tokens":20000,"thinking":{"type":"enabled","budget_tokens":4000},"messages":[{"role":"user","content":"证明根号2是无理数"}]}'

=== OpenRouter API (未设置 OPENROUTER_API_KEY,跳过) ===
注册获取 key: https://openrouter.ai/keys
支持模型: deepseek/deepseek-r1, openai/o1, anthropic/claude-sonnet-4 等

等效 curl:
curl https://openrouter.ai/api/v1/chat/completions \
-H "Authorization: Bearer $OPENROUTER_API_KEY" \
-H "Content-Type: application/json" \
-d '{"model":"deepseek/deepseek-r1","messages":[{"role":"user","content":"什么是熵?"}]}'

11. 实战:训练 Thinking 模型

下面是一个教学版完整流程,基于 Qwen2.5-7B,目标是理解“先想再说”的推理模型怎么训练。它适合做小规模实验;如果想训出接近 DeepSeek-R1/Qwen3 级别的能力,数据规模、RL 采样量、奖励设计和算力都会高很多。


Step 1: 环境准备

# 创建虚拟环境
conda create -n thinking-train python=3.10 -y
conda activate thinking-train

# 安装核心依赖
pip install torch==2.4.0 transformers datasets accelerate
pip install vllm # 用于高效推理和生成训练数据

# 安装训练框架(选一个)
pip install llama-factory # 推荐新手用,有 Web UI
# 或者
pip install axolotl # 更灵活,适合进阶

Step 2: 准备冷启动数据(~3000-5000 条)

冷启动数据的核心是 「问题 → 思考过程 → 答案」 三元组。

数据来源

  1. 用强模型生成可检查的解题步骤;如果使用 reasoning API,要遵守对应平台对 hidden reasoning 的输出策略
  2. 从公开数据集提取:GSM8K(小学数学)、MATH(竞赛数学)、APPS(编程)
  3. 混合中英文数据,让模型不会偏向某种语言

数据格式(Qwen3 / DeepSeek 风格):

[
{
"messages": [
{"role": "user", "content": "一个水箱,进水管 3 小时注满,出水管 5 小时排空。两管同开,几小时注满?"},
{"role": "assistant", "content": "<think>\n进水速率 = 1/3 箱/小时\n出水速率 = 1/5 箱/小时\n净速率 = 1/3 - 1/5 = 2/15 箱/小时\n注满需要 1/(2/15) = 15/2 = 7.5 小时\n</think>\n7.5 小时"}
]
}
]

重要:思考过程要详细但不过度。对简单题写 50-100 字,对难题写 200-500 字。 如果每道「1+1=?」都写 500 字推理,训练出来的模型会有严重的 overthinking。


Step 3: 冷启动 SFT(让模型学会格式)

这是让模型学会 <think> / 答案格式的关键一步。

用 LLaMA-Factory(推荐,有 GUI 不容易出错):

# 启动 LLaMA-Factory Web UI
llamafactory-cli webui

# 在 Web UI 中:
# 1. 选择模型: Qwen/Qwen2.5-7B-Instruct
# 2. 上传你的冷启动数据集
# 3. 训练类型: Supervised Fine-Tuning
# 4. LoRA: rank=64, alpha=128
# 5. 学习率: 5e-5, epochs: 3
# 6. 序列长度: 4096
# 7. batch_size: 4, gradient_accumulation: 4
# 8. 开始训练!

用命令行

llamafactory-cli train \
--model_name_or_path Qwen/Qwen2.5-7B-Instruct \
--dataset my_cot_coldstart \
--template qwen \
--finetuning_type lora \
--lora_rank 64 \
--lora_alpha 128 \
--output_dir ./output/qwen-sft-cot \
--per_device_train_batch_size 4 \
--gradient_accumulation_steps 4 \
--lr_scheduler_type cosine \
--logging_steps 10 \
--save_steps 500 \
--learning_rate 5e-5 \
--num_train_epochs 3 \
--bf16

预期效果:训练完后模型会给每个问题写 thinking,但质量取决于冷启动数据的质量。


Step 4: RL 推理训练(核心,让模型变聪明)

SFT 主要让模型学会格式和基础解题风格;RL 是 R1 类路线中的关键环节之一,但数据、基座模型、奖励函数、采样规模同样关键。

用 verl(常用于 RLHF/RL 训练实验的开源框架之一)

# 安装 verl
git clone https://github.com/volcengine/verl.git
cd verl
pip install -e .

# 准备奖励函数数据集:只需要 (问题, 正确答案)
# 格式: {"prompt": "...", "answer": "..."}

verl 配置文件 (r1_train.yaml):

# 模型配置
actor_rollout_ref:
model:
path: ./output/qwen-sft-cot # 冷启动后的模型
use_fused_kernels: true
rollout:
n: 4 # 每个 prompt 生成 4 个候选回答
temperature: 1.0

# 奖励函数
reward_fn:
# 数学题:比较模型输出和标准答案
- name: math_verify
weight: 0.7
# 格式检查:确保有 rubber 标签
- name: format_check
weight: 0.2
# 语言一致性
- name: language_consistency
weight: 0.1

# 训练配置
trainer:
n_gpus_per_node: 4
nnodes: 1
total_epochs: 1
project_name: "my-thinking-model"
# 启动 RL 训练
python -m verl.trainer.main_ppo \
--config-name r1_train.yaml

用 OpenRLHF(备选,更容易上手):

# 安装
git clone https://github.com/OpenRLHF/OpenRLHF.git
cd OpenRLHF
pip install -e .

# Ray 集群模式(4 卡)
ray start --head --num-gpus=4

python -m openrlhf.cli.train_ppo_ray \
--ref_num_nodes 1 \
--ref_num_gpus_per_node 2 \
--reward_num_nodes 1 \
--reward_num_gpus_per_node 1 \
--actor_num_nodes 1 \
--actor_num_gpus_per_node 1 \
--pretrain ./output/qwen-sft-cot \
--reward_fn math_verify \
--save_path ./output/qwen-rl \
--prompt_data ./data/math_prompts.jsonl

RL 训练的几个关键经验

要点说明
数学题是 RL 最好的起点数学题答案对错一目了然,不需要人工标注奖励
格式奖励不能太高太高会让模型只关注格式,不关注正确性
温度要高(0.7-1.0)RL 需要探索,低温会抑制新策略的发现
每个 prompt 生成 4-8 个候选足够的采样才能发现好的推理路径
奖励函数就是你的「教学大纲」你奖励什么,模型就学什么

Step 5: 拒绝采样 + 第二轮 SFT

# 用训练好的模型给 5000 个新问题生成回答
# 只保留答案正确的样本(「拒绝」错误的)
python scripts/rejection_sampling.py \
--model ./output/qwen-rl \
--prompts ./data/new_prompts.jsonl \
--output ./data/rl_filtered.jsonl \
--num_samples 4 \
--keep_correct_only

# 用过滤后的高质量数据再做一轮 SFT
llamafactory-cli train \
--model_name_or_path ./output/qwen-rl \
--dataset rl_filtered \
--template qwen \
--finetuning_type lora \
--output_dir ./output/qwen-final \
--learning_rate 1e-5 \
--num_train_epochs 2

成本与时间估算

步骤硬件时间费用(云 GPU)
冷启动 SFT1×A100 (80G)小时级取决于云厂商单价
RL 推理训练通常需要多卡或高吞吐推理资源可能从数小时到多天与 rollout 数、题目数、采样长度、并行策略强相关
拒绝采样 + SFT1×A100 (80G)小时级与生成样本量强相关

这里不要死记一个价格。小规模数学题实验可能几百美元跑通流程;想得到稳定、有泛化能力的 thinking 模型,成本通常会明显更高。真正要估算费用,应先确定题目数、每题采样次数、平均生成长度、训练 epoch 和 GPU 单价。


新手最容易踩的坑

  1. 冷启动数据里 thinking 太短 → 模型学会了格式但不会推理 → 解决:确保 thinking 至少 50+ 字
  2. 奖励函数只奖励正确性 → 模型学会了作弊(比如直接输出答案不思考) → 解决:加格式奖励
  3. 数学题数据集太简单 → RL 很快收敛但推理能力没提升 → 解决:混合难题比例至少 30%
  4. 训练集和测试集相同 → 模型死记硬背而不是学会推理 → 解决:严格按题目 ID 划分 train/test
  5. 用错了 chat template → thinking 标签渲染错误,模型行为异常 → 解决:确认模型官方文档的 template

懒人方案:不训练直接用 R1 蒸馏模型

如果你只想 thinking 模型而不想训练,直接用 DeepSeek 官方发布的蒸馏模型:

# DeepSeek-R1 蒸馏版(已具备 thinking 能力,无需训练)
# 1.5B 版:适合玩具项目
# 7B 版: 适合研究和实验
# 14B 版: 推理能力明显提升
# 32B 版: 接近 R1 原版的推理水平
# 70B 版: 最强蒸馏版

# 下载直接用
from transformers import AutoModelForCausalLM, AutoTokenizer

model_name = "deepseek-ai/DeepSeek-R1-Distill-Qwen-7B"
model = AutoModelForCausalLM.from_pretrained(model_name, device_map="auto")
tokenizer = AutoTokenizer.from_pretrained(model_name)

# 这个模型已经会 thinking 了!格式和 R1 完全一样。
# 如果你想做「关闭 thinking」,可以基于这个做非 thinking 数据来微调

蒸馏模型本质上就是 R1 的「学生模型」——用 R1 的 thinking 数据训练的。 它们的推理能力远强于同级别的普通模型,但不如原版 R1。

# ============================================================
# 实操:训练脚本(所有函数均为完整可运行实现)
# ============================================================

import json
import os

import re

# ----------------------------------------------------------
# A. 冷启动数据生成(用 DeepSeek API 生成训练数据)
# ----------------------------------------------------------
def generate_coldstart_data(questions, output_path="coldstart_data.json"):
"""用 DeepSeek-R1 API 批量生成 问题-思考-答案 训练数据"""
deepseek_key = os.environ.get("DEEPSEEK_API_KEY")
if not deepseek_key:
print("SKIP: 需要设置 DEEPSEEK_API_KEY 环境变量")
return []

from openai import OpenAI
client = OpenAI(api_key=deepseek_key, base_url="https://api.deepseek.com")

SYSTEM_PROMPT = """你是一个数学推理助手。对于每个问题请:
1. 先写出详细的逐步推理过程
2. 每步说清楚计算逻辑
3. 最后给出简洁的答案

重要:输出格式必须是:
<think>
(推理过程)
</think>
(最终答案)
"""

dataset = []
for i, question in enumerate(questions):
print(f"[{i+1}/{len(questions)}] {question[:50]}...")
try:
response = client.chat.completions.create(
model="deepseek-reasoner",
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": question}
]
)
thinking = response.choices[0].message.reasoning_content or ""
answer = response.choices[0].message.content or ""

content = f"<think>\n{thinking}\n</think>\n{answer}"
dataset.append({
"messages": [
{"role": "user", "content": question},
{"role": "assistant", "content": content}
]
})
except Exception as e:
print(f" 失败: {e}")

with open(output_path, "w", encoding="utf-8") as f:
json.dump(dataset, f, ensure_ascii=False, indent=2)
print(f"已保存 {len(dataset)} 条到 {output_path}")
return dataset

# 示例问题集
SAMPLE_QUESTIONS = [
"一个长方形的长是 12cm,宽是 8cm,求面积。",
"小明有 15 个苹果,吃了 3 个,又买了 7 个,现在有几个?",
"357 x 289 = ?",
"一个水箱,进水管 3 小时注满,出水管 5 小时排空。两管同开,几小时注满?",
"等差数列 3, 7, 11, ... 的第 20 项是多少?",
]

if os.environ.get("DEEPSEEK_API_KEY"):
generate_coldstart_data(SAMPLE_QUESTIONS[:2]) # 演示 2 条
else:
print("=== generate_coldstart_data 函数已定义 ===")
print("设置 DEEPSEEK_API_KEY 后调用 generate_coldstart_data(questions) 即可")
print("\n示例数据结构:")
example = {
"messages": [
{"role": "user", "content": "357 x 289 = ?"},
{
"role": "assistant",
"content": (
"<think>\n"
"357 x 200 = 71400\n"
"357 x 80 = 28560\n"
"357 x 9 = 3213\n"
"71400 + 28560 + 3213 = 103173\n"
"</think>\n"
"103173"
)
}
]
}
print(json.dumps(example, ensure_ascii=False, indent=2))

# ----------------------------------------------------------
# B. RL 奖励函数(完整实现,可直接用于 verl/OpenRLHF)
# ----------------------------------------------------------
def extract_thinking_and_answer(completion):
"""从模型输出中提取 thinking 内容和最终答案"""
thinking = ""
answer = completion.strip()

think_match = re.search(r"<think>(.*?)</think>", completion, re.DOTALL)
if think_match:
thinking = think_match.group(1).strip()
answer = completion[think_match.end():].strip()

return thinking, answer

def normalize_number(text):
"""从文本中提取数字并标准化,用于答案比对"""
text = text.strip().replace(" ", "").replace(",", "")
match = re.search(r"-?[\d.]+(?:/-?[\d.]+)?", text)
if match:
num_str = match.group()
if "/" in num_str:
parts = num_str.split("/")
try:
return str(float(parts[0]) / float(parts[1]))
except (ValueError, ZeroDivisionError):
return num_str
try:
return str(float(num_str))
except ValueError:
return num_str
return text

def compute_reward(completion, ground_truth):
"""
计算一条模型输出的奖励值。
奖励组成:格式正确 +0.15 | 答案正确 +1.0 | thinking 长度 +0.05
惩罚:格式严重不对 -1.0 | 答案错误 -0.5
"""
reward = 0.0
thinking, answer = extract_thinking_and_answer(completion)

has_thinking = "<think>" in completion
has_answer_tag = "</think>" in completion

if not has_answer_tag:
return -1.0

if has_thinking and has_answer_tag:
reward += 0.15

pred_norm = normalize_number(answer)
gt_norm = normalize_number(ground_truth)

if pred_norm == gt_norm:
reward += 1.0
else:
try:
pv = float(pred_norm)
gv = float(gt_norm)
if abs(pv - gv) / max(abs(gv), 1e-8) < 0.001:
reward += 1.0
else:
reward -= 0.5
except ValueError:
reward -= 0.5

if len(thinking) >= 30:
reward += 0.05

return reward

# 测试奖励函数
print("=== RL 奖励函数测试 ===\n")
test_cases = [
(
"<think>15+20=35, 35+7=42</think>\n42",
"42",
),
(
"<think>15+27=41</think>\n41",
"42",
),
(
"没有思考过程\n答案是42",
"42",
),
(
"<think>算一下</think>\n7.5",
"7.5",
),
]

for completion, gt in test_cases:
reward = compute_reward(completion, gt)
print(f"奖励: {reward:+.2f} | 输出片段: {completion[:50]}...")
print()

# ----------------------------------------------------------
# C. 拒绝采样(完整实现,可直接用于第二轮 SFT 数据准备)
# ----------------------------------------------------------
def rejection_sampling(model, tokenizer, prompts, num_samples=4):
"""
对每个 prompt 生成 num_samples 个候选回答,
只保留答案正确的,选 thinking 最详细的。

Args:
model: HuggingFace 模型实例
tokenizer: HuggingFace tokenizer 实例
prompts: [{"question": "...", "answer": "..."}, ...]
num_samples: 每个问题生成几个候选
Returns:
过滤后的训练数据列表
"""
import torch
device = next(model.parameters()).device
filtered_data = []
total_candidates = 0
total_correct = 0

for i, prompt in enumerate(prompts):
question = prompt["question"]
ground_truth = prompt["answer"]
candidates = []

for _ in range(num_samples):
messages = [{"role": "user", "content": question}]
text = tokenizer.apply_chat_template(
messages, tokenize=False, add_generation_prompt=True
)
inputs = tokenizer(text, return_tensors="pt").to(device)

with torch.no_grad():
outputs = model.generate(
**inputs,
max_new_tokens=1024,
temperature=0.8,
do_sample=True,
)

completion = tokenizer.decode(
outputs[0][inputs["input_ids"].shape[1]:],
skip_special_tokens=True
)

thinking, answer = extract_thinking_and_answer(completion)
is_correct = compute_reward(completion, ground_truth) > 0.5
candidates.append((thinking, answer, is_correct))
total_candidates += 1
if is_correct:
total_correct += 1

correct = [(t, a) for t, a, ok in candidates if ok]
if correct:
best_t, best_a = max(correct, key=lambda x: len(x[0]))
content = f"<think>\n{best_t}\n</think>\n{best_a}"
filtered_data.append({
"messages": [
{"role": "user", "content": question},
{"role": "assistant", "content": content}
]
})

if (i + 1) % 10 == 0:
print(f"进度: {i+1}/{len(prompts)} | "
f"候选={total_candidates} 正确={total_correct} "
f"保留={len(filtered_data)}")

return filtered_data

print("=== rejection_sampling 函数已定义 ===")
print()
print("使用方法:")
print(" from transformers import AutoModelForCausalLM, AutoTokenizer")
print(' model = AutoModelForCausalLM.from_pretrained("./output/qwen-rl")')
print(" prompts = [{'question': '...', 'answer': '...'}, ...]")
print(" new_data = rejection_sampling(model, tokenizer, prompts, num_samples=4)")
print()

# ----------------------------------------------------------
# D. 完整目录结构
# ----------------------------------------------------------
print("=== 推荐目录结构 ===")
print(r"""
my-thinking-model/
+-- data/
| +-- coldstart.jsonl # 冷启动 SFT 数据(~3000 条)
| +-- rl_prompts.jsonl # RL 训练的问题集(~1000 条)
| +-- rl_filtered.jsonl # 拒绝采样后的数据
+-- configs/
| +-- sft_coldstart.yaml # LLaMA-Factory 配置
| +-- rl_verl.yaml # verl 配置
+-- output/
| +-- qwen-sft-cot/ # 冷启动后模型
| +-- qwen-rl/ # RL 训练后模型
| +-- qwen-final/ # 最终模型
+-- scripts/
| +-- gen_coldstart.py # 生成冷启动数据(用上方函数)
| +-- rejection_sampling.py # 拒绝采样(用上方函数)
| +-- eval_thinking.py # 评估脚本
+-- README.md
""")
print()
print("以上所有函数均为完整可执行的实现(非伪代码)。")
print("核心思路:数学题答案可以自动验证,是最适合 RL 起点的数据类型。")

=== generate_coldstart_data 函数已定义 ===
设置 DEEPSEEK_API_KEY 后调用 generate_coldstart_data(questions) 即可

示例数据结构:
{
"messages": [
{
"role": "user",
"content": "357 x 289 = ?"
},
{
"role": "assistant",
"content": "<think>\n357 x 200 = 71400\n357 x 80 = 28560\n357 x 9 = 3213\n71400 + 28560 + 3213 = 103173\n</think>\n103173"
}
]
}
=== RL 奖励函数测试 ===

奖励: +1.15 | 输出片段: <think>15+20=35, 35+7=42</think>
42...
奖励: -0.35 | 输出片段: <think>15+27=41</think>
41...
奖励: -1.00 | 输出片段: 没有思考过程
答案是42...
奖励: +1.15 | 输出片段: <think>算一下</think>
7.5...

=== rejection_sampling 函数已定义 ===

使用方法:
from transformers import AutoModelForCausalLM, AutoTokenizer
model = AutoModelForCausalLM.from_pretrained("./output/qwen-rl")
prompts = [{'question': '...', 'answer': '...'}, ...]
new_data = rejection_sampling(model, tokenizer, prompts, num_samples=4)

=== 推荐目录结构 ===

my-thinking-model/
+-- data/
| +-- coldstart.jsonl # 冷启动 SFT 数据(~3000 条)
| +-- rl_prompts.jsonl # RL 训练的问题集(~1000 条)
| +-- rl_filtered.jsonl # 拒绝采样后的数据
+-- configs/
| +-- sft_coldstart.yaml # LLaMA-Factory 配置
| +-- rl_verl.yaml # verl 配置
+-- output/
| +-- qwen-sft-cot/ # 冷启动后模型
| +-- qwen-rl/ # RL 训练后模型
| +-- qwen-final/ # 最终模型
+-- scripts/
| +-- gen_coldstart.py # 生成冷启动数据(用上方函数)
| +-- rejection_sampling.py # 拒绝采样(用上方函数)
| +-- eval_thinking.py # 评估脚本
+-- README.md


以上所有函数均为完整可执行的实现(非伪代码)。
核心思路:数学题答案可以自动验证,是最适合 RL 起点的数据类型。

小结

  1. CoT = 让模型把推理过程写出来,用生成的中间结果辅助后续推理
  2. Thinking 模型 = CoT 的工程化封装,思考过程可能被特殊 token 或 API block 隔开
  3. 训练流程:冷启动 SFT → RL 推理训练 → 拒绝采样 → 全场景 RL
  4. RL 是关键:不给模型规定「怎么想」,只奖励「想对了」
  5. 反思是涌现的:模型在 RL 中可能自己学会检查和修正
  6. 各平台启动方式不同:OpenAI、DeepSeek、Qwen3、Claude 都要看对应模型版本的官方文档
  7. Qwen3 是少数明确支持 thinking/non-thinking 运行时切换的主流开源模型之一
  8. 自己训练:教学版流程可以跑通概念;真实高质量 thinking 模型的成本不能用一个固定价格概括
  9. 懒人方案:直接用 DeepSeek-R1 蒸馏模型或 Qwen3 thinking 模型,开箱即用
  10. Function Calling = 模型输出结构化 JSON 描述「要调什么函数」,外部程序执行后把结果喂回模型

一句话总结:普通模型 = 直接给答案;Thinking 模型 = 先打草稿再回答。 打草稿行为可以来自 prompt、SFT 数据,也可以在可验证任务的 RL 中被强化出来。 现在你知道怎么查 API 开关、也知道训练 thinking 模型为什么不只是加 <think> 标签。

作业> 可以让 AI 帮忙解释思路,但不建议直接让 AI "做完这道题"。

作业 1:CoT 推理步数与准确率的关系Chain-of-Thought 通过让模型「一步一步思考」来提升推理准确率。但 CoT 不是越长越好。| 推理步数 | 准确率 ||:---|:---|| 0(直接给答案) | 45% || 1-3 步 | 62% || 4-8 步 | 78% || 9-15 步 | 80% || 16+ 步 | 77% |分析:为什么推理步数超过一定范围后准确率反而下降?小提示:长链条推理中错误会累积——如果中间某一步算错了,后续所有步骤都会被带偏。

# 作业 1:CoT 推理步数与准确率的关系answer = "在这里填你的答案"# A) 步数越多越好,77% 只是实验波动# B) 长链推理中错误累积,且长序列让注意力分散,反而不利于准确推理# C) 模型能力有限,超过 15 步的推理超出模型能力范围# D) 数据集太简单,不需要那么多步推理assert not answer.startswith("在这里"), "请先填入你的答案"assert answer in "ABCD", "请填入 A/B/C/D 中的一个字母"if answer == "B":    print("✅ 作业 1 通过:")    print("   CoT 提升准确率的核心是让模型有空间展示中间推理。")    print("   但步数过长时:")    print("   1. 中间步骤的错误会沿推理链累积")    print("   2. Attention 在超长上下文中效率下降")    print("   3. 生成 token 越多,推理成本越高")    print("   因此 CoT 需要在「推理充分」和「链式错误风险」间找平衡。")else:    print(f"你选了 {answer}。")    print("提示:思考长链推理的两面性——既给了更多推理空间,也带来了错误累积。")

作业 2:Thinking 模型的训练阶段排序Thinking 模型(如 DeepSeek-R1)的训练通常分为四个阶段:1. RL 推理训练2. 冷启动 SFT3. 全场景 RL4. 拒绝采样 + SFT请将这四个阶段按正确的训练顺序排列。小提示:先 SFT 打底(学会基本格式),再 RL 提升推理,然后拒绝采样提炼,最后全场景泛化。

# 作业 2:Thinking 模型的训练阶段排序# 填入阶段编号的列表,如 [2, 1, 4, 3]# 1 = RL 推理训练# 2 = 冷启动 SFT# 3 = 全场景 RL# 4 = 拒绝采样 + SFTorder = None  # 在这里填入正确顺序的列表assert order is not None, "请填入正确顺序"assert len(order) == 4, "需要排列 4 个阶段"assert set(order) == {1, 2, 3, 4}, "必须包含 1, 2, 3, 4 四个阶段"correct_order = [2, 1, 4, 3]if order == correct_order:    print("✅ 作业 2 通过:")    print("   1. 冷启动 SFT → 学会基本 CoT 格式")    print("   2. RL 推理训练 → 在可验证任务上强化推理能力")    print("   3. 拒绝采样 + SFT → 从 RL 模型中筛选高质量推理数据")    print("   4. 全场景 RL → 泛化到对话、写作等通用场景")else:    print(f"你的排序: {order},正确排序: {correct_order}")    print("提示:SFT 总是最先做的(打底),全场景 RL 是最后的(泛化)。")

作业 3:Reward 函数设计训练 Thinking 模型时的 reward 方案:| 情况 | Reward ||:---|:---|| 答案正确 | +1.0 || 答案错误 | -1.0 || 格式正确 | +0.1 || 语言不一致 | -0.1 |计算以下 5 个样本的总 reward:1. 答案正确,格式正确,语言一致2. 答案错误,格式正确,语言不一致3. 答案正确,格式错误,语言一致4. 答案错误,格式错误,语言不一致5. 答案正确,格式正确,语言不一致小提示:总 reward = 各项相加。如情况 1 = +1.0 + 0.1 = +1.1。

# 作业 3:Reward 函数设计samples = [    {"answer": "correct", "format": True,  "lang": "consistent"},    {"answer": "wrong",   "format": True,  "lang": "inconsistent"},    {"answer": "correct", "format": False, "lang": "consistent"},    {"answer": "wrong",   "format": False, "lang": "inconsistent"},    {"answer": "correct", "format": True,  "lang": "inconsistent"},]def compute_reward(s):    r = 1.0 if s["answer"] == "correct" else -1.0    if s["format"]: r += 0.1    if s["lang"] == "inconsistent": r += -0.1    return r# TODO: 计算每个样本的总 rewardrewards = None  # 5 个 reward 值的列表assert rewards is not None, "请先计算 reward"assert len(rewards) == 5, "需要 5 个 reward 值"expected = [compute_reward(s) for s in samples]for i, (r, e) in enumerate(zip(rewards, expected)):    assert abs(r - e) < 0.01, f"样本 {i+1} reward 应为 {e:.1f},你得到 {r:.1f}"print("✅ 作业 3 通过:")for i, r in enumerate(rewards):    print(f"   样本 {i+1}: reward = {r:+.1f}")print()print("   Reward 函数引导模型:优先答对(±1.0),同时注意格式和语言。")