0%

kafka API操作之生产者

Kafka 生产者 API 详解与实践

Kafka 生产者(Producer)是消息的发送端,负责将业务数据发送到 Kafka 集群。通过 Kafka 提供的 Producer API,我们可以灵活配置生产者行为,支持同步 / 异步发送、自定义分区策略、消息压缩等功能。本文将详细介绍生产者的实现步骤、核心配置及不同发送方式的代码示例

生产者核心概念与配置

核心配置参数

创建生产者时,需通过 Properties 对象配置关键参数,其中必选参数有三个:

参数名 作用 示例值
bootstrap.servers 指定 Kafka 集群地址(多个用逗号分隔) localhost:9092
key.serializer 消息键(Key)的序列化类(需实现 Serializer 接口) org.apache.kafka.common.serialization.StringSerializer
value.serializer 消息值(Value)的序列化类 org.apache.kafka.common.serialization.StringSerializer

常用可选参数(参考 ProducerConfig 类):

参数名 作用 默认值
acks 消息确认级别(0:不确认;1:Leader 确认;-1/all:Leader + 所有 ISR 副本确认) 1
retries 发送失败后的重试次数 0
batch.size 批次大小(达到该值后批量发送,单位:字节) 16384(16KB)
linger.ms 批处理等待时间(若未达 batch.size,超时后也会发送) 0(立即发送)
buffer.memory 发送缓冲区大小(消息暂存此处等待发送) 33554432(32MB)
compression.type 消息压缩算法(none/gzip/snappy/lz4) none

生产者工作原理

Kafka 生产者发送消息的流程如下:

  1. 消息被封装为 ProducerRecord(包含主题、键、值、分区等信息)。
  2. 消息经序列化后进入发送缓冲区RecordAccumulator),按分区聚合为批次(RecordBatch)。
  3. 后台 Sender 线程从缓冲区拉取批次,发送到目标 Broker 的对应分区。
  4. 收到 Broker 确认后,触发回调(若配置)或返回结果。

生产者实现方式

单线程生产者

单线程生产者适用于消息量不大的场景,实现步骤如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class KafkaSingleThreadProducer {
// 消息总数
private static final int MSG_COUNT = 5;
// 目标主题
private static final String TOPIC_NAME = "test-topic";
// 生产者实例
private static KafkaProducer<String, String> producer;

static {
// 初始化生产者配置
Properties props = initConfig();
producer = new KafkaProducer<>(props);
}

/**
* 初始化生产者配置
*/
private static Properties initConfig() {
Properties props = new Properties();
// 必选配置
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

// 可选配置
props.put(ProducerConfig.ACKS_CONFIG, "all"); // 最高可靠性确认
props.put(ProducerConfig.RETRIES_CONFIG, 3); // 失败重试3次
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); // 16KB批次
props.put(ProducerConfig.LINGER_MS_CONFIG, 100); // 最多等待100ms批量发送
return props;
}

/**
* 同步发送消息
* (通过Future.get()阻塞等待结果)
*/
public static void sendSync() {
try {
for (int i = 0; i < MSG_COUNT; i++) {
// 创建消息(主题、键、值)
ProducerRecord<String, String> record = new ProducerRecord<>(
TOPIC_NAME,
"sync-key-" + i,
"sync-value-" + i
);

// 同步发送(阻塞直到收到确认)
RecordMetadata metadata = producer.send(record).get();
System.out.printf("同步发送成功 - 分区:%d,偏移量:%d%n",
metadata.partition(), metadata.offset());

Thread.sleep(500); // 模拟业务间隔
}
} catch (InterruptedException | ExecutionException e) {
System.err.println("同步发送失败:" + e.getMessage());
} finally {
producer.close(); // 关闭生产者,释放资源
}
}

/**
* 异步发送消息
* (通过Callback回调处理结果,非阻塞)
*/
public static void sendAsync() {
try {
for (int i = 0; i < MSG_COUNT; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>(
TOPIC_NAME,
"async-key-" + i,
"async-value-" + i
);

// 异步发送,通过回调处理结果
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
// 发送成功
System.out.printf("异步发送成功 - 分区:%d,偏移量:%d%n",
metadata.partition(), metadata.offset());
} else {
// 发送失败
System.err.println("异步发送失败:" + exception.getMessage());
}
}
});

