Kafka 消费者 API 详解与实践 Kafka 消费者(Consumer)是消息的接收端,负责从 Kafka 集群拉取并处理消息。与生产者相比,消费者的核心挑战在于可靠地消费消息 (避免丢失或重复)、高效地分配分区负载 以及灵活地管理消费位置 。本文将详细解析 Kafka 消费者 API 的使用方法,包括配置、订阅方式、偏移量管理及多线程消费等关键场景。
消费者核心配置与初始化 必选配置参数 创建消费者时,需通过 Properties 配置以下核心参数:
参数名
作用
示例值
bootstrap.servers
Kafka 集群地址
localhost:9092
group.id
消费者组 ID(同一组内的消费者共同消费主题,避免重复)
order-consumer-group
key.deserializer
消息键(Key)的反序列化类
org.apache.kafka.common.serialization.StringDeserializer
value.deserializer
消息值(Value)的反序列化类
org.apache.kafka.common.serialization.StringDeserializer
关键可选配置
参数名
作用
默认值
enable.auto.commit
是否自动提交偏移量
true(自动提交)
auto.commit.interval.ms
自动提交偏移量的间隔(毫秒)
5000
auto.offset.reset
偏移量无效时的重置策略(earliest:从最早消息开始;latest:从最新消息开始)
latest
fetch.min.bytes
拉取消息的最小字节数(不足时等待)
1
fetch.max.wait.ms
拉取消息的最大等待时间(毫秒)
500
max.poll.records
一次 poll() 调用返回的最大消息数
500
初始化消费者 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Properties;public class KafkaConsumerDemo { private static KafkaConsumer<String, String> initConsumer () { Properties props = new Properties (); props.put("bootstrap.servers" , "localhost:9092" ); props.put("group.id" , "test-group" ); props.put("key.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer" ); props.put("value.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer" ); props.put("enable.auto.commit" , "false" ); props.put("auto.offset.reset" , "earliest" ); return new KafkaConsumer <>(props); } }
订阅主题的四种方式 Kafka 提供了灵活的主题订阅方式,支持按主题名、正则表达式订阅,还可指定重平衡监听器。
按主题名订阅(基础方式) 通过 subscribe(Collection<String> topics) 订阅一个或多个主题: