欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 新闻 > 社会 > Spark处理过程-案例数据清洗

Spark处理过程-案例数据清洗

2025/5/12 21:25:37 来源:https://blog.csdn.net/2401_83374563/article/details/147797182  浏览:    关键词:Spark处理过程-案例数据清洗
需求说明

准备十条符合包含用户信息的文本文件,每行格式为 姓名,年龄,性别,需要清洗掉年龄为空或者非数字的行

例如:

张三,25,男

李四,,女

王五,30,男

赵六,a,女

孙七,35,男

周八,40,女

吴九,abc,男

郑十,45,女

王十,50,男

李二,55,女

思路分析

  1. 读入文件
  2. 对每一行数据进行分析
    1. 字段拆分,拆分出年龄这个字段
    2. 判断
      • 如果它不是数字或者缺失,则忽略这条数据
      • 否则保存

(三) 代码展示

import org.apache.spark.{SparkConf, SparkContext}

object DataCleaning {

  def main(args: Array[String]): Unit = {

    // 创建 SparkConf 对象

    val conf = new SparkConf().setAppName("DataCleaning").setMaster("local[*]")

    // 创建 SparkContext 对象

    val sc = new SparkContext(conf)

 

    // 读取文本文件,创建 RDD

    val inputFile = "input/file.txt"

    val lines = sc.textFile(inputFile)

 

    // 数据清洗操作

    val cleanedLines = lines.filter(line => { // 使用filter算子

      val fields = line.split(",")

      if (fields.length == 3) {

        val age = fields(1).trim

        age.matches("\\d+")

      } else {

        false

      }

    })
      // 输出清洗后的数据
       cleanedLines.collect().foreach(println)

 

    // 停止 SparkContext

    sc.stop()

  }

}

拓展:如何把清洗之后的数据保存到一个文件中。

可以使用coalesce(1)这个方法可以让结果全部保存在一个文件中。

代码如下:

val singlePartitionRDD = cleanedLines.coalesce(1)

    // 保存清洗后的数据到文件

    val outputPath = "path/to/your/output/file.txt"

    singlePartitionRDD.saveAsTextFile(outputPath)

    // 停止 SparkContext

    sc.stop()

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词