0%

数据读取与保存

spark数据读取与保存全解析

Spark 支持多种数据格式的读写操作,包括文本文件、JSON、CSV、SequenceFiles 等。不同格式有其特定的读写方式和优化策略,本文将详细介绍各格式的操作要点及最佳实践。

文本文件(TextFile)

文本文件是最基础的数据格式,Spark 通过 textFilesaveAsTextFile 方法进行读写。

核心 API

  • 读取sc.textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String]
  • 写入rdd.saveAsTextFile(path: String)

示例代码

1
2
3
4
5
6
7
8
// 读取本地文本文件(每行作为一个元素)  
val textRDD = sc.textFile("file:///path/to/local/file.txt")

// 读取 HDFS 文本文件
val hdfsRDD = sc.textFile("hdfs://namenode:port/path/to/hdfs/file.txt")

// 写入文本文件(输出为目录,包含多个 part-* 文件)
textRDD.saveAsTextFile("hdfs:///output/text_result")

注意事项

  1. 分区控制minPartitions 参数可指定最小分区数,但实际分区数可能受文件块大小影响(如 HDFS 默认 128MB / 块);
  2. 写入格式:输出为目录,每个分区生成一个 part-* 文件,需使用文件系统命令合并(如 hdfs dfs -getmerge);
  3. 编码问题:默认使用 UTF-8 编码,可通过 spark.io.encoding 配置修改。

JSON 格式

JSON 是半结构化数据的主流格式,Spark 支持通过自定义解析或 DataFrame API 处理。

手动解析 JSON(RDD 方式)

使用第三方库(如 Jackson、Gson)手动解析 JSON 字符串。

示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import com.fasterxml.jackson.databind.ObjectMapper  

// 定义 JSON 数据结构
case class Person(name: String, age: Int)

val jsonRDD = sc.textFile("file:///path/to/json/file.json")

// 使用 Jackson 解析 JSON
val mapper = new ObjectMapper()
val personRDD = jsonRDD.map(line => {
try {
mapper.readValue(line, classOf[Person])
} catch {
case e: Exception => null // 处理解析失败的情况
}
}).filter(_ != null) // 过滤解析失败的记录

// 将对象序列化为 JSON 并保存
personRDD.map(person => mapper.writeValueAsString(person))
.saveAsTextFile("hdfs:///output/json_result")

使用 DataFrame API(推荐)

Spark SQL 的 DataFrame 提供更高效的 JSON 处理能力。

示例代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import org.apache.spark.sql.SparkSession  

val spark = SparkSession.builder()
.appName("JSON Processing")
.getOrCreate()

// 读取 JSON 文件为 DataFrame
val df = spark.read.json("file:///path/to/json/file.json")

// 执行 SQL 查询
df.createOrReplaceTempView("people")
val resultDF = spark.sql("SELECT name, age FROM people WHERE age > 18")

// 保存为 JSON
resultDF.write.json("hdfs:///output/json_result")

注意事项

  • Schema 推断:DataFrame 会自动推断 JSON 结构,但嵌套复杂的 JSON 可能需要手动指定 Schema;
  • 性能对比:DataFrame API 比手动解析更高效,因为底层使用 Catalyst 优化器;
  • 多行 JSON:若 JSON 对象跨越多行,需设置 multiLine=truespark.read.option("multiLine", true).json(path))。

CSV 格式

CSV(逗号分隔值)是表格数据的常用格式,Spark 支持通过 RDD 或 DataFrame 读写。

手动处理 CSV(RDD 方式)

使用 OpenCSV 等库手动解析 CSV 行。

示例代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import com.opencsv.CSVReader  

val csvRDD = sc.textFile("file:///path/to/csv/file.csv")

// 解析 CSV 行(跳过表头)
val header = csvRDD.first()
val dataRDD = csvRDD.filter(_ != header)
.map(line => {
val reader = new CSVReader(new java.io.StringReader(line))
reader.readNext() // 返回 String 数组
})

// 写入 CSV
dataRDD.map(arr => arr.mkString(","))
.saveAsTextFile("hdfs:///output/csv_result")

使用 DataFrame API(推荐)

DataFrame 提供更强大的 CSV 处理能力,支持表头、分隔符、引号等配置。

示例代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import org.apache.spark.sql.SparkSession  

val spark = SparkSession.builder()
.appName("CSV Processing")
.getOrCreate()

