Pulsar 是一种分布式消息流平台,具有高性能、可扩展性和多租户支持,适用于实时数据处理和消息传递。

Pulsar 介绍

Pulsar 是一种多租户、高性能的服务器到服务器消息传递解决方案。Pulsar 最初由 Yahoo 开发,目前由 Apache 软件基金会管理。

Pulsar 支持以下关键特性。

  1. 单实例原生支持多集群部署,实现跨集群消息的无缝地理级复制(geo-replication),保障灾难恢复能力和全球数据同步。
  2. 发布(publish)与端到端(end-to-end)极低延迟。
  3. 支持无缝扩展至百万级独立消息主题(topic)。
  4. 提供简洁统一的客户端 API,支持 Java / Go / Python / C++ 主流开发语言。
  5. 多种主题订阅模式
  6. 依托 Apache BookKeeper 提供持久化存储,基于 serverless 架构的轻量级计算框架 Pulsar Functions,支持原生流数据处理能力。
  7. 基于 Pulsar Functions 构建的无服务器连接器框架 Pulsar IO,可无缝实现 Apache Pulsar 与其他系统的双向数据流动。
  8. 分级存储机制会根据数据生命周期策略,自动将热存储/温存储中的冷数据迁移到长期存储介质(例如Amazon S3、Google Cloud Storage),实现存储成本优化。

Pulsar 架构及核心概念

Pulsar 架构

在最高层级上,一个 Pulsar 实例 由一个或多个 Pulsar 集群 组成。同一实例内的集群之间可相互复制数据,实现高可用性。

Pulsar 集群的核心组件 每个 Pulsar 集群包含以下关键服务:

  1. Broker 节点(1个或多个)
    • 接收生产者(Producer)发送的消息,进行负载均衡后分发给消费者(Consumer)。
    • Pulsar 配置存储 交互,处理集群元数据管理、租户配置等协调任务。
    • 将消息持久化到 BookKeeper 实例(又称 Bookies)中。
    • 依赖 集群专属 ZooKeeper 集群 完成节点注册、Topic 分配等任务。
  2. BookKeeper 集群
    • 组成:由1个或多个 Bookie 节点 组成。
    • 职责:提供分布式日志存储(Ledger),确保消息的持久化和高可靠性。
  3. ZooKeeper 集群
    • 作用范围:专属服务于单个 Pulsar 集群。
    • 职责:协调 Broker 节点间的状态同步、Topic 分区元数据管理等任务。

架构示意图说明 下图展示了一个 Pulsar 集群的典型部署结构,其中:

  • Broker 负责消息路由与计算逻辑。
  • BookKeeper 专注存储层,通过多副本(默认3副本)保障数据可靠性。
  • ZooKeeper 作为协调中心,确保集群元数据一致性。

pulsar-system-architecture

核心概念

  • Topic: Pulsar 中的消息主题,分为持久化和非持久化两种类型。
  • Producer: 消息生产者,负责向 Topic 发送消息。
  • Consumer: 消息消费者,负责从 Topic 订阅并消费消息。
  • Subscription: 订阅模式,支持独占(Exclusive)、故障转移(Failover)、共享(Shared)和键共享(Key_Shared)四种模式。
  • Broker: Pulsar 的代理节点,负责消息的接收、存储和分发。
  • BookKeeper: Pulsar 使用 Apache BookKeeper 作为持久化存储层,确保消息的可靠存储。
  • ZooKeeper: 用于 Pulsar 的元数据存储和集群协调。

Pulsar VS Kafka

Pulsar 和 Kafka 都是流行的消息中间件,它们在架构上的最大不同是 Pulsar 是计算和存储分离,而 Kafka 由 Broker 进行消息的收发和持久化,数据存储在本地文件系统,由 Broker 统一管理。这也意味着数据和消息处理是耦合的。

此外,Pulsar 相对 Kafka 还有以下优势。

  • 扩展灵活
  • 多租户
  • 更灵活的订阅方式
  • 分层存储

搭建本地 Docker 环境

执行下面的命令,使用 Docker 快速搭建一个Pulsar 环境。

Mac

docker run -it \
-p 6650:6650 \
-p 8080:8080 \
--mount source=pulsardata,target=/pulsar/data \
--mount source=pulsarconf,target=/pulsar/conf \
apachepulsar/pulsar:4.0.3 \
bin/pulsar standalone

Windows

docker run -it ^
-p 6650:6650 ^
-p 8080:8080 ^
--mount source=pulsardata,target=/pulsar/data ^
--mount source=pulsarconf,target=/pulsar/conf ^
apachepulsar/pulsar:4.0.3 ^
bin/pulsar standalone

Pulsar go 客户端示例

Pulsar 目前支持主流的编程语言,具体客户端的特性支持情况请查看[官方说明](Pulsar Client Feature Matrix)。

安装依赖

执行以下命令,安装官方 Go 客户端库。

go get -u "github.com/apache/pulsar-client-go/pulsar"

在你的项目代码中引入该依赖库。

import "github.com/apache/pulsar-client-go/pulsar"

初始化 Client

要与 Pulsar 进行通信,首先需要初始化一个 Client 对象。

import (
    "log"
    "time"

    "github.com/apache/pulsar-client-go/pulsar"
)

func main() {
    client, err := pulsar.NewClient(pulsar.ClientOptions{
        URL:               "pulsar://localhost:6650",
        OperationTimeout:  30 * time.Second,
        ConnectionTimeout: 30 * time.Second,
    })
    if err != nil {
        log.Fatalf("Could not instantiate Pulsar client: %v", err)
    }

    defer client.Close()
}

如果有多个 broker,可以通过以下方式连接。

import (
    "log"
    "time"
    "github.com/apache/pulsar-client-go/pulsar"
)

func main() {
    client, err := pulsar.NewClient(pulsar.ClientOptions{
        URL: "pulsar://localhost:6650,localhost:6651,localhost:6652",
        OperationTimeout:  30 * time.Second,
        ConnectionTimeout: 30 * time.Second,
    })
    if err != nil {
        log.Fatalf("Could not instantiate Pulsar client: %v", err)
    }

    defer client.Close()
}

更多详细配置,可以查看 pulsar.ClientOptions

Producer

通过生产者向 Pulsar 发送消息。

producer, err := client.CreateProducer(pulsar.ProducerOptions{
    Topic: "my-topic",
})

if err != nil {
    log.Fatal(err)
}

_, err = producer.Send(context.Background(), &pulsar.ProducerMessage{
    Payload: []byte("hello"),
})

defer producer.Close()

if err != nil {
    fmt.Println("Failed to publish message", err)
}
fmt.Println("Published message")

Consumer

订阅

Pulsar 消费者可以订阅一个或多个 Pulsar topic,并订阅相关 topic 的消息。可以使用 pulsar.ConsumerOptions 对象配置 Go 消费者。

consumer, err := client.Subscribe(pulsar.ConsumerOptions{
    Topic:            "topic-1",  // 订阅指定的 topic
    SubscriptionName: "my-sub",
    Type:             pulsar.Shared,
})
if err != nil {
    log.Fatal(err)
}
defer consumer.Close()

其中:

  • Topic:当前消费者订阅的主题
  • SubscriptionName:当前订阅者的名称
  • Type:消费消息的模式,有 ExclusiveSharedFailoverKeyShared四种。

支持订阅多个 topic。

consumer, err := client.Subscribe(pulsar.ConsumerOptions{
	Topics:           []string{"my-topic", "q1mi-topic"},
	SubscriptionName: "my-sub",
	Type:             pulsar.Shared,
})

或基于正则进行订阅。

topicsPattern := "persistent://public/default/topic.*"
opts := pulsar.ConsumerOptions{
    // 通过指定 `TopicsPattern` 字段创建一个基于正则的消费者
    TopicsPattern:    topicsPattern,
    SubscriptionName: "regex-sub",
}

consumer, err := client.Subscribe(opts)
if err != nil {
    log.Fatal(err)
}
defer consumer.Close()

除上述参数外,以下 pulsar.ConsumerOptions 配置项也较常用。

  • MessageChannel:为消费组指定消息投递通道,新到达的消息将被实时推送到该通道等待处理。
  • ReceiverQueueSize:接收队列大小设置消费者接收队列的容量。消费者接收队列控制应用程序调用Consumer.receive()前可由Consumer累积的消息数量。设置较大值可能通过预取消息提升消费吞吐量,但会相应增加内存占用。默认值为1000条消息,适用于绝大多数场景。若需极致性能或处理消息尖峰,可适当调大;若内存敏感型服务,则可酌情调小。
  • EnableAutoScaledReceiverQueueSize:启用自动扩展接收队列容量。当该选项启用时,消费者接收队列将根据实时吞吐量自动调整容量。此时ReceiverQueueSize 参数将作为队列可扩展的最大容量限制(即队列绝不会超过该值,但可能在其范围内动态缩放)。默认值为false(不启用自动扩展),此时接收队列保持固定大小(由ReceiverQueueSize 参数直接控制)。
  • SubscriptionInitialPosition:订阅时开始消费的初始位置。可选值有 Earliest (最早的)和 Latest (最新的),默认为 Latest

消费消息

订阅 topic 成功后,需要在业务代码中接收并处理消息。

// 循环接收消息
for {
    // 没有消息则阻塞,
    msg, err := consumer.Receive(context.Background())
    if err != nil {
        log.Fatal(err)
    }

    fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
        msg.ID(), string(msg.Payload()))

    consumer.Ack(msg)  // 消息处理成功,回复 Ack
}

if err := consumer.Unsubscribe(); err != nil {
    log.Fatal(err)
}

消息处理成功,可以回复 Ack,如果消息处理失败,则需要回复 Nack

当一个消息被回复 Nack 后,将在一段固定延迟后被标记为重新投递。在使用ConsumerOptions 构建消费者时,可通过 ConsumerOptions.NackRedeliveryDelay 配置该延迟的时间。

Reader

Pulsar Reader 处理 Pulsar topic 的消息。Reader 与 Consumer 不同,在使用 Reader 时,需要明确指定要从流中的哪个消息开始(Consumer 会自动从最近 noacked 的消息开始)。可以使用 ReaderOptions 对象配置 Go Reader。

reader, err := client.CreateReader(pulsar.ReaderOptions{
    Topic:          "topic-1",
    StartMessageID: pulsar.EarliestMessageID(),  // 指定从最早的消息开始消费
})
if err != nil {
    log.Fatal(err)
}
defer reader.Close()

参考链接

  1. https://pulsar.apache.org/docs/
  2. https://streamnative.io/blog/apache-pulsar-client-application-best-practices
  3. https://pulsar.apache.org/docs/4.0.x/client-libraries-go/

扫码关注微信公众号