kafka 3.5 日志定时清理(源码)

一、定时任务入口

这里选择kraft的模式启动的定时任务,所以入口是在BrokerServer.scala文件中,如果选择ZooKeeper模式的入口在KafkaServer.scala

 def startup(): Unit = {
    if (!maybeChangeStatus(SHUTDOWN, STARTING)) return
    try {
      info("Starting broker")

      /* start scheduler 开始定时任务*/,
      kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
      kafkaScheduler.startup()
	  //省略。。。
      //创建日志管理器,但不要启动它,因为我们需要延迟任何潜在的不干净关闭日志恢复
      // 直到我们赶上元数据日志并拥有最新的主题和代理配置。
      logManager = LogManager(config, initialOfflineDirs, metadataCache, kafkaScheduler, time,
        brokerTopicStats, logDirFailureChannel, keepPartitionMetadataFile = true)
		//省略。。。
    } catch {
      case e: Throwable =>
        maybeChangeStatus(STARTING, STARTED)
        fatal("Fatal error during broker startup. Prepare to shutdown", e)
        shutdown()
        throw e
    }
  }

二、LopManager(这个是日志抽象层,实际逻辑不在这里)

object LogManager {
  val RecoveryPointCheckpointFile = "recovery-point-offset-checkpoint"
  val LogStartOffsetCheckpointFile = "log-start-offset-checkpoint"
  val ProducerIdExpirationCheckIntervalMs = 10 * 60 * 1000

  def apply(config: KafkaConfig,
            initialOfflineDirs: Seq[String],
            configRepository: ConfigRepository,
            kafkaScheduler: KafkaScheduler,
            time: Time,
            brokerTopicStats: BrokerTopicStats,
            logDirFailureChannel: LogDirFailureChannel,
            keepPartitionMetadataFile: Boolean): LogManager = {
    val defaultProps = LogConfig.extractLogConfigMap(config)

    LogConfig.validateValues(defaultProps)
    val defaultLogConfig = LogConfig(defaultProps)

    val cleanerConfig = LogCleaner.cleanerConfig(config)

    new LogManager(logDirs = config.logDirs.map(new File(_).getAbsoluteFile),
      initialOfflineDirs = initialOfflineDirs.map(new File(_).getAbsoluteFile),
      configRepository = configRepository,
      initialDefaultConfig = defaultLogConfig,
      cleanerConfig = cleanerConfig,
      recoveryThreadsPerDataDir = config.numRecoveryThreadsPerDataDir,
      flushCheckMs = config.logFlushSchedulerIntervalMs,
      flushRecoveryOffsetCheckpointMs = config.logFlushOffsetCheckpointIntervalMs,
      flushStartOffsetCheckpointMs = config.logFlushStartOffsetCheckpointIntervalMs,
      //log.retention.check.interval.ms 日志保留检查间隔 ms
      retentionCheckMs = config.logCleanupIntervalMs,
      maxPidExpirationMs = config.transactionalIdExpirationMs,
      scheduler = kafkaScheduler,
      brokerTopicStats = brokerTopicStats,
      logDirFailureChannel = logDirFailureChannel,
      time = time,
      keepPartitionMetadataFile = keepPartitionMetadataFile,
      interBrokerProtocolVersion = config.interBrokerProtocolVersion)
  }

}

上面的new LopManager 是下面的

1 把日志清理加入定时任务中

/**
 * The entry point to the kafka log management subsystem. The log manager is responsible for log creation, retrieval, and cleaning.
 * All read and write operations are delegated to the individual log instances.
 *kafka 日志管理子系统的入口点。日志管理器负责日志的创建、检索和清理。 所有读写操作都委托给各个日志实例
 * The log manager maintains logs in one or more directories. New logs are created in the data directory
 * with the fewest logs. No attempt is made to move partitions after the fact or balance based on
 * size or I/O rate.
 *日志管理器在一个或多个目录中维护日志。新日志在数据目录 中创建,日志最少。事后不会尝试移动分区或根据  大小或 I/O 速率进行平衡
 * A background thread handles log retention by periodically truncating excess log segments.
 * 后台线程通过定期截断多余的日志段来处理日志保留
 */
