0%

kafka API操作之消费者

Kafka 消费者 API 详解与实践

Kafka 消费者(Consumer)是消息的接收端,负责从 Kafka 集群拉取并处理消息。与生产者相比,消费者的核心挑战在于可靠地消费消息(避免丢失或重复)、高效地分配分区负载以及灵活地管理消费位置。本文将详细解析 Kafka 消费者 API 的使用方法,包括配置、订阅方式、偏移量管理及多线程消费等关键场景。

消费者核心配置与初始化

必选配置参数

创建消费者时,需通过 Properties 配置以下核心参数:

参数名 作用 示例值
bootstrap.servers Kafka 集群地址 localhost:9092
group.id 消费者组 ID(同一组内的消费者共同消费主题,避免重复) order-consumer-group
key.deserializer 消息键(Key)的反序列化类 org.apache.kafka.common.serialization.StringDeserializer
value.deserializer 消息值(Value)的反序列化类 org.apache.kafka.common.serialization.StringDeserializer

关键可选配置

参数名 作用 默认值
enable.auto.commit 是否自动提交偏移量 true(自动提交)
auto.commit.interval.ms 自动提交偏移量的间隔(毫秒) 5000
auto.offset.reset 偏移量无效时的重置策略(earliest:从最早消息开始;latest:从最新消息开始) latest
fetch.min.bytes 拉取消息的最小字节数(不足时等待) 1
fetch.max.wait.ms 拉取消息的最大等待时间(毫秒) 500
max.poll.records 一次 poll() 调用返回的最大消息数 500

初始化消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Properties;

public class KafkaConsumerDemo {
private static KafkaConsumer<String, String> initConsumer() {
Properties props = new Properties();
// 必选配置
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

// 可选配置:关闭自动提交,使用手动提交
props.put("enable.auto.commit", "false");
// 偏移量无效时从最早消息开始消费
props.put("auto.offset.reset", "earliest");

return new KafkaConsumer<>(props);
}
}

订阅主题的四种方式

Kafka 提供了灵活的主题订阅方式,支持按主题名、正则表达式订阅,还可指定重平衡监听器。

按主题名订阅(基础方式)

通过 subscribe(Collection<String> topics) 订阅一个或多个主题:

1
2
3
4
5
6
7
8
9
10
11
12
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Collections;

public class TopicSubscription {
public static void main(String[] args) {
KafkaConsumer<String, String> consumer = KafkaConsumerDemo.initConsumer();
// 订阅单个主题
consumer.subscribe(Collections.singletonList("test-topic"));
// 订阅多个主题
// consumer.subscribe(Arrays.asList("topic1", "topic2"));
}
}

带重平衡监听器的订阅

当消费者组发生重平衡(Rebalance)时(如新增消费者、分区变化),ConsumerRebalanceListener 可回调处理偏移量提交或重置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Properties;

public class RebalanceListenerDemo {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "rebalance-test-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("enable.auto.commit", "false"); // 手动提交偏移量

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

// 订阅主题并指定重平衡监听器
consumer.subscribe(Collections.singletonList("test-topic"), new ConsumerRebalanceListener() {
/**
* 重平衡开始前、消费者停止拉取消息后调用
* 可在此提交当前消费的偏移量,避免重平衡后重复消费
*/
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
System.out.println("重平衡前,提交偏移量:" + partitions);
consumer.commitSync(); // 同步提交当前偏移量
}

/**
* 重平衡完成后、消费者开始拉取消息前调用
* 可在此重置偏移量(如从数据库恢复)
*/
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
System.out.println("重平衡后,分配到的分区:" + partitions);
// 示例:从已提交的偏移量开始消费
for (TopicPartition partition : partitions) {
OffsetAndMetadata offset = consumer.committed(partition);
if (offset != null) {
consumer.seek(partition, offset.offset()); // 重置偏移量
}
}
}
});

// 消费消息
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("分区:%d,偏移量:%d,消息:%s%n",
record.partition(), record.offset(), record.value());
}
// 手动提交偏移量
consumer.commitAsync();
}
}
}

