0%

操作Parquet

hadoop操作Parquet 文件详解

在 Hadoop 生态中,Parquet 作为一种列式存储格式,凭借其高效的压缩率和查询性能,被广泛应用于大数据处理场景。当数据存储格式从 Text 转为 Parquet 后,如何正确读取 Parquet 文件成为关键问题。本文将详细介绍 Hadoop 读取 Parquet 文件的方法及相关注意事项。

依赖配置

读取 Parquet 文件前,需在项目中添加必要的 Parquet 依赖。以下是 Maven 配置示例:

1
2
3
4
5
6
7
8
9
10
11
<!--添加Parquet依赖-->
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-column</artifactId>
<version>1.8.1</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop</artifactId>
<version>1.8.1</version>
</dependency>

注意:需确保 Parquet 依赖版本与 Hadoop 版本兼容,避免出现版本冲突问题(如 Guava 库版本不一致等)。

核心读取代码实现

Parquet 文件的读取主要通过自定义 Mapper 来实现,核心在于正确配置输入格式和解析 Parquet 数据。

Mapper 类实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
public class H2EMapper extends Mapper<Void, Group, NullWritable, Text> {

private static final Logger LOGGER = LoggerFactory.getLogger(H2EMapper.class);
Gson gson = new Gson();
Text v = new Text();

@Override
public void run(Context context) throws IOException, InterruptedException {
setup(context);

// 创建Parquet输入格式对象,指定GroupReadSupport作为读取支持类
ParquetInputFormat<Group> parquetInputFormat = new ParquetInputFormat<>(GroupReadSupport.class);
try {
while (context.nextKeyValue()) {
InputSplit inputSplit = context.getInputSplit();

// 判断输入分片是否为文件分片
if(inputSplit instanceof FileSplit){
// 获取文件名
String name = ((FileSplit) inputSplit).getPath().getName();

// 创建任务尝试上下文
TaskAttemptContext hadoopAttemptContext = new TaskAttemptContextImpl(
context.getConfiguration(),
context.getTaskAttemptID()
);

// 创建记录读取器并初始化
try(RecordReader<Void,Group> recordReader = parquetInputFormat.createRecordReader(
inputSplit,
hadoopAttemptContext
)){
recordReader.initialize(inputSplit,hadoopAttemptContext);

// 读取每条记录并处理
while (recordReader.nextKeyValue()){
map(recordReader.getCurrentKey(), recordReader.getCurrentValue(), context);
}
}
}
}
} finally {
cleanup(context);
}
}

@Override
protected void map(Void key, Group value, Context context) throws IOException, InterruptedException {
// 将Parquet Group转换为JSON字符串
v.set(groupToJson(value));
context.write(NullWritable.get(), v);
}

// 解析Parquet Group对象为JSON字符串
private String groupToJson(Group value){
Map<String,Object> object = new HashMap<>();

// 按照Parquet文件中的字段名进行解析
// 注意:需根据实际Parquet schema调整字段名和数据类型
object.put("name", value.getString("name", 0));
object.put("age", value.getInteger("age", 0));

return gson.toJson(object);
}
}

关键组件解析

  1. ParquetInputFormat:Parquet 文件的输入格式类,负责将 Parquet 文件解析为键值对形式
  2. GroupReadSupport:用于支持将 Parquet 文件读取为 Group 对象
  3. RecordReader:具体负责读取 Parquet 文件内容的组件
  4. Group 对象:Parquet 文件中的一条记录,包含多个字段,需根据实际 schema 解析

注意事项

  1. Schema 匹配:解析 Parquet 文件时,groupToJson方法中的字段名和数据类型必须与 Parquet 文件的实际 schema 完全匹配,否则会出现读取错误
  2. 版本兼容性
    • 确保 Parquet 依赖版本与 Hadoop 版本兼容
    • 不同版本的 Parquet 可能有 API 差异,需根据实际版本调整代码
  3. 性能优化
    • 可通过配置 Parquet 的压缩方式提升读取性能
    • 对于大文件,可合理设置分片大小提高并行处理效率
  4. 异常处理:实际生产环境中,建议增加异常处理逻辑,特别是针对字段不存在或类型不匹配的情况

扩展知识

Parquet 文件还可以通过以下方式读取:

  • 使用 Spark SQL 或 Hive SQL 直接查询 Parquet 文件
  • 使用 Parquet 的 Schema 类进行动态 schema 解析,无需硬编码字段名
  • 结合 Avro 或 Protocol Buffers 定义 schema,实现更灵活的数据读写

欢迎关注我的其它发布渠道

表情 | 预览
快来做第一个评论的人吧~
Powered By Valine
v1.3.10