@threadsafe
class LogManager(logDirs: Seq[File],
                 initialOfflineDirs: Seq[File],
                 configRepository: ConfigRepository,
                 val initialDefaultConfig: LogConfig,
                 val cleanerConfig: CleanerConfig,
                 recoveryThreadsPerDataDir: Int,
                 val flushCheckMs: Long,
                 val flushRecoveryOffsetCheckpointMs: Long,
                 val flushStartOffsetCheckpointMs: Long,
                 val retentionCheckMs: Long,
                 val maxPidExpirationMs: Int,
                 interBrokerProtocolVersion: ApiVersion,
                 scheduler: Scheduler,
                 brokerTopicStats: BrokerTopicStats,
                 logDirFailureChannel: LogDirFailureChannel,
                 time: Time,
                 val keepPartitionMetadataFile: Boolean) extends Logging with KafkaMetricsGroup {

  import LogManager._
//省略。。。。

  /**
   *  Start the background threads to flush logs and do log cleanup
   *  启动后台线程以刷新日志并执行日志清理
   */
  def startup(topicNames: Set[String]): Unit = {
    // ensure consistency between default config and overrides
    //确保默认配置和覆盖之间的一致性
    val defaultConfig = currentDefaultConfig
    startupWithConfigOverrides(defaultConfig, fetchTopicConfigOverrides(defaultConfig, topicNames))
  }
  //把定时任务加入到scheduler中
 private[log] def startupWithConfigOverrides(defaultConfig: LogConfig, topicConfigOverrides: Map[String, LogConfig]): Unit = {
    loadLogs(defaultConfig, topicConfigOverrides) // this could take a while if shutdown was not clean

    /* Schedule the cleanup task to delete old logs */
    if (scheduler != null) {
      scheduler.schedule("kafka-log-retention",
                         () => cleanupLogs(),
                         InitialTaskDelayMs,
                         retentionCheckMs)
   //省略代码
}                         

三、清理符合条件的日志

 /**
   * 删除任何符合条件的日志。返回删除的段数。
   * 只考虑未压缩的日志。
   */
  def cleanupLogs(): Unit = {
     //省略代码
    try {
      deletableLogs.foreach {
        case (topicPartition, log) =>
          debug(s"Garbage collecting '${log.name}'")
          total += log.deleteOldSegments()
      }
    } finally {
      //省略代码
    }
  }

上面log.deleteOldSegments() 调用的是Log.scaladeleteOldSegments

/**
   *如果启用了主题删除,请删除由于基于时间的保留 ,或由于日志大小大于保留大小而过期的任何日志段。
   * 无论是否启用删除,删除日志开始偏移量之前的所有日志段
   */
  def deleteOldSegments(): Int = {
    if (config.delete) {
      //删除LogStartOffset日志之前的所有的segment
      deleteLogStartOffsetBreachedSegments() +
      //删除超出的保留大小段
        deleteRetentionSizeBreachedSegments() +
      //删除超过保存时间的
        deleteRetentionMsBreachedSegments()
    } else {
      //删除LogStartOffset日志之前的所有的segment
      deleteLogStartOffsetBreachedSegments()
    }
  }

  private def deleteRetentionMsBreachedSegments(): Int = {
    //retention.ms 小于0代表不设置保存时间
    if (config.retentionMs < 0) return 0
    val startMs = time.milliseconds

    //当前时间减去segment追后修改时间,是否大于设置的超时时间
    def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): Boolean = {
      startMs - segment.largestTimestamp > config.retentionMs
    }

    deleteOldSegments(shouldDelete, RetentionMsBreach(this))
  }

  private def deleteRetentionSizeBreachedSegments(): Int = {
    //retention.bytes 小于0代表不删,或者当前topic的此分区的日志大小小于设置的值,也不用删
    if (config.retentionSize < 0 || size < config.retentionSize) return 0
    //retentionSize代表分区最小保留大小,而diff代表超过多少
    var diff = size - config.retentionSize
    def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): Boolean = {

      if (diff - segment.size >= 0) {
        //意味着删除segment后,仍然有足够的空间(diff是一个表示剩余segment的变量),
        // 那么该segment应该被删除。在这种情况下,diff的值会减去segment的大小,并返回true。
        diff -= segment.size
        true
      } else {
        //如果diff - segment.size的结果小于0,意味着删除segment后剩余的segment不满足retentionSize
        false
      }

    deleteOldSegments(shouldDelete, RetentionSizeBreach(this))
  }

  private def deleteLogStartOffsetBreachedSegments(): Int = {
    def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): Boolean = {
      nextSegmentOpt.exists(_.baseOffset <= logStartOffset)
    }

    deleteOldSegments(shouldDelete, StartOffsetBreach(this))
  }
 /**
   * 删除从最旧的段开始并向前移动的任何本地日志段,直到用户提供的谓词为 false 或达到包含当前高水位线的段。
   * 我们不会删除偏移量等于或超过高水位线的段,以确保日志开始偏移量永远不会超过它。
   * 如果高水位线尚未初始化,则没有段符合删除条件。
   * @param predicate  一个函数,它接受候选日志段和下一个更高的段(如果有的话)并返回 true if 它是可删除的
   * @param reason The reason for the segment deletion 段删除的原因
   * @return The number of segments deleted
   */
  private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean,
                                reason: SegmentDeletionReason): Int = {
    //相当于在predicate外额外加的条件                 
    def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): Boolean = {
      //检查高水位标记(highWatermark)是否大于等于下一个日志段的基础偏移量(baseOffset),以及predicate函数的返回值
      highWatermark >= nextSegmentOpt.map(_.baseOffset).getOrElse(localLog.logEndOffset) &&
        predicate(segment, nextSegmentOpt)
    }
    lock synchronized {
      //localLog.deletableSegments(shouldDelete)获取可删除的日志段
      val deletable = localLog.deletableSegments(shouldDelete)
      if (deletable.nonEmpty) {
        //调用deleteSegments方法删除这些日志段
        deleteSegments(deletable, reason)
      } else
        0
    }
  }

看上面的代码就知道,如果你配置了按最大超时时间日志最大存储大小,那两个定时清理都会被执行
而针对LogStartOffset的清理,都会执行

1、deletableSegments(把需要删除的segment加入待删除的集合)


  /**
   * Find segments starting from the oldest until the user-supplied predicate is false.
   * A final segment that is empty will never be returned.
   * 查找从最旧的段开始,直到用户提供的谓词为 false。最后一个为空的段将永远不会返回。
   * @param predicate A function that takes in a candidate log segment, the next higher segment
   *                  (if there is one). It returns true iff the segment is deletable.
   *                  一个函数,它接收候选日志段,下一个更高的段(如果有的话)。如果段可删除,则返回 true。
   * @return the segments ready to be deleted
   */
  private[log] def deletableSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean): Iterable[LogSegment] = {
    if (segments.isEmpty) {
      Seq.empty
    } else {
      val deletable = ArrayBuffer.empty[LogSegment]
      val segmentsIterator = segments.values.iterator
      var segmentOpt = nextOption(segmentsIterator)
      //遍历所有的segment
      while (segmentOpt.isDefined) {
        val segment = segmentOpt.get
        val nextSegmentOpt = nextOption(segmentsIterator)
        //下一个segment为null,并且当前segment的大小为0则不允许删除
        val isLastSegmentAndEmpty = nextSegmentOpt.isEmpty && segment.size == 0
        //还需要通过predicate方法判断,这个是通过传入的
        if (predicate(segment, nextSegmentOpt) && !isLastSegmentAndEmpty) {
          deletable += segment
          segmentOpt = nextSegmentOpt
        } else {
          segmentOpt = Option.empty
        }
      }
      deletable
    }
  }

