11 无消息丢失配置怎么实现?

你好,我是胡夕。今天我要和你分享的主题是:如何配置Kafka无消息丢失。

一直以来,很多人对于Kafka丢失消息这件事情都有着自己的理解,因而也就有着自己的解决之道。

在讨论具体的应对方法之前,我觉得我们首先要明确,在Kafka的世界里什么才算是消息丢失,或者说Kafka在什么情况下能保证消息不丢失。这点非常关键,因为很多时候我们容易混淆责任的边界,如果搞不清楚事情由谁负责,自然也就不知道由谁来出解决方案了。

那Kafka到底在什么情况下才能保证消息不丢失呢?

一句话概括,Kafka只对“已提交”的消息(committed message)做有限度的持久化保证。

这句话里面有两个核心要素,我们一一来看。

第一个核心要素是“已提交的消息”。

什么是已提交的消息?

当Kafka的若干个Broker成功地接收到一条消息并写入到日志文件后,它们会告诉生产者程序这条消息已成功提交。

此时,这条消息在Kafka看来就正式变为“已提交”消息了。

那为什么是若干个Broker呢?

这取决于你对“已提交”的定义。

你可以选择只要有一个Broker成功保存该消息就算是已提交,也可以是令所有Broker都成功保存该消息才算是已提交。

不论哪种情况,Kafka只对已提交的消息做持久化保证这件事情是不变的。

第二个核心要素就是“有限度的持久化保证”,也就是说Kafka不可能保证在任何情况下都做到不丢失消息。

举个极端点的例子,如果地球都不存在了,Kafka还能保存任何消息吗?

显然不能!倘若这种情况下你依然还想要Kafka不丢消息,那么只能在别的星球部署Kafka Broker服务器了。

现在你应该能够稍微体会出这里的“有限度”的含义了吧,其实就是说Kafka不丢消息是有前提条件的。

假如你的消息保存在N个Kafka Broker上,那么这个前提条件就是这N个Broker中至少有1个存活。

只要这个条件成立,Kafka就能保证你的这条消息永远不会丢失。

总结一下,Kafka是能做到不丢失消息的,只不过这些消息必须是已提交的消息,而且还要满足一定的条件。

当然,说明这件事并不是要为Kafka推卸责任,而是为了在出现该类问题时我们能够明确责任边界。

“消息丢失”案例

好了,理解了Kafka是怎样做到不丢失消息的,那接下来我带你复盘一下那些常见的“Kafka消息丢失”案例。

注意,这里可是带引号的消息丢失哦,其实有些时候我们只是冤枉了Kafka而已。

案例1:生产者程序丢失数据

Producer程序丢失消息,这应该算是被抱怨最多的数据丢失场景了。

我来描述一个场景:你写了一个Producer应用向Kafka发送消息,最后发现Kafka没有保存,于是大骂:“Kafka真烂,消息发送居然都能丢失,而且还不告诉我?!”如果你有过这样的经历,那么请先消消气,我们来分析下可能的原因。

目前Kafka Producer是异步发送消息的,也就是说如果你调用的是producer.send(msg)这个API,那么它通常会立即返回,但此时你不能认为消息发送已成功完成。

这种发送方式有个有趣的名字,叫“fire and forget”,翻译一下就是“发射后不管”。

这个术语原本属于导弹制导领域,后来被借鉴到计算机领域中,它的意思是,执行完一个操作后不去管它的结果是否成功。

调用producer.send(msg)就属于典型的“fire and forget”,因此如果出现消息丢失,我们是无法知晓的。

这个发送方式挺不靠谱吧,不过有些公司真的就是在使用这个API发送消息。

如果用这个方式,可能会有哪些因素导致消息没有发送成功呢?

其实原因有很多,例如网络抖动,导致消息压根就没有发送到Broker端;或者消息本身不合格导致Broker拒绝接收(比如消息太大了,超过了Broker的承受能力)等。这么来看,让Kafka“背锅”就有点冤枉它了。就像前面说过的,Kafka不认为消息是已提交的,因此也就没有Kafka丢失消息这一说了。

不过,就算不是Kafka的“锅”,我们也要解决这个问题吧。

实际上,解决此问题的方法非常简单:Producer永远要使用带有回调通知的发送API,也就是说不要使用producer.send(msg),而要使用producer.send(msg, callback)

