Kafka 生产者 API 详解与实践
Kafka 生产者(Producer)是消息的发送端,负责将业务数据发送到 Kafka 集群。通过 Kafka 提供的 Producer API,我们可以灵活配置生产者行为,支持同步 / 异步发送、自定义分区策略、消息压缩等功能。本文将详细介绍生产者的实现步骤、核心配置及不同发送方式的代码示例
生产者核心概念与配置
核心配置参数
创建生产者时,需通过 Properties 对象配置关键参数,其中必选参数有三个:
| 参数名 | 作用 | 示例值 |
|---|---|---|
bootstrap.servers |
指定 Kafka 集群地址(多个用逗号分隔) | localhost:9092 |
key.serializer |
消息键(Key)的序列化类(需实现 Serializer 接口) |
org.apache.kafka.common.serialization.StringSerializer |
value.serializer |
消息值(Value)的序列化类 | org.apache.kafka.common.serialization.StringSerializer |
常用可选参数(参考 ProducerConfig 类):
| 参数名 | 作用 | 默认值 |
|---|---|---|
acks |
消息确认级别(0:不确认;1:Leader 确认;-1/all:Leader + 所有 ISR 副本确认) | 1 |
retries |
发送失败后的重试次数 | 0 |
batch.size |
批次大小(达到该值后批量发送,单位:字节) | 16384(16KB) |
linger.ms |
批处理等待时间(若未达 batch.size,超时后也会发送) |
0(立即发送) |
buffer.memory |
发送缓冲区大小(消息暂存此处等待发送) | 33554432(32MB) |
compression.type |
消息压缩算法(none/gzip/snappy/lz4) | none |
生产者工作原理
Kafka 生产者发送消息的流程如下:
