Kafka 核心 API 详解与实践:主题管理
Kafka 提供了四类核心 API 用于消息生产、消费、流处理和外部系统集成。其中,通过 kafka-clients 库的 AdminClient API 可实现对 Kafka 主题(Topic)的全生命周期管理,包括创建、查询、修改、删除及分区调整等操作。本文将详细介绍这些操作的实现方式,并提供完整代码示例。
Kafka 核心 API 概述
Kafka 的四类核心 API 分别面向不同场景:
| API 类型 |
作用描述 |
| Producer API |
用于生产者发送消息到 Kafka 集群,支持自定义分区策略、消息压缩等特性。 |
| Consumer API |
用于消费者从 Kafka 集群拉取消息,支持消费组管理、偏移量控制等功能。 |
| Streams API |
用于构建流处理应用,实现对实时数据流的转换、聚合等操作。 |
| Connect API |
用于连接 Kafka 与外部系统(如数据库、文件系统),实现数据导入 / 导出。 |
本文重点介绍基于 AdminClient 的主题管理操作,需引入以下依赖:
1 2 3 4 5 6 7
| <dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.3.1</version> </dependency> </dependencies>
|
主题管理实践
初始化 AdminClient
所有主题操作均需通过 AdminClient 完成,其初始化需要指定 Kafka 集群的 bootstrap 服务器地址:
1 2 3 4 5 6 7 8 9 10 11 12 13
| import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AdminClientConfig; import java.util.Properties;
public class KafkaTopicManager { public static AdminClient createAdminClient() { Properties props = new Properties(); props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); return AdminClient.create(props); } }
|
创建主题(Topic)
创建主题时需指定名称、分区数和副本数,分区数决定并行处理能力,副本数决定数据可靠性(建议副本数 ≥2 用于生产环境)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.CreateTopicsResult; import org.apache.kafka.clients.admin.NewTopic; import java.util.Collections; import java.util.concurrent.ExecutionException;
public static void createTopic(String topicName, int partitions, short replicationFactor) { AdminClient adminClient = createAdminClient(); try { NewTopic newTopic = new NewTopic(topicName, partitions, replicationFactor); CreateTopicsResult result = adminClient.createTopics(Collections.singletonList(newTopic)); result.all().get(); System.out.println("主题 " + topicName + " 创建成功(分区数:" + partitions + ",副本数:" + replicationFactor + ")"); } catch (InterruptedException | ExecutionException e) { System.err.println("主题 " + topicName + " 创建失败:" + e.getMessage()); } finally { adminClient.close(); } }
public static void main(String[] args) { createTopic("user-tracking", 3, (short) 2); }
|
注意:
- 副本数不能超过集群中 Broker 的数量(否则会创建失败)。
- 若主题已存在,执行创建操作会抛出异常,可通过先查询再创建避免冲突。
查询主题列表
通过 listTopics() 可获取集群中所有主题的名称:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.ListTopicsResult; import java.util.Set; import java.util.concurrent.ExecutionException;
public static void listAllTopics() { AdminClient adminClient = createAdminClient(); try { ListTopicsResult result = adminClient.listTopics(); Set<String> topicNames = result.names().get(); System.out.println("Kafka 集群主题列表:"); topicNames.forEach(topic -> System.out.println("- " + topic)); } catch (InterruptedException | ExecutionException e) { System.err.println("查询主题列表失败:" + e.getMessage()); } finally { adminClient.close(); } }
public static void main(String[] args) { listAllTopics(); }
|
查询主题详细信息
通过 describeTopics() 可获取主题的分区分布、副本配置等详细信息:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.DescribeTopicsResult; import org.apache.kafka.clients.admin.TopicDescription; import java.util.Collections; import java.util.Map; import java.util.concurrent.ExecutionException;
public static void describeTopic(String topicName) { AdminClient adminClient = createAdminClient(); try { DescribeTopicsResult result = adminClient.describeTopics(Collections.singletonList(topicName)); Map<String, TopicDescription> descriptions = result.all().get(); TopicDescription topicDesc = descriptions.get(topicName); if (topicDesc == null) { System.out.println("主题 " + topicName + " 不存在"); return; } System.out.println("主题 " + topicName + " 详细信息:"); System.out.println(" - 分区数:" + topicDesc.partitions().size()); System.out.println(" - 分区详情:"); topicDesc.partitions().forEach(partitionInfo -> { System.out.println(" 分区ID:" + partitionInfo.partition()); System.out.println(" Leader Broker:" + partitionInfo.leader().id()); System.out.println(" 副本Broker列表:" + partitionInfo.replicas().stream() .map(replica -> String.valueOf(replica.id())) .reduce((a, b) -> a + ", " + b) .orElse("无")); }); } catch (InterruptedException | ExecutionException e) { System.err.println("查询主题 " + topicName + " 详情失败:" + e.getMessage()); } finally { adminClient.close(); } }
public static void main(String[] args) { describeTopic("user-tracking"); }
|
修改主题配置
通过 incrementalAlterConfigs() 可修改主题的动态配置(如最大消息大小、保留时间等):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| import org.apache.kafka.clients.admin.*; import org.apache.kafka.common.config.ConfigResource; import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutionException;
public static void updateTopicConfig(String topicName, String configKey, String configValue) { AdminClient adminClient = createAdminClient(); try { ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, topicName); ConfigEntry configEntry = new ConfigEntry(configKey, configValue); AlterConfigOp alterOp = new AlterConfigOp(configEntry, AlterConfigOp.OpType.SET); Map<ConfigResource, Collection<AlterConfigOp>> configs = new HashMap<>(); configs.put(resource, Collections.singletonList(alterOp)); AlterConfigsResult result = adminClient.incrementalAlterConfigs(configs); result.all().get(); System.out.println("主题 " + topicName + " 配置更新成功:" + configKey + " = " + configValue); } catch (InterruptedException | ExecutionException e) { System.err.println("主题 " + topicName + " 配置更新失败:" + e.getMessage()); } finally { adminClient.close(); } }
public static void main(String[] args) { updateTopicConfig("user-tracking", TopicConfig.MAX_MESSAGE_BYTES_CONFIG, "102400"); }
|
常用配置项:
TopicConfig.RETENTION_MS_CONFIG:消息保留时间(毫秒)。
TopicConfig.MAX_MESSAGE_BYTES_CONFIG:单条消息最大字节数。
TopicConfig.CLEANUP_POLICY_CONFIG:日志清理策略(delete 或 compact)。
增加主题分区
Kafka 支持增加主题的分区数(不支持减少分区),以提升吞吐量:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.CreatePartitionsResult; import org.apache.kafka.clients.admin.NewPartitions; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutionException;
public static void addPartitions(String topicName, int totalPartitions) { AdminClient adminClient = createAdminClient(); try { NewPartitions newPartitions = NewPartitions.increaseTo(totalPartitions); Map<String, NewPartitions> partitionsMap = new HashMap<>(); partitionsMap.put(topicName, newPartitions); CreatePartitionsResult result = adminClient.createPartitions(partitionsMap); result.all().get(); System.out.println("主题 " + topicName + " 分区数已更新为:" + totalPartitions); } catch (InterruptedException | ExecutionException e) { System.err.println("主题 " + topicName + " 增加分区失败:" + e.getMessage()); } finally { adminClient.close(); } }
public static void main(String[] args) { addPartitions("user-tracking", 5); }
|
注意:
- 分区数增加后,原有消息的分区分布不变,新消息会按新分区策略分配。
- 若消费者组正在消费该主题,增加分区会触发重平衡(Rebalance)。
删除主题
删除主题需确保 Kafka 集群配置 delete.topic.enable=true(默认开启):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.DeleteTopicsResult; import java.util.Collections; import java.util.concurrent.ExecutionException;
public static void deleteTopic(String topicName) { AdminClient adminClient = createAdminClient(); try { DeleteTopicsResult result = adminClient.deleteTopics(Collections.singletonList(topicName)); result.all().get(); System.out.println("主题 " + topicName + " 删除成功"); } catch (InterruptedException | ExecutionException e) { System.err.println("主题 " + topicName + " 删除失败:" + e.getMessage()); } finally { adminClient.close(); } }
public static void main(String[] args) { deleteTopic("user-tracking"); }
|
注意:删除主题会永久删除所有消息和元数据,操作需谨慎
v1.3.10