不要小瞧这里的callback(回调),它能准确地告诉你消息是否真的提交成功了。

一旦出现消息提交失败的情况,你就可以有针对性地进行处理。

举例来说,如果是因为那些瞬时错误,那么仅仅让Producer重试就可以了;如果是消息不合格造成的,那么可以调整消息格式后再次发送。

总之,处理发送失败的责任在Producer端而非Broker端。

你可能会问,发送失败真的没可能是由Broker端的问题造成的吗?

当然可能!如果你所有的Broker都宕机了,那么无论Producer端怎么重试都会失败的,此时你要做的是赶快处理Broker端的问题。

但之前说的核心论据在这里依然是成立的:Kafka依然不认为这条消息属于已提交消息,故对它不做任何持久化保证。

案例2:消费者程序丢失数据

Consumer端丢失数据主要体现在Consumer端要消费的消息不见了。

Consumer程序有个“位移”的概念,表示的是这个Consumer当前消费到的Topic分区的位置。

下面这张图来自于官网,它清晰地展示了Consumer端的位移数据。

比如对于Consumer A而言,它当前的位移值就是9;Consumer B的位移值是11。

这里的“位移”类似于我们看书时使用的书签,它会标记我们当前阅读了多少页,下次翻书的时候我们能直接跳到书签页继续阅读。

正确使用书签有两个步骤:第一步是读书,第二步是更新书签页。如果这两步的顺序颠倒了,就可能出现这样的场景:当前的书签页是第90页,我先将书签放到第100页上,之后开始读书。

当阅读到第95页时,我临时有事中止了阅读。

那么问题来了,当我下次直接跳到书签页阅读时,我就丢失了第96~99页的内容,即这些消息就丢失了。

同理,Kafka中Consumer端的消息丢失就是这么一回事。

要对抗这种消息丢失,办法很简单:维持先消费消息(阅读),再更新位移(书签)的顺序即可。这样就能最大限度地保证消息不丢失。

当然,这种处理方式可能带来的问题是消息的重复处理,类似于同一页书被读了很多遍,但这不属于消息丢失的情形。在专栏后面的内容中,我会跟你分享如何应对重复消费的问题。

除了上面所说的场景,其实还存在一种比较隐蔽的消息丢失场景。

我们依然以看书为例。假设你花钱从网上租借了一本共有10章内容的电子书,该电子书的有效阅读时间是1天,过期后该电子书就无法打开,但如果在1天之内你完成阅读就退还租金。

为了加快阅读速度,你把书中的10个章节分别委托给你的10个朋友,请他们帮你阅读,并拜托他们告诉你主旨大意。当电子书临近过期时,这10个人告诉你说他们读完了自己所负责的那个章节的内容,于是你放心地把该书还了回去。不料,在这10个人向你描述主旨大意时,你突然发现有一个人对你撒了谎,他并没有看完他负责的那个章节。那么很显然,你无法知道那一章的内容了。

对于Kafka而言,这就好比Consumer程序从Kafka获取到消息后开启了多个线程异步处理消息,而Consumer程序自动地向前更新位移。

假如其中某个线程运行失败了,它负责的消息没有被成功处理,但位移已经被更新了,因此这条消息对于Consumer而言实际上是丢失了。

这里的关键在于Consumer自动提交位移,与你没有确认书籍内容被全部读完就将书归还类似,你没有真正地确认消息是否真的被消费就“盲目”地更新了位移。

这个问题的解决方案也很简单:如果是多线程异步处理消费消息,Consumer程序不要开启自动提交位移,而是要应用程序手动提交位移

在这里我要提醒你一下,单个Consumer程序使用多线程来消费消息说起来容易,写成代码却异常困难,因为你很难正确地处理位移的更新,也就是说避免无消费消息丢失很简单,但极易出现消息被消费了多次的情况。

最佳实践

