0%

Kafka 消息丢失与重复消费问题全解析:原因与解决方案

Kafka 作为分布式消息系统,消息的可靠性(不丢失)和一致性(不重复)是核心需求。但在实际使用中,由于配置不当、网络异常或故障处理等原因,可能出现消息丢失或重复消费的问题。本文将从消息丢失重复消费两个维度,详细分析原因及解决方案。

消息丢失问题

消息丢失可能发生在生产者发送Kafka 集群存储消费者消费三个环节,需针对性解决。

生产者端消息丢失

原因分析
  • acks 配置不当:
    • acks=0:生产者不等待 Kafka 确认,直接发送下一条消息。若 Broker 崩溃或网络中断,消息可能未写入磁盘而丢失。
    • acks=1:仅 Leader 写入成功后确认。若 Leader 写入后未同步到 Follower 就崩溃,新 Leader(从 Follower 选举)会丢失该消息。
  • 异步发送缓冲区溢出
    异步发送时,消息先存入缓冲区(buffer.memory),若缓冲区满且 block.on.buffer.full=false(默认 true 已废弃,由 max.block.ms 替代),生产者会丢弃消息。
  • 未启用重试机制
    网络波动等临时故障导致发送失败时,若 retries=0,生产者不会重试,消息丢失。
解决方案
  • 合理设置 acks
    关键业务场景设置 acks=-1(或 acks=all),确保 Leader 和所有 ISR 中的 Follower 均写入成功后才确认,从源头避免丢失。
  • 优化异步发送配置:
    • 设置 max.block.ms=60000(默认 60 秒),缓冲区满时阻塞等待,而非丢弃。
    • 调整 buffer.memory(默认 32MB)和 batch.size(默认 16KB),避免缓冲区频繁满溢。
  • 启用重试并合理配置:
    • 设置 retries=3(默认重试次数极高,可根据业务调整),配合 retry.backoff.ms=100(重试间隔),应对临时故障。

Kafka 集群(消息队列)消息丢失

原因分析
阅读全文 »

hive压缩配置详解:提升存储与计算效率

Hive 作为基于 Hadoop 的数据仓库,其数据存储和计算依赖 HDFS 和 MapReduce/Spark 引擎。压缩技术是优化 Hive 性能的关键手段,可显著减少磁盘存储占用和网络数据传输量。本文详细讲解 Hive 压缩的配置方式、适用场景及最佳实践,帮助开发者合理启用压缩提升效率。

Hive 压缩的核心作用

Hive 处理的多为海量数据,未压缩的数据存在两大问题:

  1. 存储成本高:TB 级数据占用大量 HDFS 存储空间;
  2. 计算效率低:MapReduce/Spark 任务中,大量未压缩数据的传输和读写会消耗更多网络带宽和 I/O 资源。

压缩的核心价值

  • 减少存储占用(压缩比通常为 3:1~5:1);
  • 降低网络传输量,加速 Map 与 Reduce 阶段的数据交换;
  • 减少磁盘 I/O 次数,提升任务执行速度。

Hive 压缩的两个关键阶段

Hive 的压缩配置需区分 Map 输出阶段Reduce 输出阶段,两者适用场景和配置参数不同。

1. Map 输出阶段压缩(中间数据压缩)

Map 阶段的输出数据(Map 结果)会作为 Reduce 阶段的输入,若数据量大,传输耗时占比极高。启用 Map 输出压缩可减少传输数据量,加速 Reduce 阶段。

配置参数及说明
参数 作用 默认值 推荐配置
hive.exec.compress.intermediate 启用 Hive 中间数据压缩 false true(开启)
mapreduce.map.output.compress 启用 MapReduce Map 输出压缩 true 保持 true
mapreduce.map.output.compress.codec Map 输出压缩算法 org.apache.hadoop.io.compress.DefaultCodec 推荐 org.apache.hadoop.io.compress.SnappyCodec(平衡速度与压缩比)
配置步骤

在 Hive 客户端或 hive-site.xml 中设置:

阅读全文 »

RabbitMQ 核心概念与工作原理详解

RabbitMQ 是基于 AMQP(Advanced Message Queuing Protocol,高级消息队列协议)的开源消息中间件,以高可靠性、灵活的路由机制和丰富的功能著称。它通过消息队列实现组件间的解耦,支持复杂的消息路由场景。本文将深入解析 RabbitMQ 的核心概念、交换机类型及工作原理,帮助理解其在分布式系统中的应用。

RabbitMQ 核心组件

RabbitMQ 的架构围绕 “消息路由” 设计,核心组件包括以下部分:

组件 作用描述
Broker 消息队列服务器实体(即 RabbitMQ 服务实例),负责接收、存储和转发消息。
Exchange 消息交换机,接收生产者发送的消息,根据路由规则将消息转发到绑定的队列。本身不存储消息,未绑定队列的消息会被丢弃。
Queue 消息队列,实际存储消息的容器,消息最终被投递到队列中等待消费者获取。队列是持久化的(默认情况下),即使 Broker 重启,消息也不会丢失(需配置持久化)。
Binding 绑定关系,用于将 Exchange 与 Queue 关联,并指定路由规则(如 Routing Key 或匹配条件)。
Routing Key 路由关键字,生产者发送消息时指定,Exchange 根据该关键字和绑定规则决定消息流向。
VHost 虚拟主机,用于隔离不同的消息环境(如开发、测试、生产),每个 VHost 拥有独立的交换机、队列和权限控制。默认 VHost 为 /
Producer 消息生产者,向 Exchange 发送消息的应用程序。
Consumer 消息消费者,从 Queue 中获取并处理消息的应用程序。
Channel 消息通道,在客户端与 Broker 的连接中创建的轻量级会话,用于发送 / 接收消息。复用 TCP 连接,减少连接开销,支持并发操作。

核心工作流程

RabbitMQ 的消息传递流程可概括为:

阅读全文 »

网络模型:从 OSI 到 TCP/IP 的分层体系

网络模型是计算机网络通信的 “设计蓝图”,通过将复杂的通信过程拆分为分层的功能模块,降低了协议设计的复杂度,实现了不同设备和系统的互联互通。目前主流的网络模型包括OSI 七层模型TCP/IP 四层模型五层协议模型,它们虽结构不同,但核心思想一致:分层负责、接口标准化

OSI/RM 七层模型(理论基石)

OSI(Open Systems Interconnection,开放系统互连)模型由 ISO(国际标准化组织)于 1984 年提出,是网络分层理论的经典框架。它将网络通信分为 7 层,从下到上依次为物理层、数据链路层、网络层、传输层、会话层、表示层、应用层。尽管 OSI 模型因过于复杂未被广泛应用,但其分层思想为后续模型奠定了基础。

各层功能与核心特征

层级 名称 核心功能 典型协议 / 设备 数据单位
7 应用层 为用户应用程序提供网络服务(如文件传输、邮件收发) HTTP、FTP、SMTP、Telnet 数据
6 表示层 处理数据格式转换(如加密 / 解密、压缩 / 解压、编码转换) JPEG、ASCII、SSL/TLS(部分) 数据
5 会话层 建立、管理和终止进程间的会话连接(如断点续传的会话控制) RPC、NetBIOS 数据
4 传输层 为端到端进程提供可靠或高效的数据传输(如分段、流量控制、重传) TCP、UDP 段(TCP)/ 用户数据报(UDP)
3 网络层 负责跨网络的数据包路由和转发(如 IP 寻址、路径选择) IP、ICMP、ARP、路由协议 分组 / 数据包
2 数据链路层 处理同一链路内的帧传输(如 MAC 寻址、差错校验、冲突处理) Ethernet、PPP、MAC 协议
1 物理层 传输原始比特流,定义物理介质的电气特性(如电压、接口、传输速率) 双绞线、光纤、集线器(Hub) 比特(bit)

OSI 模型的设计原则

  • 功能分离:每一层专注于单一功能(如物理层仅负责比特传输,网络层仅负责路由)。
  • 接口标准化:层与层之间通过明确的接口交互,上层无需关心下层的实现细节。
  • 分层适度:层数足够多以避免功能混杂,同时避免过多导致复杂度上升。
阅读全文 »

MyBatis 注解开发全指南:从基础到高级用法

MyBatis 注解方式提供了一种无需 XML 配置即可编写映射语句的方案,适用于 SQL 逻辑简单、追求代码集中管理的场景。系统梳理注解开发的完整用法,包括基础 CRUD、高级映射、动态 SQL 及最佳实践,帮助你灵活选择 XML 或注解方式。

基础注解:CRUD 操作

MyBatis 提供了 @Insert@Update@Delete@Select 四个核心注解,分别对应 XML 中的 <insert><update><delete><select> 标签,使用简单直接。

1. @Insert:插入数据

1
2
3
4
5
6
7
8
9
10
public interface UserMapper {
// 基础插入
@Insert("INSERT INTO user(username, age, email) VALUES(#{username}, #{age}, #{email})")
int insertUser(User user);

// 插入并返回自增主键(等价于 XML 中的 useGeneratedKeys)
@Insert("INSERT INTO user(username, age) VALUES(#{username}, #{age})")
@Options(useGeneratedKeys = true, keyProperty = "id", keyColumn = "id")
int insertUserWithId(User user);
}
  • @Options 用于配置插入策略,useGeneratedKeys=true 启用自增主键,keyProperty 绑定实体类属性,keyColumn 绑定数据库列(可选,默认与属性名一致)。

2. @Update@Delete:更新与删除

1
2
3
4
5
6
7
8
9
public interface UserMapper {
// 更新用户
@Update("UPDATE user SET username=#{username}, age=#{age} WHERE id=#{id}")
int updateUser(User user);

// 根据 ID 删除
@Delete("DELETE FROM user WHERE id=#{id}")
int deleteUserById(Integer id);
}

3. @Select:查询数据

阅读全文 »