跳至内容
M01 Go 语言 AI 开发基础

M01 Go 语言 AI 开发基础

这一章是整套课程的工程基础。

后面的 LLM API 接入、工具调用、记忆系统、RAG、多智能体协作,虽然是在讲不同主题,但最终落到代码层面,都要求具备以下 Go 语言工程能力:

  • 项目结构怎么组织
  • HTTP 请求怎么发
  • 流式输出怎么处理
  • 取消信号怎么传播
  • 接口怎么抽象
  • 代码怎么测试

M01 的重点通过「AI 开发场景驱动」的方式讲解 Go 特性,每个知识点都对应后续模块的具体需求。 学完这一章之后,你应该能搭出一个结构清楚、接口明确、便于扩展和测试的 Go 项目骨架,为后续章节打好基础。

这一章不会重新讲解 Go 基础语法,只关注后续 Agent 开发一定会反复用到的工程能力。

学习目标

完成这一章后,你应该能够:

  • 理解 Go 模块系统和 AI Agent 项目的基本目录结构
  • 掌握使用 Goroutine 和 Channel 处理流式输出的基本模式
  • 理解 context.Context 在超时控制和请求取消中的作用
  • 能够使用接口和泛型组织可扩展的 Agent 组件
  • 能够配置适合 LLM 调用场景的 HTTP 客户端
  • 掌握常见 JSON 响应处理方式和基础测试方法

本章内容

  • 为什么 AI Agent 开发需要扎实的 Go 工程基础
  • 如何组织一个适合 Agent 项目的目录结构
  • 如何用 Goroutine 和 Channel 处理流式输出
  • 如何使用 context.Context 传递超时与取消信号
  • 如何用接口和泛型组织 Provider 与工具系统
  • 如何实现可复用的 HTTP 客户端、重试和限流
  • 如何处理常见的 JSON 响应结构
  • 如何为 AI 组件编写单元测试和 Mock

一、Go 在 AI Agent 系统中的作用

AI Agent 开发不是单纯“调一下模型接口”。一个完整的 Agent 系统通常至少包含下面这些部分:

  • 接收用户请求
  • 组织 Prompt 和上下文
  • 调用模型
  • 执行工具
  • 聚合结果
  • 控制超时、重试、限流与错误处理
  • 记录日志、指标和测试结果

这些工作里,真正“由模型完成”的部分其实只占一部分。Go 更重要的角色,是把模型能力组织成一个稳定、可维护、可测试的工程系统。

Go 在 AI Agent 系统中的角色

Go 不是负责“让模型变聪明”,而是负责让 AI 应用真正可落地。

从工程视角看,Go 在这类项目里通常有几个明显优势:

  • 并发处理简单,适合流式响应和多工具调用
  • 标准库完整,HTTP、JSON、Context、测试都能直接用
  • 接口和包管理清楚,适合把系统拆成稳定的层次
  • 部署简单,适合作为服务端或命令行工具交付

二、项目结构与模块组织

做课程项目时,第一件事不是先写某个具体功能,而是先把项目结构搭出来。

如果一开始就把请求代码、业务逻辑、工具实现和配置解析都堆在一个目录里,后面随着模块变多,代码会很快变得难以维护。比较稳妥的做法,是先按职责把目录拆开。

项目目录结构

Go 社区有一套经过验证的标准布局,我们在此基础上针对 AI Agent 项目做了调整:

ai-agent/
├── cmd/                    # 可执行入口(每个子目录是一个 binary)
│   ├── server/             # HTTP API 服务
│   ├── agent/              # 命令行 Agent
│   └── ingest/             # 文档摄取工具
├── internal/               # 私有包,外部无法 import
│   ├── llm/                # LLM Provider 封装
│   │   ├── provider.go     # 统一接口定义
│   │   ├── openai.go       # OpenAI 实现
│   │   └── stream.go       # 流式处理
│   ├── agent/              # Agent 核心逻辑
│   ├── tools/              # 工具系统
│   ├── memory/             # 记忆与 RAG
│   └── config/             # 配置加载
├── pkg/                    # 可对外暴露的通用工具包
│   └── retry/              # 通用重试逻辑
├── go.mod                  # 模块定义
├── go.sum                  # 依赖哈希锁定
├── Makefile                # 常用命令
└── .env.example            # 环境变量模板

