0%

Kafka 消费者详解:从命令行到源码实现

Kafka 消费者是消息处理的终端,负责从 Kafka 集群拉取(pull)消息并进行业务处理。与生产者的推(push)模式不同,消费者采用拉取模式主动获取消息,且通过消费者组(Consumer Group) 机制实现消息的负载均衡与重复消费控制。本文将从命令行工具、核心配置、工作原理到源码实现,全面解析 Kafka 消费者。

消费者核心概念

  1. 消费者组:多个消费者组成一个组,共同消费一个或多个主题。同一组内的消费者分工合作,每个分区只能被组内一个消费者消费(避免重复消费)。
  2. 消息拉取:消费者主动从 Broker 的分区拉取消息,而非 Broker 推送,灵活控制消费速度。
  3. 偏移量(Offset):记录消费者已消费到的位置,确保重启后从断点继续消费。偏移量存储在 Kafka 内部主题 __consumer_offsets 中(0.9 版本后)。
  4. 单播与广播:
    • 单播:同一条消息仅被同一消费者组的一个消费者消费(默认模式)。
    • 广播:不同消费者组可消费同一条消息(通过多组订阅实现)。

命令行消费者工具:kafka-console-consumer

Kafka 提供 kafka-console-consumer.sh(Linux/Mac)或 kafka-console-consumer.bat(Windows)工具,用于快速测试消费消息。

基本使用

(1)消费最新消息

从主题的最新位置(latest)开始消费,仅接收新写入的消息:

1
2
3
4
5
# Linux/Mac
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic

# Windows
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test-topic
(2)从开头消费所有消息

通过 --from-beginning 从主题的最早位置(earliest)开始消费历史消息:

阅读全文 »

Kafka 生产者深度解析:从命令行到源码实现

Kafka 生产者是消息流入 Kafka 集群的入口,负责将业务数据可靠、高效地发送到指定主题。无论是通过命令行工具快速发送消息,还是通过 KafkaProducer 客户端进行编程式集成,理解其工作机制、配置参数及底层流程都是优化性能和确保可靠性的关键。本文将从命令行工具、核心配置、性能测试到源码实现,全面解析 Kafka 生产者。

命令行生产者工具:kafka-console-producer

Kafka 提供 kafka-console-producer.sh(Linux/Mac)或 kafka-console-producer.bat(Windows)工具,用于快速通过命令行发送消息,适合测试和调试场景。

基本使用

(1)发送无 Key 消息
1
2
3
4
5
# Linux/Mac
./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test-topic

# Windows
kafka-console-producer.bat --bootstrap-server localhost:9092 --topic test-topic

输入消息并回车即可发送,每条输入会作为一条消息发送到 test-topic

(2)发送带 Key 消息

通过 --property parse.key=true 启用 Key 解析,Key 与 Value 用Tab 键分隔:

1
2
3
4
5
# 发送带 Key 的消息(Key 为 "user1",Value 为 "hello")
./kafka-console-producer.sh --bootstrap-server localhost:9092 \
--topic test-topic \
--property parse.key=true \
--property key.separator=$'\t' # 指定分隔符为 Tab(默认)

输入 user1<Tab>hello 并回车,消息 Key 为 user1,Value 为 hello

核心参数说明

