M01 Go 语言 AI 开发基础
这一章是整套课程的工程基础。
后面的 LLM API 接入、工具调用、记忆系统、RAG、多智能体协作,虽然是在讲不同主题,但最终落到代码层面,都要求具备以下 Go 语言工程能力:
- 项目结构怎么组织
- HTTP 请求怎么发
- 流式输出怎么处理
- 取消信号怎么传播
- 接口怎么抽象
- 代码怎么测试
M01 的重点通过「AI 开发场景驱动」的方式讲解 Go 特性,每个知识点都对应后续模块的具体需求。 学完这一章之后,你应该能搭出一个结构清楚、接口明确、便于扩展和测试的 Go 项目骨架,为后续章节打好基础。
学习目标
完成这一章后,你应该能够:
- 理解 Go 模块系统和 AI Agent 项目的基本目录结构
- 掌握使用 Goroutine 和 Channel 处理流式输出的基本模式
- 理解
context.Context在超时控制和请求取消中的作用 - 能够使用接口和泛型组织可扩展的 Agent 组件
- 能够配置适合 LLM 调用场景的 HTTP 客户端
- 掌握常见 JSON 响应处理方式和基础测试方法
本章内容
- 为什么 AI Agent 开发需要扎实的 Go 工程基础
- 如何组织一个适合 Agent 项目的目录结构
- 如何用 Goroutine 和 Channel 处理流式输出
- 如何使用
context.Context传递超时与取消信号 - 如何用接口和泛型组织 Provider 与工具系统
- 如何实现可复用的 HTTP 客户端、重试和限流
- 如何处理常见的 JSON 响应结构
- 如何为 AI 组件编写单元测试和 Mock
一、Go 在 AI Agent 系统中的作用
AI Agent 开发不是单纯“调一下模型接口”。一个完整的 Agent 系统通常至少包含下面这些部分:
- 接收用户请求
- 组织 Prompt 和上下文
- 调用模型
- 执行工具
- 聚合结果
- 控制超时、重试、限流与错误处理
- 记录日志、指标和测试结果
这些工作里,真正“由模型完成”的部分其实只占一部分。Go 更重要的角色,是把模型能力组织成一个稳定、可维护、可测试的工程系统。

从工程视角看,Go 在这类项目里通常有几个明显优势:
- 并发处理简单,适合流式响应和多工具调用
- 标准库完整,HTTP、JSON、Context、测试都能直接用
- 接口和包管理清楚,适合把系统拆成稳定的层次
- 部署简单,适合作为服务端或命令行工具交付
二、项目结构与模块组织
做课程项目时,第一件事不是先写某个具体功能,而是先把项目结构搭出来。
如果一开始就把请求代码、业务逻辑、工具实现和配置解析都堆在一个目录里,后面随着模块变多,代码会很快变得难以维护。比较稳妥的做法,是先按职责把目录拆开。
项目目录结构
Go 社区有一套经过验证的标准布局,我们在此基础上针对 AI Agent 项目做了调整:
ai-agent/
├── cmd/ # 可执行入口(每个子目录是一个 binary)
│ ├── server/ # HTTP API 服务
│ ├── agent/ # 命令行 Agent
│ └── ingest/ # 文档摄取工具
│
├── internal/ # 私有包,外部无法 import
│ ├── llm/ # LLM Provider 封装
│ │ ├── provider.go # 统一接口定义
│ │ ├── openai.go # OpenAI 实现
│ │ └── stream.go # 流式处理
│ ├── agent/ # Agent 核心逻辑
│ ├── tools/ # 工具系统
│ ├── memory/ # 记忆与 RAG
│ └── config/ # 配置加载
│
├── pkg/ # 可对外暴露的通用工具包
│ └── retry/ # 通用重试逻辑
│
├── go.mod # 模块定义
├── go.sum # 依赖哈希锁定
├── Makefile # 常用命令
└── .env.example # 环境变量模板项目的目录结构没有标准答案,这里列出来的是我认为合适的方案。
cmd/只放入口,不放业务逻辑。cmd/目录的职责通常只有两件事:读取配置、组装依赖。它不应该承载具体业务逻辑。 例如,cmd/agent/main.go可以做配置加载、日志初始化、HTTP 服务启动,但真正的 Agent 执行流程应该放到internal/agent里。
这样做的好处是,业务逻辑不会和启动流程耦合在一起,后面无论你要做 CLI、Web 服务还是后台任务,都可以复用同一套核心逻辑。
internal/用来表达“这是项目内部实现”。internal/是 Go 在工程上非常有用的一个约束。放在这个目录下的包,只能被当前模块内部引用,不能被外部项目直接依赖。
对课程项目来说,这个约束很有价值。因为像internal/llm、internal/agent、internal/tools这些目录,本来就是项目内部实现细节,并不打算作为公共 SDK 暴露出去。
pkg/放可以复用的公共能力。pkg/不一定每个项目都必须有,但当某部分代码确实需要被多个子系统复用时,把它单独抽出来通常更清楚。
例如,提示词模板、通用结构定义、公共序列化逻辑,都可以放在pkg/目录下。
cmd/ 里不要堆业务逻辑;internal/ 里不要顺手写成“全局工具箱”;目录一旦失去边界,后续会越来越难收拾。初始化项目
# 创建项目
mkdir ai-agent && cd ai-agent
# 初始化 Go module(即使不发布到 GitHub,也用这个格式)
go mod init github.com/yourname/ai-agent
# 安装核心依赖
go get github.com/sashabaranov/go-openai # OpenAI SDKMakefile
通过编写 Makefile, 把常用命令标准化。
# Makefile
.PHONY: run build test lint clean
# 启动开发服务器
run:
go run ./cmd/server/
# 构建所有二进制
build:
CGO_ENABLED=0 go build -ldflags="-w -s" -o bin/server ./cmd/server/
CGO_ENABLED=0 go build -ldflags="-w -s" -o bin/agent ./cmd/agent/
# 运行测试(含竞态检测,这个习惯要从第一天养成)
test:
go test -race -count=1 -coverprofile=coverage.out ./...
# 查看测试覆盖率
cover:
go tool cover -html=coverage.out
# 代码检查
lint:
golangci-lint run ./...
# 清理构建产物
clean:
rm -rf bin/ coverage.out
三、Goroutine + Channel:处理流式输出
为什么流式处理需要 Goroutine?
无论是 OpenAI 风格的 SSE,还是其他 Provider 的事件流,调用方通常都不希望等模型全部生成完再一次性返回,而是希望边生成边消费。
同步方式:发请求 → 等待(3-8秒) → 收到完整回复 → 显示
流式方式:发请求 → 立刻收到第一个 token → 实时显示 → ... → 收到结束标记两种方式的用户体验差距巨大。这类场景非常适合用 Goroutine 和 Channel 处理。
- Goroutine 负责持续读取服务端返回;
- Channel 负责把 token 发送给调用方;
- Context 负责取消和超时控制。
但流式处理有个难点:我们要同时做两件事——接收 token(IO 密集)和处理 token(CPU/显示)。
先看最核心的思路:主流程发起请求之后,另起一个 Goroutine 持续读取流式响应,再通过 Channel 把结果逐步交给调用方。建议返回只读 Channel:
func StreamTokens(ctx context.Context, ...) <-chan Token {
tokens := make(chan Token, 64)
go func() {
defer close(tokens)
// 读取并发送 token
}()
return tokens
}返回 <-chan Token 可以明确告诉调用方:这个 Channel 只能读取,不能写入,也不能关闭。
调用方使用 for range 消费 Channel 时,如果 Channel 不关闭,循环就不会结束。
Token 结构设计
// Token 代表 LLM 流式输出的一个片段。
type Token struct {
Text string
Done bool
Error error
}这里把错误也放到 Token 结构里,是因为调用方在 for range Channel 时无法处理普通的多返回值。这样设计后,调用方只需要消费一个流,就能同时处理文本、结束信号和错误。
完整的流式处理实现
// StreamTokens 调用 OpenAI 流式 API,将 token 通过 channel 返回。
//
// 使用示例:
//
// for tok := range llm.StreamTokens(ctx, client, messages) {
// if tok.Error != nil { /* 处理错误 */ }
// if tok.Done { break }
// fmt.Print(tok.Text)
// }
func StreamTokens(
ctx context.Context,
client *openai.Client,
model string,
messages []openai.ChatCompletionMessage,
) <-chan Token {
// 缓冲 64:即使调用方处理稍慢,LLM 也能提前送入最多 64 个 token,
// 不会立刻阻塞。对于 LLM 这种生产速度快于消费速度的场景很重要。
tokens := make(chan Token, 64)
go func() {
// defer close 是整个函数最重要的一行。
// 无论 goroutine 以何种方式退出(正常/panic/context取消),
// channel 都会被关闭,调用方的 for range 才能正常结束。
defer close(tokens)
// 创建流式请求
stream, err := client.CreateChatCompletionStream(ctx,
openai.ChatCompletionRequest{
Model: model,
Stream: true,
Messages: messages,
})
if err != nil {
// 把错误发送给调用方,然后 goroutine 退出(defer 会 close channel)
tokens <- Token{Error: fmt.Errorf("create stream: %w", err)}
return
}
defer stream.Close()
for {
resp, err := stream.Recv()
if err == io.EOF {
// io.EOF 是流的正常结束信号,不是错误
tokens <- Token{Done: true}
return
}
if err != nil {
tokens <- Token{Error: fmt.Errorf("recv stream: %w", err)}
return
}
content := resp.Choices[0].Delta.Content
if content == "" {
continue // 跳过空 delta(心跳包)
}
// select 同时监听两件事:
// 1. context 被取消(用户主动停止、超时等)
// 2. 能向 channel 发送数据
// 这样即使 channel 满了,context 取消也能立即生效
select {
case <-ctx.Done():
tokens <- Token{Error: fmt.Errorf("context cancelled: %w", ctx.Err())}
return
case tokens <- Token{Text: content}:
// 成功发送
}
}
}()
return tokens
}这段代码里有几个重点:
- Channel 使用缓冲区,避免消费者稍慢时立刻阻塞生产者;
- 使用
defer close(tokens)保证调用方能正常结束; - 使用
defer stream.Close()释放流式连接; - 使用
select同时处理发送 token 和 context 取消; io.EOF是正常结束,不应该当作错误。
调用方代码
func printStream(ctx context.Context, client *openai.Client) {
messages := []openai.ChatCompletionMessage{
{Role: "user", Content: "用三句话解释量子纠缠"},
}
fmt.Print("AI: ")
for tok := range llm.StreamTokens(ctx, client, "gpt-4o-mini", messages) {
if tok.Error != nil {
fmt.Printf("\n错误: %v\n", tok.Error)
return
}
if tok.Done {
fmt.Println()
return
}
fmt.Print(tok.Text)
}
}调用方只关心“读取 token 并处理”,不用关心底层如何读取 SSE、如何关闭连接、如何检测取消。
缓冲大小的选择
// 不同场景的缓冲大小建议:
make(chan Token, 1) // 完全同步,生产者立刻等消费者
make(chan Token, 64) // ✅ LLM 流式:消费者(UI渲染)偶尔慢一点也没关系
make(chan Token, 1024) // 批量处理,允许生产者提前跑很多
四、Context:超时控制与请求取消
Context 解决什么问题?
Agent 系统往往是一条多层调用链:
HTTP Handler → Agent.Run → LLM.Complete → HTTP 请求模型服务如果用户关闭页面,或者请求超过了最大时间限制,后端应该尽快停止后续工作。否则不仅浪费资源,还可能继续消耗模型费用。
Context 的三种常用方式
超时控制
给整个操作设置最大时间限制,这是最常见的超时控制方式。
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel() // 重要:即使超时前完成,也要释放资源手动取消
这是手动取消方式,常用于用户主动停止任务,或者上游判断需要停止。
ctx, cancel := context.WithCancel(context.Background())
// 某个条件触发后主动 cancel()传递请求级别的值
WithValue 只适合传递请求级元数据,例如 request_id、trace_id,不应该用来传递业务参数。
type contextKey string
const requestIDKey contextKey = "request_id"
ctx = context.WithValue(ctx, requestIDKey, "req-123")
// 在任何地方取出
reqID := ctx.Value(requestIDKey).(string)正确传递 Context
下面是 ❌ 错误做法:每层调用都创建新的独立 context,这样用户取消外层 context 后,内层还在跑。
func (a *Agent) Run() (string, error) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
return a.callLLM(ctx) // 传入了,但如果上层的 ctx 已取消,这个新 ctx 不受影响
}这样会断开和上游请求的取消关系。正确做法是从外部接收 Context,并基于它派生子 Context:
func (a *Agent) Run(ctx context.Context, goal string) (string, error) {
// 每次 LLM 调用单独设置超时,但基于父 ctx 派生
// 这样父 ctx 取消时,子 ctx 也会立刻取消
for step := 0; step < a.maxSteps; step++ {
// 检查父 ctx 是否已取消
if err := ctx.Err(); err != nil {
return "", fmt.Errorf("agent cancelled: %w", err)
}
// 派生子 ctx,单次 LLM 调用不超过 30 秒
llmCtx, llmCancel := context.WithTimeout(ctx, 30*time.Second)
resp, err := a.llm.Complete(llmCtx, req)
llmCancel() // ← 注意:不用 defer,在循环里 defer 会积累!
if err != nil {
return "", err
}
// ...
}
}检测取消的几种方式
方式一:直接检查
适合在循环开头进行检查。
if err := ctx.Err(); err != nil {
return err
}方式二:select
适合等待操作时同时监听取消。
select {
case result := <-resultChan:
return result, nil
case <-ctx.Done():
return nil, ctx.Err()
}方式三:传给支持 ctx 的函数
这也是最常见的方式,例如使用下面这种方式创建的 http 请求会自动在 ctx 取消时中止。
req, err := http.NewRequestWithContext(ctx, "POST", url, body)在 Go 工程实践约定:函数签名里的 ctx context.Context 永远放在第一个参数。
五、接口与泛型:组织可扩展的 Agent 组件
为什么需要统一抽象?
我们的 Agent 程序要支持 OpenAI、Claude、豆包、通义千问四个平台。如果为每个平台写专属代码,Agent 层就会变成这样:
// ❌ 不可维护:每增加一个 provider,Agent 层都要修改
func (a *Agent) callLLM(ctx context.Context, messages []Message) (string, error) {
switch a.providerType {
case "openai":
return a.openaiClient.Complete(ctx, messages)
case "claude":
return a.claudeClient.Complete(ctx, messages)
case "doubao":
return a.doubaoClient.Complete(ctx, messages)
// 每加一个 provider 就要改这里...
}
}所以,在课程一开始就要设计 LLMProvider 统一抽象。 我们希望 Agent 层只依赖一个统一接口:
// LLMProvider 是所有 LLM 平台必须实现的接口。
// 接口尽量小——只包含真正需要的方法。
// 如果以后需要扩展(如图像、音频),用组合而不是修改这个接口。
type LLMProvider interface {
// Complete 发送请求并等待完整响应
Complete(ctx context.Context, req CompletionRequest) (CompletionResponse, error)
// Stream 发送请求并通过 channel 返回流式 token
Stream(ctx context.Context, req CompletionRequest) (<-chan Token, error)
// Name 返回 Provider 标识符,用于日志和监控
Name() string
// ModelID 返回当前使用的模型名称
ModelID() string
// IsAvailable 做轻量健康检查,用于故障转移判断
IsAvailable(ctx context.Context) bool
}统一请求与响应结构
type Message struct {
Role string `json:"role"`
Content string `json:"content"`
}
type CompletionRequest struct {
Messages []Message
MaxTokens int
Temperature float64
Stream bool
}
type Usage struct {
PromptTokens int
CompletionTokens int
TotalTokens int
}
type CompletionResponse struct {
Content string
ProviderName string
ModelName string
Latency time.Duration
Usage Usage
}接口的价值在于,Agent 完全不需要知道背后到底是 OpenAI、Claude 还是豆包:
type ReactAgent struct {
llm llm.LLMProvider
maxSteps int
}
func NewReactAgent(provider llm.LLMProvider, maxSteps int) *ReactAgent {
return &ReactAgent{llm: provider, maxSteps: maxSteps}
}
func (a *ReactAgent) Run(ctx context.Context, goal string) (string, error) {
resp, err := a.llm.Complete(ctx, llm.CompletionRequest{
Messages: []llm.Message{{Role: "user", Content: goal}},
})
// Agent 根本不知道 Complete 是调用了 OpenAI 还是豆包
// ...
}后续切换模型、增加 Provider、做故障转移,都会更简单。
使用泛型处理通用流
// Processor 是泛型流处理器,T 可以是任何类型
// 用法:stream.Process(ctx, tokenChan, func(tok Token) error { ... })
func Process[T any](
ctx context.Context,
ch <-chan T,
handler func(T) error,
) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
case item, ok := <-ch:
if !ok {
return nil // channel 关闭,正常结束
}
if err := handler(item); err != nil {
return err
}
}
}
}
// Collect 将 channel 中的所有值收集到 slice
func Collect[T any](ctx context.Context, ch <-chan T) ([]T, error) {
var result []T
err := Process(ctx, ch, func(item T) error {
result = append(result, item)
return nil
})
return result, err
}这类泛型工具函数适合放在 pkg/stream 这类通用包里。它与具体 LLM 无关,可以在后续模块中反复复用。
六、生产级 HTTP 客户端:连接池、重试与限流
不要在生产代码中直接使用:
resp, err := http.Get("https://api.openai.com/v1/...")http.DefaultClient 没有完整的超时、重试和限流策略。对 LLM API 这种外部依赖来说,这很容易造成请求挂起、连接复用差、错误处理不统一等问题。
HTTP 客户端配置
// ClientConfig 控制 HTTP 客户端行为
type ClientConfig struct {
// 连接池配置
MaxIdleConns int // 最大空闲连接总数
MaxIdleConnsPerHost int // 每个 host 最大空闲连接数
IdleConnTimeout time.Duration // 空闲连接存活时间
// 超时配置
DialTimeout time.Duration // 建立 TCP 连接的超时
RequestTimeout time.Duration // 单次请求总超时(包含读取响应体)
// 重试配置
MaxRetries int // 最大重试次数(不含首次请求)
RetryWaitBase time.Duration // 重试等待基础时间(指数退避的底数)
// 限流配置
RequestsPerSecond float64 // 每秒最大请求数,0 表示不限流
}一个适合 LLM API 的默认配置大致如下:
// DefaultClientConfig 返回适合 LLM API 调用的默认配置
func DefaultClientConfig() ClientConfig {
return ClientConfig{
MaxIdleConns: 100,
MaxIdleConnsPerHost: 10,
IdleConnTimeout: 90 * time.Second,
DialTimeout: 5 * time.Second,
RequestTimeout: 90 * time.Second, // LLM 响应可能较慢
MaxRetries: 3,
RetryWaitBase: 500 * time.Millisecond,
RequestsPerSecond: 10, // 默认每秒 10 个请求
}
}连接池与限流
transport := &http.Transport{
// 连接池:复用 TCP 连接,避免每次请求都重新建立
MaxIdleConns: cfg.MaxIdleConns,
MaxIdleConnsPerHost: cfg.MaxIdleConnsPerHost,
IdleConnTimeout: cfg.IdleConnTimeout,
// TCP 层面的超时控制
DialContext: (&net.Dialer{
Timeout: cfg.DialTimeout,
KeepAlive: 30 * time.Second, // TCP keepalive,防止连接被防火墙静默断开
}).DialContext,
// TLS 握手超时
TLSHandshakeTimeout: 10 * time.Second,
// 等待服务器响应头的超时(不含响应体)
ResponseHeaderTimeout: 30 * time.Second,
// 允许 HTTP/2(OpenAI API 支持 HTTP/2,性能更好)
ForceAttemptHTTP2: true,
}http.Transport 应该在程序生命周期内复用,不要每次请求都创建新的 http.Client。否则连接池无法复用,每次请求都要重新建立 TCP / TLS 连接。
指数退避重试
// Do 执行 HTTP 请求,带限流和指数退避重试
func (c *ProductionClient) Do(ctx context.Context, req *http.Request) (*http.Response, error) {
// 限流检查:如果当前请求速率过高,等待
if c.limiter != nil {
if err := c.limiter.Wait(ctx); err != nil {
return nil, fmt.Errorf("rate limiter: %w", err)
}
}
var (
resp *http.Response
err error
lastErr error
)
for attempt := 0; attempt <= c.cfg.MaxRetries; attempt++ {
// 非首次请求,等待后重试(指数退避)
if attempt > 0 {
wait := c.exponentialBackoff(attempt)
select {
case <-time.After(wait):
// 等待结束,继续重试
case <-ctx.Done():
return nil, fmt.Errorf("retry cancelled: %w", ctx.Err())
}
}
resp, err = c.client.Do(req.WithContext(ctx))
if err == nil && c.shouldRetry(resp.StatusCode) {
// HTTP 错误码也需要重试
resp.Body.Close()
lastErr = fmt.Errorf("HTTP %d", resp.StatusCode)
continue
}
if err != nil {
lastErr = err
continue
}
return resp, nil // 成功
}
return nil, fmt.Errorf("after %d retries: %w", c.cfg.MaxRetries, lastErr)
}
// exponentialBackoff 计算指数退避等待时间:base * 2^attempt,最大 30 秒
func (c *ProductionClient) exponentialBackoff(attempt int) time.Duration {
wait := float64(c.cfg.RetryWaitBase) * math.Pow(2, float64(attempt-1))
max := 30 * time.Second
if time.Duration(wait) > max {
return max
}
return time.Duration(wait)
}
// shouldRetry 判断 HTTP 状态码是否应该重试
func (c *ProductionClient) shouldRetry(statusCode int) bool {
switch statusCode {
case 429: // Too Many Requests(限流)
return true
case 500, 502, 503, 504: // 服务端错误
return true
default:
return false
}
}常见可重试场景包括:
429 Too Many Requests500 Internal Server Error502 Bad Gateway503 Service Unavailable504 Gateway Timeout
七、JSON 进阶:处理复杂 API 响应
LLM 接口看起来是在返回 JSON,但和普通业务接口相比,它的响应结构通常更复杂一些。
常见情况包括:
- 某些字段可能不存在,或者值是
null - 同一个字段在不同场景下可能是不同类型
- 流式响应并不是一个完整 JSON,而是一段段事件流
解析工具调用参数
type ToolCall struct {
ID string `json:"id"`
Type string `json:"type"`
Function ToolCallFunction `json:"function"`
}
type ToolCallFunction struct {
Name string `json:"name"`
Arguments string `json:"arguments"`
}
func ParseArguments[T any](tc ToolCall) (T, error) {
var result T
if err := json.Unmarshal([]byte(tc.Function.Arguments), &result); err != nil {
return result, fmt.Errorf("parse arguments for %s: %w", tc.Function.Name, err)
}
return result, nil
}这里容易忽略的一点是:arguments 往往是一个 JSON 字符串,而不是直接的 JSON 对象,所以需要二次解析。
处理联合类型字段
在处理多模态的会话消息时,需要使用联合类型,灵活处理数据。
// ContentPart 代表多模态消息的一个部分(文本或图片)
// 这是"联合类型"的典型场景:一个字段可能是不同结构
type ContentPart struct {
Type string `json:"type"`
Text string `json:"text,omitempty"` // type == "text" 时有值
ImageURL *ImageURL `json:"image_url,omitempty"` // type == "image_url" 时有值
}
type ImageURL struct {
URL string `json:"url"`
Detail string `json:"detail,omitempty"` // "low" | "high" | "auto"
}
// MessageContent 可以是字符串,也可以是 ContentPart 数组
// 这种"多类型字段"需要自定义 JSON 解析
type MessageContent struct {
Text string // 如果是字符串
Parts []ContentPart // 如果是数组
}
func (m *MessageContent) UnmarshalJSON(data []byte) error {
// 先尝试解析为字符串
var text string
if err := json.Unmarshal(data, &text); err == nil {
m.Text = text
return nil
}
// 再尝试解析为数组
var parts []ContentPart
if err := json.Unmarshal(data, &parts); err == nil {
m.Parts = parts
return nil
}
return fmt.Errorf("content: expected string or array, got: %s", string(data))
}
func (m MessageContent) MarshalJSON() ([]byte, error) {
if len(m.Parts) > 0 {
return json.Marshal(m.Parts)
}
return json.Marshal(m.Text)
}这种写法适合处理“字段可能是多种结构”的 API 响应。
安全序列化
// SafeJSON 安全地序列化对象,失败时返回错误字符串而非 panic
func SafeJSON(v any) string {
data, err := json.Marshal(v)
if err != nil {
return fmt.Sprintf(`{"error": "marshal failed: %v"}`, err)
}
return string(data)
}解析 SSE
对于 Server-Sent Events 数据需要根据数据行针对性的解析。
// ParseSSEData 解析 Server-Sent Events 的数据行
// 格式:data: {"choices": [...]} 或 data: [DONE]
func ParseSSEData[T any](line string) (T, bool, error) {
var zero T
if !strings.HasPrefix(line, "data: ") {
return zero, false, nil // 不是数据行(可能是注释或空行)
}
data := strings.TrimPrefix(line, "data: ")
data = strings.TrimSpace(data)
if data == "[DONE]" {
return zero, true, nil // 流结束标记
}
var result T
if err := json.Unmarshal([]byte(data), &result); err != nil {
return zero, false, fmt.Errorf("parse SSE data: %w", err)
}
return result, false, nil
}这类解析函数应该有单元测试,因为流式输出中的边界情况很多。
八、测试与 Mock:为 AI 组件写可靠测试
AI 组件和普通业务代码相比,测试难点更明显:
- LLM 调用依赖外部 API Key
- 真实响应有随机性
- 工具调用可能带副作用
所以,比较稳妥的思路是:把外部依赖抽象成接口,再通过 Mock 和 httptest 做替换。