项目的目录结构没有标准答案,这里列出来的是我认为合适的方案。

    1. cmd/ 只放入口,不放业务逻辑。
      cmd/ 目录的职责通常只有两件事:读取配置、组装依赖。它不应该承载具体业务逻辑。 例如,cmd/agent/main.go 可以做配置加载、日志初始化、HTTP 服务启动,但真正的 Agent 执行流程应该放到 internal/agent 里。
      这样做的好处是,业务逻辑不会和启动流程耦合在一起,后面无论你要做 CLI、Web 服务还是后台任务,都可以复用同一套核心逻辑。
    1. internal/ 用来表达“这是项目内部实现”。
      internal/ 是 Go 在工程上非常有用的一个约束。放在这个目录下的包,只能被当前模块内部引用,不能被外部项目直接依赖。
      对课程项目来说,这个约束很有价值。因为像 internal/llminternal/agentinternal/tools 这些目录,本来就是项目内部实现细节,并不打算作为公共 SDK 暴露出去。
    1. pkg/ 放可以复用的公共能力。
      pkg/ 不一定每个项目都必须有,但当某部分代码确实需要被多个子系统复用时,把它单独抽出来通常更清楚。
      例如,提示词模板、通用结构定义、公共序列化逻辑,都可以放在 pkg/ 目录下。
cmd/ 里不要堆业务逻辑;internal/ 里不要顺手写成“全局工具箱”;目录一旦失去边界,后续会越来越难收拾。

初始化项目

# 创建项目
mkdir ai-agent && cd ai-agent

# 初始化 Go module(即使不发布到 GitHub,也用这个格式)
go mod init github.com/yourname/ai-agent

# 安装核心依赖
go get github.com/sashabaranov/go-openai  # OpenAI SDK

Makefile

通过编写 Makefile, 把常用命令标准化。

# Makefile

.PHONY: run build test lint clean

# 启动开发服务器
run:
	go run ./cmd/server/

# 构建所有二进制
build:
	CGO_ENABLED=0 go build -ldflags="-w -s" -o bin/server ./cmd/server/
	CGO_ENABLED=0 go build -ldflags="-w -s" -o bin/agent  ./cmd/agent/

# 运行测试(含竞态检测,这个习惯要从第一天养成)
test:
	go test -race -count=1 -coverprofile=coverage.out ./...

# 查看测试覆盖率
cover:
	go tool cover -html=coverage.out

# 代码检查
lint:
	golangci-lint run ./...

# 清理构建产物
clean:
	rm -rf bin/ coverage.out

三、Goroutine + Channel:处理流式输出

为什么流式处理需要 Goroutine?

无论是 OpenAI 风格的 SSE,还是其他 Provider 的事件流,调用方通常都不希望等模型全部生成完再一次性返回,而是希望边生成边消费。

同步方式:发请求 → 等待(3-8秒) → 收到完整回复 → 显示
流式方式:发请求 → 立刻收到第一个 token → 实时显示 → ... → 收到结束标记

两种方式的用户体验差距巨大。这类场景非常适合用 Goroutine 和 Channel 处理。

  • Goroutine 负责持续读取服务端返回;
  • Channel 负责把 token 发送给调用方;
  • Context 负责取消和超时控制。

但流式处理有个难点:我们要同时做两件事——接收 token(IO 密集)和处理 token(CPU/显示)。

先看最核心的思路:主流程发起请求之后,另起一个 Goroutine 持续读取流式响应,再通过 Channel 把结果逐步交给调用方。建议返回只读 Channel:

func StreamTokens(ctx context.Context, ...) <-chan Token {
    tokens := make(chan Token, 64)

    go func() {
        defer close(tokens)
        // 读取并发送 token
    }()

    return tokens
}

返回 <-chan Token 可以明确告诉调用方:这个 Channel 只能读取,不能写入,也不能关闭。

忘记 defer close(tokens) 是非常常见的错误。
调用方使用 for range 消费 Channel 时,如果 Channel 不关闭,循环就不会结束。

Token 结构设计

