0%

spark敲门砖之WordCount

spark入门实战:WordCount 程序全解析

WordCount(单词计数)是大数据领域的 “Hello World”,通过它可以快速理解 Spark 的核心概念和编程模型。本文以 Scala 为例,从环境准备、代码实现到执行原理,全方位讲解 Spark WordCount 程序,帮助初学者入门 Spark 分布式计算。

环境准备:版本匹配与依赖配置

Spark 对 Scala 版本有严格依赖,错误的版本组合会导致兼容性问题(如类找不到、方法异常),需提前确认版本对应关系。

版本选择原则

  • Spark 2.x 主要支持 Scala 2.11、2.12;
  • Spark 3.x 主要支持 Scala 2.12、2.13(但部分早期 3.x 版本对 2.13 支持不完善);
  • 推荐组合:Spark 3.1.1 + Scala 2.12.x(稳定性好,生态支持完善)。

Maven 依赖配置

pom.xml 中添加 Spark Core 和 Scala 依赖:

1
2
3
4
5
6
7
8
9
10
11
12
13
<!-- Scala 核心库 -->  
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.12.13</version> <!-- 与 Spark 版本匹配 -->
</dependency>

<!-- Spark 核心依赖 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId> <!-- _2.12 表示适配 Scala 2.12 -->
<version>3.1.1</version>
</dependency>

注意:spark-core artifactId 中的 _2.12 必须与 Scala 版本一致,否则会出现 ClassNotFoundException

WordCount 核心代码实现

WordCount 的核心逻辑是 “读取文本 → 拆分单词 → 计数聚合”,Spark 通过 RDD 算子实现分布式计算。

完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import org.apache.spark.{SparkConf, SparkContext}  

object WordCount {
def main(args: Array[String]): Unit = {
// 1. 配置 Spark 应用
val sparkConf = new SparkConf()
.setMaster("local[*]") // 本地运行模式,[*] 表示使用所有可用 CPU 核心
.setAppName("WordCount") // 应用名称

// 2. 创建 Spark 上下文(SparkContext),连接 Spark 集群
val sc = new SparkContext(sparkConf)

try {
// 3. 读取文件:从本地资源目录读取文本文件
val lines: RDD[String] = sc.textFile("src/main/resources/wordcount.txt")
println("===== 原始文本行 =====")
lines.collect().foreach(println) // collect() 将分布式数据拉取到本地打印

// 4. 拆分单词:将每行文本按空格拆分,flatMap 压平为单个单词
val words: RDD[String] = lines.flatMap(_.split(" "))
println("\n===== 拆分后的单词 =====")
words.collect().foreach(println)

// 5. 单词记为 (word, 1):每个单词映射为键值对 (单词, 1)
val wordToOne: RDD[(String, Int)] = words.map(word => (word, 1))
println("\n===== 单词映射为 (word, 1) =====")
wordToOne.collect().foreach(println)

// 6. 聚合计数:按单词分组,对值求和
val wordCounts: RDD[(String, Int)] = wordToOne.reduceByKey(_ + _)
println("\n===== 最终单词计数 =====")
wordCounts.collect().foreach(println)

} finally {
// 7. 关闭 Spark 上下文,释放资源
sc.stop()
}
}
}

输入文件准备

在项目 src/main/resources 目录下创建 wordcount.txt,内容示例:

1
2
3
4
5
6
Hadoop Hive Spark  
Hadoop HDFS
Hive Spark
Zookeeper Kafka Flume
Zookeeper Hadoop
Zookeeper

输出结果

程序执行后,控制台输出最终计数结果:

1
2
3
4
5
6
7
8
===== 最终单词计数 =====  
(Hadoop, 3)
(Hive, 2)
(Spark, 2)
(HDFS, 1)
(Zookeeper, 3)
(Kafka, 1)
(Flume, 1)

核心概念解析:从代码到 Spark 原理

