Apache Kafka-16-深入 kafka 底层原理
深入 kafka
如果只是为了开发Kafka应用程序,或者只是在生产环境使用Kafka,那么了解Kafka的内部工作原理不是必需的。不过,了解Kafka的内部工作原理有助于理解Kafka的行为,也有助于诊断问题。
本章并不会涵盖Kafka的每一个设计和实现细节,而是集中讨论以下3个有意思的话题:
Kafka如何进行复制;
Kafka如何处理来自生产者和消费者的请求;
Kafka的存储细节,比如文件格式和索引。
在对Kafka进行调优时,深入理解这些问题是很有必要的。了解了内部机制,可以更有目的性地进行深人的调优,而不只是停留在表面,隔靴搔痒。
集群成员关系
Kafka使用Zookeeper来维护集群成员的信息。每个broker都有一个唯一标识符,这个标识符可以在配置文件里指定,也可以自动生成。
在broker启动的时候,它通过创建临时节点把自己的ID注册到Zookeeper。
Kafka组件订阅Zookeeper的 /brokers/ids
路径(broker在Zookeeper上的注册路径),当有broker加入集群或退出集群时,这些组件就可以获得通知。
如果你要启动另一个具有相同ID的broker,会得到一个错误——新broker会试着进行注册,但不会成功,因为Zookeeper里已经有一个具有相同ID的broker。
在broker停机、出现网络分区或长时间垃圾回收停顿时,broker会从Zookeeper上断开连接,此时broker在启动时创建的临时节点会自动从Zookeeper上移除。
监听broker列表的Kafka组件会被告知该broker已移除。
在关闭broker时,它对应的节点也会消失,不过它的ID会继续存在于其他数据结构中。
例如,主题的副本列表(下面会介绍)里就可能包含这些ID。在完全关闭一个broker之后,如果使用相同的ID启动另一个全新的broker,它会立即加入集群,并拥有与旧broker相同的分区和主题。
控制器
控制器其实就是一个broker,只不过它除了具有一般broker的功能之外,还负责分区首领的选举。
集群里第一个启动的broker通过在Zookeeper里创建一个临时节点 /controller
让自己成为控制器。其他broker在启动时也会尝试创建这个节点,不过它们会收到一个“节点已存在”的异常,然后“意识”到控制器节点已存在,也就是说集群里已经有一个控制器了。其他broker在控制器节点上创建Zookeeperwatch对象,这样它们就可以收到这个节点的变更通知。这种方式可以确保集群里一次只有一个控制器存在。
如果控制器被关闭或者与Zookeeper断开连接,Zookeeper上的临时节点就会消失。集群里的其他broker通过watch对象得到控制器节点消失的通知,它们会尝试让自己成为新的控制器。第一个在Zookeeper里成功创建控制器节点的broker就会成为新的控制器,其他节点会收到“节点已存在”的异常,然后在新的控制器节点上再次创建watch对象。每个新选出的控制器通过Zookeeper的条件递增操作获得一个全新的、数值更大的controllerepoch。其他broker在知道当前controllerepoch后,如果收到由控制器发出的包含较旧epoch的消息,就会忽略它们。
当控制器发现一个broker已经离开集群(通过观察相关的Zookeeper路径),它就知道,那些失去首领的分区需要一个新首领(这些分区的首领刚好是在这个broker上)。控制器遍历这些分区,并确定谁应该成为新首领(简单来说就是分区副本列表里的下一个副本),然后向所有包含新首领或现有跟随者的broker发送请求。该请求消息包含了谁是新首领以及谁是分区跟随者的信息。随后,新首领开始处理来自生产者和消费者的请求,而跟随者开始从新首领那里复制消息。
当控制器发现一个broker加入集群时,它会使用brokerID来检查新加人的broker是否包含现有分区的副本。如果有,控制器就把变更通知发送给新加入的broker和其他broker,新broker上的副本开始从首领那里复制消息。
简而言之,Kafka使用Zookeeper的临时节点来选举控制器,并在节点加人集群或退出集群时通知控制器。控制器负责在节点加人或离开集群时进行分区首领选举。
控制器使用epoch来避免“脑裂”。
“脑裂”是指两个节点同时认为自己是当前的控制器。
复制
复制功能是Kafka架构的核心。
在Kafka的文档里,Kafka把自己描述成“一个分布式的、可分区的、可复制的提交日志服务”。
复制之所以这么关键,是因为它可以在个别节点失效时仍能保证Kafka的可用性和持久性。
Kafka使用主题来组织数据,每个主题被分为若干个分区,每个分区有多个副本。那些副本被保存在broker上,每个broker可以保存成百上千个属于不同主题和分区的副本。
副本类型
副本有以下两种类型。
- 首领副本
每个分区都有一个首领副本。为了保证一致性,所有生产者请求和消费者请求都会经过这个副本。
- 跟随者副本
首领以外的副本都是跟随者副本。跟随者副本不处理来自客户端的请求,它们唯一的任务就是从首领那里复制消息,保持与首领一致的状态。
如果首领发生崩溃,其中的一个跟随者会被提升为新首领。
首领的另一个任务是搞清楚哪个跟随者的状态与自己是一致的。跟随者为了保持与首领的状态一致,在有新消息到达时尝试从首领那里复制消息,不过有各种原因会导致同步失败。
例如,网络拥塞导致复制变慢,broker发生崩溃导致复制滞后,直到重启broker后复制才会继续。
核心流程
为了与首领保持同步,跟随者向首领发送获取数据的请求,这种请求与消费者为了读取消息而发送的请求是一样的。首领将响应消息发给跟随者。请求消息里包含了跟随者想要获取消息的偏移量,而且这些偏移量总是有序的。
一个跟随者副本先请求消息1,接着请求消息2,然后请求消息3,在收到这3个请求的响应之前,它是不会发送第4个请求消息的。如果跟随者发送了请求消息4,那么首领就知道它已经收到了前面3个请求的响应。
通过查看每个跟随者请求的最新偏移量,首领就会知道每个跟随者复制的进度。如果跟随者在10s内没有请求任何消息,或者虽然在请求消息,但在10s内没有请求最新的数据,那么它就会被认为是不同步的。如果一个副本无法与首领保持一致,在首领发生失效时,它就不可能成为新首领——毕竟它没有包含全部的消息,
相反,持续请求得到的最新消息副本被称为同步的副本,在首领发生失效时,只有同步副本才有可能被选为新首领。
跟随者的正常不活跃时间或在成为不同步副本之前的时间是通过 replica.lag.time.max.ms
参数来配置的。
这个时间间隔直接影响着首领选举期间的客户端行为和数据保留机制。
我们将在后续讨论可靠性保证,到时候会深入讨论这个问题。
除了当前首领之外,每个分区都有一个首选首领——创建主题时选定的首领就是分区的首选首领。
之所以把它叫作首选首领,是因为在创建分区时,需要在broker之间均衡首领(后面会介绍在broker间分布副本和首领的算法)。
因此,我们希望首选首领在成为真正的首领时,broker间的负载最终会得到均衡。
默认情况下,Kafka的 auto.leader.rebalance.enable
被设为true.它会检查首选首领是不是当前首领,如果不是,并且该副本是同步的,那么就会触发首领选举,让首选首领成为当前首领。
找到首选首领
从分区的副本清单里可以很容易找到首选首领(可以使用 kafka.topics.sh 工具查看副本和分区的详细信息,我们将在后续章节介绍管理工具)。
清单里的第一个副本一般就是首选首领。不管当前首领是哪一个副本,都不会改变这个事实,即使使用副本分配工具将副本重新分配给其他broker。
要记住,如果你手动进行副本分配,第一个指定的副本就是首选首领,所以要确保首选首领被传播到其他broker上,避免让包含了首领的broker负载过重,而其他broker却无法为它们分担负载。
处理请求
broker的大部分工作是处理客户端、分区副本和控制器发送给分区首领的请求。
Kafka提供了一个二进制协议(基于TCP),指定了请求消息的格式以及broker如何对请求作出响应——包括成功处理请求或在处理请求过程中遇到错误。客户端发起连接并发送请求,broker处理请求并作出响应。broker按照请求到达的顺序来处理它们——这种顺序保证让Kafka具有了消息队列的特性,同时保证保存的消息也是有序的。
消息头
所有的请求消息都包含一个标准消息头:
Request type (也就是 API key)
Request version(broker可以处理不同版本的客户端请求,并根据客户端版本作出不同的响应)
Correlation ID-一个具有唯一性的数字,用于标识请求消息,同时也会出现在响应消息和错误日志里(用于诊断问题)
Client ID——用于标识发送请求的客户端
我们不打算在这里描述该协议,因为在Kafka文档里已经有很详细的说明。
不过,了解broker如何处理请求还是有必要的——后面在我们讨论Kafka监控和各种配置选项时,你就会了解到那些与队列和线程有关的度量指标和配置参数。
broker会在它所监听的每一个端口上运行一个Acceptor线程,这个线程会创建一个连接,并把它交给Processor线程去处理。
Processor线程(也被叫作“网络线程”)的数量是可配置的。网络线程负责从客户端获取请求消息,把它们放进请求队列,然后从响应队列获取响应消息,把它们发送给客户端。图5-1为Kafka处理请求的内部流程。
请求消息被放到请求队列后,IO线程会负责处理它们。
请求类型
下面是几种最常见的请求类型。
元数据请求
在消费者和跟随者副本需要从broker读取消息时发送的请求。
生产请求和获取请求都必须发送给分区的首领副本。
如果broker收到一个针对特定分区的请求,而该分区的首领在另一个broker上,那么发送请求的客户端会收到一个“非分区首领”的错误响应。
当针对特定分区的获取请求被发送到一个不含有该分区首领的broker上,也会出现同样的错误。Kafka客户端要自己负责把生产请求和获取请求发送到正确的 broker 上.
那么客户端怎么知道该往哪里发送请求呢?
客户端使用了另一种请求类型,也就是元数据请求。这种请求包含了客户端感兴趣的主题列表。
服务器端的响应消息里指明了这些主题所包含的分区、每个分区都有哪些副本,以及哪个副本是首领。
元数据请求可以发送给任意一个broker,因为所有broker都缓存了这些信息。
一般情况下,客户端会把这些信息缓存起来,并直接往目标broker上发送生产请求和获取请求。
它们需要时不时地通过发送元数据请求来刷新这些信息(刷新的时间间隔通过 metadata.max.age.ms
参数来配置),从而知道元数据是否发生了变更——比如,在新broker加人集群时,部分副本会被移动到新的broker上(如图5-2所示)。
另外,如果客户端收到“非首领”错误,它会在尝试重发请求之前先刷新元数据,因为这个错误说明了客户端正在使用过期的元数据信息,之前的请求被发到了错误的broker上。
- 5.2 客户端路由请求

生产请求
生产者发送的请求,它包含客户端要写入broker的消息。
我们在以前讨论如何配置生产者的时候,提到过acks这个配置参数——该参数指定了需要多少个broker确认才可以认为一个消息写入是成功的。
不同的配置对“写入成功”的界定是不一样的,如果acks=1,那么只要首领收到消息就认为写入成功;如果acks=all,那么需要所有同步副本收到消息才算写入成功;如果acks=0,那么生产者在把消息发出去之后,完全不需要等待broker的响应。
包含首领副本的broker在收到生产请求时,会对请求做一些验证。
发送数据的用户是否有主题写入权限?
请求里包含的acks值是否有效(只允许出现0、1或alL)?
如果acks=all,是否有足够多的同步副本保证消息已经被安全写入?(我们可以对broker进行配置,如果同步副本的数量不足,broker可以拒绝处理新消息。在第6章介绍Kafka持久性和可靠性保证时,我们会讨论更多这方面的细节。)
之后,消息被写入本地磁盘。在Linux系统上,消息会被写到文件系统缓存里,并不保证它们何时会被刷新到磁盘上。Kafka不会一直等待数据被写到磁盘上——它依赖复制功能来保证消息的持久性。
在消息被写入分区的首领之后,broker开始检查acks配置参数——如果acks被设为0或1,那么broker立即返回响应;如果acks被设为all,那么请求会被保存在一个叫作炼狱的缓冲区里,直到首领发现所有跟随者副本都复制了消息,响应才会被返回给客户端。
获取请求
broker 处理获取请求的方式与处理生产请求的方式很相似。
客户端发送请求,向broker请求主题分区里具有特定偏移量的消息,好像在说:“请把主题Test分区0偏移量从53开始的消息以及主题Test分区3偏移量从64开始的消息发给我。”客户端还可以指定broker最多可以从一个分区里返回多少数据。
这个限制是非常重要的,因为客户端需要为broker返回的数据分配足够的内存。如果没有这个限制,broker返回的大量数据有可能耗尽客户端的内存。
请求需要先到达指定的分区首领上,然后客户端通过查询元数据来确保请求的路由是正确的。
首领在收到请求时,它会先检查请求是否有效——比如,指定的偏移量在分区上是否存在?如果客户端请求的是已经被删除的数据,或者请求的偏移量不存在,那么broker将返回一个错误。
如果请求的偏移量存在,broker将按照客户端指定的数量上限从分区里读取消息,再把消息返回给客户端。
Kafka使用零复制技术向客户端发送消息——也就是说,Kafka直接把消息从文件(或者更确切地说是Linux文件系统缓存)里发送到网络通道,而不需要经过任何中间缓冲区。
这是Kafka与其他大部分数据库系统不一样的地方,其他数据库在将数据发送给客户端之前会先把它们保存在本地缓存里。这项技术避免了字节复制,也不需要管理内存缓冲区,从而获得更好的性能。
客户端除了可以设置broker返回数据的上限,也可以设置下限。
例如,如果把下限设置为10KB,就好像是在告诉broker:“等到有10KB数据的时候再把它们发送给我。”在主题消息流量不是很大的情况下,这样可以减少CPU和网络开销。
客户端发送一个请求,broker等到有足够的数据时才把它们返回给客户端,然后客户端再发出请求,而不是让客户端每隔几毫秒就发送一次请求,每次只能得到很少的数据甚至没有数据。(如图5-3所示。)
对比这两种情况,它们最终读取的数据总量是一样的,但前者的来回传送次数更少,因此开销也更小。
- 图5-3:broker延迟作出响应以便累积足够的数据

当然,我们不会让客户端一直等待broker累积数据。
在等待了一段时间之后,就可以把可用的数据拿回处理,而不是一直等待下去。所以,客户端可以定义一个超时时间,告诉broker:“如果你无法在X毫秒内累积满足要求的数据量,那么就把当前这些数据返回给我。”
有意思的是,并不是所有保存在分区首领上的数据都可以被客户端读取。大部分客户端只能读取已经被写入所有同步副本的消息(跟随者副本也不行,尽管它们也是消费者——否则复制功能就无法工作)。分区首领知道每个消息会被复制到哪个副本上,在消息还没有被写入所有同步副本之前,是不会发送给消费者的——尝试获取这些消息的请求会得到空的响应而不是错误。
数据一致性保证
因为还没有被足够多副本复制的消息被认为是“不安全”的——如果首领发生崩溃,另一个副本成为新首领,那么这些消息就丢失了。如果我们允许消费者读取这些消息,可能就会破坏一致性。
试想,一个消费者读取并处理了这样的一个消息,而另一个消费者发现这个消息其实并不存在。
所以,我们会等到所有同步副本复制了这些消息,才允许消费者读取它们(如图5-4所示)。
这也意味着,如果broker间的消息复制因为某些原因变慢,那么消息到达消费者的时间也会随之变长(因为我们会先等待消息复制完毕)。延迟时间可以通过参数 replica.lag.time.max.ms
来配置,它指定了副本在复制消息时可被允许的最大延迟时间。

其他请求
到此为止,我们讨论了Kafka最为常见的几种请求类型:元数据请求、生产请求和获取请求。
重要的是,我们讨论的是客户端在网络上使用的通用二进制协议。
Kafka内置了由开源社区贡献者实现和维护的Java客户端,同时也有用其他语言实现的客户端,如CPython、Go语言等。Kafka网站上有它们的完整清单,这些客户端就是使用这个二进制协议与broker通信的。另外,broker之间也使用同样的通信协议。它们之间的请求发生在Kafka内部,客户端不应该使用这些请求。
例如,当一个新首领被选举出来,控制器会发送 LeaderAndIsr 请求给新首领(这样它就可以开始接收来自客户端的请求)和跟随者(这样它们就知道要开始跟随新首领)。在我们写这本书的时候,Kafka协议可以处理20种不同类型的请求,而且会有更多的类型加入进来。协议在持续演化——随着客户端功能的不断增加,我们需要改进协议来满足需求。
例如,之前的Kafka消费者使用Zookeeper来跟踪偏移量,在消费者启动的时候,它通过检查保存在Zookeeper上的偏移量就可以知道从哪里开始处理消息。因为各种原因,我们决定不再使用Zookeeper来保存偏移量,而是把偏移量保存在特定的Kafka主题上。为了达到这个目的,我们不得不往协议里增加几种请求类型:OffsetComnitRequest、 offsetFetchRequest 和 ListOffsetsRequest. 现在,程序调用 commitOffset() 方法时,客户端不再把偏移量写入Zookeeper,而是往Kafka发送offsetCommitRequest请求。
主题的创建仍然需要通过命令行工具来完成,命令行工具会直接更新Zookeeper里的主题列表,broker监听这些主题列表,在有新主题加人时,它们会收到通知。
我们正在改进Kafka,增加了CreateTopicRequest请求类型,这样客户端(包括那些不支持Zookeeper客户端的编程语言)就可以直接向broker请求创建新主题了。
除了往协议里增加新的请求类型外,我们也会通过修改已有的请求类型来给它们增加新功能。
例如,从Kafka0.9.0到Kafka0.10.0,我们希望能够让客户端知道谁是当前的控制器,于是把控制器信息添加到元数据响应消息里。
我们还在元数据请求消息和响应消息里添加了一个新的version字段。
现在,0.9.0版本的客户端发送的元数据请求里version为0(0.9.0版本客户端的version不会是1)。
不管是0.9.0版本的broker,还是0.10.0版本的broker,它们都知道应该返回version为0的响应,也就是不包含控制器信息的响应。
0.9.0版本的客户端不需要控制器的信息,而且也没必要知道如何去解析它。
0.10.0版本的客户端会发送version为1的元数据请求,0.10.0版本的broker会返回version为1的响应,里面包含了控制器的信息。如果0.10.0版本的客户端发送version为1的请求给0.9.0版本的broker,这个版本的broker不知道该如何处理这个请求,就会返回一个错误。这就是为什么我们建议在升级客户端之前先升级broker,因为新的broker知道如何处理旧的请求,反过来则不然。
我们在0.10.0版本的Kafka里加人了ApiVersionRequest—客户端可以询问broker支持哪些版本的请求,然后使用正确的版本与broker通信。如果能够正确使用这个新功能,客户端就可以与旧版本的broker通信,只要broker支持这个版本的协议。
物理存储
Kafka的基本存储单元是分区。
分区无法在多个broker间进行再细分,也无法在同一个broker的多个磁盘上进行再细分。所以,分区的大小受到单个挂载点可用空间的限制(一个挂载点由单个磁盘或多个磁盘组成,如果配置了JBOD,就是单个磁盘,如果配置了RAID,就是多个磁盘。)。
在配置Kafka的时候,管理员指定了一个用于存储分区的目录清单——也就是log.dirs参数的值(不要把它与存放错误日志的目录混淆了,日志目录是配置在log4j.properties文件里的)。该参数一般会包含每个挂载点的目录。
接下来我们会介绍Kafka是如何使用这些目录来存储数据的。
首先,我们要知道数据是如何被分配到集群的broker上以及broker的目录里的。然后,我们还要知道broker是如何管理这些文件的,特别是如何进行数据保留的。随后,我们会深人探讨文件和索引格式。最后,我们会讨论日志压缩及其工作原理。
日志压缩是Kafka的一个高级特性,因为有了这个特性,Kafka可以用来长时间地保存数据。
分区分配
在创建主题时,Kafka首先会决定如何在broker间分配分区。假设你有6个broker,打算创建一个包含10个分区的主题,并且复制系数为3。
那么Kafka就会有30个分区副本,它们可以被分配给6个broker,在进行分区分配时,我们要达到如下的目标。
(1)在broker间平均地分布分区副本。对于我们的例子来说,就是要保证每个broker可以分到5个副本。
(2)确保每个分区的每个副本分布在不同的broker上。假设分区0的首领副本在broker2上,那么可以把跟随者副本放在broker3和broker4上,但不能放在broker2上,也不能两个都放在broker3上。
(3)如果为broker指定了机架信息,那么尽可能把每个分区的副本分配到不同机架的broker上。这样做是为了保证一个机架的不可用不会导致整体的分区不可用。
为了实现这个目标,我们先随机选择一个broker(假设是4),然后使用轮询的方式给每个broker分配分区来确定首领分区的位置。
于是,首领分区0会在broker4上,首领分区1会在broker5上,首领分区2会在broker0上(只有6个broker),并以此类推。
然后,我们从分区首领开始,依次分配跟随者副本。
如果分区0的首领在broker4上,那么它的第一个跟随者副本会在broker5上,第二个跟随者副本会在broker0上。分区1的首领在broker5上,那么它的第一个跟随者副本在broker0上,第二个跟随者副本在brokerl上。如果配置了机架信息,那么就不是按照数字顺序来选择broker了,而是按照交替机架的方式来选择broker。假设broker0、broker1和broker2放置在同一个机架上,broker3、broker4和broker5分别放置在其他不同的机架上。我们不是按照从0到5的顺序来选择broker,而是按照0,3,1,4,2,5的顺序来选择,这样每个相邻的broker都在不同的机架上(如图5-5所示)。于是,如果分区0的首领在broker4上,那么第一个跟随者副本会在broker2上,这两个broker在不同的机架上。如果第一个机架下线,还有其他副本仍然活跃着,所以分区仍然可用。这对所有副本来说都是一样的,因此在机架下线时仍然能够保证可用性。
- 图5-5:分配给不同机架broker的分区和副本

为分区和副本选好合适的broker之后,接下来要决定这些分区应该使用哪个目录。
我们单独为每个分区分配目录,规则很简单:计算每个目录里的分区数量,新的分区总是被添加到数量最小的那个目录里。也就是说,如果添加了一个新磁盘,所有新的分区都会被创建到这个磁盘上。因为在完成分配工作之前,新磁盘的分区数量总是最少的。
注意磁盘空间
要注意,在为broker分配分区时并没有考虑可用空间和工作负载问题,但在、将分区分配到磁盘上时会考虑分区数量,不过不考虑分区大小。
也就是说,如果有些broker的磁盘空间比其他broker要大(有可能是因为集群同时使用了旧服务器和新服务器),有些分区异常大,或者同一个broker上有大小不同的磁盘,那么在分配分区时要格外小心。在后面的章节中,我们会讨论Kafka管理员该如何解决这种broker负载不均衡的问题。
文件管理
保留数据是Kafka的一个基本特性,Kafka不会一直保留数据,也不会等到所有消费者都读取了消息之后才删除消息。
相反,Kafka管理员为每个主题配置了数据保留期限,规定数据被删除之前可以保留多长时间,或者清理数据之前可以保留的数据量大小。
因为在一个大文件里查找和删除消息是很费时的,也很容易出错,所以我们把分区分成若干个片段。
默认情况下,每个片段包含1GB或一周的数据,以较小的那个为准。在 broker 往分区写入数据时,如果达到片段上限,就关闭当前文件,并打开一个新文件。
当前正在写入数据的片段叫作活跃片段。活动片段永远不会被除,所以如果你要保留数据1天,但片段里包含了5天的数据,那么这些数据会被保留5天,因为在片段被关闭之前这些数据无法被删除。如果你要保留数据一周,而且每天使用一个新片段,那么你就会看到,每天在使用一个新片段的同时会删除一个最老的片段——所以大部分时间该分区会有7个片段存在。
我们在第2章讲过,broker会为分区里的每个片段打开一个文件句柄,哪怕片段是不活跃的。
这样会导致打开过多的文件句柄,所以操作系统必须根据实际情况做一些调优。
文件格式
我们把Kafka的消息和偏移量保存在文件里。
保存在磁盘上的数据格式与从生产者发送过来或者发送给消费者的消息格式是一样的,因为使用了相同的消息格式进行磁盘存储和网络传输,Kafka可以使用零复制技术给消费者发送消息,同时避免了对生产者已经压缩过的消息进行解压和再压缩。
除了键、值和偏移量外,消息里还包含了消息大小、校验和、消息格式版本号、压缩算法(Snappy、GZip或LZ4)和时间戮(在0.10.0版本里引入的)。
时间截可以是生产者发送消息的时间,也可以是消息到达broker的时间,这个是可配置的。
如果生产者发送的是压缩过的消息,那么同一个批次的消息会被压缩在一起,被当作“包装消息”进行发送(如图5-6所示)。
于是,broker就会收到一个这样的消息,然后再把它发送给消费者。
消费者在解压这个消息之后,会看到整个批次的消息,它们都有自己的时间戳和偏移量。