Mock LLM Provider
type MockProvider struct {
CompleteFunc func(ctx context.Context, req llm.CompletionRequest) (llm.CompletionResponse, error)
Responses []string
callCount int
name string
model string
}Mock 的作用是让测试可以稳定控制模型返回:
- 固定返回某段文本;
- 按顺序返回多轮响应;
- 模拟错误;
- 记录调用次数;
- 配合 Agent 测试多轮执行。
Table-Driven Test
func TestStreamTokens_WithMock(t *testing.T) {
tests := []struct {
name string
response string
wantTokens int
wantErr bool
}{
{name: "正常流式输出", response: "Hello", wantTokens: 5},
{name: "空响应", response: "", wantTokens: 0},
{name: "包含中文字符", response: "你好", wantTokens: 2},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
mock := testutil.NewMockProvider(tt.response)
ch, err := mock.Stream(context.Background(), llm.CompletionRequest{})
if err != nil {
t.Fatalf("Stream() error = %v", err)
}
var count int
for tok := range ch {
if tok.Error != nil {
if !tt.wantErr {
t.Fatalf("unexpected error: %v", tok.Error)
}
return
}
if tok.Done {
break
}
count++
}
if count != tt.wantTokens {
t.Fatalf("got %d tokens, want %d", count, tt.wantTokens)
}
})
}
}Table-Driven Test 的好处是可以把多种输入、输出和边界条件集中放在一张表里,新增测试场景时也更清晰。
使用 httptest 模拟 HTTP 服务
对于 HTTP 客户端,httptest.Server 比 Mock 更接近真实场景。它可以模拟:
- 前几次返回 503,最后一次成功;
- 请求超时;
- 返回 429;
- 返回非法 JSON;
- 检查请求 Header 和 Body 是否符合预期。
func TestProductionClient_Retry(t *testing.T) {
requestCount := 0
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
requestCount++
if requestCount < 3 {
w.WriteHeader(http.StatusServiceUnavailable)
return
}
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(`{"ok": true}`))
}))
defer server.Close()
cfg := llm.DefaultClientConfig()
cfg.MaxRetries = 3
cfg.RetryWaitBase = 10 * time.Millisecond
client := llm.NewProductionClient(cfg)
req, _ := http.NewRequest("GET", server.URL, nil)
resp, err := client.Do(context.Background(), req)
if err != nil {
t.Fatalf("expected success after retry, got error: %v", err)
}
defer resp.Body.Close()
if requestCount != 3 {
t.Fatalf("got %d requests, want 3", requestCount)
}
}httptest 做依赖替换,测试会简单很多。九、本章实战:搭一个最小 AI Agent 工程骨架
学完这一章后,建议先不要急着接入真实模型,而是先把工程骨架搭起来。
建议最少完成下面几件事:
- 初始化
go.mod,搭好cmd/、internal/、pkg/、config/、docs/目录 - 定义
LLMProvider接口 - 实现一个最小的
OpenAIProvider结构体,先返回固定结果 - 实现
StreamTokens,把流式处理管道打通 - 用
httptest和 Mock 补一组最基本的测试
这样做的目的,是先把后面会反复用到的“骨架”搭起来。到了 M02,我们再在这套骨架上接入真正的模型 API。
小结
这一章的主线可以概括为下面几点:
- AI Agent 开发首先是工程问题,其次才是模型问题。
- 项目结构、并发模式、Context 传播、接口抽象和测试方式,都会贯穿后续所有章节。
- 流式输出和外部 API 调用,是后面系统设计的基础场景。
- 代码从一开始就要为扩展和测试留边界。
完成这一章后,你应该已经有能力搭出一个最小可用的 Go Agent 项目骨架。
课后练习
- 按本章结构创建
go-ai-agent项目骨架。 - 实现一个
LLMProvider接口,并用MockProvider返回固定字符串。 - 实现
StreamTokens,并使用for range实时打印输出。
参考资料
context官方文档- Go Blog: Concurrency Patterns
- 《The Go Programming Language》第 8 章
go-openai源码golangci-lint