欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 科技 > 能源 > Spark处理过程—转换算子

Spark处理过程—转换算子

2025/5/17 0:04:11 来源:https://blog.csdn.net/Betty_at/article/details/147915256  浏览:    关键词:Spark处理过程—转换算子

转换算子用于对 RDD 进行转换操作,生成一个新的 RDD。转换操作是惰性的,即当调用转换算子时,Spark 并不会立即执行计算,而是记录下操作步骤,直到遇到行动算子时才会触发实际的计算。

从格式和用法上来看,它就是集合对象的方法。

以下是一些常见的转换算子:

1.map算子

功能:对 RDD 中的每个元素应用自定义函数,生成新的 RDD。
特点:一对一转换,元素数量不变。

示例代码
val numbers = sc.parallelize(1 to 5)
val squared = numbers.map(x => x * x)  // [1, 4, 9, 16, 25]// 对字符串操作
val words = sc.parallelize(Seq("hello", "world"))
val upperWords = words.map(_.toUpperCase())  // ["HELLO", "WORLD"]
应用场景
  • 数据格式转换(如 JSON 解析)。
  • 数值计算(如单位转换)。

2.filter算子

功能:过滤 RDD 中满足条件的元素,生成新的 RDD。
特点:可能减少元素数量。

示例代码
val numbers = sc.parallelize(1 to 10)
val evenNumbers = numbers.filter(_ % 2 == 0)  // [2, 4, 6, 8, 10]// 过滤长度大于 5 的字符串
val words = sc.parallelize(Seq("apple", "banana", "grape"))
val longWords = words.filter(_.length > 5)  // ["banana"]
应用场景
  • 数据清洗(如过滤空值或无效记录)。
  • 条件筛选(如筛选年龄大于 18 的用户)。

3.flatMap算子

功能:先对 RDD 中每个元素应用函数,再将结果展平(一对多转换)。
特点:常用于分词或嵌套结构展开。

示例代码
val lines = sc.parallelize(Seq("Hello World", "Spark is fast"))
val words = lines.flatMap(_.split(" "))  // ["Hello", "World", "Spark", "is", "fast"]// 嵌套结构展开
val nested = sc.parallelize(Seq(List(1, 2), List(3, 4)))
val flattened = nested.flatMap(x => x)  // [1, 2, 3, 4]
应用场景
  • 文本分析(如 WordCount 中的分词)。
  • 嵌套数据处理(如解析多层 JSON)。

4.reduceByKey算子

功能:对键值对 RDD 按 key 分组,使用指定函数聚合 value。
特点

  • 高效:先在本地分区聚合(Combiner),再跨节点聚合,减少数据传输。
  • 适用场景:适用于求和、求平均等可交换的聚合操作。
示例代码
val pairs = sc.parallelize(Seq(("a", 1), ("b", 2), ("a", 3)))// 按 key 求和
val sumByKey = pairs.reduceByKey(_ + _)  // [("a", 4), ("b", 2)]// 按 key 求最大值
val maxByKey = pairs.reduceByKey(_ max _)  // [("a", 3), ("b", 2)]

以下是一个使用 reduceByKey 计算每个单词出现次数的示例:

import org.apache.spark.{SparkConf, SparkContext}
object ReduceByKeyExample {def main(args: Array[String]): Unit = {// 创建 SparkConf 对象val conf = new SparkConf().setAppName("ReduceByKeyExample").setMaster("local[*]")// 创建 SparkContext 对象val sc = new SparkContext(conf)// 创建一个包含单词的 RDDval words = sc.parallelize(List("apple", "banana", "apple", "cherry", "banana", "apple"))// 将每个单词映射为 (单词, 1) 的键值对val wordPairs = words.map(word => (word, 1))// 使用 reduceByKey 计算每个单词的出现次数val wordCounts = wordPairs.reduceByKey(_ + _)// 输出结果wordCounts.collect().foreach(println)// 停止 SparkContextsc.stop()}
}
执行流程
  1. 本地聚合:每个节点对本地数据按 key 预聚合。
  2. 跨节点聚合:将预聚合结果传输到对应节点,再次聚合。
对比 groupByKey
  • reduceByKey:先本地聚合,再全局聚合,适合大数据量。
  • groupByKey:直接全局 shuffle,可能导致数据倾斜,性能较差。

总结对比表

算子输入类型输出类型一对一 / 多适用场景
map任意类型任意类型一对一数据转换
filter任意类型同输入类型可能减少元素数据筛选
flatMap任意类型展开后的类型一对多分词、嵌套结构展开
reduceByKey键值对 (K,V)键值对 (K,V)多对少按 key 聚合统计

注意事项

  1. 惰性执行:所有转换算子(包括上述四个)都不会立即执行,直到遇到行动算子(如 collect()count())。
  2. 性能优化
    • 使用 reduceByKey 替代 groupByKey 以减少 shuffle 数据量。
    • 避免在 map/flatMap 中执行耗时操作(如数据库连接),可改用 mapPartitions 优化。 

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词