看完这两个案例之后,我来分享一下Kafka无消息丢失的配置,每一个其实都能对应上面提到的问题。

  • 不要使用producer.send(msg),而要使用producer.send(msg, callback)。记住,一定要使用带有回调通知的send方法。
  • 设置acks = all。acks是Producer的一个参数,代表了你对“已提交”消息的定义。如果设置成all,则表明所有副本Broker都要接收到消息,该消息才算是“已提交”。这是最高等级的“已提交”定义。
  • 设置retries为一个较大的值。这里的retries同样是Producer的参数,对应前面提到的Producer自动重试。当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了retries > 0的Producer能够自动重试消息发送,避免消息丢失。
  • 设置unclean.leader.election.enable = false。这是Broker端的参数,它控制的是哪些Broker有资格竞选分区的Leader。如果一个Broker落后原先的Leader太多,那么它一旦成为新的Leader,必然会造成消息的丢失。故一般都要将该参数设置成false,即不允许这种情况的发生。
  • 设置replication.factor >= 3。这也是Broker端的参数。其实这里想表述的是,最好将消息多保存几份,毕竟目前防止消息丢失的主要机制就是冗余。
  • 设置min.insync.replicas > 1。这依然是Broker端参数,控制的是消息至少要被写入到多少个副本才算是“已提交”。设置成大于1可以提升消息持久性。在实际环境中千万不要使用默认值1。
  • 确保replication.factor > min.insync.replicas。如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了。我们不仅要改善消息的持久性,防止数据丢失,还要在不降低可用性的基础上完成。推荐设置成replication.factor = min.insync.replicas + 1。
  • 确保消息消费完成再提交。Consumer端有个参数enable.auto.commit,最好把它设置成false,并采用手动提交位移的方式。就像前面说的,这对于单Consumer多线程处理的场景而言是至关重要的。

小结

今天,我们讨论了Kafka无消息丢失的方方面面。

我们先从什么是消息丢失开始说起,明确了Kafka持久化保证的责任边界,随后以这个规则为标尺衡量了一些常见的数据丢失场景,最后通过分析这些场景,我给出了Kafka无消息丢失的“最佳实践”。

总结起来,我希望你今天能有两个收获:

  • 明确Kafka持久化保证的含义和限定条件。
  • 熟练配置Kafka无消息丢失参数。

开放讨论

其实,Kafka还有一种特别隐秘的消息丢失场景:增加主题分区。

当增加主题分区后,在某段“不凑巧”的时间间隔后,Producer先于Consumer感知到新增加的分区,而Consumer设置的是“从最新位移处”开始读取消息,因此在Consumer感知到新分区前,Producer发送的这些消息就全部“丢失”了,或者说Consumer无法读取到这些消息。

严格来说这是Kafka设计上的一个小缺陷,你有什么解决的办法吗?

欢迎写下你的思考和答案,我们一起讨论。

如果你觉得有所收获,也欢迎把文章分享给你的朋友。

chat

Q: 谈一下 kafka 如何保证消息不丢失的?

Kafka 是一个分布式流处理平台,被广泛用于构建实时数据流应用程序。

它设计的目标之一就是保证消息不丢失。以下是 Kafka 如何保证消息不丢失的一些主要机制:

  1. 持久化存储: Kafka 使用持久化存储来确保消息在发布后不会丢失。消息被写入到磁盘上的日志文件中,而不是仅存在于内存中。这使得即使在发生节点故障或重启时,消息仍然可靠地恢复。

  2. 副本机制: Kafka 通过使用分区的副本来提高可靠性。每个分区可以有多个副本,它们分布在不同的节点上。当一个节点发生故障时,其他副本可以继续提供服务。通过配置适当数量的副本,可以容忍多个节点的故障而不丢失数据。

  3. 生产者确认机制: 生产者在将消息发送到 Kafka 时可以配置确认级别。确认级别决定了生产者在认为消息已经发送成功之前需要收到多少个副本的确认。高确认级别可以提高可靠性,但会降低吞吐量。

  4. 复制同步机制: Kafka 允许配置生产者发送同步或异步复制。在同步复制中,生产者等待所有副本都成功接收消息后才认为消息发送成功。这确保了在所有副本上都有相同的数据。

  5. 消息过期: Kafka 允许为消息设置过期时间。如果消息在一定时间内没有被消费,它将被视为过期并被删除。这有助于释放存储空间,并防止旧消息对系统造成影响。

  6. ISR(In-Sync Replicas): Kafka 使用 ISR 机制来确保只有处于同步状态的副本才能参与消息的复制。这有助于防止因为某个副本的同步滞后而导致消息丢失的情况。

