
Apache Pulsar 云原生时代消息流系统采用存储计算分离架构,支持大集群、多租户、百万级 Topic、企业级和金融级功能,如跨区域数据复制、持久存储、分层存储、高可扩展性等。Apache Pulsar 提供统一的消费模式,支持消息队列和流程场景,不仅可以为队列场景提供企业读写服务质量和强一致性保证,还可以为流程场景提供高吞吐量和低延迟。
Apache Pulsar 腾讯云已经获得了大规模的生产实践,并在过去的一年里承担了许多不同的工业生态使用场景。在实际生产实践中,腾讯云针对 Apache Pulsar 为保证用户在不同场景下系统的稳定高效运行,做了一系列的性能优化和稳定功能工作。本文围绕腾讯云近一年 Pulsar 优化稳定性和性能的最佳实践。
为什么选择在生产环境中使用? Pulsar?
用户以前用过 Kafka 由于业务的特定场景,集群的整体流量相对较小,但需要使用 Topic 较多。此前使用 Kafka 因为集群 Kafka 由于自身结构的限制,用户无法在集群中创建更多 Topic,因此,为了满足多的业务 Topic 多套使用场景需要部署 Kafka 集群满足业务使用,导致业务使用成本高。
Pulsar 除了拥有 Pub-Sub 的传统 MQ 除了功能外,其底层架构计算存储分离,在存储层分层分片,可以轻松使用 BookKeeper 中的数据 offload 便宜存放。Pulsar Functions 是 Serverless 轻量化计算框架为用户提供 Topic 中转能力。在开源之前,Pulsar 已在 Yahoo! 体验生产环境 5 年度打磨,容量容易扩大,支撑多 Topic 场景。为了降低使用成本,满足需求 Topic 用户切换到业务场景 Pulsar 的集群上。
目前该用户的一套 Pulsar 集群可承载 60W 左右的 Topic,在降低使用成本的同时,时,降低了使用成本。
使用 Shared 订阅模式或单条 Ack 用户户经常遇到新闻模型时 Ack 空洞的情况。Pulsar 单独抽象 individuallyDeletedMessages 收集记录空洞信息。该集合为开闭区间集合,开闭区间表示消息为空洞消息,闭合区间表示消息已处理。早期 Pulsar 支持单条 Ack 和批量 Ack 两种模型,后者对标 Kafka 的 Ack Offset。引入单条 Ack 模型主要针对在线业务场景,但也带来了 Ack 空洞问题。Ack 下图中的空洞 individuallyDeletedMessage 展示的集合。
如何理解 individuallyDeletedMessage?以下图为例:

第一个记录 Ledger id 是 5:1280,这个集合是一个封闭的范围,说明消息已经被封闭了 Ack;之后的 5:1281 是开区间,说明消息没有被打开 Ack。在这里,我们将以开闭区间的形式区分新闻是否被区分 Ack。
Ack 空洞的出现可能是因为 Broker 由于早期版本的设计缺陷,处理失败,Ack 处理无返回值。在 2.8.0 以上版本介绍了事务新闻支持 AckResponse 支持返回值的概念。因此,在早期版本中调用 Ack 后无法确保 Broker 能正确处理 Ack 请求。第二个原因可能是客户端因为各种原因没有调用 Ack,生产实践中出现较多。
为了规避 Ack 一种方法是精确计算空洞 Backlog Size。因为在 Broker 上解析 Batch 在 Pulsar 中对 Batch 消费者侧的消息分析,所以一个 Entry 可能是单个消息,也可能是单个消息 Batch 消息的。后者情况下 Batch 新闻的数量或形式是未知的。为此准确计算 Backlog Size,但经调查发现,这种方法复杂而困难。
另一种方法是 Broker 主动补偿策略。因为 individuallyDeletedMessage 每一个都存储 ManagedCursor,也就是说,每个订阅对象都会到达 Broker 实际类中的映射。每个订阅都可以得到相应的 individuallyDeletedMessage 集合,Broker 您可以主动将集合推送到客户端,即主动补偿。
接下来我们来了解一下 Broker 主动补偿机制,即 Backlog 策略。在了解补偿机制之前,首先要了解补偿机制 Topic 可能的分布和组成。

生产者向正常 Topic 消费者从发布消息 Topic 接收消息。如上图所示,红、灰、蓝代表新闻 Topic 三种形式。Pulsar 中引入了 Backlog 用来描述生产者和消费者之间的策略 Gap。该策略提供了三种选项,包括 Producer Exception、Producer Request Hold 和 Consumer Backlog Eviction。
其中,Producer Exception 在生产环境中比用户友好更常用。当消息积累到一定程度,消费者处理消息的能力不足时,Producer Exception 会通知生产者有问题。Producer Request Hold 原理是一样的,但是 Producer Request Hold 它只会让生产者停止发送,而不会告知原因(即不会返回到业务侧)。用户感知它 Producer 停止发送消息,但无异常抛出。而 Consumer Backlog Eviction 最早的消息会自动丢弃,以确保消息的持续处理,这可能会导致消息丢失。

另外需要注意的是, Pulsar 计算 Backlog Size 的方式。上图可以理解为事件流,生产者源源不断 append message。Pulsar 计算 Backlog Size 从现在开始计算时间 MarkedDeletedPosition 的位置,到 ReadPosition 位置前 Backlog Size,而后结合 Producer Exception 暴露策略。如果 Ack 空洞,比如 Broker 侧请求失败或客户代码异常 Ack 永远不会被调用,Backlog Size 达到一定速度相当于限制生产者。上图中,M4 和 M2 是两条空洞消息,生产者的发送流迟早会被打断。

Broker 实现主动补偿机制如上图所示。由于 individuallyDeletedMessage 记录所有信息 Ack 成功与否的状态可以从中获得 MarkedDeletedPosition 位置消息,打开一个 Executor Service 定期任务,设置监控频率,间隔将消息推送到客户端,实现 Broker 主动补偿,避免 Ack 空洞导致 Producer Exception 频繁触发。
先来看看这三个概念:
如果 TTL 和 Retention 同时,如何计算消息的生命周期?以下代码:
void updateCursor (ManagedCursor Impl cursor, PositionImpl newPosition) t Pair<PositionImpl, PositionImpl> pair = cursors.cursorUpdated (cursor, newPosition); if (pair == nulL){ Cursor has been removed in the meantime trimConsumedLedgersInBackground(); return; }PositionImplpreviousSlowestReader = pair.getLeftO); PositionImpl currentSlowestReader = pair.getRightO); if (previousSlowestReader.compareTo(currentSlowestReader)==0){ // The slowest consumer has not changed position. Nothing to do right now return; }//Only trigger a trimming when switching to the next Ledger if (previousSlowestReader.getLedgerId() != newPosition.getLedgerId0)) f trimConsumedLedgersInBackground(); }
复制代码
在上述代码的最后三行中,以前最慢的 LedgerId 与 newPosition 的 LedgerId 对比,检查 ManagedLedger 切换是否发生,一旦切换调用 trimConsumedLedgersInBackground()。该函数方法的核心代码策略是 Retention 的逻辑。
由此可知:
又出现了一个新问题:TTL 为什么要选择策略? Ledger 触发切换时间 Ledger 删除操作呢?



