0%

kafka启动和关闭

Kafka 启动与关闭全流程详解

Kafka 是一个依赖 ZooKeeper 的分布式消息系统,其启动过程涉及 ZooKeeper 初始化、Kafka 核心组件加载等步骤。本文将详细介绍 Kafka 及依赖的 ZooKeeper 的启动配置、操作步骤、源码级启动流程,以及安全关闭的方法,帮助理解其运行机制。

前置依赖:ZooKeeper 启动

Kafka 依赖 ZooKeeper 存储集群元数据(如 Broker 信息、主题配置、消费者偏移量等),需先启动 ZooKeeper。Kafka 自带 ZooKeeper 组件,推荐使用自带版本以避免版本兼容问题。

ZooKeeper 配置文件(zookeeper.properties)

Kafka 的 ZooKeeper 配置文件位于 config/zookeeper.properties,核心配置如下:

1
2
3
4
5
6
7
8
# 快照文件存储目录(默认/tmp/zookeeper,建议修改为持久化路径)
dataDir=E:\\zookeeper\\data
# 事务日志存储目录(独立设置可提升性能)
dataLogDir=E:\\zookeeper\\logs
# 客户端连接端口
clientPort=2181
# 允许客户端最大连接数(0表示无限制)
maxClientCnxns=0
  • 注意:路径需避免包含空格,否则可能导致启动失败(报错 “系统找不到指定的路径”)。

启动 ZooKeeper

(1)Windows 环境

进入 Kafka 安装目录的 bin/windows 文件夹,执行启动命令:

1
2
# 启动 ZooKeeper(指定配置文件)
zookeeper-server-start ../../config/zookeeper.properties
(2)Linux/Mac 环境

进入 bin 目录:

1
2
# 启动 ZooKeeper
./zookeeper-server-start.sh ../config/zookeeper.properties
(3)验证启动

ZooKeeper 启动成功后,日志会显示 binding to port 0.0.0.0/0.0.0.0:2181,表示已监听 2181 端口。

Kafka 启动流程

Kafka 核心配置(server.properties)

启动前需确保 config/server.properties 配置正确,关键配置包括:

  • broker.id=0(Broker 唯一标识)
  • log.dirs=/tmp/kafka-logs(消息日志存储路径)
  • zookeeper.connect=localhost:2181(ZooKeeper 地址)

启动 Kafka 服务

(1)基本启动命令
  • Windowsbin/windows 目录):

    1
    2
    # 启动 Kafka(指定配置文件)
    kafka-server-start ../../config/server.properties
  • Linux/Macbin 目录):

    1
    ./kafka-server-start.sh ../config/server.properties
(2)后台运行

添加 -daemon 参数可让 Kafka 在后台运行(不占用终端):

1
2
3
4
5
# Windows 后台启动
kafka-server-start -daemon ../../config/server.properties

# Linux/Mac 后台启动
./kafka-server-start.sh -daemon ../config/server.properties

启动脚本解析(kafka-server-start)

启动脚本的核心逻辑是加载配置、设置 JVM 参数,并执行 Kafka 主类 kafka.Kafka

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 检查参数(必须指定配置文件)
if [ $# -lt 1 ]; then
echo "USAGE: $0 [-daemon] server.properties"
exit 1
fi

# 配置日志(默认使用 config/log4j.properties)
if [ "x$KAFKA_LOG4J_OPTS" = "x" ]; then
export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/log4j.properties"
fi

# 配置 JVM 内存(默认 1G 堆内存)
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
fi

# 执行 Kafka 主类
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"

源码级启动流程(Kafka 核心组件初始化)

Kafka 启动的核心逻辑在 kafka.Kafka 类的 main 方法中,最终调用 KafkaServer.startup() 完成组件初始化,步骤如下:

(1)初始化 ZooKeeper 连接

创建 ZooKeeper 客户端,在 ZooKeeper 中创建元数据节点(如 /brokers/ids/consumers/config 等),用于存储 Broker 信息、主题配置等。

(2)生成集群 ID(cluster.id)

生成全局唯一的集群 ID(UUID),存储在 ZooKeeper 的 /cluster/id 节点,标识整个 Kafka 集群。

(3)启动核心组件
  • 调度器(KafkaScheduler):基于线程池的定时任务管理器,负责日志清理、副本同步等后台任务。
  • 日志管理器(LogManager):管理消息日志的存储与刷盘,初始化 log.dirs 配置的目录。
  • 副本管理器(ReplicaManager):管理分区副本的同步与 Leader 选举,确保数据可靠性。
  • 控制器(KafkaController):协调集群中 Broker 的状态(如 Leader 选举、分区分配),每个集群有一个 Leader 控制器。
  • 组协调器(GroupCoordinator):管理消费者组的重平衡与偏移量提交,存储偏移量到 __consumer_offsets 主题。
  • 网络服务(SocketServer):启动 NIO 服务器,监听客户端连接(默认 9092 端口)。
(4)状态变迁

Kafka Broker 启动过程中,状态从 Not RunningStartingRunningAsBroker,最终日志输出 started 表示启动完成。

验证 Kafka 启动成功

(1)查看日志

Kafka 启动日志默认输出到控制台,成功启动会显示:

1
[KafkaServer id=0] started
(2)通过 ZooKeeper 验证

使用 Kafka 自带的 zookeeper-shell 查看元数据:

1
2
3
4
5
6
7
8
9
10
# 进入 ZooKeeper 客户端(Windows)
zookeeper-shell localhost:2181

# 查看 Broker 列表(应包含 broker.id=0
ls /brokers/ids
# 输出:[0]

# 查看控制器信息(Leader 控制器的 Broker ID)
get /controller
# 输出示例:{"version":1,"brokerid":0,"timestamp":"1589255159885"}

Kafka 日志配置

Kafka 日志由 config/log4j.properties 配置,主要日志文件及用途如下:

日志文件 用途
server.log Broker 运行主日志(核心操作记录)
controller.log 控制器(KafkaController)操作日志
state-change.log 分区状态变更日志(如 Leader 切换)
kafka-request.log 客户端请求日志(包含请求类型、耗时等)
log-cleaner.log 日志清理线程操作日志

Kafka 关闭流程

关闭命令

(1)Windows 环境(bin/windows 目录):
1
kafka-server-stop
(2)Linux/Mac 环境(bin 目录):
1
./kafka-server-stop.sh

关闭脚本解析(kafka-server-stop)

关闭脚本通过查找 Kafka 进程并发送终止信号(默认 TERM)实现安全关闭:

1
2
3
4
5
6
7
8
9
10
11
12
13
# 查找 Kafka 进程 PID(基于类名 kafka.Kafka)
if [[ $(uname -s) == "OS/390" ]]; then
PIDS=$(ps -A -o pid,jobname,comm | grep -i "KAFKSTRT" | grep java | awk '{print $1}')
else
PIDS=$(ps ax | grep -i 'kafka\.Kafka' | grep java | grep -v grep | awk '{print $1}')
fi

# 发送终止信号
if [ -z "$PIDS" ]; then
echo "No kafka server to stop"
else
kill -s $SIGNAL $PIDS # SIGNAL 默认为 TERM
fi
  • 安全关闭:Kafka 收到 TERM 信号后,会执行优雅关闭(如提交日志、释放资源),状态从 RunningAsBrokerBrokerShuttingDownNot Running

强制关闭(谨慎使用)

若正常关闭失败,可强制杀死进程(可能导致数据不一致):

1
2
3
4
# Linux/Mac 强制关闭
kill -9 <Kafka进程PID>

# Windows 强制关闭(通过任务管理器结束 java.exe 进程)

欢迎关注我的其它发布渠道

表情 | 预览
快来做第一个评论的人吧~
Powered By Valine
v1.3.10