欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 科技 > 名人名企 > spark第三篇sql

spark第三篇sql

2025/12/14 23:23:07 来源:https://blog.csdn.net/Emperor_rock/article/details/139436442  浏览:    关键词:spark第三篇sql

spark第三篇sql

        • sparksql概述
        • sparksql四大特性
        • dataframe概述
        • 通过读取数据源创建dataFrame
        • DataFrame常用操作
        • DataSet
        • 将RDD转换为DataFrame代码开发
        • sparksql 操作hivesql
        • sparksql读取mysql表中的数据
        • sparksql将结果数据写入到mysql中

sparksql概述
  • 1、sparksql发展史
    • shark为spark提供了分布式数据仓库系统
    • shark依赖于hive的代码、依赖于spark的版本
  • 2、sparksql是什么
    • sparksql是spark的一个模块,主要用来处理结构化数据。
    • 操作sparksql的方式: sql 、dataframe、dataSet
sparksql四大特性
  • 1、易整合
    • 把sql语句与spark程序进行无缝混合使用
    • 采用4种Api:java/scala/python/R
  • 2、统一的数据源访问方式
    • 可以通过一种相同的方式对接任何外部数据源
    • sparkSession.read.文件格式(文件格式的路径)
  • 3、兼容hivesql
    • 可以在sparksql中写hivesql
  • 4、支持标准的数据库连接
    • JDBC和ODBC
dataframe概述
  • dataframe是什么

    • dataframe前身是schemaRDD,在spark1.3.0之后才出现的
    • DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库的二维表格,DataFrame带有Schema元信息,即DataFrame所表示的二维表数据集的每一列都带有名称和类型,但底层做了更多的优化。
  • DataFrame与RDD的区别

    • RDD可看作是分布式的对象的集合,Spark并不知道对象的详细模式信息,DataFrame可看作是分布式的Row对象的集合,其提供了由列组成的详细模式信息,使得Spark SQL可以进行某些形式的执行优化。

    在这里插入图片描述

  • dataFrame和rdd优缺点

    • rdd优缺点
      • 优点
        • 1、编译时类型安全
        • 2、具备面向对象编程风格
      • 缺点
        • 1、序列化和反序列化性能开销大
        • 2、频繁创建和销毁对象,造成大量的GC
    • dataFrame引入schema和off-heap(使用不在java堆中的内存,直接使用操作系统中的内存),决定了rdd缺点,同时丢失了rdd有点。dataframe不在是类型安全和面向对象编程风格。
通过读取数据源创建dataFrame
  • 1、读取文本文件创建dataFrame
    • sparkSession.read.text(“文本文件格式的路径”)
  • 2、读取json文件创建dataFrame
    • sparkSession.read.json(“json文件格式的路径”)
  • 3、读取parquet文件创建dataFrame
    • sparkSession.read.parquet(“parquet文件格式的路径”)
DataFrame常用操作
  • DSL语法风格

    • 它就是dataFrame自身提供的API

      1、打印DataFrame的schemaprintlnSchema2、查看dataFrame中的数据show3、取出第一位firsthead(N) 取出前N个4、查看某个字段peopleDF.select("name").showpeopleDF.select(col("name")).showpeopleDF.select($"name").showpeopleDF.select(peopleDF("name")).show5、取出多个字段peopleDF.select("name","age").show6、让age字段+1peopleDF.select(col("age")+1).show7、过滤出年龄大于30的人数peopleDF.filter($"age" > 30).count
      
  • SQL风格语法

    • 通过将一个dataFrame注册成一张表,接下来就可以通过sql语句操作dataFrame

      • sparkSession.sql(sql语句)
      1、先需要将DataFrame注册成一张临时表personDF.registerTempTable("t_person")2、然后通过sparkSession.sql(sql语句)操作DataFramesparkSession.sql("select * from t_person").show
      
