欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 科技 > 能源 > sparkSQL读入csv文件写入mysql

sparkSQL读入csv文件写入mysql

2025/5/21 6:51:15 来源:https://blog.csdn.net/sxy_1030_/article/details/148078902  浏览:    关键词:sparkSQL读入csv文件写入mysql

下面是一个完整的示例,展示如何使用SparkSQL读取CSV文件并将数据写入MySQL数据库。

1. 准备工作

首先确保你有:
1. 运行中的Spark环境
2. MySQL数据库连接信息
3. 适当的JDBC驱动

2. 示例代码

Scala版本

import org.apache.spark.sql.{SparkSession, SaveMode}object CsvToMysql {def main(args: Array[String]): Unit = {// 创建SparkSessionval spark = SparkSession.builder().appName("CSV to MySQL").master("local[*]") // 本地模式,生产环境去掉这行.getOrCreate()// 读取CSV文件val df = spark.read.option("header", "true") // 第一行作为header.option("inferSchema", "true") // 自动推断数据类型.csv("path/to/your/file.csv") // CSV文件路径// 显示数据df.show()// MySQL连接配置val jdbcUrl = "jdbc:mysql://localhost:3306/your_database"val connectionProperties = new java.util.Properties()connectionProperties.put("user", "your_username")connectionProperties.put("password", "your_password")connectionProperties.put("driver", "com.mysql.jdbc.Driver")// 写入MySQLdf.write.mode(SaveMode.Overwrite) // 如果表存在则覆盖.jdbc(jdbcUrl, "your_table", connectionProperties)spark.stop()}
}

 Python (PySpark)版本

from pyspark.sql import SparkSession# 创建SparkSession
spark = SparkSession.builder \.appName("CSV to MySQL") \.config("spark.jars", "/path/to/mysql-connector-java-8.0.23.jar") \  # MySQL驱动路径.getOrCreate()# 读取CSV文件
df = spark.read \.option("header", "true") \.option("inferSchema", "true") \.csv("path/to/your/file.csv")# 显示数据
df.show()# MySQL连接配置
jdbc_url = "jdbc:mysql://localhost:3306/your_database"
connection_properties = {"user": "your_username","password": "your_password","driver": "com.mysql.jdbc.Driver"
}# 写入MySQL
df.write \.mode("overwrite") \  # 可选: "append", "ignore", "error" (默认).jdbc(jdbc_url, "your_table", properties=connection_properties)spark.stop()

3. 关键点说明

1. CSV读取选项:
   `header`: 是否将第一行作为列名
   `inferSchema`: 是否自动推断数据类型
   其他可选参数:`delimiter`, `quote`, `escape`, `nullValue`等

2. 写入模式:
   `overwrite`: 覆盖现有表
   `append`: 追加数据
   `ignore`: 表存在时不做任何操作
   `error`或`errorifexists`(默认): 表存在时报错

3. MySQL连接:
   需要MySQL JDBC驱动
   驱动可以:
   通过`--jars`参数在spark-submit时指定
     在代码中通过`spark.jars`配置指定
     放在Spark的`jars`目录下

4. 性能优化:
   批量写入:`connectionProperties.put("batchsize", "10000")`
   并行写入:`df.repartition(10).write...` (根据数据量调整分区数)

 4. 运行方式

对于Scala项目,打包后使用spark-submit运行:

spark-submit --class CsvToMysql \--jars /path/to/mysql-connector-java-8.0.23.jar \your_application.jar

对于Python脚本:

spark-submit --jars /path/to/mysql-connector-java-8.0.23.jar \your_script.py

5. 常见问题解决

1. 驱动类找不到:
   ①确保驱动jar包路径正确
   ②检查驱动版本与MySQL版本兼容

2. 连接拒绝:
   ①检查MySQL服务是否运行
   ②检查用户名密码是否正确
   ③检查MySQL是否允许远程连接

3. 权限问题:
   确保数据库用户有创建表和写入数据的权限

4. 数据类型不匹配:
   ①可以在写入前使用`df.printSchema()`检查数据类型
   ②必要时使用`cast()`函数转换数据类型

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词