// Token 代表 LLM 流式输出的一个片段。
type Token struct {
    Text  string
    Done  bool
    Error error
}

这里把错误也放到 Token 结构里,是因为调用方在 for range Channel 时无法处理普通的多返回值。这样设计后,调用方只需要消费一个流,就能同时处理文本、结束信号和错误。

完整的流式处理实现

// StreamTokens 调用 OpenAI 流式 API,将 token 通过 channel 返回。
//
// 使用示例:
//
//	for tok := range llm.StreamTokens(ctx, client, messages) {
//	    if tok.Error != nil { /* 处理错误 */ }
//	    if tok.Done { break }
//	    fmt.Print(tok.Text)
//	}
func StreamTokens(
    ctx context.Context,
    client *openai.Client,
    model string,
    messages []openai.ChatCompletionMessage,
) <-chan Token {
    // 缓冲 64:即使调用方处理稍慢,LLM 也能提前送入最多 64 个 token,
    // 不会立刻阻塞。对于 LLM 这种生产速度快于消费速度的场景很重要。
    tokens := make(chan Token, 64)

    go func() {
        // defer close 是整个函数最重要的一行。
        // 无论 goroutine 以何种方式退出(正常/panic/context取消),
        // channel 都会被关闭,调用方的 for range 才能正常结束。
        defer close(tokens)

        // 创建流式请求
        stream, err := client.CreateChatCompletionStream(ctx,
            openai.ChatCompletionRequest{
                Model:    model,
                Stream:   true,
                Messages: messages,
            })
        if err != nil {
            // 把错误发送给调用方,然后 goroutine 退出(defer 会 close channel)
            tokens <- Token{Error: fmt.Errorf("create stream: %w", err)}
            return
        }
        defer stream.Close()

        for {
            resp, err := stream.Recv()
            if err == io.EOF {
                // io.EOF 是流的正常结束信号,不是错误
                tokens <- Token{Done: true}
                return
            }
            if err != nil {
                tokens <- Token{Error: fmt.Errorf("recv stream: %w", err)}
                return
            }

            content := resp.Choices[0].Delta.Content
            if content == "" {
                continue // 跳过空 delta(心跳包)
            }

            // select 同时监听两件事:
            // 1. context 被取消(用户主动停止、超时等)
            // 2. 能向 channel 发送数据
            // 这样即使 channel 满了,context 取消也能立即生效
            select {
            case <-ctx.Done():
                tokens <- Token{Error: fmt.Errorf("context cancelled: %w", ctx.Err())}
                return
            case tokens <- Token{Text: content}:
                // 成功发送
            }
        }
    }()

    return tokens
}

这段代码里有几个重点:

  • Channel 使用缓冲区,避免消费者稍慢时立刻阻塞生产者;
  • 使用 defer close(tokens) 保证调用方能正常结束;
  • 使用 defer stream.Close() 释放流式连接;
  • 使用 select 同时处理发送 token 和 context 取消;
  • io.EOF 是正常结束,不应该当作错误。

调用方代码

func printStream(ctx context.Context, client *openai.Client) {
    messages := []openai.ChatCompletionMessage{
        {Role: "user", Content: "用三句话解释量子纠缠"},
    }

    fmt.Print("AI: ")
    for tok := range llm.StreamTokens(ctx, client, "gpt-4o-mini", messages) {
        if tok.Error != nil {
            fmt.Printf("\n错误: %v\n", tok.Error)
            return
        }
        if tok.Done {
            fmt.Println()
            return
        }
        fmt.Print(tok.Text)
    }
}

调用方只关心“读取 token 并处理”,不用关心底层如何读取 SSE、如何关闭连接、如何检测取消。

缓冲大小的选择

// 不同场景的缓冲大小建议:
make(chan Token, 1)    // 完全同步,生产者立刻等消费者
make(chan Token, 64)   // ✅ LLM 流式:消费者(UI渲染)偶尔慢一点也没关系
make(chan Token, 1024) // 批量处理,允许生产者提前跑很多

流式处理与 Context 传播

四、Context:超时控制与请求取消

Context 解决什么问题?

Agent 系统往往是一条多层调用链:

HTTP Handler → Agent.Run → LLM.Complete → HTTP 请求模型服务

如果用户关闭页面,或者请求超过了最大时间限制,后端应该尽快停止后续工作。否则不仅浪费资源,还可能继续消耗模型费用。

Context 的三种常用方式

超时控制

给整个操作设置最大时间限制,这是最常见的超时控制方式。

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel() // 重要:即使超时前完成,也要释放资源

手动取消

这是手动取消方式,常用于用户主动停止任务,或者上游判断需要停止。

ctx, cancel := context.WithCancel(context.Background())
// 某个条件触发后主动 cancel()

传递请求级别的值

WithValue 只适合传递请求级元数据,例如 request_idtrace_id,不应该用来传递业务参数。

type contextKey string
const requestIDKey contextKey = "request_id"

ctx = context.WithValue(ctx, requestIDKey, "req-123")
// 在任何地方取出
reqID := ctx.Value(requestIDKey).(string)

正确传递 Context

下面是 ❌ 错误做法:每层调用都创建新的独立 context,这样用户取消外层 context 后,内层还在跑。

func (a *Agent) Run() (string, error) {
    ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
    defer cancel()
    return a.callLLM(ctx) // 传入了,但如果上层的 ctx 已取消,这个新 ctx 不受影响
}

这样会断开和上游请求的取消关系。正确做法是从外部接收 Context,并基于它派生子 Context:

func (a *Agent) Run(ctx context.Context, goal string) (string, error) {
    // 每次 LLM 调用单独设置超时,但基于父 ctx 派生
    // 这样父 ctx 取消时,子 ctx 也会立刻取消
    for step := 0; step < a.maxSteps; step++ {
        // 检查父 ctx 是否已取消
        if err := ctx.Err(); err != nil {
            return "", fmt.Errorf("agent cancelled: %w", err)
        }

        // 派生子 ctx,单次 LLM 调用不超过 30 秒
        llmCtx, llmCancel := context.WithTimeout(ctx, 30*time.Second)
        resp, err := a.llm.Complete(llmCtx, req)
        llmCancel() // ← 注意:不用 defer,在循环里 defer 会积累!
        
        if err != nil {
            return "", err
        }
        // ...
    }
}
后面的工具调用、RAG 检索、多智能体协作,都会沿用同一套 Context 传播思路。

检测取消的几种方式

方式一:直接检查

适合在循环开头进行检查。

if err := ctx.Err(); err != nil {
    return err
}

方式二:select

适合等待操作时同时监听取消。

select {
case result := <-resultChan:
    return result, nil
case <-ctx.Done():
    return nil, ctx.Err()
}

方式三:传给支持 ctx 的函数

这也是最常见的方式,例如使用下面这种方式创建的 http 请求会自动在 ctx 取消时中止。

req, err := http.NewRequestWithContext(ctx, "POST", url, body)

在 Go 工程实践约定:函数签名里的 ctx context.Context 永远放在第一个参数

五、接口与泛型:组织可扩展的 Agent 组件

为什么需要统一抽象?

我们的 Agent 程序要支持 OpenAI、Claude、豆包、通义千问四个平台。如果为每个平台写专属代码,Agent 层就会变成这样:

// ❌ 不可维护:每增加一个 provider,Agent 层都要修改
func (a *Agent) callLLM(ctx context.Context, messages []Message) (string, error) {
    switch a.providerType {
    case "openai":
        return a.openaiClient.Complete(ctx, messages)
    case "claude":
        return a.claudeClient.Complete(ctx, messages)
    case "doubao":
        return a.doubaoClient.Complete(ctx, messages)
    // 每加一个 provider 就要改这里...
    }
}

所以,在课程一开始就要设计 LLMProvider 统一抽象。 我们希望 Agent 层只依赖一个统一接口:

// LLMProvider 是所有 LLM 平台必须实现的接口。
// 接口尽量小——只包含真正需要的方法。
// 如果以后需要扩展(如图像、音频),用组合而不是修改这个接口。
type LLMProvider interface {
    // Complete 发送请求并等待完整响应
    Complete(ctx context.Context, req CompletionRequest) (CompletionResponse, error)

    // Stream 发送请求并通过 channel 返回流式 token
    Stream(ctx context.Context, req CompletionRequest) (<-chan Token, error)

    // Name 返回 Provider 标识符,用于日志和监控
    Name() string

    // ModelID 返回当前使用的模型名称
    ModelID() string

    // IsAvailable 做轻量健康检查,用于故障转移判断
    IsAvailable(ctx context.Context) bool
}