Thread.sleep(500);
}
} catch (InterruptedException e) {
System.err.println("线程中断:" + e.getMessage());
} finally {
producer.close();
}
}

public static void main(String[] args) {
// 测试同步发送
System.out.println("===== 同步发送 =====");
sendSync();

// 测试异步发送
System.out.println("\n===== 异步发送 =====");
sendAsync();
}
}

关键区别

  • 同步发送:通过 producer.send(record).get() 阻塞等待结果,确保消息发送状态,但吞吐量较低。
  • 异步发送:通过 Callback 回调非阻塞处理结果,吞吐量更高,适合高并发场景。

多线程生产者

当消息量较大时,可使用多线程生产者提高发送效率。注意:KafkaProducer 是线程安全的,可被多个线程共享,无需为每个线程创建单独实例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class KafkaMultiThreadProducer {
private static final int MSG_COUNT = 10;
private static final String TOPIC_NAME = "test-topic";
private static KafkaProducer<String, String> producer;

static {
Properties props = initConfig();
producer = new KafkaProducer<>(props);
}

private static Properties initConfig() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.ACKS_CONFIG, "1");
props.put(ProducerConfig.RETRIES_CONFIG, 2);
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy"); // 启用snappy压缩
return props;
}

/**
* 消息发送任务(实现Runnable)
*/
static class ProducerTask implements Runnable {
private ProducerRecord<String, String> record;

public ProducerTask(ProducerRecord<String, String> record) {
this.record = record;
}

@Override
public void run() {
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
System.err.println("线程 " + Thread.currentThread().getId() + " 发送失败:" + exception.getMessage());
} else {
System.out.printf("线程 %d 发送成功 - 分区:%d,偏移量:%d%n",
Thread.currentThread().getId(),
metadata.partition(),
metadata.offset());
}
}
});
}
}

public static void main(String[] args) {
// 创建线程池(4个线程)
ExecutorService executor = Executors.newFixedThreadPool(4);

try {
for (int i = 0; i < MSG_COUNT; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>(
TOPIC_NAME,
"multi-key-" + i,
"multi-value-" + i
);
// 提交发送任务
executor.submit(new ProducerTask(record));
}
} finally {
executor.shutdown(); // 关闭线程池
producer.close(); // 关闭生产者
}
}
}

优势

  • 利用线程池并行处理发送任务,提高吞吐量。
  • 共享单个 KafkaProducer 实例,减少资源消耗(避免重复创建连接)。

自定义分区策略

Kafka 默认根据消息键(Key)的哈希值分配分区,若需自定义分区规则(如按业务类型分配),可实现 Partitioner 接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;

/**
* 自定义分区器:按消息值中包含的"order"或"log"关键词分配到不同分区
*/
public class CustomPartitioner implements Partitioner {

@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// 获取主题的分区总数
int numPartitions = cluster.partitionsForTopic(topic).size();

String valueStr = value.toString();
if (valueStr.contains("order")) {
// 包含"order"的消息分配到分区0
return 0 % numPartitions;
} else if (valueStr.contains("log")) {
// 包含"log"的消息分配到分区1
return 1 % numPartitions;
} else {
// 其他消息按Key哈希分配
return (key == null) ? 0 : Math.abs(key.hashCode() % numPartitions);
}
}

@Override
public void close() {
// 关闭资源(如需要)
}

@Override
public void configure(Map<String, ?> configs) {
// 读取配置(如需要)
}
}

使用自定义分区器
在生产者配置中添加:

1
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.example.CustomPartitioner");

最佳实践

  1. 选择合适的发送方式
    • 同步发送:适用于对消息可靠性要求高、吞吐量要求低的场景(如金融交易)。
    • 异步发送:适用于高吞吐量场景(如日志收集),通过回调处理异常。
  2. 优化性能
    • 启用批量发送(batch.size + linger.ms)和压缩(compression.type)。
    • 合理设置缓冲区大小(buffer.memory),避免缓冲区满导致阻塞。
  3. 确保可靠性
    • 关键业务设置 acks=all + 适当的重试次数(retries)。
    • 配合回调函数处理发送失败,实现消息重试或落库补偿。
  4. 线程安全
    • KafkaProducer 是线程安全的,多线程场景下应共享实例而非创建多个

欢迎关注我的其它发布渠道