也就是说,如果在生产者端使用了压缩功能(极力推荐),那么发送的批次越大,就意味着在网络传输和磁盘存储方面会获得越好的压缩性能,同时意味着如果修改了消费者使用的消息格式(例如,在消息里增加了时间戳),那么网络传输和磁盘存储的格式也要随之修改,而且broker要知道如何处理包含了两种消息格式的文件。
Kafka附带了一个叫DumpLogSegment的工具,可以用它查看片段的内容。它可以显示每个消息的偏移量、校验和、魔术数字节、消息大小和压缩算法。
运行该工具的方法如下:
bin/kafka-run-class.sh kafka.tools.DumpLogSegments
如果使用了 --deep-iteration
参数,可以显示被压缩到包装消息里的消息。
索引
消费者可以从Kafka的任意可用偏移量位置开始读取消息。
假设消费者要读取从偏移量100开始的1MB消息,那么broker必须立即定位到偏移量100(可能是在分区的任意一个片段里),然后开始从这个位置读取消息。
为了帮助broker更快地定位到指定的偏移量,Kafka为每个分区维护了一个索引。
索引把偏移量映射到片段文件和偏移量在文件里的位置。索引也被分成片段,所以在删除消息时,也可以删除相应的索引。
Kafka不维护索引的校验和。如果索引出现损坏,Kafka会通过重新读取消息并录制偏移量和位置来重新生成索引。
如果有必要,管理员可以删除索引,这样做是绝对安全的,Kafka会自动重新生成这些索引。
清理
一般情况下,Kafka会根据设置的时间保留数据,把超过时效的旧数据删除掉。
不过,试想一下这样的场景,如果你使用Kafka保存客户的收货地址,那么保存客户的最新地址比保存客户上周甚至去年的地址要有意义得多,这样你就不用担心会用错旧地址,而且短时间内客户也不会修改新地址。
另外一个场景,一个应用程序使用Kafka保存它的状态,每次状态发生变化,它就把状态写入Kafka。
在应用程序从崩溃中恢复时,它从Kafka读取消息来恢复最近的状态。在这种情况下,应用程序只关心它在崩溃前的那个状态,而不关心运行过程中的那些状态。
Kafka通过改变主题的保留策略来满足这些使用场景。早于保留时间的旧事件会被删除,为每个键保留最新的值,从而达到清理的效果。很显然,只有当应用程序生成的事件里包含了键值对时,为这些主题设置compact策略才有意义。如果主题包含null键,清理就会失败。
清理的工作原理
每个日志片段可以分为以下两个部分。
- 干净的部分
这些消息之前被清理过,每个键只有一个对应的值,这个值是上一次清理时保留下来的。
- 污浊的部分
这些消息是在上一次清理之后写入的。
两个部分的日志片段示意如图5-7所示。
- 图 5-7:包含干净和污浊两个部分的分区

