0%

spring集成kafka

Spring 集成 Kafka 详解:生产者、消费者与监听机制

Spring Kafka 是 Spring 生态对 Kafka 的集成封装,简化了 Kafka 生产者和消费者的配置与使用。它通过注解驱动、模板类(KafkaTemplate)和监听容器(MessageListenerContainer),实现了与 Spring 应用的无缝对接。本文将详细介绍 Spring Kafka 的核心组件、生产者 / 消费者实现及监听器生命周期管理。

Spring Kafka 核心组件

Spring Kafka 的设计遵循 Spring 一贯的 “模板 + 注解” 风格,核心组件包括:

组件 作用描述
KafkaTemplate 生产者模板类,封装了 Kafka 生产者 API,提供同步 / 异步发送消息的方法。
@KafkaListener 消费者注解,标注在方法上即可监听指定主题,自动接收并处理消息。
MessageListenerContainer 消息监听容器,管理消费者线程和生命周期,有单线程(KafkaMessageListenerContainer)和多线程(ConcurrentMessageListenerContainer)两种实现。
KafkaListenerEndpointRegistry 监听器容器的注册表,用于控制监听器的启动、暂停、恢复等生命周期操作。
ConsumerFactory/ProducerFactory 消费者 / 生产者工厂,负责创建 Kafka 消费者 / 生产者实例,封装配置参数。

环境配置

引入依赖

在 Maven 项目中添加 spring-kafka 依赖(需与 Kafka 版本兼容,如 Kafka 2.3.x 对应 Spring Kafka 2.3.x):

1
2
3
4
5
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.3.12.RELEASE</version>
</dependency>

核心配置

通过 Spring 配置类或 application.properties 配置 Kafka 连接信息、生产者 / 消费者参数:

(1)application.properties 配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# Kafka 集群地址
spring.kafka.bootstrap-servers=localhost:9092

# 生产者配置
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.acks=all
spring.kafka.producer.retries=3

# 消费者配置
spring.kafka.consumer.group-id=spring-kafka-group
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false # 关闭自动提交,使用手动提交

# 监听容器配置(多线程并发消费)
spring.kafka.listener.concurrency=3 # 并发线程数(建议 ≤ 分区数)
spring.kafka.listener.ack-mode=manual_immediate # 手动提交偏移量
(2)Java 配置类(可选)

如需更灵活的配置,可通过 @Configuration 类自定义工厂和模板:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaConfig {

// 生产者工厂
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
configProps.put(ProducerConfig.ACKS_CONFIG, "all");
return new DefaultKafkaProducerFactory<>(configProps);
}

// KafkaTemplate(生产者模板)
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}

// 消费者工厂
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "spring-kafka-group");
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
configProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return new DefaultKafkaConsumerFactory<>(configProps);
}

// 监听容器工厂(多线程)
@Bean
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3); // 并发线程数
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); // 手动提交
return factory;
}
}

生产者实现:KafkaTemplate

KafkaTemplate 是 Spring Kafka 提供的生产者模板,简化了消息发送逻辑,支持同步和异步发送。

异步发送(推荐)

异步发送通过回调函数处理发送结果(成功 / 失败),不阻塞主线程,适合高吞吐量场景:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

@Component
public class KafkaProducer {

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

/**
* 异步发送消息
* @param topic 主题
* @param message 消息内容
*/
public void sendAsync(String topic, String message) {
// 发送消息并添加回调
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onSuccess(SendResult<String, String> result) {
// 发送成功
System.out.printf("异步发送成功 - 主题:%s,分区:%d,偏移量:%d%n",
topic,
result.getRecordMetadata().partition(),
result.getRecordMetadata().offset());
}

@Override
public void onFailure(Throwable ex) {
// 发送失败
System.err.println("异步发送失败:" + ex.getMessage());
// 可选:重试发送
// kafkaTemplate.send(topic, message);
}
});
}
}

同步发送

同步发送通过 future.get() 阻塞等待结果,适合需要立即知道发送状态的场景:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* 同步发送消息
* @param topic 主题
* @param message 消息内容
*/
public void sendSync(String topic, String message) {
try {
// 阻塞等待发送结果
SendResult<String, String> result = kafkaTemplate.send(topic, message).get();
System.out.printf("同步发送成功 - 分区:%d,偏移量:%d%n",
result.getRecordMetadata().partition(),
result.getRecordMetadata().offset());
} catch (Exception e) {
System.err.println("同步发送失败:" + e.getMessage());
}
}

消费者实现:@KafkaListener 注解

@KafkaListener 是 Spring Kafka 最核心的注解,通过标注方法即可实现消息监听,无需手动管理消费者线程。

基础用法:监听主题

最简单的监听方式,直接指定主题名称:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

@Component
public class KafkaConsumer {

/**
* 监听指定主题,手动提交偏移量
* @param message 消息内容
* @param acknowledgment 偏移量确认对象
*/
@KafkaListener(topics = "test-topic", id = "test-listener")
public void listen(String message, Acknowledgment acknowledgment) {
try {
System.out.println("收到消息:" + message);
// 处理业务逻辑
processMessage(message);
// 手动提交偏移量(需配置 ack-mode=manual_immediate)
acknowledgment.acknowledge();
} catch (Exception e) {
System.err.println("处理消息失败:" + e.getMessage());
// 可选:不提交偏移量,等待重试
}
}

private void processMessage(String message) {
// 业务处理逻辑
}
}

高级用法:指定分区与初始偏移量

通过 topicPartitions 可精确指定监听的分区及初始偏移量:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.PartitionOffset;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.stereotype.Component;

@Component
public class AdvancedKafkaConsumer {

/**
* 监听指定分区,并设置初始偏移量
*/
@KafkaListener(
topicPartitions = {
// 监听 test-topic 的分区0,从偏移量10开始消费
@TopicPartition(
topic = "test-topic",
partitions = "0",
partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "10")
),
// 监听 test-topic 的分区1,从偏移量100开始消费
@TopicPartition(
topic = "test-topic",
partitions = "1",
partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100")
)
},
concurrency = "2", // 并发线程数(与分区数匹配)
errorHandler = "kafkaListenerErrorHandler", // 自定义异常处理器
id = "partition-listener"
)
public void listenPartition(String message) {
System.out.println("收到分区消息:" + message);
}
}

自定义异常处理器

当消息处理抛出异常时,可通过 KafkaListenerErrorHandler 自定义处理逻辑(如重试、记录日志):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import org.springframework.kafka.listener.KafkaListenerErrorHandler;
import org.springframework.kafka.listener.ListenerExecutionFailedException;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

@Component("kafkaListenerErrorHandler")
public class CustomKafkaErrorHandler implements KafkaListenerErrorHandler {

@Override
public Object handleError(Message<?> message, ListenerExecutionFailedException exception) {
System.err.println("消息处理异常:" + exception.getMessage());
System.err.println("异常消息内容:" + message.getPayload());
// 可选:返回默认值或触发重试
return null;
}
}

监听器生命周期管理

KafkaListenerEndpointRegistry 用于管理监听器容器的生命周期(启动、暂停、恢复等),需通过 @Autowired 注入后使用。

控制指定监听器

通过 @KafkaListenerid 属性标识监听器,再通过注册表操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.stereotype.Component;

@Component
public class ListenerLifecycleManager {

@Autowired
private KafkaListenerEndpointRegistry registry;

/**
* 启动指定监听器
* @param listenerId 监听器ID(@KafkaListener的id属性)
*/
public void startListener(String listenerId) {
MessageListenerContainer container = registry.getListenerContainer(listenerId);
if (container != null && !container.isRunning()) {
container.start();
System.out.println("监听器 " + listenerId + " 已启动");
}
}

/**
* 暂停指定监听器
*/
public void pauseListener(String listenerId) {
MessageListenerContainer container = registry.getListenerContainer(listenerId);
if (container != null && container.isRunning()) {
container.pause();
System.out.println("监听器 " + listenerId + " 已暂停");
}
}

/**
* 恢复指定监听器
*/
public void resumeListener(String listenerId) {
MessageListenerContainer container = registry.getListenerContainer(listenerId);
if (container != null && container.isPaused()) {
container.resume();
System.out.println("监听器 " + listenerId + " 已恢复");
}
}
}

全局控制(所有监听器)

1
2
3
4
5
// 启动所有监听器
registry.start();

// 暂停所有监听器
registry.stop();

最佳实践

  1. 生产者
    • 优先使用异步发送,通过回调处理失败并重试。
    • 关键业务需配置 acks=allretries>0,确保消息可靠投递。
  2. 消费者
    • 核心业务使用手动提交偏移量(ack-mode=manual),确保消息处理完成后再提交。
    • 并发数(concurrency)建议与主题分区数一致,避免线程空闲。
    • 通过 @TopicPartition 精确控制分区消费,适合需要负载均衡的场景。
  3. 异常处理
    • 实现 KafkaListenerErrorHandler 统一处理消费异常,避免监听器终止。
    • 失败消息可转发到 “死信队列”(DLQ),后续人工处理。
  4. 性能优化
    • 生产者启用批量发送(batch.size)和压缩(compression.type)。
    • 消费者合理设置 max.poll.records,避免单次拉取过多消息导致处理超时

欢迎关注我的其它发布渠道