16 TopicDeletionManager: Topic是怎么被删除的? 你好,我是胡夕。今天,我们正式进入到第四大模块“状态机”的学习。

Kafka源码中有很多状态机和管理器,比如之前我们学过的Controller通道管理器ControllerChannelManager、处理Controller事件的ControllerEventManager,等等。这些管理器和状态机,大多与各自的“宿主”组件关系密切,可以说是大小不同、功能各异。就比如Controller的这两个管理器,必须要与Controller组件紧耦合在一起才能实现各自的功能。

不过,Kafka中还是有一些状态机和管理器具有相对独立的功能框架,不严重依赖使用方,也就是我在这个模块为你精选的TopicDeletionManager(主题删除管理器)、ReplicaStateMachine(副本状态机)和PartitionStateMachine(分区状态机)。

  • TopicDeletionManager:负责对指定Kafka主题执行删除操作,清除待删除主题在集群上的各类“痕迹”。
  • ReplicaStateMachine:负责定义Kafka副本状态、合法的状态转换,以及管理状态之间的转换。
  • PartitionStateMachine:负责定义Kafka分区状态、合法的状态转换,以及管理状态之间的转换。

无论是主题、分区,还是副本,它们在Kafka中的生命周期通常都有多个状态。而这3个状态机,就是来管理这些状态的。而如何实现正确、高效的管理,就是源码要解决的核心问题。

今天,我们先来学习TopicDeletionManager,看一下Kafka是如何删除一个主题的。

课前导读

刚开始学习Kafka的时候,我对Kafka删除主题的认识非常“浅薄”。之前我以为成功执行了kafka-topics.sh –delete命令后,主题就会被删除。我相信,很多人可能都有过这样的错误理解。

这种不正确的认知产生的一个结果就是,我们经常发现主题没有被删除干净。于是,网上流传着一套终极“武林秘籍”:手动删除磁盘上的日志文件,以及手动删除ZooKeeper下关于主题的各个节点。

就我个人而言,我始终不推荐你使用这套“秘籍”,理由有二:

  • 它并不完整。事实上,除非你重启Broker,否则,这套“秘籍”无法清理Controller端和各个Broker上元数据缓存中的待删除主题的相关条目。
  • 它并没有被官方所认证,换句话说就是后果自负。从某种程度上说,它会带来什么不好的结果,是你没法把控的。

所谓“本事大不如不摊上”,我们与其琢磨删除主题失败之后怎么自救,不如踏踏实实地研究下Kafka底层是怎么执行这个操作的。搞明白它的原理之后,再有针对性地使用“秘籍”,才能做到有的放矢。你说是不是?

TopicDeletionManager概览

好了,我们就正式开始学习TopicDeletionManager吧。

这个管理器位于kafka.controller包下,文件名是TopicDeletionManager.scala。在总共不到400行的源码中,它定义了3个类结构以及20多个方法。总体而言,它还是比较容易学习的。

为了让你先有个感性认识,我画了一张TopicDeletionManager.scala的代码UML图:

TopicDeletionManager.scala这个源文件,包括3个部分。

  • DeletionClient接口:负责实现删除主题以及后续的动作,比如更新元数据等。这个接口里定义了4个方法,分别是deleteTopic、deleteTopicDeletions、mutePartitionModifications和sendMetadataUpdate。我们后面再详细学习它们的代码。
  • ControllerDeletionClient类:实现DeletionClient接口的类,分别实现了刚刚说到的那4个方法。
  • TopicDeletionManager类:主题删除管理器类,定义了若干个方法维护主题删除前后集群状态的正确性。比如,什么时候才能删除主题、什么时候主题不能被删除、主题删除过程中要规避哪些操作,等等。

DeletionClient接口及其实现

接下来,我们逐一讨论下这3部分。首先是DeletionClient接口及其实现类。

就像前面说的,DeletionClient接口定义的方法用于删除主题,并将删除主题这件事儿同步给其他Broker。

