0%

Kafka 生产者 API 详解与实践

Kafka 生产者(Producer)是消息的发送端,负责将业务数据发送到 Kafka 集群。通过 Kafka 提供的 Producer API,我们可以灵活配置生产者行为,支持同步 / 异步发送、自定义分区策略、消息压缩等功能。本文将详细介绍生产者的实现步骤、核心配置及不同发送方式的代码示例

生产者核心概念与配置

核心配置参数

创建生产者时,需通过 Properties 对象配置关键参数,其中必选参数有三个:

参数名 作用 示例值
bootstrap.servers 指定 Kafka 集群地址(多个用逗号分隔) localhost:9092
key.serializer 消息键(Key)的序列化类(需实现 Serializer 接口) org.apache.kafka.common.serialization.StringSerializer
value.serializer 消息值(Value)的序列化类 org.apache.kafka.common.serialization.StringSerializer

常用可选参数(参考 ProducerConfig 类):

参数名 作用 默认值
acks 消息确认级别(0:不确认;1:Leader 确认;-1/all:Leader + 所有 ISR 副本确认) 1
retries 发送失败后的重试次数 0
batch.size 批次大小(达到该值后批量发送,单位:字节) 16384(16KB)
linger.ms 批处理等待时间(若未达 batch.size,超时后也会发送) 0(立即发送)
buffer.memory 发送缓冲区大小(消息暂存此处等待发送) 33554432(32MB)
compression.type 消息压缩算法(none/gzip/snappy/lz4) none

生产者工作原理

Kafka 生产者发送消息的流程如下:

阅读全文 »

Kafka 消费者 API 详解与实践

Kafka 消费者(Consumer)是消息的接收端,负责从 Kafka 集群拉取并处理消息。与生产者相比,消费者的核心挑战在于可靠地消费消息(避免丢失或重复)、高效地分配分区负载以及灵活地管理消费位置。本文将详细解析 Kafka 消费者 API 的使用方法,包括配置、订阅方式、偏移量管理及多线程消费等关键场景。

消费者核心配置与初始化

必选配置参数

创建消费者时,需通过 Properties 配置以下核心参数:

参数名 作用 示例值
bootstrap.servers Kafka 集群地址 localhost:9092
group.id 消费者组 ID(同一组内的消费者共同消费主题,避免重复) order-consumer-group
key.deserializer 消息键(Key)的反序列化类 org.apache.kafka.common.serialization.StringDeserializer
value.deserializer 消息值(Value)的反序列化类 org.apache.kafka.common.serialization.StringDeserializer

关键可选配置

参数名 作用 默认值
enable.auto.commit 是否自动提交偏移量 true(自动提交)
auto.commit.interval.ms 自动提交偏移量的间隔(毫秒) 5000
auto.offset.reset 偏移量无效时的重置策略(earliest:从最早消息开始;latest:从最新消息开始) latest
fetch.min.bytes 拉取消息的最小字节数(不足时等待) 1
fetch.max.wait.ms 拉取消息的最大等待时间(毫秒) 500
max.poll.records 一次 poll() 调用返回的最大消息数 500

初始化消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Properties;

public class KafkaConsumerDemo {
private static KafkaConsumer<String, String> initConsumer() {
Properties props = new Properties();
// 必选配置
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

// 可选配置:关闭自动提交,使用手动提交
props.put("enable.auto.commit", "false");
// 偏移量无效时从最早消息开始消费
props.put("auto.offset.reset", "earliest");

return new KafkaConsumer<>(props);
}
}

订阅主题的四种方式

Kafka 提供了灵活的主题订阅方式,支持按主题名、正则表达式订阅,还可指定重平衡监听器。

按主题名订阅(基础方式)

通过 subscribe(Collection<String> topics) 订阅一个或多个主题:

阅读全文 »

Spring 集成 Kafka 详解:生产者、消费者与监听机制

Spring Kafka 是 Spring 生态对 Kafka 的集成封装,简化了 Kafka 生产者和消费者的配置与使用。它通过注解驱动、模板类(KafkaTemplate)和监听容器(MessageListenerContainer),实现了与 Spring 应用的无缝对接。本文将详细介绍 Spring Kafka 的核心组件、生产者 / 消费者实现及监听器生命周期管理。

Spring Kafka 核心组件

Spring Kafka 的设计遵循 Spring 一贯的 “模板 + 注解” 风格,核心组件包括:

组件 作用描述
KafkaTemplate 生产者模板类,封装了 Kafka 生产者 API,提供同步 / 异步发送消息的方法。
@KafkaListener 消费者注解,标注在方法上即可监听指定主题,自动接收并处理消息。
MessageListenerContainer 消息监听容器,管理消费者线程和生命周期,有单线程(KafkaMessageListenerContainer)和多线程(ConcurrentMessageListenerContainer)两种实现。
KafkaListenerEndpointRegistry 监听器容器的注册表,用于控制监听器的启动、暂停、恢复等生命周期操作。
ConsumerFactory/ProducerFactory 消费者 / 生产者工厂,负责创建 Kafka 消费者 / 生产者实例,封装配置参数。

