转换算子用于对 RDD 进行转换操作,生成一个新的 RDD。转换操作是惰性的,即当调用转换算子时,Spark 并不会立即执行计算,而是记录下操作步骤,直到遇到行动算子时才会触发实际的计算。
从格式和用法上来看,它就是集合对象的方法。
以下是一些常见的转换算子:
1.map算子
功能:对 RDD 中的每个元素应用自定义函数,生成新的 RDD。
特点:一对一转换,元素数量不变。
示例代码
val numbers = sc.parallelize(1 to 5)
val squared = numbers.map(x => x * x) // [1, 4, 9, 16, 25]// 对字符串操作
val words = sc.parallelize(Seq("hello", "world"))
val upperWords = words.map(_.toUpperCase()) // ["HELLO", "WORLD"]
应用场景
- 数据格式转换(如 JSON 解析)。
- 数值计算(如单位转换)。
2.filter算子
功能:过滤 RDD 中满足条件的元素,生成新的 RDD。
特点:可能减少元素数量。
示例代码
val numbers = sc.parallelize(1 to 10)
val evenNumbers = numbers.filter(_ % 2 == 0) // [2, 4, 6, 8, 10]// 过滤长度大于 5 的字符串
val words = sc.parallelize(Seq("apple", "banana", "grape"))
val longWords = words.filter(_.length > 5) // ["banana"]
应用场景
- 数据清洗(如过滤空值或无效记录)。
- 条件筛选(如筛选年龄大于 18 的用户)。
3.flatMap算子
功能:先对 RDD 中每个元素应用函数,再将结果展平(一对多转换)。
特点:常用于分词或嵌套结构展开。
示例代码
val lines = sc.parallelize(Seq("Hello World", "Spark is fast"))
val words = lines.flatMap(_.split(" ")) // ["Hello", "World", "Spark", "is", "fast"]// 嵌套结构展开
val nested = sc.parallelize(Seq(List(1, 2), List(3, 4)))
val flattened = nested.flatMap(x => x) // [1, 2, 3, 4]
应用场景
- 文本分析(如 WordCount 中的分词)。
- 嵌套数据处理(如解析多层 JSON)。
4.reduceByKey算子
功能:对键值对 RDD 按 key 分组,使用指定函数聚合 value。
特点:
- 高效:先在本地分区聚合(Combiner),再跨节点聚合,减少数据传输。
- 适用场景:适用于求和、求平均等可交换的聚合操作。
示例代码
val pairs = sc.parallelize(Seq(("a", 1), ("b", 2), ("a", 3)))// 按 key 求和
val sumByKey = pairs.reduceByKey(_ + _) // [("a", 4), ("b", 2)]// 按 key 求最大值
val maxByKey = pairs.reduceByKey(_ max _) // [("a", 3), ("b", 2)]
以下是一个使用 reduceByKey 计算每个单词出现次数的示例:
import org.apache.spark.{SparkConf, SparkContext}
object ReduceByKeyExample {def main(args: Array[String]): Unit = {// 创建 SparkConf 对象val conf = new SparkConf().setAppName("ReduceByKeyExample").setMaster("local[*]")// 创建 SparkContext 对象val sc = new SparkContext(conf)// 创建一个包含单词的 RDDval words = sc.parallelize(List("apple", "banana", "apple", "cherry", "banana", "apple"))// 将每个单词映射为 (单词, 1) 的键值对val wordPairs = words.map(word => (word, 1))// 使用 reduceByKey 计算每个单词的出现次数val wordCounts = wordPairs.reduceByKey(_ + _)// 输出结果wordCounts.collect().foreach(println)// 停止 SparkContextsc.stop()}
}
执行流程
- 本地聚合:每个节点对本地数据按 key 预聚合。
- 跨节点聚合:将预聚合结果传输到对应节点,再次聚合。
对比 groupByKey
reduceByKey
:先本地聚合,再全局聚合,适合大数据量。groupByKey
:直接全局 shuffle,可能导致数据倾斜,性能较差。
总结对比表
算子 | 输入类型 | 输出类型 | 一对一 / 多 | 适用场景 |
---|---|---|---|---|
map | 任意类型 | 任意类型 | 一对一 | 数据转换 |
filter | 任意类型 | 同输入类型 | 可能减少元素 | 数据筛选 |
flatMap | 任意类型 | 展开后的类型 | 一对多 | 分词、嵌套结构展开 |
reduceByKey | 键值对 (K,V) | 键值对 (K,V) | 多对少 | 按 key 聚合统计 |
注意事项
- 惰性执行:所有转换算子(包括上述四个)都不会立即执行,直到遇到行动算子(如
collect()
、count()
)。 - 性能优化:
- 使用
reduceByKey
替代groupByKey
以减少 shuffle 数据量。 - 避免在
map
/flatMap
中执行耗时操作(如数据库连接),可改用mapPartitions
优化。
- 使用