0%

kafka API操作之主题

Kafka 核心 API 详解与实践:主题管理

Kafka 提供了四类核心 API 用于消息生产、消费、流处理和外部系统集成。其中,通过 kafka-clients 库的 AdminClient API 可实现对 Kafka 主题(Topic)的全生命周期管理,包括创建、查询、修改、删除及分区调整等操作。本文将详细介绍这些操作的实现方式,并提供完整代码示例。

Kafka 核心 API 概述

Kafka 的四类核心 API 分别面向不同场景:

API 类型 作用描述
Producer API 用于生产者发送消息到 Kafka 集群,支持自定义分区策略、消息压缩等特性。
Consumer API 用于消费者从 Kafka 集群拉取消息,支持消费组管理、偏移量控制等功能。
Streams API 用于构建流处理应用,实现对实时数据流的转换、聚合等操作。
Connect API 用于连接 Kafka 与外部系统(如数据库、文件系统),实现数据导入 / 导出。

本文重点介绍基于 AdminClient 的主题管理操作,需引入以下依赖:

1
2
3
4
5
6
7
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.3.1</version> <!-- 版本可根据实际需求调整 -->
</dependency>
</dependencies>

主题管理实践

初始化 AdminClient

所有主题操作均需通过 AdminClient 完成,其初始化需要指定 Kafka 集群的 bootstrap 服务器地址:

1
2
3
4
5
6
7
8
9
10
11
12
13
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import java.util.Properties;

public class KafkaTopicManager {
// 创建 AdminClient 实例
public static AdminClient createAdminClient() {
Properties props = new Properties();
// 指定 Kafka 集群地址(多个用逗号分隔)
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
return AdminClient.create(props);
}
}

创建主题(Topic)

创建主题时需指定名称、分区数和副本数,分区数决定并行处理能力,副本数决定数据可靠性(建议副本数 ≥2 用于生产环境)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import java.util.Collections;
import java.util.concurrent.ExecutionException;

/**
* 创建 Kafka 主题
* @param topicName 主题名称
* @param partitions 分区数
* @param replicationFactor 副本数(short类型,需 ≤ 集群Broker数量)
*/
public static void createTopic(String topicName, int partitions, short replicationFactor) {
AdminClient adminClient = createAdminClient();
try {
// 定义新主题
NewTopic newTopic = new NewTopic(topicName, partitions, replicationFactor);
// 发送创建请求
CreateTopicsResult result = adminClient.createTopics(Collections.singletonList(newTopic));
// 同步等待结果(阻塞直到完成)
result.all().get();
System.out.println("主题 " + topicName + " 创建成功(分区数:" + partitions + ",副本数:" + replicationFactor + ")");
} catch (InterruptedException | ExecutionException e) {
System.err.println("主题 " + topicName + " 创建失败:" + e.getMessage());
} finally {
// 关闭 AdminClient
adminClient.close();
}
}

// 示例调用
public static void main(String[] args) {
createTopic("user-tracking", 3, (short) 2); // 创建3分区、2副本的主题
}

注意

  • 副本数不能超过集群中 Broker 的数量(否则会创建失败)。
  • 若主题已存在,执行创建操作会抛出异常,可通过先查询再创建避免冲突。

查询主题列表

通过 listTopics() 可获取集群中所有主题的名称:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ListTopicsResult;
import java.util.Set;
import java.util.concurrent.ExecutionException;

/**
* 列出所有主题名称
*/
public static void listAllTopics() {
AdminClient adminClient = createAdminClient();
try {
ListTopicsResult result = adminClient.listTopics();
// 获取所有主题名称(同步阻塞)
Set<String> topicNames = result.names().get();
System.out.println("Kafka 集群主题列表:");
topicNames.forEach(topic -> System.out.println("- " + topic));
} catch (InterruptedException | ExecutionException e) {
System.err.println("查询主题列表失败:" + e.getMessage());
} finally {
adminClient.close();
}
}

// 示例调用
public static void main(String[] args) {
listAllTopics();
}

查询主题详细信息

通过 describeTopics() 可获取主题的分区分布、副本配置等详细信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.TopicDescription;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ExecutionException;

/**
* 查询主题详细信息(分区、副本、Leader等)
* @param topicName 主题名称
*/
public static void describeTopic(String topicName) {
AdminClient adminClient = createAdminClient();
try {
DescribeTopicsResult result = adminClient.describeTopics(Collections.singletonList(topicName));
// 获取主题描述(同步阻塞)
Map<String, TopicDescription> descriptions = result.all().get();
TopicDescription topicDesc = descriptions.get(topicName);

if (topicDesc == null) {
System.out.println("主题 " + topicName + " 不存在");
return;
}

System.out.println("主题 " + topicName + " 详细信息:");
System.out.println(" - 分区数:" + topicDesc.partitions().size());
System.out.println(" - 分区详情:");
topicDesc.partitions().forEach(partitionInfo -> {
System.out.println(" 分区ID:" + partitionInfo.partition());
System.out.println(" Leader Broker:" + partitionInfo.leader().id());
System.out.println(" 副本Broker列表:" +
partitionInfo.replicas().stream()
.map(replica -> String.valueOf(replica.id()))
.reduce((a, b) -> a + ", " + b)
.orElse("无"));
});
} catch (InterruptedException | ExecutionException e) {
System.err.println("查询主题 " + topicName + " 详情失败:" + e.getMessage());
} finally {
adminClient.close();
}
}