目前,DeletionClient这个接口只有一个实现类,即ControllerDeletionClient。我们看下这个实现类的代码: class ControllerDeletionClient(controller: KafkaController, zkClient: KafkaZkClient) extends DeletionClient { // 删除给定主题 override def deleteTopic(topic: String, epochZkVersion: Int): Unit = { // 删除/brokers/topics/节点 zkClient.deleteTopicZNode(topic, epochZkVersion) // 删除/config/topics/节点 zkClient.deleteTopicConfigs(Seq(topic), epochZkVersion) // 删除/admin/delete_topics/节点 zkClient.deleteTopicDeletions(Seq(topic), epochZkVersion) } // 删除/admin/delete_topics下的给定topic子节点 override def deleteTopicDeletions(topics: Seq[String], epochZkVersion: Int): Unit = { zkClient.deleteTopicDeletions(topics, epochZkVersion) } // 取消/brokers/topics/节点数据变更的监听 override def mutePartitionModifications(topic: String): Unit = { controller.unregisterPartitionModificationsHandlers(Seq(topic)) } // 向集群Broker发送指定分区的元数据更新请求 override def sendMetadataUpdate(partitions: Set[TopicPartition]): Unit = { controller.sendUpdateMetadataRequest( controller.controllerContext.liveOrShuttingDownBrokerIds.toSeq, partitions) } }

这个类的构造函数接收两个字段。同时,由于是DeletionClient接口的实现类,因而该类实现了DeletionClient接口定义的四个方法。

先来说构造函数的两个字段:KafkaController实例和KafkaZkClient实例。KafkaController实例,我们已经很熟悉了,就是Controller组件对象;而KafkaZkClient实例,就是Kafka与ZooKeeper交互的客户端对象。

接下来,我们再结合代码看下DeletionClient接口实现类ControllerDeletionClient定义的4个方法。我来简单介绍下这4个方法大致是做什么的。

1. deleteTopic

它用于删除主题在ZooKeeper上的所有“痕迹”。具体方法是,分别调用KafkaZkClient的3个方法去删除ZooKeeper下/brokers/topics/节点、/config/topics/节点和/admin/delete_topics/节点。

2. deleteTopicDeletions

它用于删除ZooKeeper下待删除主题的标记节点。具体方法是,调用KafkaZkClient的deleteTopicDeletions方法,批量删除一组主题在/admin/delete_topics下的子节点。注意,deleteTopicDeletions这个方法名结尾的Deletions,表示/admin/delete_topics下的子节点。所以,deleteTopic是删除主题,deleteTopicDeletions是删除/admin/delete_topics下的对应子节点。

到这里,我们还要注意的一点是,这两个方法里都有一个epochZkVersion的字段,代表期望的Controller Epoch版本号。如果你使用一个旧的Epoch版本号执行这些方法,ZooKeeper会拒绝,因为和它自己保存的版本号不匹配。如果一个Controller的Epoch值小于ZooKeeper中保存的,那么这个Controller很可能是已经过期的Controller。这种Controller就被称为Zombie Controller。epochZkVersion字段的作用,就是隔离Zombie Controller发送的操作。

3. mutePartitionModifications

它的作用是屏蔽主题分区数据变更监听器,具体实现原理其实就是取消/brokers/topics/节点数据变更的监听。这样当该主题的分区数据发生变更后,由于对应的ZooKeeper监听器已经被取消了,因此不会触发Controller相应的处理逻辑。

那为什么要取消这个监听器呢?其实,主要是为了避免操作之间的相互干扰。设想下,用户A发起了主题删除,而同时用户B为这个主题新增了分区。此时,这两个操作就会相互冲突,如果允许Controller同时处理这两个操作,势必会造成逻辑上的混乱以及状态的不一致。为了应对这种情况,在移除主题副本和分区对象前,代码要先执行这个方法,以确保不再响应用户对该主题的其他操作。

mutePartitionModifications方法的实现原理很简单,它会调用unregisterPartitionModificationsHandlers,并接着调用KafkaZkClient的unregisterZNodeChangeHandler方法,取消ZooKeeper上对给定主题的分区节点数据变更的监听。

4. sendMetadataUpdate

它会调用KafkaController的sendUpdateMetadataRequest方法,给集群所有Broker发送更新请求,告诉它们不要再为已删除主题的分区提供服务。代码如下: override def sendMetadataUpdate(partitions: Set[TopicPartition]): Unit = { // 给集群所有Broker发送UpdateMetadataRequest // 通知它们给定partitions的状态变化 controller.sendUpdateMetadataRequest( controller.controllerContext.liveOrShuttingDownBrokerIds.toSeq, partitions) }

该方法会给集群中的所有Broker发送更新元数据请求,告知它们要同步给定分区的状态。

TopicDeletionManager定义及初始化

有了这些铺垫,我们再来看主题删除管理器的主要入口:TopicDeletionManager类。这个类的定义代码,如下: class TopicDeletionManager( // KafkaConfig类,保存Broker端参数 config: KafkaConfig, // 集群元数据 controllerContext: ControllerContext, // 副本状态机,用于设置副本状态 replicaStateMachine: ReplicaStateMachine, // 分区状态机,用于设置分区状态 partitionStateMachine: PartitionStateMachine, // DeletionClient接口,实现主题删除 client: DeletionClient) extends Logging { this.logIdent = s”[Topic Deletion Manager ${config.brokerId}] “ // 是否允许删除主题 val isDeleteTopicEnabled: Boolean = config.deleteTopicEnable …… }

该类主要的属性有6个,我们分别来看看。

  • config:KafkaConfig实例,可以用作获取Broker端参数delete.topic.enable的值。该参数用于控制是否允许删除主题,默认值是true,即Kafka默认允许用户删除主题。
  • controllerContext:Controller端保存的元数据信息。删除主题必然要变更集群元数据信息,因此TopicDeletionManager需要用到controllerContext的方法,去更新它保存的数据。
  • replicaStateMachine和partitionStateMachine:副本状态机和分区状态机。它们各自负责副本和分区的状态转换,以保持副本对象和分区对象在集群上的一致性状态。这两个状态机是后面两讲的重要知识点。
  • client:前面介绍的DeletionClient接口。TopicDeletionManager通过该接口执行ZooKeeper上节点的相应更新。
  • isDeleteTopicEnabled:表明主题是否允许被删除。它是Broker端参数delete.topic.enable的值,默认是true,表示Kafka允许删除主题。源码中大量使用这个字段判断主题的可删除性。前面的config参数的主要目的就是设置这个字段的值。被设定之后,config就不再被源码使用了。

好了,知道这些字段的含义,我们再看看TopicDeletionManager类实例是如何被创建的。

实际上,这个实例是在KafkaController类初始化时被创建的。在KafkaController类的源码中,你可以很容易地找到这行代码: val topicDeletionManager = new TopicDeletionManager(config, controllerContext, replicaStateMachine, partitionStateMachine, new ControllerDeletionClient(this, zkClient))

可以看到,代码实例化了一个全新的ControllerDeletionClient对象,然后利用这个对象实例和replicaStateMachine、partitionStateMachine,一起创建TopicDeletionManager实例。

为了方便你理解,我再给你画一张流程图:

TopicDeletionManager重要方法

除了类定义和初始化,TopicDeletionManager类还定义了16个方法。在这些方法中,最重要的当属resumeDeletions方法。它是重启主题删除操作过程的方法。

主题因为某些事件可能一时无法完成删除,比如主题分区正在进行副本重分配等。一旦这些事件完成后,主题重新具备可删除的资格。此时,代码就需要调用resumeDeletions重启删除操作。

这个方法之所以很重要,是因为它还串联了TopicDeletionManager类的很多方法,如completeDeleteTopic和onTopicDeletion等。因此,你完全可以从resumeDeletions方法开始,逐渐深入到其他方法代码的学习。

那我们就先学习resumeDeletions的实现代码吧。 private def resumeDeletions(): Unit = { // 从元数据缓存中获取要删除的主题列表 val topicsQueuedForDeletion = Set.empty[String] ++ controllerContext.topicsToBeDeleted // 待重试主题列表 val topicsEligibleForRetry = mutable.Set.empty[String] // 待删除主题列表 val topicsEligibleForDeletion = mutable.Set.empty[String] if (topicsQueuedForDeletion.nonEmpty) info(s”Handling deletion for topics ${topicsQueuedForDeletion.mkString(“,”)}”) // 遍历每个待删除主题 topicsQueuedForDeletion.foreach { topic => // 如果该主题所有副本已经是ReplicaDeletionSuccessful状态 // 即该主题已经被删除 if (controllerContext.areAllReplicasInState(topic, ReplicaDeletionSuccessful)) { // 调用completeDeleteTopic方法完成后续操作即可 completeDeleteTopic(topic) info(s”Deletion of topic $topic successfully completed”) // 如果主题删除尚未开始并且主题当前无法执行删除的话 } else if (!controllerContext.isAnyReplicaInState(topic, ReplicaDeletionStarted)) { if (controllerContext.isAnyReplicaInState(topic, ReplicaDeletionIneligible)) { // 把该主题加到待重试主题列表中用于后续重试 topicsEligibleForRetry += topic } } // 如果该主题能够被删除 if (isTopicEligibleForDeletion(topic)) { info(s”Deletion of topic $topic (re)started”) topicsEligibleForDeletion += topic } } // 重试待重试主题列表中的主题删除操作 if (topicsEligibleForRetry.nonEmpty) { retryDeletionForIneligibleReplicas(topicsEligibleForRetry) } // 调用onTopicDeletion方法,对待删除主题列表中的主题执行删除操作 if (topicsEligibleForDeletion.nonEmpty) { onTopicDeletion(topicsEligibleForDeletion) } }

通过代码我们发现,这个方法首先从元数据缓存中获取要删除的主题列表,之后定义了两个空的主题列表,分别保存待重试删除主题和待删除主题。

然后,代码遍历每个要删除的主题,去看它所有副本的状态。如果副本状态都是ReplicaDeletionSuccessful,就表明该主题已经被成功删除,此时,再调用completeDeleteTopic方法,完成后续的操作就可以了。对于那些删除操作尚未开始,并且暂时无法执行删除的主题,源码会把这类主题加到待重试主题列表中,用于后续重试;如果主题是能够被删除的,就将其加入到待删除列表中。

最后,该方法调用retryDeletionForIneligibleReplicas方法,来重试待重试主题列表中的主题删除操作。对于待删除主题列表中的主题则调用onTopicDeletion删除之。

值得一提的是,retryDeletionForIneligibleReplicas方法用于重试主题删除。这是通过将对应主题副本的状态,从ReplicaDeletionIneligible变更到OfflineReplica来完成的。这样,后续再次调用resumeDeletions时,会尝试重新删除主题。

看到这里,我们再次发现,Kafka的方法命名真的是非常规范。得益于这一点,很多时候,我们不用深入到方法内部,就能知道这个方法大致是做什么用的。比如:

  • topicsQueuedForDeletion方法,应该是保存待删除的主题列表;
  • controllerContext.isAnyReplicaInState方法,应该是判断某个主题下是否有副本处于某种状态;
  • 而onTopicDeletion方法,应该就是执行主题删除操作用的。

这时,你再去阅读这3个方法的源码,就会发现它们的作用确实如其名字标识的那样。这也再次证明了Kafka源码质量是非常不错的。因此,不管你是不是Kafka的使用者,都可以把Kafka的源码作为阅读开源框架源码、提升自己竞争力的一个选择。

下面,我再用一张图来解释下resumeDeletions方法的执行流程:

到这里,resumeDeletions方法的逻辑我就讲完了,它果然是串联起了TopicDeletionManger中定义的很多方法。其中,比较关键的两个操作是completeDeleteTopiconTopicDeletion。接下来,我们就分别看看。

先来看completeDeleteTopic方法代码,我给每行代码都加标了注解。 private def completeDeleteTopic(topic: String): Unit = { // 第1步:注销分区变更监听器,防止删除过程中因分区数据变更 // 导致监听器被触发,引起状态不一致 client.mutePartitionModifications(topic) // 第2步:获取该主题下处于ReplicaDeletionSuccessful状态的所有副本对象, // 即所有已经被成功删除的副本对象 val replicasForDeletedTopic = controllerContext.replicasInState(topic, ReplicaDeletionSuccessful) // 第3步:利用副本状态机将这些副本对象转换成NonExistentReplica状态。 // 等同于在状态机中删除这些副本 replicaStateMachine.handleStateChanges( replicasForDeletedTopic.toSeq, NonExistentReplica) // 第4步:更新元数据缓存中的待删除主题列表和已开始删除的主题列表 // 因为主题已经成功删除了,没有必要出现在这两个列表中了 controllerContext.topicsToBeDeleted -= topic controllerContext.topicsWithDeletionStarted -= topic // 第5步:移除ZooKeeper上关于该主题的一切“痕迹” client.deleteTopic(topic, controllerContext.epochZkVersion) // 第6步:移除元数据缓存中关于该主题的一切“痕迹” controllerContext.removeTopic(topic) }

整个过程如行云流水般一气呵成,非常容易理解,我就不多解释了。

再来看看onTopicDeletion方法的代码: private def onTopicDeletion(topics: Set[String]): Unit = { // 找出给定待删除主题列表中那些尚未开启删除操作的所有主题 val unseenTopicsForDeletion = topics.diff(controllerContext.topicsWithDeletionStarted) if (unseenTopicsForDeletion.nonEmpty) { // 获取到这些主题的所有分区对象 val unseenPartitionsForDeletion = unseenTopicsForDeletion.flatMap(controllerContext.partitionsForTopic) // 将这些分区的状态依次调整成OfflinePartition和NonExistentPartition // 等同于将这些分区从分区状态机中删除 partitionStateMachine.handleStateChanges( unseenPartitionsForDeletion.toSeq, OfflinePartition) partitionStateMachine.handleStateChanges( unseenPartitionsForDeletion.toSeq, NonExistentPartition) // 把这些主题加到“已开启删除操作”主题列表中 controllerContext.beginTopicDeletion(unseenTopicsForDeletion) } // 给集群所有Broker发送元数据更新请求,告诉它们不要再为这些主题处理数据了 client.sendMetadataUpdate( topics.flatMap(controllerContext.partitionsForTopic)) // 分区删除操作会执行底层的物理磁盘文件删除动作 onPartitionDeletion(topics) }

我在代码中用注释的方式,已经把onTopicDeletion方法的逻辑解释清楚了。你可以发现,这个方法也基本是串行化的流程,没什么难理解的。我再给你梳理下其中的核心点。

onTopicDeletion方法会多次使用分区状态机,来调整待删除主题的分区状态。在后两讲的分区状态机和副本状态机的课里面,我还会和你详细讲解它们,包括它们都定义了哪些状态,这些状态彼此之间的转换规则都是什么,等等。

onTopicDeletion方法的最后一行调用了onPartitionDeletion方法,来执行真正的底层物理磁盘文件删除。实际上,这是通过副本状态机状态转换操作完成的。下节课,我会和你详细聊聊这个事情。

学到这里,我还想提醒你的是,在学习TopicDeletionManager方法的时候,非常重要一点的是,你要理解主题删除的脉络。对于其他部分的源码,也是这个道理。一旦你掌握了整体的流程,阅读那些细枝末节的方法代码就没有任何难度了。照着这个方法去读源码,搞懂Kafka源码也不是什么难事!

总结

今天,我们主要学习了TopicDeletionManager.scala中关于主题删除的代码。这里有几个要点,需要你记住。

  • 在主题删除过程中,Kafka会调整集群中三个地方的数据:ZooKeeper、元数据缓存和磁盘日志文件。删除主题时,ZooKeeper上与该主题相关的所有ZNode节点必须被清除;Controller端元数据缓存中的相关项,也必须要被处理,并且要被同步到集群的其他Broker上;而磁盘日志文件,更是要清理的首要目标。这三个地方必须要统一处理,就好似我们常说的原子性操作一样。现在回想下开篇提到的那个“秘籍”,你就会发现它缺少了非常重要的一环,那就是:无法清除Controller端的元数据缓存项。因此,你要尽力避免使用这个“大招”。
  • DeletionClient接口的作用,主要是操作ZooKeeper,实现ZooKeeper节点的删除等操作。
  • TopicDeletionManager,是在KafkaController创建过程中被初始化的,主要通过与元数据缓存进行交互的方式,来更新各类数据。

今天我们看到的代码中出现了大量的replicaStateMachine和partitionStateMachine,其实这就是我今天反复提到的副本状态机和分区状态机。接下来两讲,我会逐步带你走进它们的代码世界,让你领略下Kafka通过状态机机制管理副本和分区的风采。

课后讨论

上节课,我们在学习processTopicDeletion方法的代码时,看到了一个名为markTopicIneligibleForDeletion的方法,这也是和主题删除相关的方法。现在,你能说说,它是做什么用的、它的实现原理又是怎样的吗?

欢迎你在留言区畅所欲言,跟我交流讨论,也欢迎你把今天的内容分享给你的朋友。

参考资料

https://learn.lianglianglee.com/%e4%b8%93%e6%a0%8f/Kafka%e6%a0%b8%e5%bf%83%e6%ba%90%e7%a0%81%e8%a7%a3%e8%af%bb/16%20TopicDeletionManager%ef%bc%9a%20Topic%e6%98%af%e6%80%8e%e4%b9%88%e8%a2%ab%e5%88%a0%e9%99%a4%e7%9a%84%ef%bc%9f.md