DataSet
  • 1、DataSet是什么
    • DataSet是分布式的数据集合,Dataset提供了强类型支持,也是在RDD的每行数据加了类型约束。DataSet是在Spark1.6中添加的新的接口。它集中了RDD的优点(强类型和可以用强大lambda函数)以及使用了Spark SQL优化的执行引擎。
  • 2、DataSet特性
    • 继承了RDD的优点
      • 编译时类型安全
      • 面向对象编程风格
  • 3、创建Dataset
    • 1、spark.createDataSet(“已经存在的scala集合”)
    • 2、spark.createDataSet(“已经存在RDD”)
    • 3、已经存在的scala集合调用toDs
    • 4、通过dataFrame转换生成 as[强类型]
  • 4、dataSet与dataFrame互相转换
    • 1、将dataSet转化生成dataFrame
      • dataSet.toDF
    • 2、将dataFrame转换成dataSet
      • dataFrame.as[强类型]
将RDD转换为DataFrame代码开发
  • 导包

    <dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.11</artifactId><version>2.0.2</version>
    </dependency>
    
  • 1、 通过定义case class样例类利用反射机制推断Schema

    import org.apache.spark.SparkContext
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.{Column, DataFrame, SparkSession}//todo:利用反射机制(case class )指定dataFrame的schemacase class Person(id:Int,name:String,age:Int)
    object CaseClassSchema {def main(args: Array[String]): Unit = {//1、创建SparkSessionval spark: SparkSession = SparkSession.builder().appName("CaseClassSchema").master("local[2]").getOrCreate()//2、获取sparkContextval sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")//3、读取数据文件val data: RDD[Array[String]] = sc.textFile("d:\\person.txt").map(x=>x.split(" "))//4、将rdd与样例类关联val personRDD: RDD[Person] = data.map(x=>Person(x(0).toInt,x(1),x(2).toInt))//5、获取DataFrame//手动导入隐式转换import spark.implicits._val personDF: DataFrame = personRDD.toDF//---------------DSL语法-------------start//1、打印dataframe的schema元信息personDF.printSchema()//2、 显示dataFrame结果数据personDF.show()personDF.show(2)//3、显示第一条数据println(personDF.head())//4、查询name字段结果数据personDF.select("name").show()personDF.select($"name").show()personDF.select(new Column("name")).show()//5、把age字段结果加1personDF.select($"id",$"name",$"age",$"age"+1).show()//6、把age 大于30的人过滤出来personDF.filter($"age" > 30).show()//7、按照age进行分组personDF.groupBy("age").count().show()//---------------DSL语法-------------end//--------------SQL语法--------------start//把dataFrame注册成一张表personDF.createTempView("t_person")//通过sparksession调用sql方法spark.sql("select * from t_person").show()spark.sql("select * from t_person where name='lisi'").show()spark.sql("select * from t_person order by age desc ").show()//--------------SQL语法--------------end//关闭操作sc.stop()spark.stop()}
    }
  • 2、通过StructType指定schema

    import org.apache.spark.SparkContext
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.{DataFrame, Row, SparkSession}
    import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}//todo:将rdd转换成dataFrame,通过StructType来指定schema
    object SparkSqlSchema {def main(args: Array[String]): Unit = {//1、创建sparkSessionval spark: SparkSession = SparkSession.builder().appName("SparkSqlSchema").master("local[2]").getOrCreate()//2、获取得到sparkContextval sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")//3、读取数据文件val data: RDD[Array[String]] = sc.textFile("d:\\person.txt").map(_.split(" "))//4、将rdd与Row类型关联val rowRDD: RDD[Row] = data.map(x=>Row(x(0).toInt,x(1),x(2).toInt))//5、指定schemaval schema:StructType=StructType(StructField("id", IntegerType, true) ::StructField("name", StringType, false) ::StructField("age", IntegerType, false) :: Nil)val personDF: DataFrame = spark.createDataFrame(rowRDD,schema)//打印schemapersonDF.printSchema()//显示数据personDF.show()//dataframe注册成一张表personDF.createTempView("t_person")spark.sql("select * from t_person").show()//关闭sc.stop()spark.stop()}
    }
sparksql 操作hivesql
  • 导包

            <!--引入 spark-hive依赖--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_2.11</artifactId><version>2.0.2</version></dependency>
    
  • 代码开发

    import org.apache.spark.sql.SparkSession//todo:利用sparksql操作hivesql
    object HiveSupport {def main(args: Array[String]): Unit = {//1、创建sparkSessionval spark: SparkSession = SparkSession.builder().appName("HiveSupport").master("local[2]").enableHiveSupport()   //开启对hivesql的支持.getOrCreate()//2、利用sparkSession操作hivesql//2.1 创建hive表//spark.sql("create table if not exists student(id int,name string,age int) row format delimited fields terminated by ',' ")//2.2 加载数据到hive表中//spark.sql("load data local inpath './data/student.txt' into table student")//2.3 查询表中数据spark.sql("select * from student").show()//3、关闭spark.stop()}
    }
sparksql读取mysql表中的数据
  • 代码开发

    import java.util.Properties
    import org.apache.spark.sql.{DataFrame, SparkSession}//todo:利用sparksql从mysql表中读取数据
    object DataFromMysql {def main(args: Array[String]): Unit = {//1、创建sparkSessionval spark: SparkSession = SparkSession.builder().appName("DataFromMysql").master("local[2]").getOrCreate()//2、通过sparksession读取mysql表中的数据//定义urlval url="jdbc:mysql://192.168.200.100:3306/spark"//定义表名val tableName="iplocation"//定义相关的属性val properties=new Propertiesproperties.setProperty("user","root")properties.setProperty("password","123456")val jdbcDataFrame: DataFrame = spark.read.jdbc(url,tableName,properties)//显示schemajdbcDataFrame.printSchema()//打印结果数据jdbcDataFrame.show()//关闭spark.stop()}
    }
  • spark-shell 操作读取mysql表中

    • 启动spark-shell脚本

      spark-shell \
      --master spark://hdp-node-01:7077 \
      --executor-memory 1g \
      --total-executor-cores  2 \
      --jars /opt/bigdata/hive/lib/mysql-connector-java-5.1.35.jar \
      --driver-class-path /opt/bigdata/hive/lib/mysql-connector-java-5.1.35.jar
      
    • 执行代码

      val mysqlDF = spark.read.format("jdbc").options(Map("url" -> "jdbc:mysql://192.168.200.100:3306/spark", "driver" -> "com.mysql.jdbc.Driver", "dbtable" -> "iplocation", "user" -> "root", "password" -> "123456")).load()
      
sparksql将结果数据写入到mysql中
  • 代码开发

    import java.util.Properties
    import org.apache.spark.SparkContext
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.{DataFrame, SparkSession}//todo:利用sparksql将结果数据写入到mysql表中case class Student(id:Int,name:String,age:Int)
    object Data2Mysql {def main(args: Array[String]): Unit = {//1、创建sparkSessionval spark: SparkSession = SparkSession.builder().appName("Data2Mysql").getOrCreate()//2、获取sparkcontextval sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")//3、读取数据文件val rdd: RDD[Array[String]] = sc.textFile(args(0)).map(_.split(" "))//4、将样例类与rdd进行关联val studentRDD: RDD[Student] = rdd.map(x=>Student(x(0).toInt,x(1),x(2).toInt))//5、将rdd转换成dataframeimport spark.implicits._val studentDF: DataFrame = studentRDD.toDF//打印结果数据studentDF.show()//dataFrame注册成一张表studentDF.createTempView("t_student")//通过sparkSession操作这个表val result: DataFrame = spark.sql("select * from t_student order by age desc")//把结果数据写入到mysql表中//定义urlval url="jdbc:mysql://192.168.200.100:3306/spark"//定义表名val tableName=args(1)//定义相关的属性val properties=new Propertiesproperties.setProperty("user","root")properties.setProperty("password","123456")//mode:指定数据插入模式//overwrite: 覆盖(事先会创建一张表)//append: 追加(事先会创建一张表)//ignore:忽略(如果当前这个表已经存在,不执行操作)//error:如果当前这个表存在,这个时候就报错result.write.mode("append").jdbc(url,tableName,properties)//关闭sc.stop()spark.stop()}  }
    
  • 把程序打成jar包 提交到集群中运行

    
    spark-submit --master spark://node1:7077 --class cn.包名.sql.Data2Mysql --executor-memory 1g --total-executor-cores 2 --jars /export/servers/hive/lib/mysql-connector-java-5.1.35.jar --driver-class-path /export/servers/hive/lib/mysql-connector-java-5.1.35.jar original-spark_class06-2.0.jar /person.txt student100
    

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com