欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 房产 > 家装 > 【spark】spark列转行操作(json格式)

【spark】spark列转行操作(json格式)

2025/10/28 16:52:20 来源:https://blog.csdn.net/d905133872/article/details/139500701  浏览:    关键词:【spark】spark列转行操作(json格式)

前言:一般我们列转行都是使用concat_ws函数或者concat函数,但是concat一般都是用于字符串的拼接,后续处理数据时并不方便。

需求:将两列数据按照设备id进行分组,每个设备有多个时间点位和对应值,将其一一对应,并以json形式存储。

设备id(device_name)点位值(point)

测量值(value)

key111.12
key121.32
key331.00

实现:

1、依旧需要对数据进行分组后聚合。由于有大量键值对,通过groupByKey进行分组

2、分组后得到(key, value[Iteratable])类型数据,对value进行转换后操作

import spark.implicits._//定义数据源
val seq = Seq(("key1","1","1.12"),("key1","3","1.32"),("key1","3","1.00")
).toDF("device","point","value")//数据处理
seq.as[pointKey].groupByKey(_.device).mapGroups((key, value) => {val list = value.toListval map = new mutable.HashMap[String, String]()list.foreach(elem => map.put(elem.point, elem.value))//此时的数据格式为map格式//map转jsonimplicit val formats: DefaultFormats.type = DefaultFormatsval json = Serialization.write(map)(key, map, json)})//样例类,用于装载
case class pointKey(device: String, point: String, value: String
)

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com