0%

kafka镜像操作

Kafka 镜像操作详解:跨集群数据同步

Kafka 镜像操作(MirrorMaker)是实现跨集群数据同步的核心工具,通过消费源集群的消息并生产到目标集群,实现两个 Kafka 集群之间的数据镜像。这种机制适用于灾备、数据迁移、多区域部署等场景。本文将详细介绍 MirrorMaker 的工作原理、配置方法及操作步骤。

镜像操作核心原理

MirrorMaker 的工作机制本质是 “消费 - 生产” 模式:

  1. 消费者:从源集群的指定主题拉取消息。
  2. 生产者:将拉取的消息推送到目标集群的同名(或指定)主题。

通过这种方式,目标集群会实时同步源集群的消息,形成 “镜像”。MirrorMaker 支持通过正则表达式(--whitelist)过滤需要同步的主题,灵活控制同步范围。

镜像操作工具:kafka-mirror-maker.sh

Kafka 提供 kafka-mirror-maker.sh(Linux/Mac)或 kafka-mirror-maker.bat(Windows)脚本执行镜像操作,核心参数如下:

参数 作用
--consumer.config 源集群消费者配置文件路径(必传)。
--producer.config 目标集群生产者配置文件路径(必传)。
--whitelist 正则表达式,指定需要同步的源集群主题(如 `test-mirror topic.*`)。
--blacklist 正则表达式,指定无需同步的源集群主题(与 --whitelist 互斥)。
--num.streams 消费线程数(默认 1,增加可提升同步吞吐量)。

操作步骤

环境准备

  • 源集群(待同步数据的集群)和目标集群(接收同步数据的集群)已正常运行。
  • 确保源集群的主题在目标集群中已创建(可手动创建或配置自动创建)。

配置文件

(1)消费者配置文件(源集群)

创建 consumer-mirror.properties,配置源集群连接信息:

1
2
3
4
5
6
7
8
9
10
11
12
# 源集群 Broker 地址
bootstrap.servers=source-broker1:9092,source-broker2:9092

# 消费者组 ID(用于跟踪消费进度,确保断点续传)
group.id=mirror-maker-group

# 偏移量重置策略(首次同步从最早消息开始)
auto.offset.reset=earliest

# 自动提交偏移量(默认 true,也可手动提交)
enable.auto.commit=true
auto.commit.interval.ms=5000
(2)生产者配置文件(目标集群)

创建 producer-mirror.properties,配置目标集群连接信息:

1
2
3
4
5
6
7
8
9
10
11
12
# 目标集群 Broker 地址
bootstrap.servers=target-broker1:9092,target-broker2:9092

# 消息确认级别(确保数据可靠写入目标集群)
acks=all

# 失败重试次数
retries=3

# 批量发送配置(提升吞吐量)
batch.size=16384
linger.ms=50

启动镜像同步

执行 kafka-mirror-maker.sh,指定配置文件和待同步主题:

1
2
3
4
5
6
# 同步源集群中所有匹配 "test-mirror" 的主题
./kafka-mirror-maker.sh \
--consumer.config consumer-mirror.properties \
--producer.config producer-mirror.properties \
--whitelist "test-mirror" \
--num.streams 3 # 3 个消费线程并行同步
  • 参数说明:
    • --whitelist "test-mirror":仅同步源集群中名为 test-mirror 的主题。
    • --num.streams 3:启动 3 个消费者线程,对应 3 个生产者线程,提升同步效率。

验证同步结果

(1)在源集群发送消息
1
2
3
4
5
6
# 源集群生产者发送消息
./kafka-console-producer.sh \
--bootstrap-server source-broker1:9092 \
--topic test-mirror
>hello mirror
>this is a test
(2)在目标集群消费消息
1
2
3
4
5
6
7
# 目标集群消费者接收消息
./kafka-console-consumer.sh \
--bootstrap-server target-broker1:9092 \
--topic test-mirror \
--from-beginning
hello mirror
this is a test

若目标集群成功接收消息,说明镜像同步生效。

高级配置与优化

主题名称映射

默认情况下,目标集群的主题名称与源集群一致。如需修改,可通过 --rewrite.topics 参数实现映射(如 source-topic:target-topic):

1
2
3
4
5
./kafka-mirror-maker.sh \
--consumer.config consumer-mirror.properties \
--producer.config producer-mirror.properties \
--whitelist "source-topic" \
--rewrite.topics "source-topic:target-topic"

吞吐量优化

  • 增加 --num.streams 数量(如 5-10),利用多线程并行同步。
  • 调整生产者配置:增大 batch.size(如 65536)和 linger.ms(如 100),提升批量发送效率。

可靠性保障

  • 生产者配置 acks=all,确保消息被目标集群所有副本确认。
  • 消费者配置 enable.auto.commit=false,结合手动提交偏移量(需自定义 MirrorMaker 逻辑),避免消息丢失。

过滤不需要的消息

通过 --blacklist 排除无需同步的主题:

1
2
3
4
5
# 同步所有主题,除了 "internal-*"
./kafka-mirror-maker.sh \
--consumer.config consumer-mirror.properties \
--producer.config producer-mirror.properties \
--blacklist "internal-.*"

适用场景

  1. 灾备集群:将主集群数据同步到灾备集群,主集群故障时可切换到灾备集群。
  2. 数据迁移:跨数据中心迁移 Kafka 集群,通过 MirrorMaker 逐步同步数据。
  3. 多区域部署:不同区域的集群通过镜像同步实现数据一致性,降低跨区域访问延迟

欢迎关注我的其它发布渠道