按正则表达式订阅

通过 subscribe(Pattern pattern) 订阅匹配正则表达式的主题(适用于动态创建的主题):

1
2
3
4
import java.util.regex.Pattern;

// 订阅所有以"log-"开头的主题
consumer.subscribe(Pattern.compile("log-.*"));

偏移量管理:避免消息丢失与重复

偏移量(Offset)是消费者消费进度的标识,确保消费者重启后能从断点继续消费。Kafka 提供自动提交手动提交两种方式,核心是平衡可靠性与性能。

自动提交(简单但有风险)

  • 原理:消费者每隔 auto.commit.interval.ms 自动提交当前最大偏移量。
  • 风险:若消费后未提交偏移量时消费者宕机,重启后会重复消费未提交的消息。
1
2
3
4
5
6
7
8
9
10
11
12
// 配置自动提交(默认开启)
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", 1000); // 1秒提交一次

// 消费消息
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
processMessage(record); // 处理消息
// 无需手动提交,后台自动执行
}
}

手动提交(更可靠)

手动提交分为同步提交commitSync())和异步提交commitAsync()),适用于对可靠性要求高的场景。

(1)同步提交
  • 特点:阻塞等待提交成功,失败会重试,可靠性高但吞吐量低。
1
2
3
4
5
6
7
8
9
10
props.put("enable.auto.commit", "false"); // 关闭自动提交

while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
processMessage(record); // 确保消息处理完成
}
// 全部处理完成后同步提交
consumer.commitSync(); // 阻塞直到提交成功
}
(2)异步提交
  • 特点:非阻塞,不等待结果,失败不会重试,吞吐量高但需手动处理异常。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;

public class AsyncCommitDemo {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "async-commit-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("enable.auto.commit", "false");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("test-topic"));

while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("处理消息:分区=%d,偏移量=%d,内容=%s%n",
record.partition(), record.offset(), record.value());
processMessage(record); // 处理业务逻辑
}

// 异步提交,通过回调处理结果
consumer.commitAsync((offsets, exception) -> {
if (exception != null) {
// 提交失败,记录异常(可重试或人工处理)
System.err.println("偏移量提交失败:" + exception.getMessage());
// 示例:重试提交失败的偏移量
if (offsets != null) {
for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
System.err.printf("失败的分区:%s,偏移量:%d%n",
entry.getKey(), entry.getValue().offset());
}
}
} else {
System.out.println("偏移量提交成功");
}
});
}
}

private static void processMessage(ConsumerRecord<String, String> record) {
// 模拟业务处理
}
}

自定义偏移量存储(高级场景)

对于核心业务(如金融交易),可将偏移量存储在外部系统(如 MySQL、Redis),实现更精细的控制:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.util.*;

public class CustomOffsetStorage {
// 模拟外部存储(实际可用MySQL/Redis)
private static Map<TopicPartition, Long> offsetStore = new HashMap<>();

public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "custom-offset-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("enable.auto.commit", "false");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("test-topic"), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// 重平衡前,将当前偏移量保存到外部存储
saveOffsetsToExternal(partitions, consumer);
}

@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// 重平衡后,从外部存储恢复偏移量
restoreOffsetsFromExternal(partitions, consumer);
}
});

while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("处理消息:分区=%d,偏移量=%d%n",
record.partition(), record.offset());
processMessage(record);

// 实时更新偏移量到外部存储(或批量更新)
TopicPartition tp = new TopicPartition(record.topic(), record.partition());
offsetStore.put(tp, record.offset() + 1); // 下次从下一个偏移量开始
}
// 批量提交到外部存储(如每处理100条消息)
}
}

// 保存偏移量到外部存储
private static void saveOffsetsToExternal(Collection<TopicPartition> partitions, KafkaConsumer<?, ?> consumer) {
for (TopicPartition tp : partitions) {
Long offset = offsetStore.get(tp);
if (offset != null) {
// 实际场景:写入MySQL/Redis
System.out.printf("保存偏移量到外部:%s -> %d%n", tp, offset);
}
}
}

