0%

kafka主题

Kafka 主题(Topic)操作全解析:命令、源码与实践

Kafka 主题(Topic)是消息的逻辑容器,所有消息都需发送到指定主题。Kafka 提供了 kafka-topics.sh(或 .bat)脚本用于主题的创建、删除、查询和修改等操作。本文将详细介绍主题的核心操作命令、底层实现原理及最佳实践,帮助深入理解主题管理机制。

主题操作脚本与核心原理

脚本入口:kafka-topics

Kafka 主题操作的脚本是 kafka-topics.sh(Linux/Mac)或 kafka-topics.bat(Windows),其核心是调用 kafka.admin.TopicCommand 类执行操作:

1
2
# 脚本核心逻辑(kafka-topics.sh)
exec $(dirname $0)/kafka-run-class.sh kafka.admin.TopicCommand "$@"

TopicCommand 类的 main 方法解析命令参数,根据操作类型(创建、删除等)调用对应逻辑,底层通过 ZooKeeper 或 Kafka Admin API 交互。

主题创建的底层流程

主题创建分为客户端发起服务端处理两个阶段:

  1. 客户端阶段
    • 通过 kafka-topics 脚本发送创建请求,包含主题名、分区数、副本数等参数。
    • 客户端将主题元数据(如分区 - 副本分配方案)写入 ZooKeeper 的 /brokers/topics/[topic] 节点。
  2. 服务端阶段
    • Kafka 控制器(Controller)监听 ZooKeeper 节点变化,发现新主题后触发创建流程。
    • 控制器在指定 Broker 上创建分区目录(如 topic-0topic-1),并初始化 Leader 和 Follower 副本。

核心主题操作命令

创建主题(—create)

创建主题需指定分区数、副本数和 Kafka 集群地址,核心参数:

  • --bootstrap-server:Kafka 集群地址(替代旧版 --zookeeper 参数)。
  • --partitions:分区数(必选,决定并行处理能力)。
  • --replication-factor:副本数(必选,需 ≤ Broker 数量)。
  • --topic:主题名。
  • --config:可选,主题级别的配置(如最大消息大小)。
示例:创建主题
1
2
3
4
5
6
7
8
9
10
11
12
13
# Linux/Mac
./kafka-topics.sh --create \
--bootstrap-server localhost:9092 \
--partitions 3 \
--replication-factor 1 \
--topic kafka-action

# Windows
kafka-topics.bat --create ^
--bootstrap-server localhost:9092 ^
--partitions 3 ^
--replication-factor 1 ^
--topic kafka-action
  • 成功输出Created topic kafka-action.
  • 结果验证
    主题创建后,Kafka 数据目录(log.dirs 配置)会生成分区文件夹(如 kafka-action-0kafka-action-1)。

查看主题列表(—list)

列出集群中所有主题:

1
2
3
4
5
6
# 查看所有主题
./kafka-topics.sh --list --bootstrap-server localhost:9092

# 输出示例
kafka-action
test-topic

查看主题详情(—describe)

查看主题的分区分布、副本状态等详细信息:

1
2
3
4
5
6
7
8
# 查看指定主题详情
./kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic kafka-action

# 输出示例
Topic: kafka-action PartitionCount: 3 ReplicationFactor: 1 Configs:
Topic: kafka-action Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: kafka-action Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: kafka-action Partition: 2 Leader: 0 Replicas: 0 Isr: 0
  • 字段说明:
    • Leader:负责读写的副本所在 Broker ID。
    • Replicas:所有副本的 Broker ID 列表。
    • Isr:与 Leader 同步的副本列表(In-Sync Replicas)。
高级查询:
  • 查看所有主题详情:--describe(不加 --topic)。

  • 查看异常分区:

    1
    2
    3
    4
    5
    # 查看同步延迟的分区(under-replicated)
    ./kafka-topics.sh --describe --bootstrap-server localhost:9092 --under-replicated-partitions

    # 查看无Leader的分区
    ./kafka-topics.sh --describe --bootstrap-server localhost:9092 --unavailable-partitions

修改主题(—alter)

支持增加分区修改配置(注意:分区数只能增加,不能减少)。

(1)增加分区
1
2
3
4
5
# 将 kafka-action 的分区数从3增加到5
./kafka-topics.sh --alter \
--bootstrap-server localhost:9092 \
--topic kafka-action \
--partitions 5
  • 限制:分区数增加后,原有消息的分区分布不变,新消息按新分区策略分配。
(2)修改主题配置

注意kafka-topics 的配置修改功能已 deprecated,推荐使用 kafka-configs.sh

1
2
3
4
5
6
7
8
9
10
# 使用 kafka-configs 调整最大消息大小(200KB)
./kafka-configs.sh --alter \
--bootstrap-server localhost:9092 \
--topic kafka-action \
--add-config max.message.bytes=204800

# 验证配置
./kafka-configs.sh --describe \
--bootstrap-server localhost:9092 \
--topic kafka-action
  • 常用配置:
    • max.message.bytes:单条消息最大字节数(默认 1MB)。
    • retention.ms:消息保留时间(默认 7 天)。

删除主题(—delete)

删除主题需确保 Broker 配置 delete.topic.enable=true(默认 true 从 Kafka 1.0 开始):

1
2
3
4
5
6
7
# 删除主题
./kafka-topics.sh --delete \
--bootstrap-server localhost:9092 \
--topic kafka-action

# 输出示例
Topic kafka-action is marked for deletion.
  • 原理:删除操作会标记主题为 “待删除”,控制器随后清理分区目录和 ZooKeeper 元数据。

自动创建主题

Kafka 默认开启自动创建主题(auto.create.topics.enabled=true),当生产者发送消息到不存在的主题时,会自动创建主题,默认配置:

  • 分区数:num.partitions(默认 1)。
  • 副本数:default.replication.factor(默认 1)。

建议:生产环境关闭自动创建(auto.create.topics.enabled=false),避免意外创建无用主题。

最佳实践

  1. 分区数设计
    • 分区数决定并行度,建议根据吞吐量需求设置(如每分区目标吞吐量 1000-5000 条 / 秒)。
    • 分区数过多会增加元数据管理开销,过少则无法充分利用集群资源。
  2. 副本数设计
    • 生产环境副本数 ≥ 2,确保单 Broker 故障时数据不丢失。
    • 副本数 ≤ Broker 数量(否则无法分配)。
  3. 配置管理
    • 核心主题单独配置(如更长的 retention.ms),非核心主题使用默认配置。
    • 通过 kafka-configs.sh 统一管理配置,避免使用 deprecated 命令。
  4. 主题命名规范
    • 采用 业务域-功能-环境 格式(如 order-payment-prod),便于管理和监控

欢迎关注我的其它发布渠道

表情 | 预览
快来做第一个评论的人吧~
Powered By Valine
v1.3.10