SparkConf 与 SparkContext

  • SparkConf:配置 Spark 应用的参数(如运行模式、应用名称)。
    • setMaster("local[*]"):本地运行模式,[*] 表示使用所有可用 CPU 核心(开发调试用);
    • setAppName("WordCount"):指定应用名称,用于集群监控识别。
  • SparkContext:Spark 应用的入口,负责连接集群、创建 RDD、调度任务,是 Spark 核心组件的 “总管”。

RDD:弹性分布式数据集

RDD(Resilient Distributed Dataset)是 Spark 的核心数据结构,代表分布式内存中的不可变数据集。上述代码中涉及的 RDD 操作如下:

步骤 RDD 操作 作用 类型
读取文件 textFile 从文件创建 RDD,每行作为一个元素 输入操作
拆分单词 flatMap(_.split(" ")) 对每个元素拆分后压平(如 ["a b", "c"]["a", "b", "c"] Transformation(转换)
映射为 (word, 1) map(word => (word, 1)) 每个元素映射为新元素(一对一转换) Transformation
聚合计数 reduceByKey(_ + _) 按 Key 分组,对 Value 求和(如 (a,1), (a,1)(a,2) Transformation
打印结果 collect() 将分布式 RDD 数据拉取到本地(仅小数据用) Action(行动)

Transformation 与 Action

Spark 算子分为两类,决定了 RDD 的计算逻辑:

  • Transformation(转换)
    • 特点:惰性执行(仅记录操作,不立即计算),返回新 RDD;
    • 示例:flatMapmapreduceByKey
  • Action(行动)
    • 特点:触发实际计算(执行之前的所有转换),返回本地数据或写入外部存储;
    • 示例:collect()(拉取数据到本地)、count()(计数)、saveAsTextFile()(保存结果)。

分布式执行逻辑

即使在本地模式,Spark 也会模拟分布式计算:

  1. 输入文件被拆分为多个分区(Partition),并行处理;
  2. flatMapmap 等转换在每个分区上独立执行(并行计算);
  3. reduceByKey 会触发 Shuffle(跨分区数据交换),将相同 Key 的数据汇总到同一分区计算;
  4. 最终结果通过 collect() 拉取到 Driver 节点(本地)打印。

Spark Shell 交互式运行(快速验证)

Spark 提供交互式 Shell 工具,无需编写完整程序即可运行代码,适合快速测试。

启动 Spark Shell

在终端执行(需先安装 Spark):

1
spark-shell  # 启动 Scala 版 Shell,自动创建 SparkContext(变量名 sc)  

交互式执行 WordCount

在 Spark Shell 中逐行输入代码:

1
2
3
4
5
6
7
8
9
10
11
// 读取文件(本地路径需用 file:// 前缀)  
val lines = sc.textFile("file:///path/to/wordcount.txt")

// 执行单词计数
val wordCounts = lines
.flatMap(_.split(" "))
.map((_, 1))
.reduceByKey(_ + _)

// 打印结果
wordCounts.collect().foreach(println)

常见问题与优化

1. 版本兼容性问题

  • 错误现象:NoClassDefFoundErrorMethodNotFoundError
  • 解决:确保 Scala 版本与 Spark 版本匹配(如 Spark 3.1.1 对应 Scala 2.12.x)。

2. 本地文件路径问题

  • 错误现象:FileNotFoundException
  • 解决:
    • 本地文件需用绝对路径,或在 Spark Shell 中加 file:// 前缀(如 file:///home/user/wordcount.txt);
    • 项目中资源文件通过 ClassLoader.getSystemResource("wordcount.txt").getPath 获取路径。

3. 数据倾斜优化

  • 问题:若某个单词出现频率极高(如 “the”),reduceByKey 会导致单个分区压力过大。
  • 优化:使用 reduceByKeyLocally 先在本地聚合,或通过加盐哈希分散热点 Key。

欢迎关注我的其它发布渠道

表情 | 预览
快来做第一个评论的人吧~
Powered By Valine
v1.3.10