如果在Kafka启动时启用了清理功能(通过配置 log.cleaner.enabled
参数),每个broker会启动一个清理管理器线程和多个清理线程,它们负责执行清理任务。这些线程会选择污浊率(污浊消息占分区总大小的比例)较高的分区进行清理。
为了清理分区,清理线程会读取分区的污浊部分,并在内存里创建一个map。map里的每个元素包含了消息键的散列值和消息的偏移量,键的散列值是16B,加上偏移量总共是24B。如果要清理一个1GB的日志片段,并假设每个消息大小为1KB,那么这个片段就包含一百万个消息,而我们只需要用24MB的map就可以清理这个片段。(如果有重复的键,可以重用散列项,从而使用更少的内存。)这是非常高效的!
管理员在配置Kafka时可以对map使用的内存大小进行配置,每个线程都有自己的map,而这个参数指的是所有线程可使用的内存总大小。如果你为map分配了1GB内存,并使用了5个清理线程,那么每个线程可以使用200MB内存来创建自己的map。
Kafka并不要求分区的整个污浊部分来适应这个map的大小,但要求至少有一个完整的片段必须符合。
如果不符合,那么Kafka就会报错,管理员要么分配更多的内存,要么减少清理线程数量。如果只有少部分片段可以完全符合,Kafka将从最旧的片段开始清理,等待下一次清理剩余的部分。
清理线程在创建好偏移量map后,开始从干净的片段处读取消息,从最旧的消息开始,把它们的内容与map里的内容进行比对。它会检查消息的键是否存在于map中,如果不存在,那么说明消息的值是最新的,就把消息复制到替换片段上。如果键已存在,消息会被忽略,因为在分区的后部已经有一个具有相同键的消息存在。在复制完所有的消息之后,我们就将替换片段与原始片段进行交换,然后开始清理下一个片段。完成整个清理过程之后,每个键对应一个不同的消息—―这些消息的值都是最新的。
清理前后的分区片段如图5-8所示。
- 图5-8:清理前后的分区片段

