pulsar 介绍及Pulsar Go client 使用指南
承蒙大家厚爱,我的《Go语言之路》的纸质版图书已经上架京东,有需要的朋友请点击 此链接 购买。
Pulsar 是一种分布式消息流平台,具有高性能、可扩展性和多租户支持,适用于实时数据处理和消息传递。
Pulsar 介绍
Pulsar 是一种多租户、高性能的服务器到服务器消息传递解决方案。Pulsar 最初由 Yahoo 开发,目前由 Apache 软件基金会管理。
Pulsar 支持以下关键特性。
- 单实例原生支持多集群部署,实现跨集群消息的无缝地理级复制(geo-replication),保障灾难恢复能力和全球数据同步。
- 发布(publish)与端到端(end-to-end)极低延迟。
- 支持无缝扩展至百万级独立消息主题(topic)。
- 提供简洁统一的客户端 API,支持 Java / Go / Python / C++ 主流开发语言。
- 多种主题订阅模式
- 依托 Apache BookKeeper 提供持久化存储,基于 serverless 架构的轻量级计算框架 Pulsar Functions,支持原生流数据处理能力。
- 基于 Pulsar Functions 构建的无服务器连接器框架 Pulsar IO,可无缝实现 Apache Pulsar 与其他系统的双向数据流动。
- 分级存储机制会根据数据生命周期策略,自动将热存储/温存储中的冷数据迁移到长期存储介质(例如Amazon S3、Google Cloud Storage),实现存储成本优化。
Pulsar 架构及核心概念
Pulsar 架构
在最高层级上,一个 Pulsar 实例 由一个或多个 Pulsar 集群 组成。同一实例内的集群之间可相互复制数据,实现高可用性。
Pulsar 集群的核心组件 每个 Pulsar 集群包含以下关键服务:
- Broker 节点(1个或多个)
- 接收生产者(Producer)发送的消息,进行负载均衡后分发给消费者(Consumer)。
- 与 Pulsar 配置存储 交互,处理集群元数据管理、租户配置等协调任务。
- 将消息持久化到 BookKeeper 实例(又称 Bookies)中。
- 依赖 集群专属 ZooKeeper 集群 完成节点注册、Topic 分配等任务。
- BookKeeper 集群
- 组成:由1个或多个 Bookie 节点 组成。
- 职责:提供分布式日志存储(Ledger),确保消息的持久化和高可靠性。
- ZooKeeper 集群
- 作用范围:专属服务于单个 Pulsar 集群。
- 职责:协调 Broker 节点间的状态同步、Topic 分区元数据管理等任务。
架构示意图说明 下图展示了一个 Pulsar 集群的典型部署结构,其中:
- Broker 负责消息路由与计算逻辑。
- BookKeeper 专注存储层,通过多副本(默认3副本)保障数据可靠性。
- ZooKeeper 作为协调中心,确保集群元数据一致性。
核心概念
- Topic: Pulsar 中的消息主题,分为持久化和非持久化两种类型。
- Producer: 消息生产者,负责向 Topic 发送消息。
- Consumer: 消息消费者,负责从 Topic 订阅并消费消息。
- Subscription: 订阅模式,支持独占(Exclusive)、故障转移(Failover)、共享(Shared)和键共享(Key_Shared)四种模式。
- Broker: Pulsar 的代理节点,负责消息的接收、存储和分发。
- BookKeeper: Pulsar 使用 Apache BookKeeper 作为持久化存储层,确保消息的可靠存储。
- ZooKeeper: 用于 Pulsar 的元数据存储和集群协调。
Pulsar VS Kafka
Pulsar 和 Kafka 都是流行的消息中间件,它们在架构上的最大不同是 Pulsar 是计算和存储分离,而 Kafka 由 Broker 进行消息的收发和持久化,数据存储在本地文件系统,由 Broker 统一管理。这也意味着数据和消息处理是耦合的。
此外,Pulsar 相对 Kafka 还有以下优势。
- 扩展灵活
- 多租户
- 更灵活的订阅方式
- 分层存储
搭建本地 Docker 环境
执行下面的命令,使用 Docker 快速搭建一个Pulsar 环境。
Mac
docker run -it \
-p 6650:6650 \
-p 8080:8080 \
--mount source=pulsardata,target=/pulsar/data \
--mount source=pulsarconf,target=/pulsar/conf \
apachepulsar/pulsar:4.0.3 \
bin/pulsar standalone
Windows
docker run -it ^
-p 6650:6650 ^
-p 8080:8080 ^
--mount source=pulsardata,target=/pulsar/data ^
--mount source=pulsarconf,target=/pulsar/conf ^
apachepulsar/pulsar:4.0.3 ^
bin/pulsar standalone
Pulsar go 客户端示例
Pulsar 目前支持主流的编程语言,具体客户端的特性支持情况请查看[官方说明](Pulsar Client Feature Matrix)。
安装依赖
执行以下命令,安装官方 Go 客户端库。
go get -u "github.com/apache/pulsar-client-go/pulsar"
在你的项目代码中引入该依赖库。
import "github.com/apache/pulsar-client-go/pulsar"
初始化 Client
要与 Pulsar 进行通信,首先需要初始化一个 Client 对象。
import (
"log"
"time"
"github.com/apache/pulsar-client-go/pulsar"
)
func main() {
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650",
OperationTimeout: 30 * time.Second,
ConnectionTimeout: 30 * time.Second,
})
if err != nil {
log.Fatalf("Could not instantiate Pulsar client: %v", err)
}
defer client.Close()
}
如果有多个 broker,可以通过以下方式连接。
import (
"log"
"time"
"github.com/apache/pulsar-client-go/pulsar"
)
func main() {
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://localhost:6650,localhost:6651,localhost:6652",
OperationTimeout: 30 * time.Second,
ConnectionTimeout: 30 * time.Second,
})
if err != nil {
log.Fatalf("Could not instantiate Pulsar client: %v", err)
}
defer client.Close()
}
更多详细配置,可以查看 pulsar.ClientOptions。
Producer
通过生产者向 Pulsar 发送消息。
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: "my-topic",
})
if err != nil {
log.Fatal(err)
}
_, err = producer.Send(context.Background(), &pulsar.ProducerMessage{
Payload: []byte("hello"),
})
defer producer.Close()
if err != nil {
fmt.Println("Failed to publish message", err)
}
fmt.Println("Published message")
Consumer
订阅
Pulsar 消费者可以订阅一个或多个 Pulsar topic,并订阅相关 topic 的消息。可以使用 pulsar.ConsumerOptions 对象配置 Go 消费者。
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: "topic-1", // 订阅指定的 topic
SubscriptionName: "my-sub",
Type: pulsar.Shared,
})
if err != nil {
log.Fatal(err)
}
defer consumer.Close()
其中:
- Topic:当前消费者订阅的主题
- SubscriptionName:当前订阅者的名称
- Type:消费消息的模式,有
Exclusive
、Shared
、Failover
、KeyShared
四种。
支持订阅多个 topic。
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topics: []string{"my-topic", "q1mi-topic"},
SubscriptionName: "my-sub",
Type: pulsar.Shared,
})
或基于正则进行订阅。
topicsPattern := "persistent://public/default/topic.*"
opts := pulsar.ConsumerOptions{
// 通过指定 `TopicsPattern` 字段创建一个基于正则的消费者
TopicsPattern: topicsPattern,
SubscriptionName: "regex-sub",
}
consumer, err := client.Subscribe(opts)
if err != nil {
log.Fatal(err)
}
defer consumer.Close()
除上述参数外,以下 pulsar.ConsumerOptions
配置项也较常用。
MessageChannel
:为消费组指定消息投递通道,新到达的消息将被实时推送到该通道等待处理。ReceiverQueueSize
:接收队列大小设置消费者接收队列的容量。消费者接收队列控制应用程序调用Consumer.receive()
前可由Consumer
累积的消息数量。设置较大值可能通过预取消息提升消费吞吐量,但会相应增加内存占用。默认值为1000
条消息,适用于绝大多数场景。若需极致性能或处理消息尖峰,可适当调大;若内存敏感型服务,则可酌情调小。EnableAutoScaledReceiverQueueSize
:启用自动扩展接收队列容量。当该选项启用时,消费者接收队列将根据实时吞吐量自动调整容量。此时ReceiverQueueSize 参数将作为队列可扩展的最大容量限制(即队列绝不会超过该值,但可能在其范围内动态缩放)。默认值为false(不启用自动扩展),此时接收队列保持固定大小(由ReceiverQueueSize 参数直接控制)。SubscriptionInitialPosition
:订阅时开始消费的初始位置。可选值有Earliest
(最早的)和Latest
(最新的),默认为Latest
。
消费消息
订阅 topic 成功后,需要在业务代码中接收并处理消息。
// 循环接收消息
for {
// 没有消息则阻塞,
msg, err := consumer.Receive(context.Background())
if err != nil {
log.Fatal(err)
}
fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
msg.ID(), string(msg.Payload()))
consumer.Ack(msg) // 消息处理成功,回复 Ack
}
if err := consumer.Unsubscribe(); err != nil {
log.Fatal(err)
}
消息处理成功,可以回复 Ack
,如果消息处理失败,则需要回复 Nack
。
当一个消息被回复 Nack
后,将在一段固定延迟后被标记为重新投递。在使用ConsumerOptions 构建消费者时,可通过 ConsumerOptions.NackRedeliveryDelay
配置该延迟的时间。
Reader
Pulsar Reader 处理 Pulsar topic 的消息。Reader 与 Consumer 不同,在使用 Reader 时,需要明确指定要从流中的哪个消息开始(Consumer 会自动从最近 noacked
的消息开始)。可以使用 ReaderOptions 对象配置 Go Reader。
reader, err := client.CreateReader(pulsar.ReaderOptions{
Topic: "topic-1",
StartMessageID: pulsar.EarliestMessageID(), // 指定从最早的消息开始消费
})
if err != nil {
log.Fatal(err)
}
defer reader.Close()
参考链接
- https://pulsar.apache.org/docs/
- https://streamnative.io/blog/apache-pulsar-client-application-best-practices
- https://pulsar.apache.org/docs/4.0.x/client-libraries-go/