0%

数据清洗

MapReduce 数据清洗(ETL):从杂乱数据到可用信息

数据清洗是大数据处理的前置关键步骤,旨在去除无效、错误或冗余数据,为后续分析提供高质量输入。在 MapReduce 中,数据清洗通常只需通过 Mapper 程序实现,无需 Reduce 阶段,大幅提升效率。本文将详细解析数据清洗的原理、实现方式及实战案例。

数据清洗的核心目标与场景

核心目标

数据清洗(ETL 中的 “Extract-Transform” 环节)的核心是 “去伪存真”,具体包括:

  • 去除无效数据:过滤格式错误、缺失关键字段或不符合业务规则的数据;
  • 标准化格式:将数据转换为统一格式(如日期、数值格式统一);
  • 去除冗余数据:删除重复记录或无用字段;
  • 数据补全:对缺失的非关键字段进行合理填充(如默认值)。

典型应用场景

  • 日志清洗:从用户行为日志中过滤爬虫数据、格式错误的日志;
  • 数据校验:验证订单数据中的金额、日期等字段合法性;
  • 格式转换:将非结构化文本转换为结构化键值对;
  • 重复数据删除:去除数据库同步过程中产生的重复记录。

MapReduce 数据清洗的实现原理

为何无需 Reduce 阶段?

数据清洗的核心是 过滤和转换,通常无需聚合操作,因此可省略 Reduce 阶段:

  • Mapper 负责清洗:读取原始数据,按规则过滤或转换后直接输出;
  • 无 Shuffle 开销:不设置 ReduceTask(job.setNumReduceTasks(0)),避免中间数据传输和排序;
  • 效率更高:数据直接从 Mapper 输出到 HDFS,减少磁盘 I/O 和 CPU 消耗。

执行流程

  • 输入:原始数据文件(如日志、CSV 等);
  • 处理:Mapper 对每条记录执行清洗逻辑(过滤、转换);
  • 输出:清洗后的有效数据,直接写入目标路径。

数据清洗实战:日志过滤案例

场景描述

假设需要清洗 Nginx 访问日志,过滤掉状态码为 404/500 的错误日志,仅保留正常访问记录(状态码 200)。

原始日志格式(简化):
1
2
3
192.168.1.1 - [2023-10-01 12:00:00] "GET /index.html" 200 1024  
192.168.1.2 - [2023-10-01 12:01:00] "GET /error.html" 404 512
192.168.1.3 - [2023-10-01 12:02:00] "POST /submit" 200 2048
清洗目标:

仅保留状态码为 200 的记录,输出格式不变。

代码实现

(1)Mapper 类:核心清洗逻辑
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import org.apache.hadoop.io.LongWritable;  
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;

public class LogCleanMapper extends Mapper<LongWritable, Text, Text, Text> {
private Text outputKey = new Text();
private Text outputValue = new Text();

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 读取一行日志
String line = value.toString();

// 跳过空行
if (line == null || line.trim().isEmpty()) {
return;
}

// 解析日志(按空格分割,状态码在第 8 个字段)
String[] fields = line.split(" ");
if (fields.length < 9) { // 过滤格式错误的日志
return;
}

// 提取状态码(第 8 个字段,索引 8)
String status = fields[8];

// 过滤状态码为 200 的记录
if ("200".equals(status)) {
// 输出原始日志(key 可设为空,或保留偏移量)
outputValue.set(line);
context.write(null, outputValue);
}
}
}
(2)Driver 类:配置作业
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import org.apache.hadoop.conf.Configuration;  
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class LogCleanDriver {
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: LogClean <input path> <output path>");
System.exit(-1);
}

Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "Log Clean");

// 设置主类和 Mapper 类
job.setJarByClass(LogCleanDriver.class);
job.setMapperClass(LogCleanMapper.class);

// 不设置 Reducer(省略 Reduce 阶段)
job.setNumReduceTasks(0);

// 设置输出键值对类型(此处 value 为清洗后的日志行)
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

// 设置输入输出路径
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

// 提交作业并等待完成
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

关键配置说明

  • job.setNumReduceTasks(0):核心配置,禁用 Reduce 阶段,避免 Shuffle 过程;
  • 输出格式:默认使用 TextOutputFormat,直接输出清洗后的文本行;
  • 输入格式:根据原始数据格式选择(如日志文件用 TextInputFormat,CSV 用 KeyValueTextInputFormat)。

高级清洗场景:多路径输出与复杂转换

多路径输出(区分有效 / 无效数据)

有时需要将有效数据和无效数据分别输出到不同路径,可通过 MultipleOutputs 实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class LogCleanMapper extends Mapper<LongWritable, Text, Text, Text> {  
private MultipleOutputs<Text, Text> multipleOutputs;

@Override
protected void setup(Context context) {
multipleOutputs = new MultipleOutputs<>(context);
}

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] fields = line.split(" ");

if (fields.length >= 9 && "200".equals(fields[8])) {
// 有效数据输出到 valid/ 目录
multipleOutputs.write("valid", null, new Text(line), "valid/part");
} else {
// 无效数据输出到 invalid/ 目录
multipleOutputs.write("invalid", null, new Text(line), "invalid/part");
}
}

@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
multipleOutputs.close(); // 释放资源
}
}

在 Driver 中配置多输出:

1
2
MultipleOutputs.addNamedOutput(job, "valid", TextOutputFormat.class, Text.class, Text.class);  
MultipleOutputs.addNamedOutput(job, "invalid", TextOutputFormat.class, Text.class, Text.class);

数据转换与补全

对缺失字段进行补全(如默认值填充),示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
// 对缺失用户 ID 的日志补全默认值  
@Override
protected void map(LongWritable key, Text value, Context context) {
String line = value.toString();
String[] fields = line.split(",");

// 假设格式为:user_id,action,time
if (fields.length < 3) return;

String userId = fields[0].isEmpty() ? "unknown" : fields[0]; // 补全用户 ID
String cleanedLine = userId + "," + fields[1] + "," + fields[2];
context.write(null, new Text(cleanedLine));
}

性能优化与最佳实践

1. 性能优化技巧

  • 减少数据读取:通过 InputFormat 过滤不需要的文件(如 RegexInputFormat 按文件名过滤);

  • 启用压缩输出:对清洗后的大文件启用压缩(如 Gzip),节省存储:

1
2
FileOutputFormat.setCompressOutput(job, true);  
FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
  • 合理设置 Mapper 内存:调大 mapreduce.map.memory.mb(如 2048MB),处理大文件时避免 OOM;

  • 避免小文件:清洗后若生成大量小文件,可后续通过 CombineFileInputFormat 合并。

2. 最佳实践

  • 先采样再清洗:对原始数据采样分析,明确清洗规则后再批量处理;
  • 日志分级输出:通过多路径输出区分不同类型的异常数据,便于问题排查;
  • 可复用清洗规则:将通用清洗逻辑封装为工具类(如日期校验、格式转换);
  • 增量清洗:对新增数据仅清洗增量部分,避免重复处理全量数据。

欢迎关注我的其它发布渠道

表情 | 预览
快来做第一个评论的人吧~
Powered By Valine
v1.3.10