这里需要注意,不管是哪种删除方式把segment加入待删除集合,必须有isLastSegmentAndEmpty的判断,代表的是如果此segment是最后一个(nextSegmentOpt.isEmpty为true时,代表没有下一个segment并且最后一个segment中数据为不存在(空文件)是不允许删除segment,原因是没有删除的必要

(1)如果主题分区的logStartOffset大于segment的baseOffset

deleteLogStartOffsetBreachedSegments函数传给deleteOldSegmentspredicate

 def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): Boolean = {
      nextSegmentOpt.exists(_.baseOffset <= logStartOffset)
 }

deleteOldSegments传给deletableSegmentslocalLog.deletableSegments的predicate是

 def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): Boolean = {
      //1、检查高水位标记(highWatermark)是否大于等于下一个日志段的基础偏移量(baseOffset),如果baseOffset不存在,则获得当前主题分区的logEndOffset值,
      //2、deleteLogStartOffsetBreachedSegments传过来的predicate函数
      highWatermark >= nextSegmentOpt.map(_.baseOffset).getOrElse(localLog.logEndOffset) &&
        predicate(segment, nextSegmentOpt)
}

主题分区的logStartOffset 要大于下一个segmentbaseOffset,所以deleteLogStartOffsetBreachedSegments方法要把segment加入到待删除的列表条件

  1. 至少两个segment,后面第二个segmentbaseOffset要大于等于主题分区的logStartOffset
  2. 要把第一个segment加入待删除列表,后面第二个segmentbaseOffset(如果不存在,则用主题分区的logEndOffset代替)要大于等于主题分区的highWatermark
  3. 当前segment为空文件,不允许删

(2)如果主题分区文件总大小大于配置中retention.bytes(最大留存大小)

deleteRetentionSizeBreachedSegments函数传给deleteOldSegmentspredicate

 def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): Boolean = {
       if (diff - segment.size >= 0) {
        //意味着删除segment后,仍然有足够的空间(diff是一个表示剩余segment的变量),
        // 那么该segment应该被删除。在这种情况下,diff的值会减去segment的大小,并返回true。
        diff -= segment.size
        true
      } else {
        //如果diff - segment.size的结果小于0,意味着删除segment后剩余的segment不满足retentionSize
        false
      }
    }

deleteOldSegments传给deletableSegmentslocalLog.deletableSegmentspredicate

 def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): Boolean = {
      //1、检查高水位标记(highWatermark)是否大于等于下一个日志段的基础偏移量(baseOffset),如果baseOffset不存在,则获得当前主题分区的logEndOffset值,
      //2、deleteRetentionSizeBreachedSegments传过来的predicate函数
      highWatermark >= nextSegmentOpt.map(_.baseOffset).getOrElse(localLog.logEndOffset) &&
        predicate(segment, nextSegmentOpt)
}

segment遍历时,会迭代diff判断是否大于设置的阈值(retention.bytes),deleteRetentionSizeBreachedSegments方法要把符合条件的segment加入到待删除的列表

  1. 分区的全部的segment文件大小总和大于配置的retention.bytes
  2. 要把第一个segment加入待删除列表,后面第二个segmentbaseOffset(如果不存在,则用主题分区的logEndOffset代替)要大于等于主题分区的highWatermark
  3. 当前segment为空文件,不允许删

(3)如果主题分区各个segment的修改时间和当前时间差大于配置的retention.ms(最大留存时间)

deleteRetentionMsBreachedSegments函数传给deleteOldSegmentspredicate

val startMs = time.milliseconds
 //当前时间减去segment追后修改时间,是否大于设置的超时时间
    def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): Boolean = {
      startMs - segment.largestTimestamp > config.retentionMs
 }

