欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 财经 > 产业 > SparkSQL操作Mysql

SparkSQL操作Mysql

2025/8/18 22:53:44 来源:https://blog.csdn.net/2301_82309776/article/details/147926379  浏览:    关键词:SparkSQL操作Mysql

SparkSQL 提供了强大的功能来连接和操作 MySQL 数据库,支持读取数据、写入数据以及执行 SQL 查询。下面将详细介绍如何使用 SparkSQL 与 MySQL 进行交互,并提供完整的代码示例。

一、环境准备

  1. 安装 MySQL JDBC 驱动
    下载 mysql-connector-java,并将 JAR 文件添加到 Spark 的 classpath 中。

  2. 启动 SparkSession
    在创建 SparkSession 时,通过 config 参数指定 JDBC 驱动路径:

python

运行

from pyspark.sql import SparkSessionspark = SparkSession.builder \.appName("SparkMySQLIntegration") \.config("spark.jars", "/path/to/mysql-connector-java-*.jar") \.getOrCreate()

二、从 MySQL 读取数据

使用 spark.read.jdbc() 方法读取 MySQL 表数据:

python

运行

# MySQL 连接配置
url = "jdbc:mysql://localhost:3306/mydatabase"  # 数据库连接 URL
properties = {"user": "username","password": "password","driver": "com.mysql.cj.jdbc.Driver"  # MySQL 驱动类
}# 读取整张表
df = spark.read.jdbc(url=url, table="mytable", properties=properties)# 读取时指定查询条件(推荐:减少数据传输)
query = "(SELECT * FROM mytable WHERE category = 'electronics') AS subquery"
df = spark.read.jdbc(url=url, table=query, properties=properties)# 查看数据
df.show()
df.printSchema()

三、向 MySQL 写入数据

使用 DataFrame.write.jdbc() 方法将数据写入 MySQL:

python

运行

# 示例 DataFrame
data = [("Alice", 25), ("Bob", 30), ("Charlie", 35)]
columns = ["name", "age"]
df = spark.createDataFrame(data, columns)# 写入配置
url = "jdbc:mysql://localhost:3306/mydatabase"
properties = {"user": "username","password": "password","driver": "com.mysql.cj.jdbc.Driver"
}# 写入模式:overwrite(覆盖)、append(追加)、ignore(存在则忽略)、error(默认,存在则报错)
df.write.jdbc(url=url,table="new_table",mode="overwrite",properties=properties
)

四、优化写入性能

  1. 批量写入
    通过 batchsize 参数控制每次写入的行数:

python

运行

properties = {"user": "username","password": "password","driver": "com.mysql.cj.jdbc.Driver","batchsize": 1000  # 每批写入1000行
}

  1. 分区并行写入
    使用 repartition() 增加并行度:

python

运行

df.repartition(10).write.jdbc(url=url,table="new_table",mode="append",properties=properties
)

  1. 禁用自动提交
    通过 isolationLevel 参数减少事务开销:

python

运行

properties = {"user": "username","password": "password","driver": "com.mysql.cj.jdbc.Driver","isolationLevel": "NONE"  # 禁用自动提交
}

五、执行复杂 SQL 查询

  1. 将 DataFrame 注册为临时表

    python

    运行

    df.createOrReplaceTempView("temp_table")
    result = spark.sql("SELECT name, age FROM temp_table WHERE age > 30")
    result.show()
    
  2. 直接执行原生 SQL

    python

    运行

    # 创建数据库连接
    from pyspark.sql import SQLContext
    sqlContext = SQLContext(spark)# 执行原生 SQL
    query = "SELECT * FROM mytable WHERE category = 'books'"
    result = sqlContext.read.jdbc(url=url,table=f"({query}) AS subquery",properties=properties
    )
    result.show()
    

六、处理数据类型映射

Spark 与 MySQL 之间的数据类型映射需注意:

Spark 数据类型MySQL 数据类型
StringTypeVARCHAR, TEXT
IntegerTypeINT
LongTypeBIGINT
DoubleTypeDOUBLE
BooleanTypeTINYINT(1)
TimestampTypeDATETIME, TIMESTAMP
DateTypeDATE

七、完整示例:读写 MySQL

python

运行

from pyspark.sql import SparkSession# 创建 SparkSession
spark = SparkSession.builder \.appName("SparkMySQLExample") \.config("spark.jars", "/path/to/mysql-connector-java-8.0.26.jar") \.getOrCreate()# 读取 MySQL 数据
url = "jdbc:mysql://localhost:3306/mydatabase"
properties = {"user": "root","password": "password","driver": "com.mysql.cj.jdbc.Driver"
}# 读取表
df = spark.read.jdbc(url=url, table="products", properties=properties)
df.show()# 数据转换
from pyspark.sql.functions import col
df_filtered = df.filter(col("price") > 100)
df_new = df_filtered.withColumn("discounted_price", col("price") * 0.9)# 写入新表
df_new.write.jdbc(url=url,table="discounted_products",mode="overwrite",properties=properties
)# 执行 SQL 查询
df_new.createOrReplaceTempView("temp_products")
result = spark.sql("SELECT category, AVG(discounted_price) FROM temp_products GROUP BY category")
result.show()# 停止 SparkSession
spark.stop()

八、常见问题与解决方案

  1. ClassNotFoundException

    • 检查 JDBC 驱动 JAR 文件路径是否正确。
    • 确保所有 Worker 节点都能访问该 JAR 文件。
  2. 数据倾斜

    • 使用 repartition() 或 coalesce() 调整分区数。
    • 对写入表添加合适的索引。
  3. 写入性能低

    • 增大 batchsize 参数。
    • 禁用自动提交(isolationLevel=NONE)。
  4. 连接超时

    • 增加 connectTimeout 参数:

      python

      运行

      properties = {"user": "username","password": "password","driver": "com.mysql.cj.jdbc.Driver","connectTimeout": "300"  # 连接超时时间(秒)
      }

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词