0%

数据输入

MapReduce 数据输入机制:InputFormat 深度解析

在 MapReduce 框架中,数据输入阶段是处理流程的起点,负责将原始数据读取、分片并转换为 Map 任务可处理的键值对。InputFormat 作为数据输入的核心组件,决定了数据如何分片、如何读取以及如何转换为键值对。本文将从 InputSplit 原理到具体实现类,全面解析 MapReduce 的数据输入机制。

InputFormat 核心作用与架构

核心职责

InputFormat 是 MapReduce 数据输入的抽象层,主要完成三项任务:

  • 数据分片:将输入数据分割为若干 InputSplit(逻辑分片),每个分片对应一个 MapTask;
  • 读取数据:通过 RecordReader 将 InputSplit 转换为 <key, value> 键值对,供 Map 函数处理;
  • 校验输入:检查输入路径的有效性,确保数据可访问。

架构层次

InputFormat 采用抽象类 + 实现类的设计模式,层次结构如下:

1
2
3
4
5
6
7
InputFormat(抽象类)  
├─ FileInputFormat(文件输入基类)
│ ├─ TextInputFormat(默认文本输入)
│ ├─ KeyValueTextInputFormat(键值分隔输入)
│ ├─ NLineInputFormat(按行数分片输入)
│ └─ CombineTextInputFormat(小文件合并输入)
└─ 自定义 InputFormat(继承 FileInputFormat 或 InputFormat)

所有文件类输入格式均继承自 FileInputFormat,它提供了文件分片的基础实现,子类只需重写数据读取逻辑即可。

InputSplit:逻辑分片的核心

InputSplit 本质

InputSplit 是 MapReduce 对输入数据的 逻辑分片,代表 MapTask 处理的数据范围。它并非实际存储数据,而是记录了数据的位置信息长度信息

  • 长度:分片的字节大小,用于排序分片(优化 MapTask 调度);
  • 位置:存储该分片数据的 DataNode 节点列表(用于本地化调度,优先将 MapTask 分配到数据所在节点)。

InputSplit 核心方法

1
2
3
4
5
6
7
8
9
10
11
12
public abstract class InputSplit {  
// 获取分片大小(字节)
public abstract long getLength() throws IOException, InterruptedException;

// 获取分片所在的节点位置(DataNode 主机名)
public abstract String[] getLocations() throws IOException, InterruptedException;

// 获取更详细的位置信息(如机架信息),用于高级调度
public SplitLocationInfo[] getLocationInfo() throws IOException {
return null;
}
}

分片与 Block 的关系

在 HDFS 中,数据物理存储以 Block(默认 128MB)为单位,而 InputSplit 是逻辑分片,两者的关系需注意:

  • 默认情况:TextInputFormat 的分片大小与 HDFS Block 大小一致(128MB),即一个 Block 对应一个 InputSplit;
  • 灵活性:可通过配置调整分片大小,使其大于或小于 Block 大小(如合并小文件为大分片);
  • 无重叠:InputSplit 之间无数据重叠,确保每个数据只被一个 MapTask 处理。

FileInputFormat:文件输入的基类

FileInputFormat 是所有文件类 InputFormat 的父类,提供了文件分片的核心逻辑,其子类只需实现数据读取(RecordReader)即可。

分片逻辑(核心算法)

FileInputFormat 的分片大小由以下公式计算:

1
splitSize = max(minSize, min(maxSize, blockSize))  
  • 参数说明
    • blockSize:HDFS 块大小(默认 128MB,由 dfs.blocksize 配置);
    • minSize:最小分片大小(默认 1B,由 mapreduce.input.fileinputformat.split.minsize 配置);
    • maxSize:最大分片大小(默认 Long.MAX_VALUE,由 mapreduce.input.fileinputformat.split.maxsize 配置)。
  • 示例
    • maxSize=64MB,则 splitSize=64MB(小于 BlockSize),一个 Block 会被分为 2 个分片;
    • minSize=256MB,则 splitSize=256MB(大于 BlockSize),多个 Block 会被合并为一个分片。

关键配置参数

配置参数 含义 默认值 作用
mapreduce.input.fileinputformat.split.minsize 最小分片大小 1B 强制分片不小于该值,可合并小文件
mapreduce.input.fileinputformat.split.maxsize 最大分片大小 Long.MAX_VALUE 强制分片不大于该值,可拆分大文件
mapreduce.input.fileinputformat.input.dir 输入目录 指定输入数据所在的 HDFS 目录
mapreduce.input.fileinputformat.input.dir.recursive 是否递归读取子目录 false 设置为 true 可读取嵌套目录文件

常用 InputFormat 实现类详解

1. TextInputFormat(默认输入格式)

TextInputFormat 是 MapReduce 的默认输入格式,适用于文本文件(如日志、CSV 等),按行读取数据。

核心特性
  • 数据分片:按 splitSize 分片,与 HDFS Block 大小默认一致;

  • 键值对格式

    • key:LongWritable 类型,代表该行在文件中的起始字节偏移量;
    • value:Text 类型,代表该行的文本内容(不含换行符);
  • 示例

    若文件内容为:

    1
    2
    hello world  
    hadoop mapreduce

    则 Map 输入的键值对为:

    1
    2
    (0, "hello world")  
    (12, "hadoop mapreduce")
适用场景
  • 普通文本文件处理(如日志分析、词频统计);
  • 无特殊格式要求的文本数据。

2. CombineTextInputFormat(小文件合并输入)

针对小文件过多的场景(如大量 KB 级文件),CombineTextInputFormat 可将多个小文件合并为一个逻辑分片,减少 MapTask 数量(避免大量小任务导致的资源浪费)。