统一请求与响应结构

type Message struct {
    Role    string `json:"role"`
    Content string `json:"content"`
}

type CompletionRequest struct {
    Messages    []Message
    MaxTokens   int
    Temperature float64
    Stream      bool
}

type Usage struct {
    PromptTokens     int
    CompletionTokens int
    TotalTokens      int
}

type CompletionResponse struct {
    Content      string
    ProviderName string
    ModelName    string
    Latency      time.Duration
    Usage        Usage
}

接口的价值在于,Agent 完全不需要知道背后到底是 OpenAI、Claude 还是豆包:

type ReactAgent struct {
    llm      llm.LLMProvider
    maxSteps int
}

func NewReactAgent(provider llm.LLMProvider, maxSteps int) *ReactAgent {
    return &ReactAgent{llm: provider, maxSteps: maxSteps}
}

func (a *ReactAgent) Run(ctx context.Context, goal string) (string, error) {
    resp, err := a.llm.Complete(ctx, llm.CompletionRequest{
        Messages: []llm.Message{{Role: "user", Content: goal}},
    })
    // Agent 根本不知道 Complete 是调用了 OpenAI 还是豆包
    // ...
}

后续切换模型、增加 Provider、做故障转移,都会更简单。

使用泛型处理通用流

// Processor 是泛型流处理器,T 可以是任何类型
// 用法:stream.Process(ctx, tokenChan, func(tok Token) error { ... })
func Process[T any](
    ctx context.Context,
    ch <-chan T,
    handler func(T) error,
) error {
    for {
        select {
        case <-ctx.Done():
            return ctx.Err()
        case item, ok := <-ch:
            if !ok {
                return nil // channel 关闭,正常结束
            }
            if err := handler(item); err != nil {
                return err
            }
        }
    }
}

// Collect 将 channel 中的所有值收集到 slice
func Collect[T any](ctx context.Context, ch <-chan T) ([]T, error) {
    var result []T
    err := Process(ctx, ch, func(item T) error {
        result = append(result, item)
        return nil
    })
    return result, err
}

这类泛型工具函数适合放在 pkg/stream 这类通用包里。它与具体 LLM 无关,可以在后续模块中反复复用。

六、生产级 HTTP 客户端:连接池、重试与限流

不要在生产代码中直接使用:

resp, err := http.Get("https://api.openai.com/v1/...")

http.DefaultClient 没有完整的超时、重试和限流策略。对 LLM API 这种外部依赖来说,这很容易造成请求挂起、连接复用差、错误处理不统一等问题。

HTTP 客户端配置

// ClientConfig 控制 HTTP 客户端行为
type ClientConfig struct {
    // 连接池配置
    MaxIdleConns        int           // 最大空闲连接总数
    MaxIdleConnsPerHost int           // 每个 host 最大空闲连接数
    IdleConnTimeout     time.Duration // 空闲连接存活时间

    // 超时配置
    DialTimeout     time.Duration // 建立 TCP 连接的超时
    RequestTimeout  time.Duration // 单次请求总超时(包含读取响应体)

    // 重试配置
    MaxRetries    int           // 最大重试次数(不含首次请求)
    RetryWaitBase time.Duration // 重试等待基础时间(指数退避的底数)

    // 限流配置
    RequestsPerSecond float64 // 每秒最大请求数,0 表示不限流
}

一个适合 LLM API 的默认配置大致如下:

// DefaultClientConfig 返回适合 LLM API 调用的默认配置
func DefaultClientConfig() ClientConfig {
    return ClientConfig{
        MaxIdleConns:        100,
        MaxIdleConnsPerHost: 10,
        IdleConnTimeout:     90 * time.Second,
        DialTimeout:         5 * time.Second,
        RequestTimeout:      90 * time.Second, // LLM 响应可能较慢
        MaxRetries:          3,
        RetryWaitBase:       500 * time.Millisecond,
        RequestsPerSecond:   10, // 默认每秒 10 个请求
    }
}

