1. 添加依赖
在项目的 `pom.xml`(Maven)中添加以下依赖:
```xml
<!-- Spark SQL -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.3.0</version>
</dependency>
<!-- MySQL Connector -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.33</version>
</dependency>
代码
import org.apache.spark.sql.{SparkSession, SaveMode}
object SparkMySQLDemo {
def main(args: Array[String]): Unit = {
// 创建 SparkSession
val spark = SparkSession.builder()
.appName("SparkMySQLDemo")
.master("local[*]") // 生产环境需改为集群模式,如 yarn
.config("spark.sql.shuffle.partitions", "5") // 优化分区数
.getOrCreate()
// 设置 MySQL 连接参数
val jdbcUrl = "jdbc:mysql://localhost:3306/your_database"
val jdbcUsername = "your_username"
val jdbcPassword = "your_password"
try {
// 从 MySQL 读取数据
val df = spark.read
.format("jdbc")
.option("url", jdbcUrl)
.option("dbtable", "source_table") // 要读取的表名
.option("user", jdbcUsername)
.option("password", jdbcPassword)
.load()
// 执行计算(示例:按 category 分组求和)
val resultDF = df.groupBy("category")
.agg(
sum("amount").alias("total_amount"),
count("*").alias("record_count")
)
// 打印计算结果(调试用)
resultDF.show()
// 将结果写入 MySQL
resultDF.write
.format("jdbc")
.option("url", jdbcUrl)
.option("dbtable", "result_table") // 目标表名
.option("user", jdbcUsername)
.option("password", jdbcPassword)
.mode(SaveMode.Append) // 写入模式:覆盖/追加
.save()
println("数据写入 MySQL 成功!")
} catch {
case e: Exception => e.printStackTrace()
} finally {
spark.stop()
}
}
}