(一)创建数据库和表
接下来,我们去创建一个新的数据库,数据表,并插入一条数据。
参考代码如下:
-- 创建数据库
CREATE DATABASE spark;
-- 使用数据库USE spark;
-- 创建表
create table person(id int, name char(20), age int);
-- 插入示例数据
insert into person values(1, 'jam', 20), (2,'judi', 21);
-- 查看所有数据
select * from person;
-- 退出
quit
提醒:use spark;的作用是使用当前数据库;
(二)Spark连接MySQL数据库
- 新建项目,或者使用之前的项目也可以。
- 修改pom.xml文件。
补充三个依赖:
(1)scala-library 是 Scala 语言的基础库,是编写 Scala 程序的必要条件。
(2)spark-sql_2.12 提供了 Spark SQL 的功能,用于高效的数据处理和分析。
(3)mysql-connector-java 提供了与 MySQL 数据库交互的能力。
<dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>2.12.15</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.12</artifactId><version>3.3.1</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.33</version></dependency>
请注意,这里并没没有单独添加spark_core的依赖,因为在spark-sql中已经包含了spark_core。
3.写Spark程序连接mysql
核心步骤:
- 创建Properties对象,设置数据库的用户名和密码
- 使用spark.read.jbdc方法,连接数据库
参考代码如下:
impport org.apache.spark.sql.SparkSession
import java.util.Properties
object SparkMySQL {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName("SparkMySQL").master("local[*]").getOrCreate()// 创建properties对象,设置连接mysql的用户名和密码val prop = new Properties()prop.setProperty("user", "root")prop.setProperty("password", "000000")// 读取mysql数据val df = spark.read.jdbc("jdbc:mysql://hadoop100:3306/spark", "person", prop)df.show()spark.stop()}
(三)Spark添加数据到mysql
前面演示了数据的查询,现在来看看添加数据到mysql。
核心方法:dataFrame.write.mode("append").jdbc()。
import org.apache.spark.sql.SparkSession
import java.util.Properties
object SparkMySQL {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName("SparkMySQL").master("local[*]").getOrCreate()// 创建properties对象,设置连接mysql的用户名和密码val prop = new Properties()prop.setProperty("user", "root")prop.setProperty("password", "000000")// 插入一条数据到数据库val data = Seq(("3", "zhangsan", "30"))val df2 = spark.createDataFrame(data).toDF("id", "name", "age")df2.write.mode("append").jdbc("jdbc:mysql://hadoop100:3306/spark", "person", prop)}
}
上面的代码运行完成之后,切换到finalshell中的mysql端,查看效果。