Spring 集成 Kafka 详解:生产者、消费者与监听机制 Spring Kafka 是 Spring 生态对 Kafka 的集成封装,简化了 Kafka 生产者和消费者的配置与使用。它通过注解驱动、模板类(KafkaTemplate)和监听容器(MessageListenerContainer),实现了与 Spring 应用的无缝对接。本文将详细介绍 Spring Kafka 的核心组件、生产者 / 消费者实现及监听器生命周期管理。
Spring Kafka 核心组件 Spring Kafka 的设计遵循 Spring 一贯的 “模板 + 注解” 风格,核心组件包括:
组件
作用描述
KafkaTemplate
生产者模板类,封装了 Kafka 生产者 API,提供同步 / 异步发送消息的方法。
@KafkaListener
消费者注解,标注在方法上即可监听指定主题,自动接收并处理消息。
MessageListenerContainer
消息监听容器,管理消费者线程和生命周期,有单线程(KafkaMessageListenerContainer)和多线程(ConcurrentMessageListenerContainer)两种实现。
KafkaListenerEndpointRegistry
监听器容器的注册表,用于控制监听器的启动、暂停、恢复等生命周期操作。
ConsumerFactory/ProducerFactory
消费者 / 生产者工厂,负责创建 Kafka 消费者 / 生产者实例,封装配置参数。
环境配置 引入依赖 在 Maven 项目中添加 spring-kafka 依赖(需与 Kafka 版本兼容,如 Kafka 2.3.x 对应 Spring Kafka 2.3.x):
1 2 3 4 5 <dependency > <groupId > org.springframework.kafka</groupId > <artifactId > spring-kafka</artifactId > <version > 2.3.12.RELEASE</version > </dependency >
核心配置 通过 Spring 配置类或 application.properties 配置 Kafka 连接信息、生产者 / 消费者参数:
(1)application.properties 配置 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 spring.kafka.bootstrap-servers =localhost:9092 spring.kafka.producer.key-serializer =org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer =org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.acks =all spring.kafka.producer.retries =3 spring.kafka.consumer.group-id =spring-kafka-group spring.kafka.consumer.key-deserializer =org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer =org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.auto-offset-reset =earliest spring.kafka.consumer.enable-auto-commit =false # 关闭自动提交,使用手动提交 spring.kafka.listener.concurrency =3 # 并发线程数(建议 ≤ 分区数) spring.kafka.listener.ack-mode =manual_immediate # 手动提交偏移量
(2)Java 配置类(可选) 如需更灵活的配置,可通过 @Configuration 类自定义工厂和模板:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.kafka.clients.producer.ProducerConfig;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;import org.springframework.kafka.config.KafkaListenerContainerFactory;import org.springframework.kafka.core.*;import java.util.HashMap;import java.util.Map;@Configuration public class KafkaConfig { @Bean public ProducerFactory<String, String> producerFactory () { Map<String, Object> configProps = new HashMap <>(); configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092" ); configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer" ); configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer" ); configProps.put(ProducerConfig.ACKS_CONFIG, "all" ); return new DefaultKafkaProducerFactory <>(configProps); } @Bean public KafkaTemplate<String, String> kafkaTemplate () { return new KafkaTemplate <>(producerFactory()); } @Bean public ConsumerFactory<String, String> consumerFactory () { Map<String, Object> configProps = new HashMap <>(); configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092" ); configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "spring-kafka-group" ); configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer" ); configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer" ); configProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest" ); return new DefaultKafkaConsumerFactory <>(configProps); } @Bean public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory <>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(3 ); factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); return factory; } }
生产者实现:KafkaTemplate KafkaTemplate 是 Spring Kafka 提供的生产者模板,简化了消息发送逻辑,支持同步和异步发送。
异步发送(推荐) 异步发送通过回调函数处理发送结果(成功 / 失败),不阻塞主线程,适合高吞吐量场景:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 import org.springframework.beans.factory.annotation.Autowired;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.kafka.support.SendResult;import org.springframework.stereotype.Component;import org.springframework.util.concurrent.ListenableFuture;import org.springframework.util.concurrent.ListenableFutureCallback;@Component public class KafkaProducer { @Autowired private KafkaTemplate<String, String> kafkaTemplate; public void sendAsync (String topic, String message) { ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message); future.addCallback(new ListenableFutureCallback <SendResult<String, String>>() { @Override public void onSuccess (SendResult<String, String> result) { System.out.printf("异步发送成功 - 主题:%s,分区:%d,偏移量:%d%n" , topic, result.getRecordMetadata().partition(), result.getRecordMetadata().offset()); } @Override public void onFailure (Throwable ex) { System.err.println("异步发送失败:" + ex.getMessage()); } }); } }
同步发送 同步发送通过 future.get() 阻塞等待结果,适合需要立即知道发送状态的场景:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public void sendSync (String topic, String message) { try { SendResult<String, String> result = kafkaTemplate.send(topic, message).get(); System.out.printf("同步发送成功 - 分区:%d,偏移量:%d%n" , result.getRecordMetadata().partition(), result.getRecordMetadata().offset()); } catch (Exception e) { System.err.println("同步发送失败:" + e.getMessage()); } }
消费者实现:@KafkaListener 注解 @KafkaListener 是 Spring Kafka 最核心的注解,通过标注方法即可实现消息监听,无需手动管理消费者线程。
基础用法:监听主题 最简单的监听方式,直接指定主题名称:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 import org.springframework.kafka.annotation.KafkaListener;import org.springframework.kafka.support.Acknowledgment;import org.springframework.stereotype.Component;@Component public class KafkaConsumer { @KafkaListener(topics = "test-topic", id = "test-listener") public void listen (String message, Acknowledgment acknowledgment) { try { System.out.println("收到消息:" + message); processMessage(message); acknowledgment.acknowledge(); } catch (Exception e) { System.err.println("处理消息失败:" + e.getMessage()); } } private void processMessage (String message) { } }
高级用法:指定分区与初始偏移量 通过 topicPartitions 可精确指定监听的分区及初始偏移量:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 import org.springframework.kafka.annotation.KafkaListener;import org.springframework.kafka.annotation.PartitionOffset;import org.springframework.kafka.annotation.TopicPartition;import org.springframework.stereotype.Component;@Component public class AdvancedKafkaConsumer { @KafkaListener( topicPartitions = { // 监听 test-topic 的分区0,从偏移量10开始消费 @TopicPartition( topic = "test-topic", partitions = "0", partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "10") ), // 监听 test-topic 的分区1,从偏移量100开始消费 @TopicPartition( topic = "test-topic", partitions = "1", partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100") ) }, concurrency = "2", // 并发线程数(与分区数匹配) errorHandler = "kafkaListenerErrorHandler", // 自定义异常处理器 id = "partition-listener" ) public void listenPartition (String message) { System.out.println("收到分区消息:" + message); } }
自定义异常处理器 当消息处理抛出异常时,可通过 KafkaListenerErrorHandler 自定义处理逻辑(如重试、记录日志):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 import org.springframework.kafka.listener.KafkaListenerErrorHandler;import org.springframework.kafka.listener.ListenerExecutionFailedException;import org.springframework.messaging.Message;import org.springframework.stereotype.Component;@Component("kafkaListenerErrorHandler") public class CustomKafkaErrorHandler implements KafkaListenerErrorHandler { @Override public Object handleError (Message<?> message, ListenerExecutionFailedException exception) { System.err.println("消息处理异常:" + exception.getMessage()); System.err.println("异常消息内容:" + message.getPayload()); return null ; } }
监听器生命周期管理 KafkaListenerEndpointRegistry 用于管理监听器容器的生命周期(启动、暂停、恢复等),需通过 @Autowired 注入后使用。
控制指定监听器 通过 @KafkaListener 的 id 属性标识监听器,再通过注册表操作:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 import org.springframework.beans.factory.annotation.Autowired;import org.springframework.kafka.config.KafkaListenerEndpointRegistry;import org.springframework.kafka.listener.MessageListenerContainer;import org.springframework.stereotype.Component;@Component public class ListenerLifecycleManager { @Autowired private KafkaListenerEndpointRegistry registry; public void startListener (String listenerId) { MessageListenerContainer container = registry.getListenerContainer(listenerId); if (container != null && !container.isRunning()) { container.start(); System.out.println("监听器 " + listenerId + " 已启动" ); } } public void pauseListener (String listenerId) { MessageListenerContainer container = registry.getListenerContainer(listenerId); if (container != null && container.isRunning()) { container.pause(); System.out.println("监听器 " + listenerId + " 已暂停" ); } } public void resumeListener (String listenerId) { MessageListenerContainer container = registry.getListenerContainer(listenerId); if (container != null && container.isPaused()) { container.resume(); System.out.println("监听器 " + listenerId + " 已恢复" ); } } }
全局控制(所有监听器) 1 2 3 4 5 registry.start(); registry.stop();
最佳实践
生产者 :
优先使用异步发送,通过回调处理失败并重试。
关键业务需配置 acks=all 和 retries>0,确保消息可靠投递。
消费者 :
核心业务使用手动提交偏移量(ack-mode=manual),确保消息处理完成后再提交。
并发数(concurrency)建议与主题分区数一致,避免线程空闲。
通过 @TopicPartition 精确控制分区消费,适合需要负载均衡的场景。
异常处理 :
实现 KafkaListenerErrorHandler 统一处理消费异常,避免监听器终止。
失败消息可转发到 “死信队列”(DLQ),后续人工处理。
性能优化 :
生产者启用批量发送(batch.size)和压缩(compression.type)。
消费者合理设置 max.poll.records,避免单次拉取过多消息导致处理超时