0%

kafka安全机制

Kafka 安全机制详解:身份认证与权限控制

Kafka 作为分布式消息系统,需保障数据传输的安全性和访问的可控性。其安全机制主要包含身份认证(验证连接方身份)和权限控制(限制操作范围)两部分,可通过配置实现客户端与 Broker、Broker 之间及 Broker 与 ZooKeeper 之间的安全通信。本文将详细介绍这两种机制的实现方式及操作步骤。

身份认证

身份认证用于验证连接发起方的合法性,防止未授权节点接入集群。Kafka 支持多种认证机制,如 SSL、SASL(含 PLAIN、SCRAM 等),其中 SASL/PLAIN 因配置简单被广泛用于内部环境。

SASL/PLAIN 认证原理

SASL(Simple Authentication and Security Layer)是一种通用认证框架,PLAIN 是其最简单的机制,基于用户名 / 密码验证。适用于客户端与 Broker、Broker 之间及 Broker 与 ZooKeeper 的认证。

配置步骤(SASL/PLAIN)

(1)环境准备
  • 确保 Kafka 集群(含 ZooKeeper)已安装并正常运行。
  • 准备 3 类 JAAS(Java Authentication and Authorization Service)配置文件:ZooKeeper 服务端、Kafka 服务端、Kafka 客户端。
(2)配置 ZooKeeper 认证
① 修改 zookeeper.properties

开启 SASL 认证,添加以下配置:

1
2
3
4
5
6
# 启用 SASL 认证插件
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
# 要求客户端必须使用 SASL 认证
requireClientAuthScheme=sasl
# 认证信息刷新间隔(毫秒)
jaasLoginRenew=3600000
② 创建 ZooKeeper JAAS 文件(kafka_zk_jaas.conf

定义 ZooKeeper 服务端的认证用户(如管理员 admin):

1
2
3
4
5
6
Server {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin" # ZooKeeper 内部通信用户名
password="admin" # 密码
user_admin="admin"; # 定义用户 admin 的密码(格式:user_<用户名>="<密码>")
};
③ 配置 ZooKeeper 启动参数

zookeeper-server-start.sh(或 .bat)中添加 JVM 参数,指定 JAAS 文件路径:

1
export KAFKA_OPTS="-Djava.security.auth.login.config=/path/to/kafka_zk_jaas.conf"
(3)配置 Kafka Broker 认证
① 修改 server.properties

开启 SASL 监听,配置 Broker 间通信协议:

1
2
3
4
5
6
7
8
9
10
# 监听地址(SASL_PLAINTEXT 表示无加密的 SASL 认证)
listeners=SASL_PLAINTEXT://localhost:9092
# Broker 之间的通信协议
security.inter.broker.protocol=SASL_PLAINTEXT
# 启用的 SASL 机制(此处为 PLAIN)
sasl.enabled.mechanisms=PLAIN
# Broker 间认证使用的 SASL 机制
sasl.mechanism.inter.broker.protocol=PLAIN
# 超级管理员(拥有所有权限,格式:User:<用户名>)
super.users=User:admin
② 创建 Kafka 服务端 JAAS 文件(kafka_server_jaas.conf

定义 Broker 自身认证信息及连接 ZooKeeper 的认证信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# Kafka 服务端认证配置(Broker 之间及客户端连接)
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin" # Broker 间通信的用户名
password="admin" # 密码
user_admin="admin" # 定义用户 admin(用于客户端或 Broker 连接)
user_morton="kafka"; # 定义普通用户 morton(密码 kafka)
};

# Kafka 连接 ZooKeeper 的认证配置
Client {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin" # 对应 ZooKeeper 的 admin 用户
password="admin";
};
③ 配置 Kafka Broker 启动参数

kafka-server-start.sh 中添加 JVM 参数,指定服务端 JAAS 文件:

1
export KAFKA_OPTS="-Djava.security.auth.login.config=/path/to/kafka_server_jaas.conf"
(4)配置 Kafka 客户端认证(生产者 / 消费者)
① 创建客户端 JAAS 文件(kafka_client_jaas.conf

定义客户端连接 Kafka 时的认证用户(如使用 adminmorton):

1
2
3
4
5
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="morton" # 客户端用户名(需与服务端定义的 user_<用户名> 匹配)
password="kafka"; # 对应密码
};
② 配置客户端启动参数

kafka-console-producer.shkafka-console-consumer.sh 中添加 JVM 参数:

1
export KAFKA_OPTS="-Djava.security.auth.login.config=/path/to/kafka_client_jaas.conf"
③ 启动客户端时指定安全协议

通过参数或配置文件指定 SASL 协议:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 启动生产者(直接指定参数)
kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic acls-test \
--producer-property security.protocol=SASL_PLAINTEXT \
--producer-property sasl.mechanism=PLAIN

# 或通过配置文件(producer.properties)
# producer.properties 中添加:
# security.protocol=SASL_PLAINTEXT
# sasl.mechanism=PLAIN
kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic acls-test \
--producer.config producer.properties

权限控制

权限控制用于限制已认证用户的操作范围(如是否允许读写某主题),Kafka 自带 SimpleAclAuthorizer 实现基于 ACL(Access Control List)的权限管理,ACL 规则存储在 ZooKeeper 中。

核心概念

(1)权限类型

Kafka 定义了多种操作权限,覆盖主题、集群等资源:

权限 描述
READ 消费消息、查看消费组信息等读操作
WRITE 发送消息、创建主题(特定场景)等写操作
DELETE 删除主题、消费组等操作
CREATE 创建主题、分区等操作
ALTER 修改主题配置、分区等操作
DESCRIBE 查看主题元数据(如分区数、副本分布)
ClusterAction 集群级操作(如更新元数据、关闭 Broker)
ALL 包含上述所有权限
(2)ACL 存储
  • ACL 规则存储在 ZooKeeper 的 /kafka-acl 节点。
  • 变更记录存储在 /kafka-acl-changes 节点,用于追踪权限修改历史。

启用权限控制

(1)修改 server.properties

指定权限控制实现类,并配置默认权限策略:

1
2
3
4
5
6
# 启用 SimpleAclAuthorizer
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer

# 未配置 ACL 时是否允许所有用户访问(默认 false,仅超级用户可操作)
# 建议生产环境设为 false,测试环境可设为 true
allow.everyone.if.no.acl.found=false
(2)重启 Kafka Broker

使配置生效。

权限操作(kafka-acls.sh)

kafka-acls.sh 脚本用于管理 ACL 规则,支持添加、查询、删除权限。

(1)查询权限

查看当前所有 ACL 规则:

1
2
3
kafka-acls.sh \
--authorizer-properties zookeeper.connect=localhost:2181 \
--list

查看指定主题的权限:

1
2
3
4
kafka-acls.sh \
--authorizer-properties zookeeper.connect=localhost:2181 \
--list \
--topic acls-test
(2)添加权限

允许用户 morton 向主题 acls-test 发送消息(包含 WRITEDESCRIBE 权限):

1
2
3
4
5
6
kafka-acls.sh \
--authorizer-properties zookeeper.connect=localhost:2181 \
--add \
--allow-principal User:morton \ # 授权用户(格式:User:<用户名>)
--producer \ # 等价于授予 WRITE + DESCRIBE + CREATE 权限
--topic acls-test # 目标主题

允许用户 morton 消费主题 acls-test(包含 READDESCRIBE 权限):

1
2
3
4
5
6
7
kafka-acls.sh \
--authorizer-properties zookeeper.connect=localhost:2181 \
--add \
--allow-principal User:morton \
--consumer \ # 等价于授予 READ + DESCRIBE 权限
--topic acls-test \
--group test-group # 允许访问的消费组(* 表示所有组)

指定具体权限(如仅允许 READ):

1
2
3
4
5
6
kafka-acls.sh \
--authorizer-properties zookeeper.connect=localhost:2181 \
--add \
--allow-principal User:morton \
--operation READ \ # 仅授予读权限
--topic acls-test
(3)删除权限

删除主题 acls-test 上所有权限:

1
2
3
4
kafka-acls.sh \
--authorizer-properties zookeeper.connect=localhost:2181 \
--remove \
--topic acls-test

删除指定用户的权限:

1
2
3
4
5
kafka-acls.sh \
--authorizer-properties zookeeper.connect=localhost:2181 \
--remove \
--allow-principal User:morton \
--topic acls-test

最佳实践

  1. 生产环境安全配置
    • 优先使用 SASL/SCRAMSSL 替代 PLAIN(PLAIN 密码明文传输,需配合加密)。
    • 超级用户(如 admin)仅用于管理,业务客户端使用普通用户。
  2. 权限最小化原则
    • 仅授予客户端必要权限(如生产者仅需 WRITE + DESCRIBE)。
    • 定期清理无用 ACL 规则,避免权限冗余。
  3. 监控与审计
    • 通过 ZooKeeper 监控 /kafka-acl-changes 节点,追踪权限变更。
    • 结合 Kafka 审计日志(如 log4j 配置)记录认证失败和权限拒绝事件。
  4. 集群内通信安全
    • Broker 之间通信启用 SASL 或 SSL,防止内部节点被篡改

欢迎关注我的其它发布渠道

表情 | 预览
快来做第一个评论的人吧~
Powered By Valine
v1.3.10