// 从外部存储恢复偏移量
private static void restoreOffsetsFromExternal(Collection<TopicPartition> partitions, KafkaConsumer<?, ?> consumer) {
for (TopicPartition tp : partitions) {
Long offset = offsetStore.get(tp);
if (offset != null) {
consumer.seek(tp, offset); // 重置消费位置
System.out.printf("从外部恢复偏移量:%s -> %d%n", tp, offset);
} else {
// 若无记录,从最早位置开始
consumer.seekToBeginning(Collections.singletonList(tp));
}
}
}

private static void processMessage(ConsumerRecord<String, String> record) {
// 业务处理
}
}

高级特性:订阅指定分区与消费控制

订阅指定分区

通过 assign(Collection<TopicPartition>) 可直接指定消费的分区(绕过自动分配),适用于需要精确控制分区消费的场景:

1
2
3
4
5
6
7
8
9
10
11
import org.apache.kafka.common.TopicPartition;
import java.util.Collections;

// 仅消费test-topic的分区0和分区1
consumer.assign(Arrays.asList(
new TopicPartition("test-topic", 0),
new TopicPartition("test-topic", 1)
));

// 从分区0的偏移量100开始消费
consumer.seek(new TopicPartition("test-topic", 0), 100);

消费速度控制

通过 pause()resume() 暂停 / 恢复指定分区的消费,适用于流量削峰:

1
2
3
4
5
6
7
// 暂停分区0的消费
TopicPartition tp = new TopicPartition("test-topic", 0);
consumer.pause(Collections.singletonList(tp));

// 5秒后恢复消费
Thread.sleep(5000);
consumer.resume(Collections.singletonList(tp));

多线程消费者实现

KafkaConsumer 是非线程安全的(不能多线程共享一个实例),多线程消费需采用 “一个线程一个消费者” 的模式,每个线程独立处理部分分区:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

// 消费者线程
class ConsumerWorker implements Runnable {
private final KafkaConsumer<String, String> consumer;
private final String topic;

public ConsumerWorker(Properties props, String topic) {
this.consumer = new KafkaConsumer<>(props);
this.topic = topic;
}

@Override
public void run() {
try {
consumer.subscribe(Collections.singletonList(topic));
while (!Thread.currentThread().isInterrupted()) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (var record : records) {
System.out.printf("线程ID:%d,分区:%d,偏移量:%d,消息:%s%n",
Thread.currentThread().getId(),
record.partition(),
record.offset(),
record.value());
// 处理消息
}
consumer.commitAsync(); // 异步提交
}
} catch (Exception e) {
System.err.println("消费线程异常:" + e.getMessage());
} finally {
consumer.close(); // 关闭消费者
}
}
}

// 多线程消费者管理器
public class MultiThreadConsumer {
private static final String TOPIC = "test-topic";
private static final int THREAD_COUNT = 3; // 线程数(建议 ≤ 分区数)

public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "multi-thread-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

// 创建线程池
ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT);
for (int i = 0; i < THREAD_COUNT; i++) {
executor.submit(new ConsumerWorker(props, TOPIC));
}
}
}

注意

  • 线程数建议 ≤ 主题分区数(多余线程会空闲)。
  • 同一消费者组内的线程会通过重平衡自动分配分区,实现负载均衡。

最佳实践

  1. 偏移量提交策略
    • 非核心业务:使用自动提交(简单高效)。
    • 核心业务:使用手动提交(同步保证可靠性,异步提升吞吐量)。
  2. 重平衡处理
    • 重平衡前提交偏移量(避免重复消费)。
    • 重平衡后从正确位置恢复消费(如从外部存储读取偏移量)。
  3. 消费速度优化
    • 合理设置 max.poll.records(一次拉取的消息数),避免处理超时。
    • 若消费缓慢,增加消费者线程数(不超过分区数)。
  4. 异常处理
    • 异步提交失败时记录日志并重试。
    • 消费者宕机前确保偏移量提交(如通过 finally 块)

欢迎关注我的其它发布渠道

表情 | 预览
快来做第一个评论的人吧~
Powered By Valine
v1.3.10