连接池与限流

transport := &http.Transport{
	// 连接池:复用 TCP 连接,避免每次请求都重新建立
	MaxIdleConns:        cfg.MaxIdleConns,
	MaxIdleConnsPerHost: cfg.MaxIdleConnsPerHost,
	IdleConnTimeout:     cfg.IdleConnTimeout,

	// TCP 层面的超时控制
	DialContext: (&net.Dialer{
		Timeout:   cfg.DialTimeout,
		KeepAlive: 30 * time.Second, // TCP keepalive,防止连接被防火墙静默断开
	}).DialContext,

	// TLS 握手超时
	TLSHandshakeTimeout: 10 * time.Second,

	// 等待服务器响应头的超时(不含响应体)
	ResponseHeaderTimeout: 30 * time.Second,

	// 允许 HTTP/2(OpenAI API 支持 HTTP/2,性能更好)
	ForceAttemptHTTP2: true,
}

http.Transport 应该在程序生命周期内复用,不要每次请求都创建新的 http.Client。否则连接池无法复用,每次请求都要重新建立 TCP / TLS 连接。

指数退避重试

// Do 执行 HTTP 请求,带限流和指数退避重试
func (c *ProductionClient) Do(ctx context.Context, req *http.Request) (*http.Response, error) {
    // 限流检查:如果当前请求速率过高,等待
    if c.limiter != nil {
        if err := c.limiter.Wait(ctx); err != nil {
            return nil, fmt.Errorf("rate limiter: %w", err)
        }
    }

    var (
        resp     *http.Response
        err      error
        lastErr  error
    )

    for attempt := 0; attempt <= c.cfg.MaxRetries; attempt++ {
        // 非首次请求,等待后重试(指数退避)
        if attempt > 0 {
            wait := c.exponentialBackoff(attempt)
            select {
            case <-time.After(wait):
                // 等待结束,继续重试
            case <-ctx.Done():
                return nil, fmt.Errorf("retry cancelled: %w", ctx.Err())
            }
        }

        resp, err = c.client.Do(req.WithContext(ctx))
        if err == nil && c.shouldRetry(resp.StatusCode) {
            // HTTP 错误码也需要重试
            resp.Body.Close()
            lastErr = fmt.Errorf("HTTP %d", resp.StatusCode)
            continue
        }
        if err != nil {
            lastErr = err
            continue
        }
        return resp, nil // 成功
    }

    return nil, fmt.Errorf("after %d retries: %w", c.cfg.MaxRetries, lastErr)
}

// exponentialBackoff 计算指数退避等待时间:base * 2^attempt,最大 30 秒
func (c *ProductionClient) exponentialBackoff(attempt int) time.Duration {
    wait := float64(c.cfg.RetryWaitBase) * math.Pow(2, float64(attempt-1))
    max := 30 * time.Second
    if time.Duration(wait) > max {
        return max
    }
    return time.Duration(wait)
}

// shouldRetry 判断 HTTP 状态码是否应该重试
func (c *ProductionClient) shouldRetry(statusCode int) bool {
    switch statusCode {
    case 429: // Too Many Requests(限流)
        return true
    case 500, 502, 503, 504: // 服务端错误
        return true
    default:
        return false
    }
}

常见可重试场景包括:

  • 429 Too Many Requests
  • 500 Internal Server Error
  • 502 Bad Gateway
  • 503 Service Unavailable
  • 504 Gateway Timeout
重试不能无限进行。重试次数、退避时间、整体超时都必须受控,否则可能放大故障。

七、JSON 进阶:处理复杂 API 响应

LLM 接口看起来是在返回 JSON,但和普通业务接口相比,它的响应结构通常更复杂一些。

常见情况包括:

  • 某些字段可能不存在,或者值是 null
  • 同一个字段在不同场景下可能是不同类型
  • 流式响应并不是一个完整 JSON,而是一段段事件流

解析工具调用参数

type ToolCall struct {
    ID       string           `json:"id"`
    Type     string           `json:"type"`
    Function ToolCallFunction `json:"function"`
}