参数 作用 默认值
--bootstrap-server Kafka 集群地址(替代旧版 --broker-list 无(必传)
--topic 目标主题名称 无(必传)
--property 自定义属性(如 parse.key=true 启用 Key 解析)
--producer-property 生产者配置(如 acks=all
--sync 同步发送消息(默认异步) false
--compression-codec 消息压缩算法(none/gzip/snappy 等) none

生产者核心配置参数

Kafka 生产者的行为由一系列配置参数控制,可通过配置文件或代码指定。核心参数如下:

阅读全文 »

Kafka 主题(Topic)操作全解析:命令、源码与实践

Kafka 主题(Topic)是消息的逻辑容器,所有消息都需发送到指定主题。Kafka 提供了 kafka-topics.sh(或 .bat)脚本用于主题的创建、删除、查询和修改等操作。本文将详细介绍主题的核心操作命令、底层实现原理及最佳实践,帮助深入理解主题管理机制。

主题操作脚本与核心原理

脚本入口:kafka-topics

Kafka 主题操作的脚本是 kafka-topics.sh(Linux/Mac)或 kafka-topics.bat(Windows),其核心是调用 kafka.admin.TopicCommand 类执行操作:

1
2
# 脚本核心逻辑(kafka-topics.sh)
exec $(dirname $0)/kafka-run-class.sh kafka.admin.TopicCommand "$@"

TopicCommand 类的 main 方法解析命令参数,根据操作类型(创建、删除等)调用对应逻辑,底层通过 ZooKeeper 或 Kafka Admin API 交互。

主题创建的底层流程

主题创建分为客户端发起服务端处理两个阶段:

  1. 客户端阶段
    • 通过 kafka-topics 脚本发送创建请求,包含主题名、分区数、副本数等参数。
    • 客户端将主题元数据(如分区 - 副本分配方案)写入 ZooKeeper 的 /brokers/topics/[topic] 节点。
  2. 服务端阶段
    • Kafka 控制器(Controller)监听 ZooKeeper 节点变化,发现新主题后触发创建流程。
    • 控制器在指定 Broker 上创建分区目录(如 topic-0topic-1),并初始化 Leader 和 Follower 副本。

核心主题操作命令

创建主题(—create)

创建主题需指定分区数、副本数和 Kafka 集群地址,核心参数:

阅读全文 »

Kafka 启动与关闭全流程详解

Kafka 是一个依赖 ZooKeeper 的分布式消息系统,其启动过程涉及 ZooKeeper 初始化、Kafka 核心组件加载等步骤。本文将详细介绍 Kafka 及依赖的 ZooKeeper 的启动配置、操作步骤、源码级启动流程,以及安全关闭的方法,帮助理解其运行机制。

前置依赖:ZooKeeper 启动

Kafka 依赖 ZooKeeper 存储集群元数据(如 Broker 信息、主题配置、消费者偏移量等),需先启动 ZooKeeper。Kafka 自带 ZooKeeper 组件,推荐使用自带版本以避免版本兼容问题。

ZooKeeper 配置文件(zookeeper.properties)

Kafka 的 ZooKeeper 配置文件位于 config/zookeeper.properties,核心配置如下:

1
2
3
4
5
6
7
8
# 快照文件存储目录(默认/tmp/zookeeper,建议修改为持久化路径)
dataDir=E:\\zookeeper\\data
# 事务日志存储目录(独立设置可提升性能)
dataLogDir=E:\\zookeeper\\logs
# 客户端连接端口
clientPort=2181
# 允许客户端最大连接数(0表示无限制)
maxClientCnxns=0
  • 注意:路径需避免包含空格,否则可能导致启动失败(报错 “系统找不到指定的路径”)。

启动 ZooKeeper

(1)Windows 环境

进入 Kafka 安装目录的 bin/windows 文件夹,执行启动命令:

1
2
# 启动 ZooKeeper(指定配置文件)
zookeeper-server-start ../../config/zookeeper.properties
(2)Linux/Mac 环境

进入 bin 目录:

1
2
# 启动 ZooKeeper
./zookeeper-server-start.sh ../config/zookeeper.properties
(3)验证启动

ZooKeeper 启动成功后,日志会显示 binding to port 0.0.0.0/0.0.0.0:2181,表示已监听 2181 端口。

Kafka 启动流程

Kafka 核心配置(server.properties)

启动前需确保 config/server.properties 配置正确,关键配置包括:

阅读全文 »

CyclicBarrier和CountDownLatch

这两个类都在jdk的并发包中,都可以用来表示代码运行到某个点上

两者的区别

  • CyclicBarrier表示达到一定数量的线程才会运行;CountDownLatch每来一个线程进行减一操作,直到0为止
  • CyclicBarrier只能唤起一个任务;CountDownLatch可以唤起多个任务
  • CyclicBarrier可重用;CountDownLatch不可重用,只能触发一次事件,值为0后就不可再用了
  • CyclicBarrier允许N个线程相互等待;CountDownLatch是允许1或N个线程等待其他线程完成执行

核心区别对比表

特性 CyclicBarrier CountDownLatch
计数器机制 初始值为 parties,每次 await() 减 1,归 0 后自动重置 初始值为 count,每次 countDown() 减 1,归 0 后不可用
线程协作模式 N 个线程互相等待,全部到达屏障后继续执行 1 个(或多个)线程等待其他线程完成操作
重用性 支持循环使用(通过 reset() 或自动重置) 一次性使用,计数器归 0 后无法重置
屏障动作 支持 Runnable 任务,在所有线程到达后执行 无特殊动作,仅唤醒等待线程
典型场景 并行计算的多阶段同步(如 MapReduce) 主线程等待多个子线程完成初始化
阅读全文 »