0%

kafka自定义组件

Kafka 自定义组件详解:分区器、序列化器与拦截器

Kafka 提供了灵活的扩展机制,允许用户通过自定义组件(如分区器、序列化器、拦截器)满足特定业务需求。这些组件可深度集成到消息生产流程中,实现消息路由、格式转换、内容增强等个性化功能。本文将详细介绍如何开发和使用这些自定义组件,并解析其执行顺序与应用场景。

自定义分区器(Partitioner)

Kafka 默认分区策略基于消息键(Key)的哈希值分配分区,但在复杂业务场景(如按地区、用户类型路由消息)中,需自定义分区逻辑。

核心接口与方法

自定义分区器需实现 org.apache.kafka.clients.producer.Partitioner 接口,核心方法:

方法 作用
int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) 计算消息应发送到的分区编号,返回值为分区索引(从 0 开始)。
void configure(Map<String, ?> configs) 初始化配置(如从生产者配置中读取参数)。
void close() 资源清理(如关闭连接、释放内存)。

实现示例:按业务类型分区

假设需将包含 “order” 的消息发送到分区 0,包含 “log” 的消息发送到分区 1,其他消息按 Key 哈希分配:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import org.apache.kafka.clients.producer.Partitioner; 
import org.apache.kafka.common.Cluster;
import java.util.Map;

public class BusinessTypePartitioner implements Partitioner {

// 分区总数(可从配置中读取,此处简化为固定值)
private int totalPartitions = 3;

@Override
public int partition (String topic, Object key, byte [] keyBytes, Object value, byte [] valueBytes, Cluster cluster) {
// 从集群元数据中获取实际分区数(更灵活的方式)
totalPartitions = cluster.partitionsForTopic (topic).size ();

if (value == null) {
// 无消息体时,使用默认分区策略(按 Key 哈希)
return key == null ? 0 : Math.abs (key.hashCode () % totalPartitions);
}

String valueStr = value.toString ();
// 按消息内容中的业务类型分区
if (valueStr.contains ("order")) {
return 0; // 订单消息固定到分区 0
} else if (valueStr.contains ("log")) {
return 1; // 日志消息固定到分区 1
} else {
// 其他消息按 Key 哈希分配到剩余分区
return key == null ? 2 : Math.abs (key.hashCode () % (totalPartitions - 2) + 2);
}
}


@Override
public void configure (Map<String, ?> configs) {
// 从配置中读取自定义参数(如分区数)
Object partitions = configs.get ("business.partitions");
if (partitions != null) {
totalPartitions = Integer.parseInt (partitions.toString ());
}
}


@Override
public void close () {
// 无需清理资源,空实现
}
}

使用自定义分区器

在生产者配置中指定分区器类名,即可启用自定义逻辑:

1
2
3
4
5
6
7
8
9
10
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 指定自定义分区器
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, BusinessTypePartitioner.class.getName());
// 传递自定义参数(可选)
props.put("business.partitions", 5);

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

自定义序列化器(Serializer)

Kafka 默认提供字符串、整数等基础类型的序列化器,但对于自定义对象(如 Java 实体类),需自定义序列化逻辑,将对象转换为字节数组。

核心接口与方法

实现 org.apache.kafka.common.serialization.Serializer 接口,核心方法:

方法 作用
byte[] serialize(String topic, T data) 将对象序列化为字节数组。
void configure(Map<String, ?> configs, boolean isKey) 初始化配置(isKey 标识是否为 Key 序列化)。
void close() 资源清理。

实现示例:JSON 序列化器

使用 Jackson 库将 Java 对象序列化为 JSON 字节数组:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import com.fasterxml.jackson.core.JsonProcessingException; 
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.serialization.Serializer;
import java.util.Map;

public class JsonSerializer<T> implements Serializer<T> {

private final ObjectMapper objectMapper = new ObjectMapper();

@Override
public byte [] serialize (String topic, T data) {
if (data == null) {
return new byte [0]; // 空对象返回空字节数组
}
try {
// 将对象转换为 JSON 字节数组
return objectMapper.writeValueAsBytes (data);
} catch (JsonProcessingException e) {
throw new RuntimeException ("JSON 序列化失败:" + e.getMessage (), e);
}
}

@Override
public void configure (Map<String, ?> configs, boolean isKey) {
// 可配置 JSON 序列化特性(如日期格式)
}

@Override
public void close () {
// 无需清理资源
}
}

对应的反序列化器

消费者需使用对应的反序列化器解析 JSON 字节数组:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.serialization.Deserializer;
import java.util.Map;

public class JsonDeserializer<T> implements Deserializer<T> {

private final ObjectMapper objectMapper = new ObjectMapper();
private Class<T> targetClass; // 目标对象类型

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
// 从配置中获取目标类(需提前设置)
String className = (String) configs.get("target.class");
try {
targetClass = (Class<T>) Class.forName(className);
} catch (ClassNotFoundException e) {
throw new RuntimeException("无法加载目标类:" + className, e);
}
}

@Override
public T deserialize(String topic, byte[] data) {
if (data == null || data.length == 0) {
return null;
}
try {
// 将JSON字节数组转换为对象
return objectMapper.readValue(data, targetClass);
} catch (Exception e) {
throw new RuntimeException("JSON反序列化失败:" + e.getMessage(), e);
}
}

@Override
public void close() {
// 无需清理资源
}
}

使用自定义序列化器

生产者和消费者需分别配置序列化器和反序列化器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 生产者配置(序列化自定义对象)
Properties producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 指定值的序列化器为自定义JSON序列化器
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName());

// 消费者配置(反序列化自定义对象)
Properties consumerProps = new Properties();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "json-group");
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 指定值的反序列化器为自定义JSON反序列化器
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class.getName());
// 传递目标类名(需与生产者发送的对象类型一致)
consumerProps.put("target.class", "com.example.Order");

自定义拦截器(ProducerInterceptor)

拦截器用于在消息发送前或发送后(回调前)对消息进行加工(如添加时间戳、过滤无效消息),支持多拦截器形成链式调用。

核心接口与方法

实现 org.apache.kafka.clients.producer.ProducerInterceptor 接口,核心方法:

方法 作用
ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) 消息发送前调用,可修改消息内容(如添加头信息),返回修改后的消息。
void onAcknowledgement(RecordMetadata metadata, Exception exception) 消息发送成功或失败后调用,可记录发送结果(如统计成功率)。
void close() 资源清理。

实现示例:消息增强与统计拦截器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import org.apache.kafka.clients.producer.ProducerInterceptor; 
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

public class MessageEnhanceInterceptor implements ProducerInterceptor<String, String> {

// 统计发送总数和成功数
private final AtomicInteger totalCount = new AtomicInteger (0);
private final AtomicInteger successCount = new AtomicInteger (0);


@Override
public ProducerRecord<String, String> onSend (ProducerRecord<String, String> record) {
// 1. 消息发送前增强:添加时间戳头信息
long timestamp = System.currentTimeMillis ();
return new ProducerRecord<>(
record.topic (),
record.partition (),
record.timestamp (),
record.key (),
// 消息体添加时间戳前缀
String.format ("[% d] % s", timestamp, record.value ()),
record.headers ()
);
}



@Override
public void onAcknowledgement (RecordMetadata metadata, Exception exception) {
// 2. 消息发送后统计结果
totalCount.incrementAndGet ();
if (exception == null) {
successCount.incrementAndGet ();
} else {
System.err.println ("消息发送失败:" + exception.getMessage ());
}
// 每发送 10 条消息打印一次统计
if (totalCount.get () % 10 == 0) {
double successRate = (double) successCount.get () /totalCount.get ();
System.out.printf ("发送统计 - 总数:% d,成功:% d,成功率:%.2f%%% n",
totalCount.get (), successCount.get (), successRate * 100);
}
}

@Override
public void close () {
// 3. 关闭时打印最终统计
System.out.printf ("最终统计 - 总数:% d,成功:% d% n", totalCount.get (), successCount.get ());
}

@Override
public void configure (Map<String, ?> configs) {
// 初始化配置(如统计频率)
}
}

使用拦截器链

通过 interceptor.classes 配置多个拦截器,按顺序执行:

1
2
3
4
5
6
7
8
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 配置拦截器链(多个拦截器按逗号分隔,顺序执行)
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
"com.example.MessageEnhanceInterceptor," +
"com.example.LoggingInterceptor"); // 假设另有一个日志拦截器

组件执行顺序与最佳实践

执行顺序

消息从生产者发送到 Broker 的流程中,自定义组件的执行顺序为:
拦截器(onSend)→ 序列化器 → 分区器 → 拦截器(onAcknowledgement

  • onSend:在消息序列化和分区前调用,可修改消息内容或 Key。
  • 序列化器:将 Key 和 Value 转换为字节数组。
  • 分区器:根据 Key 或 Value 计算目标分区。
  • onAcknowledgement:在消息发送结果返回后调用,可统计或处理异常。

最佳实践

  • 分区器
    • 避免复杂逻辑(如网络请求),以免阻塞发送线程。
    • 确保分区算法均匀分布消息,避免个别分区负载过高。
  • 序列化器
    • 优先使用成熟格式(如 JSON、Protobuf),避免自定义二进制格式。
    • 处理空值和异常,确保序列化过程稳定。
  • 拦截器
    • onSend 中避免修改消息的 Topic 或分区,以免影响路由逻辑。
    • onAcknowledgement 中避免 heavy 操作(如磁盘 IO),以免拖慢发送效率

欢迎关注我的其它发布渠道