deleteOldSegments传给deletableSegmentslocalLog.deletableSegmentspredicate

 def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): Boolean = {
      //1、检查高水位标记(highWatermark)是否大于等于下一个日志段的基础偏移量(baseOffset),如果baseOffset不存在,则获得当前主题分区的logEndOffset值,
      //2、deleteRetentionMsBreachedSegments传过来的predicate函数
      highWatermark >= nextSegmentOpt.map(_.baseOffset).getOrElse(localLog.logEndOffset) &&
        predicate(segment, nextSegmentOpt)
}

当前的segment文件的largestTimestamp和当前系统时间的差值大于retention.ms,则会判定需要删除此segment
所以条件总结:

  1. segment的最后修改时间和当前系统时间差值大于配置的retention.bytes
  2. 要把第一个segment加入待删除列表,后面第二个segmentbaseOffset(如果不存在,则用主题分区的logEndOffset代替)要大于等于主题分区的highWatermark
  3. 当前segment为空文件,不允许删

2、把上面待删除segment执行删除操作

如果上面待删除segment集合不为空则执行删除操作

if (deletable.nonEmpty) {
        //调用deleteSegments方法删除这些日志段
        deleteSegments(deletable, reason)
 }

其中deleteSegments的方法实现如下

  // deletable是一个可迭代对象,表示要删除的日志段。
  //reason是一个SegmentDeletionReason类型的参数,表示删除日志段的原因。
  private def deleteSegments(deletable: Iterable[LogSegment], reason: SegmentDeletionReason): Int = {
    maybeHandleIOException(s"Error while deleting segments for $topicPartition in dir ${dir.getParent}") {
      //numToDelete表示要删除的日志段数量。
      val numToDelete = deletable.size
      if (numToDelete > 0) {
        //如果要删除的日志段数量等于当前日志的所有日志段数量,即localLog.segments.numberOfSegments等于numToDelete,则先创建一个新的日志段。
        var segmentsToDelete = deletable
        if (localLog.segments.numberOfSegments == numToDelete) {
          val newSegment = roll()
          //如果要删除的最后一个日志段的基准偏移量等于新创建的日志段的基准偏移量,说明删除了一个空的活动日志段并重新创建了一个新的日志段,会打印警告信息。
          if (deletable.last.baseOffset == newSegment.baseOffset) {
            warn(s"Empty active segment at ${deletable.last.baseOffset} was deleted and recreated due to $reason")
            segmentsToDelete = deletable.dropRight(1)
          }
        }
        //用于检查内存映射缓冲区是否关闭。
        localLog.checkIfMemoryMappedBufferClosed()
        //用于从日志中移除并删除指定的日志段。asyncDelete=true代表采用异步的方式删除日志端
        localLog.removeAndDeleteSegments(segmentsToDelete, asyncDelete = true, reason)
        //删除生产者快照。
        deleteProducerSnapshots(deletable, asyncDelete = true)
        //根据删除的日志段更新日志的起始偏移量。
        maybeIncrementLogStartOffset(localLog.segments.firstSegmentBaseOffset.get, LogStartOffsetIncrementReason.SegmentDeletion)
      }
      numToDelete
    }
  }

1、把segment文件后缀改成.deleted

/**
   *此方法通过对每个日志段执行以下操作来删除给定的日志段:
   * - 它从段映射中删除段,以便它不再用于读取。
   * - 它通过将 .delete 附加到相应的文件名来重命名索引和日志文件
   * - 它可以计划将来发生的异步删除操作或同步执行删除
   *异步删除允许在不同步的情况下并发进行读取,并且不可能在读取文件时物理删除文件。
   *此方法不会将 IOException 转换为 KafkaStorageException,直接调用方应捕获并处理 IOException。
   * @param segmentsToDelete The log segments to schedule for deletion 要计划删除的日志段
   * @param asyncDelete Whether the segment files should be deleted as ynchronously 是否应异步删除段文件
   * @param reason The reason for the segment deletion 段删除的原因
   */
  private[log] def removeAndDeleteSegments(segmentsToDelete: Iterable[LogSegment],
                                           asyncDelete: Boolean,
                                           reason: SegmentDeletionReason): Unit = {
    if (segmentsToDelete.nonEmpty) {
      val toDelete = segmentsToDelete.toList
      reason.logReason(toDelete)
      toDelete.foreach { segment =>
        segments.remove(segment.baseOffset)
      }
      LocalLog.deleteSegmentFiles(toDelete, asyncDelete, dir, topicPartition, config, scheduler, logDirFailureChannel, logIdent)
    }
  }