type ToolCallFunction struct {
    Name      string `json:"name"`
    Arguments string `json:"arguments"`
}

func ParseArguments[T any](tc ToolCall) (T, error) {
    var result T
    if err := json.Unmarshal([]byte(tc.Function.Arguments), &result); err != nil {
        return result, fmt.Errorf("parse arguments for %s: %w", tc.Function.Name, err)
    }
    return result, nil
}

这里容易忽略的一点是:arguments 往往是一个 JSON 字符串,而不是直接的 JSON 对象,所以需要二次解析。

处理联合类型字段

在处理多模态的会话消息时,需要使用联合类型,灵活处理数据。

// ContentPart 代表多模态消息的一个部分(文本或图片)
// 这是"联合类型"的典型场景:一个字段可能是不同结构
type ContentPart struct {
    Type     string      `json:"type"`
    Text     string      `json:"text,omitempty"`      // type == "text" 时有值
    ImageURL *ImageURL   `json:"image_url,omitempty"` // type == "image_url" 时有值
}

type ImageURL struct {
    URL    string `json:"url"`
    Detail string `json:"detail,omitempty"` // "low" | "high" | "auto"
}

// MessageContent 可以是字符串,也可以是 ContentPart 数组
// 这种"多类型字段"需要自定义 JSON 解析
type MessageContent struct {
    Text  string        // 如果是字符串
    Parts []ContentPart // 如果是数组
}

func (m *MessageContent) UnmarshalJSON(data []byte) error {
    // 先尝试解析为字符串
    var text string
    if err := json.Unmarshal(data, &text); err == nil {
        m.Text = text
        return nil
    }

    // 再尝试解析为数组
    var parts []ContentPart
    if err := json.Unmarshal(data, &parts); err == nil {
        m.Parts = parts
        return nil
    }

    return fmt.Errorf("content: expected string or array, got: %s", string(data))
}

func (m MessageContent) MarshalJSON() ([]byte, error) {
    if len(m.Parts) > 0 {
        return json.Marshal(m.Parts)
    }
    return json.Marshal(m.Text)
}

这种写法适合处理“字段可能是多种结构”的 API 响应。

安全序列化

// SafeJSON 安全地序列化对象,失败时返回错误字符串而非 panic
func SafeJSON(v any) string {
    data, err := json.Marshal(v)
    if err != nil {
        return fmt.Sprintf(`{"error": "marshal failed: %v"}`, err)
    }
    return string(data)
}

解析 SSE

对于 Server-Sent Events 数据需要根据数据行针对性的解析。

// ParseSSEData 解析 Server-Sent Events 的数据行
// 格式:data: {"choices": [...]}  或  data: [DONE]
func ParseSSEData[T any](line string) (T, bool, error) {
    var zero T

    if !strings.HasPrefix(line, "data: ") {
        return zero, false, nil // 不是数据行(可能是注释或空行)
    }

    data := strings.TrimPrefix(line, "data: ")
    data = strings.TrimSpace(data)

    if data == "[DONE]" {
        return zero, true, nil // 流结束标记
    }

    var result T
    if err := json.Unmarshal([]byte(data), &result); err != nil {
        return zero, false, fmt.Errorf("parse SSE data: %w", err)
    }

    return result, false, nil
}

这类解析函数应该有单元测试,因为流式输出中的边界情况很多。

八、测试与 Mock:为 AI 组件写可靠测试

AI 组件和普通业务代码相比,测试难点更明显:

  • LLM 调用依赖外部 API Key
  • 真实响应有随机性
  • 工具调用可能带副作用

所以,比较稳妥的思路是:把外部依赖抽象成接口,再通过 Mock 和 httptest 做替换。

接口、Mock 与测试关系图

Mock LLM Provider

type MockProvider struct {
    CompleteFunc func(ctx context.Context, req llm.CompletionRequest) (llm.CompletionResponse, error)
    Responses    []string
    callCount    int
    name         string
    model        string
}

Mock 的作用是让测试可以稳定控制模型返回:

  • 固定返回某段文本;
  • 按顺序返回多轮响应;
  • 模拟错误;
  • 记录调用次数;
  • 配合 Agent 测试多轮执行。