// 示例调用
public static void main(String[] args) {
describeTopic("user-tracking");
}

修改主题配置

通过 incrementalAlterConfigs() 可修改主题的动态配置(如最大消息大小、保留时间等):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.config.ConfigResource;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;

/**
* 修改主题配置(如最大消息大小)
* @param topicName 主题名称
* @param configKey 配置项键(参考 TopicConfig 类)
* @param configValue 配置项值
*/
public static void updateTopicConfig(String topicName, String configKey, String configValue) {
AdminClient adminClient = createAdminClient();
try {
// 定义配置资源(主题级别)
ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, topicName);
// 定义配置项及操作(SET 表示更新)
ConfigEntry configEntry = new ConfigEntry(configKey, configValue);
AlterConfigOp alterOp = new AlterConfigOp(configEntry, AlterConfigOp.OpType.SET);

Map<ConfigResource, Collection<AlterConfigOp>> configs = new HashMap<>();
configs.put(resource, Collections.singletonList(alterOp));

// 发送更新请求
AlterConfigsResult result = adminClient.incrementalAlterConfigs(configs);
result.all().get(); // 同步等待结果
System.out.println("主题 " + topicName + " 配置更新成功:" + configKey + " = " + configValue);
} catch (InterruptedException | ExecutionException e) {
System.err.println("主题 " + topicName + " 配置更新失败:" + e.getMessage());
} finally {
adminClient.close();
}
}

// 示例调用:修改最大消息大小为 100KB(102400 字节)
public static void main(String[] args) {
updateTopicConfig("user-tracking", TopicConfig.MAX_MESSAGE_BYTES_CONFIG, "102400");
}

常用配置项

  • TopicConfig.RETENTION_MS_CONFIG:消息保留时间(毫秒)。
  • TopicConfig.MAX_MESSAGE_BYTES_CONFIG:单条消息最大字节数。
  • TopicConfig.CLEANUP_POLICY_CONFIG:日志清理策略(deletecompact)。

增加主题分区

Kafka 支持增加主题的分区数(不支持减少分区),以提升吞吐量:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreatePartitionsResult;
import org.apache.kafka.clients.admin.NewPartitions;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;

/**
* 增加主题分区(只能增加,不能减少)
* @param topicName 主题名称
* @param totalPartitions 目标总分区数(需大于当前分区数)
*/
public static void addPartitions(String topicName, int totalPartitions) {
AdminClient adminClient = createAdminClient();
try {
// 定义目标分区数(必须大于当前分区数)
NewPartitions newPartitions = NewPartitions.increaseTo(totalPartitions);
Map<String, NewPartitions> partitionsMap = new HashMap<>();
partitionsMap.put(topicName, newPartitions);

// 发送增加分区请求
CreatePartitionsResult result = adminClient.createPartitions(partitionsMap);
result.all().get(); // 同步等待结果
System.out.println("主题 " + topicName + " 分区数已更新为:" + totalPartitions);
} catch (InterruptedException | ExecutionException e) {
System.err.println("主题 " + topicName + " 增加分区失败:" + e.getMessage());
} finally {
adminClient.close();
}
}

// 示例调用:将分区数从3增加到5
public static void main(String[] args) {
addPartitions("user-tracking", 5);
}

注意

  • 分区数增加后,原有消息的分区分布不变,新消息会按新分区策略分配。
  • 若消费者组正在消费该主题,增加分区会触发重平衡(Rebalance)。

删除主题

删除主题需确保 Kafka 集群配置 delete.topic.enable=true(默认开启):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.DeleteTopicsResult;
import java.util.Collections;
import java.util.concurrent.ExecutionException;

/**
* 删除主题
* @param topicName 主题名称
*/
public static void deleteTopic(String topicName) {
AdminClient adminClient = createAdminClient();
try {
// 发送删除请求
DeleteTopicsResult result = adminClient.deleteTopics(Collections.singletonList(topicName));
result.all().get(); // 同步等待结果
System.out.println("主题 " + topicName + " 删除成功");
} catch (InterruptedException | ExecutionException e) {
System.err.println("主题 " + topicName + " 删除失败:" + e.getMessage());
} finally {
adminClient.close();
}
}

// 示例调用
public static void main(String[] args) {
deleteTopic("user-tracking");
}

注意:删除主题会永久删除所有消息和元数据,操作需谨慎

欢迎关注我的其它发布渠道

表情 | 预览
快来做第一个评论的人吧~
Powered By Valine
v1.3.10