被删除的事件
如果只为每个键保留最近的一个消息,那么当需要删除某个特定键所对应的所有消息时,我们该怎么办?这种情况是有可能发生的,比如一个用户不再使用我们的服务,那么完全可以把与这个用户相关的所有信息从系统中删除。
为了彻底把一个键从系统里删除,应用程序必须发送一个包含该键且值为null的消息。清理线程发现该消息时,会先进行常规的清理,只保留值为null的消息。该消息(被称为墓碑消息)会被保留一段时间,时间长短是可配置的。
在这期间,消费者可以看到这个墓碑消息,并且发现它的值已经被删除。于是,如果消费者往数据库里复制Kafka的数据,当它看到这个墓碑消息时,就知道应该要把相关的用户信息从数据库里删除。
在这个时间段过后,清理线程会移除这个墓碑消息,这个键也将从Kafka分区里消失。
重要的是,要留给消费者足够多的时间,让他看到墓碑消息,因为如果消费者离线几个小时并错过了墓碑消息,就看不到这个键,也就不知道它已经从Kafka里删除,从而也就不会去删除数据库里的相关数据了。
何时会清理主题
就像deLete策略不会删除当前活跃的片段一样,compact策略也不会对当前片段进行清理。只有旧片段里的消息才会被清理。
在0.10.0和更早的版本里,Kafka会在包含脏记录的主题数量达到50%时进行清理。
这样做的目的是避免太过频繁的清理(因为清理会影响主题的读写性能),同时也避免存在太多脏记录(因为它们会占用磁盘空间)。浪费50%的磁盘空间给主题存放脏记录,然后进行一次清理,这是个合理的折中,管理员也可以对它进行调整。
我们计划在未来的版本中加人宽限期,在宽限期内,我们保证消息不会被清理。对于想看到主题的每个消息的应用程序来说,它们就有了足够的时间,即使时间有点滞后。
参考资料
《kafka 权威指南》