0%

kafka之日志管理器

Kafka 日志管理器详解:存储、索引与清理机制

Kafka 的日志管理器(Log Manager)是数据持久化的核心组件,负责消息的存储、检索、维护和清理,直接影响 Kafka 的性能、可靠性和磁盘占用。本文将从日志存储结构、索引机制、检索流程到日志清理策略,全面解析日志管理器的工作原理。

日志存储基础

Kafka 中的 “日志” 并非传统意义上的文本日志,而是消息的持久化存储结构。其设计目标是高效支持高吞吐的写入和随机读取,核心特点是 “顺序写、分段存储”。

存储结构概览

  • 主题与分区映射:每个主题(Topic)的每个分区(Partition)对应一个独立的日志目录,命名格式为 {topic}-{partition}(如 test-topic-0test-topic-1)。
  • 副本存储:分区的每个副本(Replica)在不同 Broker 上拥有独立的日志目录,确保数据冗余。
  • 日志文件组织:每个日志目录包含多个日志段(LogSegment),每个 LogSegment 由 3 个文件组成:
    • .log:存储消息的实际内容(二进制格式)。
    • .index:偏移量索引文件,映射消息偏移量到 .log 文件中的物理位置。
    • .timeindex:时间戳索引文件,映射消息时间戳到偏移量。

日志分段(LogSegment)

Kafka 将每个分区的日志拆分为多个 LogSegment(默认最大 1GB),而非一个巨大的文件,原因如下:

  • 便于管理:单个大文件难以高效定位和删除,分段后可按段操作(如删除老数据)。
  • 提升性能:索引文件随分段变小,可缓存至内存,加速查询。
  • 并行处理:分段操作可分布式进行,避免单文件锁竞争。

日志文件的名称是以偏移量进行命名的,这样是为了方便知道数据在哪个日志段中(采用了跳跃表的方式)

在Log对象中维护了一个ConcurrentSkipListMap(底层是跳跃表),保存该主题所有分区对应的所有LogSegment

1
2
/* the actual segments of the log */
private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment]

LogSegment中封装有一个FileRecords对象(日志文件),一个OffsetIndex对象(偏移量索引文件)和一个TimeIndex对象(时间戳索引文件)

1
2
3
4
5
6
7
8
class LogSegment private[log] (val log: FileRecords,
val offsetIndex: OffsetIndex,
val timeIndex: TimeIndex,
val txnIndex: TransactionIndex,
val baseOffset: Long,
val indexIntervalBytes: Int,
val rollJitterMs: Long,
val time: Time) extends Logging

在存储结构上每个分区副本对应一个目录,每个分区副本由一个或多个日志段(LogSegment)组成。每个日志段在物理结构上对应一个以.index后缀的偏移量索引文件、一个以.timeindex后缀的时间戳索引文件和一个以.log结尾的日志文件

使用.index后缀的偏移量索引文件是为了方便定位数据,在.index文件中记录了许多偏移量的索引,每隔一个范围区间创建一个索引,这种方式称之为稀疏索引。这样可以避免索引文件过大,从而使得内存中可以保存更多的索引

使用.timeindex文件是为了kafka清理数据准备的,kafka默认是保留七天内的数据的,主要根据timeindex时间索引文件里最大的时间来判断的,如果最大时间与当前时间差值超过7天,那么对应的数据段就会被清理掉

关键配置:
  • log.segment.bytes:单个 LogSegment 的最大大小(默认 1073741824 字节,即 1GB)。
  • log.roll.ms/log.roll.hours:LogSegment 滚动周期(即使未达大小上限,超时也会创建新段,默认 7 天)。

日志文件与索引机制

1. 日志文件(.log)

.log 文件是消息的实际存储载体,消息以追加(Append) 方式写入(顺序写磁盘,性能远高于随机写)。每条消息包含以下核心字段:

  • 偏移量(Offset):分区内唯一的消息编号(单调递增)。
  • 时间戳(Timestamp):消息创建或写入的时间(可通过 log.message.timestamp.type 配置)。
  • 键(Key)值(Value):消息的实际内容(经序列化后的字节数组)。
  • 头部(Headers):可选的附加元数据(如业务标识)。
查看日志内容:

通过 kafka.tools.DumpLogSegments 工具可解析 .log 文件:

1
2
3
4
# 打印日志文件内容(包括偏移量、时间戳、消息体)
kafka-run-class.sh kafka.tools.DumpLogSegments \
--files /path/to/test-topic-0/00000000000000000000.log \
--print-data-log

2. 偏移量索引(.index)

.index 文件是稀疏索引(非每条消息都有索引),用于快速定位指定偏移量的消息在 .log 中的物理位置(如字节偏移量)。