/**
   *对给定段的索引和日志文件执行物理删除。在删除之前,通过将 .delete 附加到相应的文件名来重命名索引和日志文件。允许选择性地异步删除这些文件。
   *这个方法假定该文件存在。
   * 它不需要将IOException(从changeFile后缀抛出)转换为KafkaStorageException,
   * 因为它要么在加载所有日志之前调用,要么调用方将捕获并处理IOException。
   */
  private[log] def deleteSegmentFiles(segmentsToDelete: immutable.Iterable[LogSegment],
                                      asyncDelete: Boolean,
                                      dir: File,
                                      topicPartition: TopicPartition,
                                      config: LogConfig,
                                      scheduler: Scheduler,
                                      logDirFailureChannel: LogDirFailureChannel,
                                      logPrefix: String): Unit = {
    //遍历所有的待删除的segment,把segment文件后缀改成.deleted
    segmentsToDelete.foreach { segment =>
      if (!segment.hasSuffix(LogFileUtils.DELETED_FILE_SUFFIX))
        segment.changeFileSuffixes("", LogFileUtils.DELETED_FILE_SUFFIX)
    }
    //删除segment具体方法
    def deleteSegments(): Unit = {
      info(s"${logPrefix}Deleting segment files ${segmentsToDelete.mkString(",")}")
      //获取dir的父目录,并赋值给parentDir变量。
      val parentDir = dir.getParent
      maybeHandleIOException(logDirFailureChannel, parentDir, s"Error while deleting segments for $topicPartition in dir $parentDir") {
        //遍历segmentsToDelete列表中的每个元素(即要删除的文件),并调用deleteIfExists()方法删除文件。
        segmentsToDelete.foreach { segment =>
          segment.deleteIfExists()
        }
      }
    }
    //asyncDelete为true,代表是异步删除,加入到scheduler,并且只执行一次
    if (asyncDelete)
      scheduler.scheduleOnce("delete-file", () => deleteSegments(), config.fileDeleteDelayMs)
    else
      deleteSegments()
  }

2、异步执行删除文件后缀是.delete的文件(加入只执行一次定时任务中)

上面的deleteSegments方法内
调用 localLog.removeAndDeleteSegments(segmentsToDelete, asyncDelete = true, reason)asyncDelete为true,所以采用异步方式删除segment

四,segment相关文件有哪些整理

下面的每一个segment有什么,可以看Kafka的Log存储原理再析,下面是我复制过来的一张图

主要是index,log,timeindexs三种文件,
在这里插入图片描述
而index和log文件的映射是下面这种
在这里插入图片描述

定时删除日志段就会把要删除的log文件后缀改成.delete

五、问题:

1、如果segment最大留存时间和分区最大留存文件大小都不配置会出现什么情况?

答:分区的segment不会被删除,可能会出现磁盘满了,而根据LogStartOffset删除只是删除补偿机制,只要LogStartOffset不主动更新,则不会删除旧的segment

2、如果只配置segment最大留存时间,并且数据量非常小,能消费最大留存时间之前的数据吗?

答:不可以,因为如果segment会删除重新建一个新的segemnt,

3、如果只是配置分区最大留存文件大小,并且数据量非常小,能消费到很久以前的吗?

答:可以,前提是数据量要小于分区最大留存文件大小,举例,segment为1G,retention.bytes=1G,现在这个分区的从开始生产消息到现在才500M数据,消费者就可以配置消费策略,重头开始消费,就能消费到生产者很久以前生产到此分区的第一条数据