环境配置

引入依赖

在 Maven 项目中添加 spring-kafka 依赖(需与 Kafka 版本兼容,如 Kafka 2.3.x 对应 Spring Kafka 2.3.x):

阅读全文 »

Redis 安装指南:从 macOS 到源码编译的完整步骤

Redis(Remote Dictionary Server)是一款高性能的开源键值对数据库,支持多种数据结构,广泛用于缓存、会话存储、消息队列等场景。本文详细介绍在 macOS 系统中通过包管理工具(Homebrew)和源码编译两种方式安装 Redis 的步骤,以及基本的服务管理操作。

macOS 下通过 Homebrew 安装(推荐)

Homebrew 是 macOS 下的包管理工具,通过它安装 Redis 简单高效,适合大多数用户。

1. 安装 Homebrew(若未安装)

打开终端,执行以下命令安装 Homebrew:

1
/bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/HEAD/install.sh)"

2. 安装 Redis

1
brew install redis
  • 该命令会自动下载并安装最新稳定版 Redis,同时配置环境变量,确保 redis-serverredis-cli 等命令可直接使用。

3. 配置 Redis(可选)

Redis 的配置文件 redis.conf 是核心配置入口,通过 Homebrew 安装的 Redis 配置文件路径可通过以下命令查看:

1
brew list redis  # 列出 Redis 安装的所有文件,其中包含 redis.conf 的路径(通常为 /usr/local/etc/redis.conf)

常用配置修改(使用文本编辑器打开 redis.conf):

  • 后台启动:默认 Redis 以前台模式运行,修改为后台启动:

阅读全文 »

Java IO 全解析:从基础流到序列化

Java 的 I/O(输入 / 输出)机制是程序与外部世界(文件、网络、控制台等)交互的核心,其设计围绕 “流(Stream)” 展开 —— 通过流的方式有序传输数据。本文将从 I/O 操作模式出发,详细解析 Java IO 体系的核心类、使用方法及最佳实践。

I/O 操作模式:五种基本类型

I/O 操作的本质是 “数据从设备到内核缓冲区,再从缓冲区到用户进程” 的过程。根据等待数据的方式不同,分为五种模式:

模式 核心特点 适用场景
阻塞 I/O 进程发起请求后阻塞,直到数据复制完成才唤醒。 简单场景(如单线程读取小文件)
非阻塞 I/O 进程不阻塞,定期轮询缓冲区状态,数据就绪后再复制(复制时可能阻塞)。 需快速响应的场景
I/O 复用 单进程监控多个 I/O 通道,数据就绪后通知进程处理(如 select/epoll)。 高并发网络编程(如 NIO)
信号驱动 I/O 进程不阻塞,数据就绪后内核通过信号通知,再处理复制。 实时性要求高的场景
异步 I/O 进程发起请求后完全不阻塞,内核自动完成全流程,完成后通知进程。 高性能 I/O 场景(如磁盘操作)

注:Java 传统 IO(java.io)主要基于阻塞 I/O,而 NIO(java.nio)引入了 I/O 复用机制。

Java IO 核心体系:流的分类

Java IO 包(java.io)的类按功能可分为四大类,核心是 “字节流” 和 “字符流”:

类型 核心接口 / 类 处理数据类型 典型用途
字节流 InputStream(输入)、OutputStream(输出) 二进制数据(字节) 图片、视频、压缩文件等
字符流 Reader(输入)、Writer(输出) 文本数据(字符) 文本文件、配置文件等
磁盘操作 File 文件 / 目录元数据 创建 / 删除文件、获取路径
网络操作(java.net SocketServerSocket 网络字节流 客户端 / 服务器通信

字节流:处理二进制数据

字节流以字节(8 位)为单位传输数据,是所有 I/O 操作的基础。核心基类为抽象类 InputStream(输入)和 OutputStream(输出)。

字节输入流(InputStream

InputStream

所有字节输入流均继承自 InputStream,用于从数据源读取字节。

核心子类及功能
数据源 核心功能 构造器参数示例
ByteArrayInputStream 内存字节数组 从内存缓冲区读取数据,无需磁盘 I/O。 new byte[] {1,2,3}
FileInputStream 本地文件 从文件读取字节,是文件输入的基础类。 "/data/test.bin"new File(...)
PipedInputStream 管道输出流 PipedOutputStream 配合,实现线程间通信。 new PipedOutputStream()
SequenceInputStream 多个输入流 合并多个流为一个,按顺序读取。 new InputStream[] {in1, in2}
装饰器子类(FilterInputStream

通过 “装饰器模式” 为基础流添加功能(如缓冲、数据类型转换):

阅读全文 »