索引结构:

每个索引条目为 8 字节(4 字节偏移量 + 4 字节物理位置),例如:

偏移量(Offset) 物理位置(Position)
0 0
100 4096
200 8192
索引间隔:

索引并非每条消息都记录,而是每隔一定字节(log.index.interval.bytes,默认 4096 字节)创建一条索引。例如,若消息平均大小为 100 字节,约每 40 条消息创建一条索引。

查找流程:

给定目标偏移量 targetOffset,查找步骤如下:

  1. 定位 LogSegment:通过文件名(如 00000000000000000100.log 表示起始偏移量为 100)找到包含 targetOffset 的 LogSegment。
  2. 二分查找索引:在 .index 文件中二分查找小于等于 targetOffset 的最大偏移量,获取其物理位置。
  3. 扫描日志文件:从该物理位置开始顺序扫描 .log 文件,直到找到 targetOffset 对应的消息。

3. 时间戳索引(.timeindex)

.timeindex 文件用于根据时间戳定位消息,支持按时间范围查询(如 “获取近 1 小时的消息”),是日志清理的核心依据。

索引结构:

每个索引条目为 12 字节(8 字节时间戳 + 4 字节偏移量),例如:

时间戳(Timestamp) 偏移量(Offset)
1620000000000 0
1620000300000 100
查找流程:

给定目标时间戳 targetTs,步骤类似偏移量查找:

  1. 定位包含 targetTs 的 LogSegment(通过段的最大时间戳)。
  2. .timeindex 中二分查找小于等于 targetTs 的最大时间戳,获取对应偏移量。
  3. 结合 .index 文件定位消息。

日志检索流程

无论是通过偏移量还是时间戳检索消息,Kafka 都遵循 “索引定位 + 日志扫描” 的高效流程,示例如下:

场景:查找分区 test-topic-0 中偏移量为 150 的消息。

  1. 定位 LogSegment
    假设日志目录包含 00000000000000000000.log(起始偏移量 0)和 00000000000000000100.log(起始偏移量 100),150 属于后者。
  2. 查询 .index 文件
    00000000000000000100.index 中二分查找 ≤150 的最大偏移量(假设为 140,对应物理位置 5800)。
  3. 扫描 .log 文件
    .log 文件的 5800 字节位置开始顺序读取,直到找到偏移量为 150 的消息。

日志清理机制

Kafka 不会永久存储消息,日志管理器会定期清理过期或超量的数据,避免磁盘占满。清理策略分为基于时间基于大小两种,可同时生效(满足任一条件即清理)。

1. 基于时间的清理

根据消息的保留时长删除老数据,核心配置:

  • log.retention.ms:消息最大保留时间(默认 604800000 毫秒,即 7 天)。
    (注:log.retention.hours/log.retention.minutes 已过时,优先使用 log.retention.ms
  • log.retention.check.interval.ms:清理检查间隔(默认 300000 毫秒,即 5 分钟)。
清理逻辑:
  • 对每个 LogSegment,检查其最大时间戳(.timeindex 中记录)。
  • 若最大时间戳与当前时间的差值 ≥ log.retention.ms,则删除该段。

2. 基于大小的清理

当分区日志总大小超过阈值时,删除最老的 LogSegment,核心配置:

  • log.retention.bytes:分区日志的最大总大小(默认 -1,即不限制)。
清理逻辑:
  • 累计分区所有 LogSegment 的大小,若超过 log.retention.bytes,则从最老的段开始删除,直至总大小达标。

3. 清理触发时机

  • 定时任务:日志管理器每 log.retention.check.interval.ms 执行一次清理检查。
  • LogSegment 滚动时:新段创建后,自动检查是否需要清理老段。

日志刷写与恢复

1. 日志刷写(Flush)

为平衡性能与可靠性,Kafka 并非每条消息写入后立即刷盘,而是通过以下机制控制:

  • 定时刷写log.flush.interval.ms(默认 300000 毫秒,即 5 分钟)。
  • 定量刷写log.flush.interval.messages(默认 Long.MaxValue,即不限制)。

刷写由 LogFlusher 线程负责,将内存中的消息批量写入磁盘,避免频繁 IO 操作。

2. 日志恢复

Broker 重启时,日志管理器需从磁盘加载日志并恢复状态:

  1. 扫描分区目录下的所有 LogSegment 文件,按起始偏移量排序。
  2. 加载 .index.timeindex 索引至内存,重建偏移量与时间戳映射。
  3. 检查日志完整性(如 CRC 校验),修复损坏的消息(若配置 log.checksum.type

欢迎关注我的其它发布渠道