claude-code/docs/conversation/streaming.mdx
2026-04-01 14:44:21 +08:00

183 lines
7.6 KiB
Plaintext
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

---
title: "流式响应:逐字呈现"
description: "为什么 Claude Code 的回答是'打字机效果'而不是一整块弹出"
---
## 为什么需要流式
想象 AI 需要 30 秒才能生成完整回答——如果等 30 秒后才一次性显示,用户体验是灾难性的。
流式响应让用户**实时看到 AI 的思考过程**
- 文字逐字出现,用户能提前判断方向是否正确
- 工具调用的参数在生成过程中就能预览
- 长时间任务不会让用户觉得"卡死了"
## `BetaRawMessageStreamEvent` 核心事件类型
流式 API 返回的是一系列 `BetaRawMessageStreamEvent`,每种事件类型对应流式响应的不同阶段(`src/services/api/claude.ts`
```
message_start ← 消息开始,包含 model、usage 初始值
├── content_block_start ← 内容块开始text / tool_use / thinking
│ ├── content_block_delta ← 增量数据text_delta / input_json_delta / thinking_delta
│ ├── content_block_delta ← ... 持续到达
│ └── content_block_stop ← 内容块结束yield AssistantMessage
├── content_block_start ← 下一个内容块...
│ └── ...
└── message_delta ← stop_reason + 最终 usage
message_stop ← 消息结束
```
### 事件处理状态机
`src/services/api/claude.ts:1980-2298` 实现了一个基于 `switch(part.type)` 的状态机:
| 事件类型 | 处理逻辑 | 状态变更 |
|----------|----------|----------|
| `message_start` | 初始化 `partialMessage`,记录 TTFT首字节延迟 | `usage` 初始化 |
| `content_block_start` | 按 `part.index` 创建对应类型的内容块 | `contentBlocks[index]` 初始化 |
| `content_block_delta` | 按子类型增量追加数据 | text / thinking / input 累加 |
| `content_block_stop` | 构建完整 `AssistantMessage` 并 yield | 消息推入 `newMessages` |
| `message_delta` | 更新 stop_reason 和最终 usage | 写回最后一条消息 |
| `message_stop` | 无操作(流结束标记) | — |
### 内容块类型及其增量数据
`content_block_start` 中的 `content_block.type` 决定了如何处理后续 delta
| 内容块类型 | Delta 类型 | 累加逻辑 |
|-----------|-----------|----------|
| `text` | `text_delta` | `text += delta.text` |
| `thinking` | `thinking_delta` + `signature_delta` | `thinking += delta.thinking``signature = delta.signature` |
| `tool_use` | `input_json_delta` | `input += delta.partial_json`JSON 字符串增量拼接) |
| `server_tool_use` | `input_json_delta` | 同 tool_use |
| `connector_text` | `connector_text_delta` | 特殊连接器文本feature flag 控制) |
关键设计:`content_block_start` 时所有文本字段初始化为空字符串,只通过 `content_block_delta` 累加。这是因为 SDK 有时在 start 和 delta 中重复发送相同文本。
## 文本 chunk 和 tool_use block 的交织
一次 AI 响应可能包含多个内容块,交替出现:
```
content_block_start (text, index=0) "我来帮你修复这个 bug。"
content_block_delta (text_delta) "首先..."
content_block_stop (index=0)
content_block_start (tool_use, index=1) { name: "Read", input: "..." }
content_block_delta (input_json_delta) '{"file_p' → 'ath":' → '"src/foo.ts"}'
content_block_stop (index=1)
content_block_start (text, index=2) "我已经看到了问题所在..."
content_block_stop (index=2)
```
每个 `content_block_stop` 触发一次 `yield`,将完整的 AssistantMessage 推送给消费者。这意味着一个 AI 响应会产生**多条** `AssistantMessage`——文本消息和工具调用消息交替产出。
`stop_reason` 要等到 `message_delta` 才确定(可能是 `end_turn`、`tool_use`、`max_tokens` 等),所以最后一条消息的 `stop_reason` 是**回写**的:
```typescript
// claude.ts:2246 — 直接属性修改,不用对象替换
// 因为 transcript 写队列持有 message.message 的引用
const lastMsg = newMessages.at(-1)
if (lastMsg) {
lastMsg.message.usage = usage
lastMsg.message.stop_reason = stopReason
}
```
## 流式中的错误处理
### 网络断开
流式连接依赖 SSEServer-Sent Events。当连接中断时
1. **Stream idle watchdog**定时检测事件间隔超过阈值stall触发告警和重试
2. **Stream abort**:如果 watchdog 检测到长时间无事件,抛出错误进入重试流程
3. **非流式降级**:作为最后手段,回退到非流式请求(一次性获取完整响应)
```typescript
// claude.ts:2338-2355 — 检测空流
// 1. 完全没有事件 → 代理返回了非 SSE 响应
// 2. 有 message_start 但没有 content_block_stop → 流被截断
```
### API 限流
当 API 返回限流错误时,系统使用 `withRetry` 包装器进行指数退避重试。重试逻辑考虑了:
- 错误类型429 限流 vs 500 服务器错误)
- 重试次数上限
- 退避间隔
### Token 超限
两种 token 超限场景有不同的处理:
| 场景 | stop_reason | 处理方式 |
|------|------------|----------|
| **输出超限** | `max_tokens` | 生成错误消息,建议设置 `CLAUDE_CODE_MAX_OUTPUT_TOKENS` |
| **上下文窗口超限** | `model_context_window_exceeded` | 触发 compaction 压缩对话历史后重试 |
```typescript
// claude.ts:2267-2293
if (stopReason === 'max_tokens') {
yield createAssistantAPIErrorMessage({ error: 'max_output_tokens', ... })
}
if (stopReason === 'model_context_window_exceeded') {
// 复用 max_output_tokens 的恢复路径
yield createAssistantAPIErrorMessage({ error: 'max_output_tokens', ... })
}
```
### 流式停滞检测
系统持续监控事件到达间隔,检测"停滞"stall
```typescript
// claude.ts:1940-1966
const STALL_THRESHOLD_MS = 10_000 // 10 秒无事件视为停滞
if (timeSinceLastEvent > STALL_THRESHOLD_MS) {
stallCount++
totalStallTime += timeSinceLastEvent
logEvent('tengu_streaming_stall', { stall_duration_ms, stall_count, ... })
}
```
多个 stall 累积后watchdog 可能决定中断流并触发重试。
## 工具执行的流式反馈
BashTool 的命令执行也是流式的——通过 `onProgress` 回调逐行推送输出:
```
BashTool.call() → runShellCommand() → AsyncGenerator
├── 每秒轮询输出文件 → onProgress(lastLines, allLines, ...)
├── yield { type: 'progress', output, fullOutput, elapsedTimeSeconds }
└── return { code, stdout, interrupted, ... }
```
UI 层通过 `useToolCallProgress` hook 实时展示命令输出,而不是等命令完全结束。长时间运行的命令还支持自动后台化(`shouldAutoBackground`)。
## 多 Provider 适配
| Provider | 流式协议 | 特殊处理 |
|----------|----------|----------|
| **Anthropic Direct** | 原生 SSE | 延迟最低TTFT 最快 |
| **AWS Bedrock** | AWS SDK 流式接口 | 需要额外的 beta header 和认证 |
| **Google Vertex** | gRPC → 事件流 | 通过 `getMergedBetas()` 适配 |
| **Azure** | Anthropic 兼容 API | 自定义 base URL |
所有 Provider 通过统一的 `Stream<BetaRawMessageStreamEvent>` 抽象层屏蔽差异。上层代码QueryEngine、REPL不需要关心底层用的是哪个 Provider。
### Provider 选择
`src/utils/model/providers.ts` 中的 `getAPIProvider()` 根据配置决定使用哪个 Provider
```typescript
// 根据 api_provider 配置选择:
// "anthropic" → 直连
// "bedrock" → AWS SDK
// "vertex" → Google SDK
// 第三方 base URL → 自动检测
```
每个 Provider 需要适配的细节包括认证方式、beta header、请求参数格式、错误码映射——但这些差异在 `claude.ts` 的 `queryStream()` 函数中被统一处理。