总体而言,通过这些机制,Kafka 构建了一个强大的消息传递系统,确保了消息在分布式环境中的可靠性和持久性。

Q: kafka 持久化存储是怎么保证消息不丢失的?

Kafka 的持久化存储是通过日志(Log)的方式来保证消息不丢失的。

以下是 Kafka 如何通过日志实现消息的持久化存储和可靠性保障的主要机制:

  1. 日志结构: Kafka 的存储方式是采用日志结构(log-structured),所有的消息都追加到日志文件的末尾。这意味着写入的消息会按顺序追加到文件中,而不会在文件中覆盖或删除已有的数据。这种方式有助于提高写入性能,并保证消息不会因为覆盖而丢失。

  2. 分段存储: 日志被划分为多个分段(segment),每个分段都有一个固定的大小(通过配置指定,默认是1GB)。当一个分段达到最大大小时,会被关闭,不再接受新的消息写入。然后,一个新的分段会被创建,继续接收新的消息。这种分段存储的方式有助于提高文件操作的效率,并简化消息的删除和过期处理。

  3. 数据刷写: Kafka 的消息是先写入到操作系统的页缓存,然后通过定期的刷写机制将数据刷写到磁盘上的日志文件。这确保了即使在发生故障时,部分数据也会被写入磁盘,从而减少数据丢失的可能性。

  4. 副本机制: Kafka 通过分区的副本机制提供了冗余性。每个分区可以配置多个副本,它们分布在不同的节点上。当消息被写入一个分区时,会被复制到该分区的所有副本中。如果某个节点发生故障,其他副本仍然可以提供服务,确保数据的可靠性。

  5. 确认机制: Kafka 的生产者在发送消息后可以配置确认机制,确保消息被成功写入到指定数量的副本后才认为发送成功。这提供了对消息写入的更强的控制,从而确保消息不会因为写入未完成而丢失。

通过这些机制,Kafka 实现了可靠的消息持久化存储,确保即使在发生节点故障或其他异常情况时,数据也能够被安全地保留和恢复。

如果机器挂了,kafka 如何保证消息不丢失的?

Kafka 通过副本机制来保证在机器挂了的情况下消息不丢失。

以下是 Kafka 如何通过副本机制实现高可用性和数据不丢失的:

  1. 分区的副本: Kafka 的每个主题(topic)被划分为多个分区,每个分区可以配置多个副本。这些副本分布在不同的节点上,通常包括领导副本(Leader Replica)和追随副本(Follower Replica)。

  2. 领导副本和追随副本: 每个分区有一个领导副本,它负责处理读写请求。其他副本称为追随副本,它们复制领导副本上的数据。领导副本负责处理客户端的写入请求,并确保将数据复制到所有的追随副本。

  3. 同步复制: Kafka 使用同步复制机制,确保追随副本与领导副本保持同步。在同步复制中,生产者发送消息到领导副本并等待所有的同步追随副本都确认接收到消息,然后才认为消息发送成功。这样可以确保即使领导副本挂了,某个追随副本也能够接管领导角色,继续提供服务。

  4. ISR(In-Sync Replicas): ISR 是指与领导副本保持同步的追随副本的集合。Kafka 确保只有属于 ISR 的副本才能参与消息的复制。如果某个追随副本无法及时复制领导副本的数据,它会被从 ISR 中移除,直到追赶上来再重新加入。

  5. 副本的重新分配: 当某个节点挂了,Kafka 会重新分配该节点上的副本到其他可用节点上。这确保了即使发生节点故障,仍然有足够数量的副本存活,从而保障数据的可用性和不丢失。

总体来说,通过副本机制,Kafka 能够在机器挂了的情况下继续提供服务,并确保数据不丢失。

这种机制使得 Kafka 具有高可用性和容错性,适用于构建可靠的分布式消息传递系统。

参考资料

https://learn.lianglianglee.com/%e4%b8%93%e6%a0%8f/Kafka%e6%a0%b8%e5%bf%83%e6%8a%80%e6%9c%af%e4%b8%8e%e5%ae%9e%e6%88%98/11%20%e6%97%a0%e6%b6%88%e6%81%af%e4%b8%a2%e5%a4%b1%e9%85%8d%e7%bd%ae%e6%80%8e%e4%b9%88%e5%ae%9e%e7%8e%b0%ef%bc%9f.md