spark数据读取与保存全解析
Spark 支持多种数据格式的读写操作,包括文本文件、JSON、CSV、SequenceFiles 等。不同格式有其特定的读写方式和优化策略,本文将详细介绍各格式的操作要点及最佳实践。
文本文件(TextFile)
文本文件是最基础的数据格式,Spark 通过 textFile 和 saveAsTextFile 方法进行读写。
核心 API
- 读取:
sc.textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String] - 写入:
rdd.saveAsTextFile(path: String)
示例代码
1 | // 读取本地文本文件(每行作为一个元素) |
注意事项
- 分区控制:
minPartitions参数可指定最小分区数,但实际分区数可能受文件块大小影响(如 HDFS 默认 128MB / 块); - 写入格式:输出为目录,每个分区生成一个
part-*文件,需使用文件系统命令合并(如hdfs dfs -getmerge); - 编码问题:默认使用 UTF-8 编码,可通过
spark.io.encoding配置修改。
JSON 格式
JSON 是半结构化数据的主流格式,Spark 支持通过自定义解析或 DataFrame API 处理。
手动解析 JSON(RDD 方式)
使用第三方库(如 Jackson、Gson)手动解析 JSON 字符串。
示例代码
1 | import com.fasterxml.jackson.databind.ObjectMapper |
使用 DataFrame API(推荐)
Spark SQL 的 DataFrame 提供更高效的 JSON 处理能力。
示例代码
1 | import org.apache.spark.sql.SparkSession |
注意事项
- Schema 推断:DataFrame 会自动推断 JSON 结构,但嵌套复杂的 JSON 可能需要手动指定 Schema;
- 性能对比:DataFrame API 比手动解析更高效,因为底层使用 Catalyst 优化器;
- 多行 JSON:若 JSON 对象跨越多行,需设置
multiLine=true(spark.read.option("multiLine", true).json(path))。
CSV 格式
CSV(逗号分隔值)是表格数据的常用格式,Spark 支持通过 RDD 或 DataFrame 读写。
手动处理 CSV(RDD 方式)
使用 OpenCSV 等库手动解析 CSV 行。
示例代码
1 | import com.opencsv.CSVReader |
使用 DataFrame API(推荐)
DataFrame 提供更强大的 CSV 处理能力,支持表头、分隔符、引号等配置。
示例代码
1 | import org.apache.spark.sql.SparkSession |
注意事项
- 复杂分隔符:通过
option("delimiter", "|")指定非逗号分隔符; - 引号处理:通过
option("quote", "\"")处理带引号的字段; - 空值处理:通过
option("nullValue", "nan")自定义空值表示。
SequenceFiles 格式
SequenceFiles 是 Hadoop 专用的键值对二进制格式,适合高效存储二进制数据。
核心 API
- 读取:
sc.sequenceFile[K, V](path: String): RDD[(K, V)] - 写入:
rdd.saveAsSequenceFile(path: String)
示例代码
1 | import org.apache.hadoop.io.{IntWritable, Text} |
注意事项
- 类型匹配:读取时需明确指定 Key 和 Value 的 Hadoop 类型(如
Text、IntWritable); - 序列化:自定义类型需实现 Hadoop 的
Writable接口; - 压缩:支持多种压缩格式(如
BZip2Codec、SnappyCodec),通过spark.hadoop.io.seqfile.compression.type配置。
对象序列化
Spark 支持将自定义对象序列化为二进制格式存储,需确保对象实现 Serializable 接口。
示例代码
1 | // 定义可序列化的类 |
注意事项
- 序列化方式:默认使用 Java 序列化,性能较低;推荐使用 Kryo 序列化(通过
spark.serializer配置); - 兼容性:反序列化时需确保类定义一致,否则可能抛出
ClassNotFoundException; - 适用场景:适合存储中间结果或复杂对象,不适合跨语言数据交换。
不同格式性能对比与选择建议
| 格式 | 读写性能 | 空间效率 | 结构化支持 | 跨语言兼容性 | 适用场景 |
|---|---|---|---|---|---|
| 文本文件 | 高 | 低 | 无 | 高 | 日志、简单文本数据 |
| JSON | 中 | 中 | 部分 | 高 | 半结构化数据、API 响应 |
| CSV | 高 | 中 | 表格 | 高 | 表格数据、批量导入导出 |
| SequenceFiles | 高 | 高 | 键值对 | 低 | Hadoop 生态内部数据交换 |
| 对象序列化 | 高 | 高 | 自定义 | 低 | Spark 内部中间结果缓存 |
选择建议
- 结构化数据:优先使用 DataFrame API 处理 CSV/JSON,利用 Catalyst 优化器;
- 性能敏感场景:使用 SequenceFiles 或 Parquet(列式存储,后续文章会详细介绍);
- 跨语言需求:选择文本、JSON 或 Avro 格式;
- 临时数据:使用对象序列化(需配置 Kryo 提升性能)。
高级优化技巧
压缩配置
通过配置压缩算法减少存储开销:
1 | // 全局启用压缩(影响所有输出格式) |
分区写入
按字段值分区存储,加速后续查询:
1 | // 按日期字段分区写入 |
并行度控制
通过 repartition 或 coalesce 调整输出分区数:
1 | // 调整为 20 个分区写入(避免生成过多小文件) |