核心原理:虚拟存储
  1. 虚拟存储分割:将小文件按 maxSplitSize 划分为 “虚拟块”(如设置 maxSplitSize=20MB,则 5MB 小文件会生成 1 个虚拟块,30MB 小文件生成 2 个虚拟块);
  2. 合并虚拟块:将虚拟块合并为实际 InputSplit,每个 Split 大小不超过 maxSplitSize
使用配置
1
2
3
4
// 在 Driver 类中设置输入格式  
job.setInputFormatClass(CombineTextInputFormat.class);
// 设置最大分片大小(20MB)
CombineTextInputFormat.setMaxInputSplitSize(job, 20 * 1024 * 1024);
适用场景
  • 大量小文件场景(如图片、文档、日志碎片);
  • 需减少 MapTask 数量以提升效率的场景。

3. KeyValueTextInputFormat(键值分隔输入)

KeyValueTextInputFormat 适用于每行数据包含键值对的文件(如配置文件、TSV 格式),可通过分隔符自动拆分键和值。

核心特性
  • 键值对格式
    • key:Text 类型,分隔符前的内容;
    • value:Text 类型,分隔符后的内容;
  • 默认分隔符:Tab 键(\t),可通过配置修改(如空格、逗号)。
配置示例
1
2
3
4
// 在 Driver 类中设置分隔符为逗号  
conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPARATOR, ",");
// 设置输入格式
job.setInputFormatClass(KeyValueTextInputFormat.class);
示例

若文件内容为:

1
2
name,zhangsan  
age,20

则 Map 输入的键值对为:

1
2
("name", "zhangsan")  
("age", "20")
适用场景
  • 键值对格式文件(如配置文件、TSV/CSV 数据);
  • 需要按固定分隔符拆分每行数据的场景。

4. NLineInputFormat(按行数分片输入)

NLineInputFormat 按指定行数划分 InputSplit,即每个 MapTask 处理固定行数的数据(而非按大小分片)。

核心特性
  • 分片规则:若输入文件总行数为 L,指定行数为 N,则分片数 = ceil(L / N)
  • 键值对格式:与 TextInputFormat 一致(偏移量为 key,行为 value)。
使用配置
1
2
3
4
// 在 Driver 类中设置每行处理的行数 N=5  
conf.setInt("mapreduce.input.lineinputformat.linespermap", 5);
// 设置输入格式
job.setInputFormatClass(NLineInputFormat.class);
示例

若文件共 12 行,N=5,则生成 3 个分片:

  • 分片 1:处理第 1-5 行;
  • 分片 2:处理第 6-10 行;
  • 分片 3:处理第 11-12 行。
适用场景
  • 需要按行数均匀分配任务的场景(如按批次处理数据);
  • 每行数据处理时间相近的计算任务。

5. 自定义 InputFormat(处理特殊格式)

对于特殊格式文件(如二进制文件、自定义协议文件),可通过继承 FileInputFormat 并实现 RecordReader 自定义 InputFormat。

自定义步骤
  1. 继承 FileInputFormat:重写 isSplitable 方法(指定文件是否可分片);
  2. 实现 RecordReader:重写 nextKeyValue()getCurrentKey()getCurrentValue() 等方法,定义键值对转换逻辑;
  3. 在 Driver 中配置:设置自定义 InputFormat 为作业输入格式。
示例:读取二进制文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 自定义 InputFormat  
public class BinaryInputFormat extends FileInputFormat<LongWritable, BytesWritable> {
@Override
public RecordReader<LongWritable, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) {
return new BinaryRecordReader();
}

// 二进制文件通常不可分片(如图片),需禁用分片
@Override
protected boolean isSplitable(JobContext context, Path filename) {
return false;
}
}

// 自定义 RecordReader
public class BinaryRecordReader extends RecordReader<LongWritable, BytesWritable> {
// 实现 nextKeyValue() 方法读取二进制数据
// ...
}
适用场景
  • 二进制文件(如图片、视频、序列化文件);
  • 自定义格式文件(如私有协议数据)。

InputFormat 选择指南

场景 推荐 InputFormat 优势
普通文本文件(大文件) TextInputFormat 默认格式,无需额外配置
大量小文件(<100MB) CombineTextInputFormat 合并小文件,减少 MapTask 数量
键值对格式文件(如 TSV) KeyValueTextInputFormat 自动按分隔符拆分键值对
按行数均匀分配任务 NLineInputFormat 精确控制每个 MapTask 处理的行数
特殊格式文件(二进制 / 自定义) 自定义 InputFormat 灵活适配特殊数据格式

常见问题与优化

1. 小文件过多导致 MapTask 激增?

  • 问题:每个小文件对应一个 InputSplit,导致 MapTask 数量过多,资源调度开销增大;
  • 解决:使用 CombineTextInputFormat 合并小文件,设置合理的 maxSplitSize(如 128MB)。

2. 大文件分片不合理?

  • 问题:单个大文件分片过大(导致 MapTask 处理时间过长)或过小(任务过多);
  • 解决:调整 mapreduce.input.fileinputformat.split.maxsizeminSize,使分片大小与集群处理能力匹配(如每个 MapTask 处理 128-256MB 数据)。

3. 数据本地化不足?

  • 问题:MapTask 被调度到非数据所在节点,导致大量网络传输;
  • 解决
    • 确保 InputSplit 位置信息准确(依赖 FileInputFormat 的 getLocations 方法);
    • 避免分片跨节点(合理设置 blockSize 与分片大小一致)。

欢迎关注我的其它发布渠道

表情 | 预览
快来做第一个评论的人吧~
Powered By Valine
v1.3.10