MapReduce 数据清洗(ETL):从杂乱数据到可用信息
数据清洗是大数据处理的前置关键步骤,旨在去除无效、错误或冗余数据,为后续分析提供高质量输入。在 MapReduce 中,数据清洗通常只需通过 Mapper 程序实现,无需 Reduce 阶段,大幅提升效率。本文将详细解析数据清洗的原理、实现方式及实战案例。
数据清洗的核心目标与场景
核心目标
数据清洗(ETL 中的 “Extract-Transform” 环节)的核心是 “去伪存真”,具体包括:
- 去除无效数据:过滤格式错误、缺失关键字段或不符合业务规则的数据;
- 标准化格式:将数据转换为统一格式(如日期、数值格式统一);
- 去除冗余数据:删除重复记录或无用字段;
- 数据补全:对缺失的非关键字段进行合理填充(如默认值)。
典型应用场景
- 日志清洗:从用户行为日志中过滤爬虫数据、格式错误的日志;
- 数据校验:验证订单数据中的金额、日期等字段合法性;
- 格式转换:将非结构化文本转换为结构化键值对;
- 重复数据删除:去除数据库同步过程中产生的重复记录。
MapReduce 数据清洗的实现原理
为何无需 Reduce 阶段?
数据清洗的核心是 过滤和转换,通常无需聚合操作,因此可省略 Reduce 阶段:
- Mapper 负责清洗:读取原始数据,按规则过滤或转换后直接输出;
- 无 Shuffle 开销:不设置 ReduceTask(
job.setNumReduceTasks(0)),避免中间数据传输和排序;
- 效率更高:数据直接从 Mapper 输出到 HDFS,减少磁盘 I/O 和 CPU 消耗。
执行流程
- 输入:原始数据文件(如日志、CSV 等);
- 处理:Mapper 对每条记录执行清洗逻辑(过滤、转换);
- 输出:清洗后的有效数据,直接写入目标路径。
数据清洗实战:日志过滤案例
场景描述
假设需要清洗 Nginx 访问日志,过滤掉状态码为 404/500 的错误日志,仅保留正常访问记录(状态码 200)。
原始日志格式(简化):
1 2 3
| 192.168.1.1 - [2023-10-01 12:00:00] "GET /index.html" 200 1024 192.168.1.2 - [2023-10-01 12:01:00] "GET /error.html" 404 512 192.168.1.3 - [2023-10-01 12:02:00] "POST /submit" 200 2048
|
清洗目标:
仅保留状态码为 200 的记录,输出格式不变。
代码实现
(1)Mapper 类:核心清洗逻辑
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException;
public class LogCleanMapper extends Mapper<LongWritable, Text, Text, Text> { private Text outputKey = new Text(); private Text outputValue = new Text();
@Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString();
if (line == null || line.trim().isEmpty()) { return; }
String[] fields = line.split(" "); if (fields.length < 9) { return; }
String status = fields[8];
if ("200".equals(status)) { outputValue.set(line); context.write(null, outputValue); } } }
|
(2)Driver 类:配置作业
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class LogCleanDriver { public static void main(String[] args) throws Exception { if (args.length != 2) { System.err.println("Usage: LogClean <input path> <output path>"); System.exit(-1); }
Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "Log Clean");
job.setJarByClass(LogCleanDriver.class); job.setMapperClass(LogCleanMapper.class);
job.setNumReduceTasks(0);
job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1); } }
|
关键配置说明
job.setNumReduceTasks(0):核心配置,禁用 Reduce 阶段,避免 Shuffle 过程;
- 输出格式:默认使用
TextOutputFormat,直接输出清洗后的文本行;
- 输入格式:根据原始数据格式选择(如日志文件用
TextInputFormat,CSV 用 KeyValueTextInputFormat)。
高级清洗场景:多路径输出与复杂转换
多路径输出(区分有效 / 无效数据)
有时需要将有效数据和无效数据分别输出到不同路径,可通过 MultipleOutputs 实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| public class LogCleanMapper extends Mapper<LongWritable, Text, Text, Text> { private MultipleOutputs<Text, Text> multipleOutputs;
@Override protected void setup(Context context) { multipleOutputs = new MultipleOutputs<>(context); }
@Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fields = line.split(" ");
if (fields.length >= 9 && "200".equals(fields[8])) { multipleOutputs.write("valid", null, new Text(line), "valid/part"); } else { multipleOutputs.write("invalid", null, new Text(line), "invalid/part"); } }
@Override protected void cleanup(Context context) throws IOException, InterruptedException { multipleOutputs.close(); } }
|
在 Driver 中配置多输出:
1 2
| MultipleOutputs.addNamedOutput(job, "valid", TextOutputFormat.class, Text.class, Text.class); MultipleOutputs.addNamedOutput(job, "invalid", TextOutputFormat.class, Text.class, Text.class);
|
数据转换与补全
对缺失字段进行补全(如默认值填充),示例:
1 2 3 4 5 6 7 8 9 10 11 12 13
| @Override protected void map(LongWritable key, Text value, Context context) { String line = value.toString(); String[] fields = line.split(",");
if (fields.length < 3) return;
String userId = fields[0].isEmpty() ? "unknown" : fields[0]; String cleanedLine = userId + "," + fields[1] + "," + fields[2]; context.write(null, new Text(cleanedLine)); }
|
性能优化与最佳实践
1. 性能优化技巧
1 2
| FileOutputFormat.setCompressOutput(job, true); FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
|
2. 最佳实践
- 先采样再清洗:对原始数据采样分析,明确清洗规则后再批量处理;
- 日志分级输出:通过多路径输出区分不同类型的异常数据,便于问题排查;
- 可复用清洗规则:将通用清洗逻辑封装为工具类(如日期校验、格式转换);
- 增量清洗:对新增数据仅清洗增量部分,避免重复处理全量数据。
v1.3.10