Kafka 消费者 API 详解与实践 Kafka 消费者(Consumer)是消息的接收端,负责从 Kafka 集群拉取并处理消息。与生产者相比,消费者的核心挑战在于可靠地消费消息 (避免丢失或重复)、高效地分配分区负载 以及灵活地管理消费位置 。本文将详细解析 Kafka 消费者 API 的使用方法,包括配置、订阅方式、偏移量管理及多线程消费等关键场景。
消费者核心配置与初始化 必选配置参数 创建消费者时,需通过 Properties 配置以下核心参数:
参数名
作用
示例值
bootstrap.servers
Kafka 集群地址
localhost:9092
group.id
消费者组 ID(同一组内的消费者共同消费主题,避免重复)
order-consumer-group
key.deserializer
消息键(Key)的反序列化类
org.apache.kafka.common.serialization.StringDeserializer
value.deserializer
消息值(Value)的反序列化类
org.apache.kafka.common.serialization.StringDeserializer
关键可选配置
参数名
作用
默认值
enable.auto.commit
是否自动提交偏移量
true(自动提交)
auto.commit.interval.ms
自动提交偏移量的间隔(毫秒)
5000
auto.offset.reset
偏移量无效时的重置策略(earliest:从最早消息开始;latest:从最新消息开始)
latest
fetch.min.bytes
拉取消息的最小字节数(不足时等待)
1
fetch.max.wait.ms
拉取消息的最大等待时间(毫秒)
500
max.poll.records
一次 poll() 调用返回的最大消息数
500
初始化消费者 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Properties;public class KafkaConsumerDemo { private static KafkaConsumer<String, String> initConsumer () { Properties props = new Properties(); props.put("bootstrap.servers" , "localhost:9092" ); props.put("group.id" , "test-group" ); props.put("key.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer" ); props.put("value.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer" ); props.put("enable.auto.commit" , "false" ); props.put("auto.offset.reset" , "earliest" ); return new KafkaConsumer<>(props); } }
订阅主题的四种方式 Kafka 提供了灵活的主题订阅方式,支持按主题名、正则表达式订阅,还可指定重平衡监听器。
按主题名订阅(基础方式) 通过 subscribe(Collection<String> topics) 订阅一个或多个主题:
1 2 3 4 5 6 7 8 9 10 11 12 import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Collections;public class TopicSubscription { public static void main (String[] args) { KafkaConsumer<String, String> consumer = KafkaConsumerDemo.initConsumer(); consumer.subscribe(Collections.singletonList("test-topic" )); } }
带重平衡监听器的订阅 当消费者组发生重平衡(Rebalance)时(如新增消费者、分区变化),ConsumerRebalanceListener 可回调处理偏移量提交或重置:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 import org.apache.kafka.clients.consumer.*;import org.apache.kafka.common.TopicPartition;import java.time.Duration;import java.util.Collection;import java.util.Collections;import java.util.Properties;public class RebalanceListenerDemo { public static void main (String[] args) { Properties props = new Properties(); props.put("bootstrap.servers" , "localhost:9092" ); props.put("group.id" , "rebalance-test-group" ); props.put("key.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer" ); props.put("value.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer" ); props.put("enable.auto.commit" , "false" ); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("test-topic" ), new ConsumerRebalanceListener() { @Override public void onPartitionsRevoked (Collection<TopicPartition> partitions) { System.out.println("重平衡前,提交偏移量:" + partitions); consumer.commitSync(); } @Override public void onPartitionsAssigned (Collection<TopicPartition> partitions) { System.out.println("重平衡后,分配到的分区:" + partitions); for (TopicPartition partition : partitions) { OffsetAndMetadata offset = consumer.committed(partition); if (offset != null ) { consumer.seek(partition, offset.offset()); } } } }); while (true ) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000 )); for (ConsumerRecord<String, String> record : records) { System.out.printf("分区:%d,偏移量:%d,消息:%s%n" , record.partition(), record.offset(), record.value()); } consumer.commitAsync(); } } }
按正则表达式订阅 通过 subscribe(Pattern pattern) 订阅匹配正则表达式的主题(适用于动态创建的主题):
1 2 3 4 import java.util.regex.Pattern;consumer.subscribe(Pattern.compile("log-.*" ));
偏移量管理:避免消息丢失与重复 偏移量(Offset)是消费者消费进度的标识,确保消费者重启后能从断点继续消费。Kafka 提供自动提交 和手动提交 两种方式,核心是平衡可靠性与性能。
自动提交(简单但有风险)
原理 :消费者每隔 auto.commit.interval.ms 自动提交当前最大偏移量。
风险 :若消费后未提交偏移量时消费者宕机,重启后会重复消费未提交的消息。
1 2 3 4 5 6 7 8 9 10 11 12 props.put("enable.auto.commit" , "true" ); props.put("auto.commit.interval.ms" , 1000 ); while (true ) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000 )); for (ConsumerRecord<String, String> record : records) { processMessage(record); } }
手动提交(更可靠) 手动提交分为同步提交 (commitSync())和异步提交 (commitAsync()),适用于对可靠性要求高的场景。
(1)同步提交
特点 :阻塞等待提交成功,失败会重试,可靠性高但吞吐量低。
1 2 3 4 5 6 7 8 9 10 props.put("enable.auto.commit" , "false" ); while (true ) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000 )); for (ConsumerRecord<String, String> record : records) { processMessage(record); } consumer.commitSync(); }
(2)异步提交
特点 :非阻塞,不等待结果,失败不会重试,吞吐量高但需手动处理异常。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 import org.apache.kafka.clients.consumer.*;import org.apache.kafka.common.TopicPartition;import java.time.Duration;import java.util.Collections;import java.util.Map;import java.util.Properties;public class AsyncCommitDemo { public static void main (String[] args) { Properties props = new Properties(); props.put("bootstrap.servers" , "localhost:9092" ); props.put("group.id" , "async-commit-group" ); props.put("key.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer" ); props.put("value.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer" ); props.put("enable.auto.commit" , "false" ); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("test-topic" )); while (true ) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000 )); for (ConsumerRecord<String, String> record : records) { System.out.printf("处理消息:分区=%d,偏移量=%d,内容=%s%n" , record.partition(), record.offset(), record.value()); processMessage(record); } consumer.commitAsync((offsets, exception) -> { if (exception != null ) { System.err.println("偏移量提交失败:" + exception.getMessage()); if (offsets != null ) { for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) { System.err.printf("失败的分区:%s,偏移量:%d%n" , entry.getKey(), entry.getValue().offset()); } } } else { System.out.println("偏移量提交成功" ); } }); } } private static void processMessage (ConsumerRecord<String, String> record) { } }
自定义偏移量存储(高级场景) 对于核心业务(如金融交易),可将偏移量存储在外部系统(如 MySQL、Redis),实现更精细的控制:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 import org.apache.kafka.clients.consumer.*;import org.apache.kafka.common.TopicPartition;import java.time.Duration;import java.util.*;public class CustomOffsetStorage { private static Map<TopicPartition, Long> offsetStore = new HashMap<>(); public static void main (String[] args) { Properties props = new Properties(); props.put("bootstrap.servers" , "localhost:9092" ); props.put("group.id" , "custom-offset-group" ); props.put("key.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer" ); props.put("value.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer" ); props.put("enable.auto.commit" , "false" ); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("test-topic" ), new ConsumerRebalanceListener() { @Override public void onPartitionsRevoked (Collection<TopicPartition> partitions) { saveOffsetsToExternal(partitions, consumer); } @Override public void onPartitionsAssigned (Collection<TopicPartition> partitions) { restoreOffsetsFromExternal(partitions, consumer); } }); while (true ) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000 )); for (ConsumerRecord<String, String> record : records) { System.out.printf("处理消息:分区=%d,偏移量=%d%n" , record.partition(), record.offset()); processMessage(record); TopicPartition tp = new TopicPartition(record.topic(), record.partition()); offsetStore.put(tp, record.offset() + 1 ); } } } private static void saveOffsetsToExternal (Collection<TopicPartition> partitions, KafkaConsumer<?, ?> consumer) { for (TopicPartition tp : partitions) { Long offset = offsetStore.get(tp); if (offset != null ) { System.out.printf("保存偏移量到外部:%s -> %d%n" , tp, offset); } } } private static void restoreOffsetsFromExternal (Collection<TopicPartition> partitions, KafkaConsumer<?, ?> consumer) { for (TopicPartition tp : partitions) { Long offset = offsetStore.get(tp); if (offset != null ) { consumer.seek(tp, offset); System.out.printf("从外部恢复偏移量:%s -> %d%n" , tp, offset); } else { consumer.seekToBeginning(Collections.singletonList(tp)); } } } private static void processMessage (ConsumerRecord<String, String> record) { } }
高级特性:订阅指定分区与消费控制 订阅指定分区 通过 assign(Collection<TopicPartition>) 可直接指定消费的分区(绕过自动分配),适用于需要精确控制分区消费的场景:
1 2 3 4 5 6 7 8 9 10 11 import org.apache.kafka.common.TopicPartition;import java.util.Collections;consumer.assign(Arrays.asList( new TopicPartition("test-topic" , 0 ), new TopicPartition("test-topic" , 1 ) )); consumer.seek(new TopicPartition("test-topic" , 0 ), 100 );
消费速度控制 通过 pause() 和 resume() 暂停 / 恢复指定分区的消费,适用于流量削峰:
1 2 3 4 5 6 7 TopicPartition tp = new TopicPartition("test-topic" , 0 ); consumer.pause(Collections.singletonList(tp)); Thread.sleep(5000 ); consumer.resume(Collections.singletonList(tp));
多线程消费者实现 KafkaConsumer 是非线程安全 的(不能多线程共享一个实例),多线程消费需采用 “一个线程一个消费者” 的模式,每个线程独立处理部分分区:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import java.time.Duration;import java.util.Collections;import java.util.Properties;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;class ConsumerWorker implements Runnable { private final KafkaConsumer<String, String> consumer; private final String topic; public ConsumerWorker (Properties props, String topic) { this .consumer = new KafkaConsumer<>(props); this .topic = topic; } @Override public void run () { try { consumer.subscribe(Collections.singletonList(topic)); while (!Thread.currentThread().isInterrupted()) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000 )); for (var record : records) { System.out.printf("线程ID:%d,分区:%d,偏移量:%d,消息:%s%n" , Thread.currentThread().getId(), record.partition(), record.offset(), record.value()); } consumer.commitAsync(); } } catch (Exception e) { System.err.println("消费线程异常:" + e.getMessage()); } finally { consumer.close(); } } } public class MultiThreadConsumer { private static final String TOPIC = "test-topic" ; private static final int THREAD_COUNT = 3 ; public static void main (String[] args) { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092" ); props.put(ConsumerConfig.GROUP_ID_CONFIG, "multi-thread-group" ); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer" ); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer" ); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false" ); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest" ); ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT); for (int i = 0 ; i < THREAD_COUNT; i++) { executor.submit(new ConsumerWorker(props, TOPIC)); } } }
注意 :
线程数建议 ≤ 主题分区数(多余线程会空闲)。
同一消费者组内的线程会通过重平衡自动分配分区,实现负载均衡。
最佳实践
偏移量提交策略 :
非核心业务:使用自动提交(简单高效)。
核心业务:使用手动提交(同步保证可靠性,异步提升吞吐量)。
重平衡处理 :
重平衡前提交偏移量(避免重复消费)。
重平衡后从正确位置恢复消费(如从外部存储读取偏移量)。
消费速度优化 :
合理设置 max.poll.records(一次拉取的消息数),避免处理超时。
若消费缓慢,增加消费者线程数(不超过分区数)。
异常处理 :
异步提交失败时记录日志并重试。
消费者宕机前确保偏移量提交(如通过 finally 块)
v1.3.10