Table-Driven Test

func TestStreamTokens_WithMock(t *testing.T) {
    tests := []struct {
        name       string
        response   string
        wantTokens int
        wantErr    bool
    }{
        {name: "正常流式输出", response: "Hello", wantTokens: 5},
        {name: "空响应", response: "", wantTokens: 0},
        {name: "包含中文字符", response: "你好", wantTokens: 2},
    }

    for _, tt := range tests {
        t.Run(tt.name, func(t *testing.T) {
            mock := testutil.NewMockProvider(tt.response)
            ch, err := mock.Stream(context.Background(), llm.CompletionRequest{})
            if err != nil {
                t.Fatalf("Stream() error = %v", err)
            }

            var count int
            for tok := range ch {
                if tok.Error != nil {
                    if !tt.wantErr {
                        t.Fatalf("unexpected error: %v", tok.Error)
                    }
                    return
                }
                if tok.Done {
                    break
                }
                count++
            }

            if count != tt.wantTokens {
                t.Fatalf("got %d tokens, want %d", count, tt.wantTokens)
            }
        })
    }
}

Table-Driven Test 的好处是可以把多种输入、输出和边界条件集中放在一张表里,新增测试场景时也更清晰。

使用 httptest 模拟 HTTP 服务

对于 HTTP 客户端,httptest.Server 比 Mock 更接近真实场景。它可以模拟:

  • 前几次返回 503,最后一次成功;
  • 请求超时;
  • 返回 429;
  • 返回非法 JSON;
  • 检查请求 Header 和 Body 是否符合预期。
func TestProductionClient_Retry(t *testing.T) {
    requestCount := 0
    server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
        requestCount++
        if requestCount < 3 {
            w.WriteHeader(http.StatusServiceUnavailable)
            return
        }
        w.WriteHeader(http.StatusOK)
        _, _ = w.Write([]byte(`{"ok": true}`))
    }))
    defer server.Close()

    cfg := llm.DefaultClientConfig()
    cfg.MaxRetries = 3
    cfg.RetryWaitBase = 10 * time.Millisecond

    client := llm.NewProductionClient(cfg)
    req, _ := http.NewRequest("GET", server.URL, nil)
    resp, err := client.Do(context.Background(), req)
    if err != nil {
        t.Fatalf("expected success after retry, got error: %v", err)
    }
    defer resp.Body.Close()

    if requestCount != 3 {
        t.Fatalf("got %d requests, want 3", requestCount)
    }
}
先面向接口编程,再通过 Mock 和 httptest 做依赖替换,测试会简单很多。

九、本章实战:搭一个最小 AI Agent 工程骨架

学完这一章后,建议先不要急着接入真实模型,而是先把工程骨架搭起来。

建议最少完成下面几件事:

  1. 初始化 go.mod,搭好 cmd/internal/pkg/config/docs/ 目录
  2. 定义 LLMProvider 接口
  3. 实现一个最小的 OpenAIProvider 结构体,先返回固定结果
  4. 实现 StreamTokens,把流式处理管道打通
  5. httptest 和 Mock 补一组最基本的测试

这样做的目的,是先把后面会反复用到的“骨架”搭起来。到了 M02,我们再在这套骨架上接入真正的模型 API。

小结

这一章的主线可以概括为下面几点:

  1. AI Agent 开发首先是工程问题,其次才是模型问题。
  2. 项目结构、并发模式、Context 传播、接口抽象和测试方式,都会贯穿后续所有章节。
  3. 流式输出和外部 API 调用,是后面系统设计的基础场景。
  4. 代码从一开始就要为扩展和测试留边界。

完成这一章后,你应该已经有能力搭出一个最小可用的 Go Agent 项目骨架。

课后练习

  1. 按本章结构创建 go-ai-agent 项目骨架。
  2. 实现一个 LLMProvider 接口,并用 MockProvider 返回固定字符串。
  3. 实现 StreamTokens,并使用 for range 实时打印输出。

参考资料

  • context 官方文档
  • Go Blog: Concurrency Patterns
  • 《The Go Programming Language》第 8 章
  • go-openai 源码
  • golangci-lint
最后更新于 • Q1mi