欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 科技 > 名人名企 > 详解 Spark 核心编程之 RDD 序列化

详解 Spark 核心编程之 RDD 序列化

2025/5/14 6:30:11 来源:https://blog.csdn.net/weixin_44480009/article/details/139349311  浏览:    关键词:详解 Spark 核心编程之 RDD 序列化

一、问题引出

object TestRDDSerializable {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("ser")val sc = new SparkContext(sparkConf)val rdd = sc.makeRDD(List(1,2,3,4), 2)val user = new User()rdd.foreach(num => {println("age = " + (user.age + num))})/*结果:程序执行抛出异常 NotSerializableException分析:1.foreach 算子外部的执行是在 Driver 端,内部的操作是在 Executor 端执行2.foreach 算子的内部操作使用到了 user 对象的属性,所以 user 对象需要从 Driver 发送到 Executor,涉及到网络传输3.由于 User 类没有混入序列化特质,所以抛出异常4.解决方法:class User extends Serializable {} 或 case class User {},样例类在编译时会自动混入序列化*/println("=================")val rdd1 = sc.makeRDD(List[Int](), 2)val user1 = new User()rdd1.foreach(num => {println("age = " + (user1.age + num))})/*期望:由于rdd1没有数据,foreach 算子不会实际执行,即使 User 没有混入序列化也不会报错结果:程序执行抛出异常 NotSerializableException分析:1.RDD 算子中如果传递的是函数参数,则会涉及到闭包操作,内部会调用 sc.clean(f)2.clean 方法底层会进行闭包检测,其中就包含序列化的检测,如果检测到使用的对象没有混入序列化特质,就会抛出异常*/}
}class User {val age: Int = 30
}

二、Kryo 序列化框架

  • 参考地址:https://github.com/EsotericSoftware/kryo

  • 与 Java 序列化的对比:

    • Java 的序列化比较重,生成的文件字节比较多,而 Kryo 序列化是轻量级的,产生的字节较少,所以 Kryo 速度是 Serializable 的 10 倍
    • Java 的序列化中可以通过 transient 关键字限制不参与序列化的属性,而 transient 关键字在 Kryo 序列化中不产生作用
  • 自定义 Kryo 序列化

    /*
    简单数据类型、数组和字符串类型已经在 Spark 内部使用 Kryo 来序列化
    */
    object TestKryoSerializable {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("Ser")// 替换默认的序列化机制conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")// 注册需要使用  kryo  序列化的自定义类,该类必须混入 Serializable 特质conf.registerKryoClasses(Array(classOf[Searcher]))val sc = new SparkContext(conf)val rdd: RDD[String] = sc.makeRDD(Array("hello world", "hello spark", "kafka", "hive"), 2)val searcher = new Searcher("h")val result: RDD[String] = searcher.getMatchedRDD1(rdd)result.collect.foreach(println)}
    }case class Searcher(val query: String) { def isMatch(s: String) = {s.contains(query) // this.query}def getMatchedRDD1(rdd: RDD[String]) = {rdd.filter(isMatch) }def getMatchedRDD2(rdd: RDD[String]) = {val q = queryrdd.filter(_.contains(q))}
    }
    

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词