// 读取 CSV 文件(自动推断 Schema)
val df = spark.read
.option("header", "true") // 第一行为表头
.option("inferSchema", "true") // 自动推断数据类型
.csv("file:///path/to/csv/file.csv")

// 写入 CSV
df.write
.option("header", "true")
.csv("hdfs:///output/csv_result")

注意事项

  • 复杂分隔符:通过 option("delimiter", "|") 指定非逗号分隔符;
  • 引号处理:通过 option("quote", "\"") 处理带引号的字段;
  • 空值处理:通过 option("nullValue", "nan") 自定义空值表示。

SequenceFiles 格式

SequenceFiles 是 Hadoop 专用的键值对二进制格式,适合高效存储二进制数据。

核心 API

  • 读取sc.sequenceFile[K, V](path: String): RDD[(K, V)]
  • 写入rdd.saveAsSequenceFile(path: String)

示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import org.apache.hadoop.io.{IntWritable, Text}  

// 读取 SequenceFile(指定 Key 和 Value 类型)
val seqRDD = sc.sequenceFile[Text, IntWritable]("hdfs:///path/to/seqfile")

// 转换为普通 RDD
val stringIntRDD = seqRDD.map { case (k, v) =>
(k.toString, v.get())
}

// 写入 SequenceFile(需转换为 Hadoop 类型)
stringIntRDD.map { case (k, v) =>
(new Text(k), new IntWritable(v))
}.saveAsSequenceFile("hdfs:///output/seq_result")

注意事项

  • 类型匹配:读取时需明确指定 Key 和 Value 的 Hadoop 类型(如 TextIntWritable);
  • 序列化:自定义类型需实现 Hadoop 的 Writable 接口;
  • 压缩:支持多种压缩格式(如 BZip2CodecSnappyCodec),通过 spark.hadoop.io.seqfile.compression.type 配置。

对象序列化

Spark 支持将自定义对象序列化为二进制格式存储,需确保对象实现 Serializable 接口。

示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 定义可序列化的类  
case class User(id: Long, name: String) extends Serializable

// 创建对象 RDD
val users = sc.parallelize(Seq(
User(1, "Alice"),
User(2, "Bob")
))

// 保存为对象文件(使用 Java 序列化)
users.saveAsObjectFile("hdfs:///output/objects")

// 读取对象文件
val loadedUsers = sc.objectFile<User>("hdfs:///output/objects")

注意事项

  • 序列化方式:默认使用 Java 序列化,性能较低;推荐使用 Kryo 序列化(通过 spark.serializer 配置);
  • 兼容性:反序列化时需确保类定义一致,否则可能抛出 ClassNotFoundException
  • 适用场景:适合存储中间结果或复杂对象,不适合跨语言数据交换。

不同格式性能对比与选择建议

格式 读写性能 空间效率 结构化支持 跨语言兼容性 适用场景
文本文件 日志、简单文本数据
JSON 部分 半结构化数据、API 响应
CSV 表格 表格数据、批量导入导出
SequenceFiles 键值对 Hadoop 生态内部数据交换
对象序列化 自定义 Spark 内部中间结果缓存

选择建议

  1. 结构化数据:优先使用 DataFrame API 处理 CSV/JSON,利用 Catalyst 优化器;
  2. 性能敏感场景:使用 SequenceFiles 或 Parquet(列式存储,后续文章会详细介绍);
  3. 跨语言需求:选择文本、JSON 或 Avro 格式;
  4. 临时数据:使用对象序列化(需配置 Kryo 提升性能)。

高级优化技巧

压缩配置

通过配置压缩算法减少存储开销:

1
2
3
4
5
6
7
8
// 全局启用压缩(影响所有输出格式)  
spark.conf.set("spark.hadoop.mapred.output.compress", "true")
spark.conf.set("spark.hadoop.mapred.output.compression.codec", "org.apache.hadoop.io.compress.SnappyCodec")

// 写入时指定压缩(对支持的格式有效)
df.write
.option("compression", "snappy")
.csv("hdfs:///output/compressed_csv")

分区写入

按字段值分区存储,加速后续查询:

1
2
3
4
// 按日期字段分区写入  
df.write
.partitionBy("date")
.parquet("hdfs:///output/partitioned_data")

并行度控制

通过 repartitioncoalesce 调整输出分区数:

1
2
// 调整为 20 个分区写入(避免生成过多小文件)  
df.repartition(20).write.csv("hdfs:///output/adjusted_partitions")

欢迎关注我的其它发布渠道