以下是 Scala 中使用 Spark SQL 读取 CSV 文件的核心步骤和代码示例(纯文本):
1. 创建 SparkSession
scala
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.appName("Spark SQL Read CSV")
.master("local[*]") // 单机模式,集群改为 "yarn" 等
.getOrCreate()
2. 基础读取(默认配置)
scala
// 读取 CSV 文件(默认:逗号分隔,无表头,字段类型为 String)
val dfDefault = spark.read.csv("file:///path/to/your/data.csv")
// 示例输出:显示前 20 行数据
dfDefault.show()
3. 指定表头和自动推断类型
scala
val dfWithSchema = spark.read
.option("header", "true") // 首行作为表头
.option("inferSchema", "true") // 自动推断数据类型(如 Int、Double)
.csv("file:///path/to/your/data_with_header.csv")
// 查看表结构
dfWithSchema.printSchema()
4. 自定义分隔符和空值处理
scala
val dfCustom = spark.read
.option("header", "true")
.option("delimiter", ";") // 分隔符改为分号(适用于欧洲格式 CSV)
.option("nullValue", "NA") // 将 "NA" 识别为空值
.csv("file:///path/to/your/custom_separator.csv")
5. 读取带引号的复杂文本
scala
val dfQuoted = spark.read
.option("header", "true")
.option("quote", "\"") // 文本字段引号(默认双引号)
.option("escape", "\"") // 转义符(处理嵌套引号)
.csv("file:///path/to/your/quoted_data.csv")
6. 读取远程文件(如 HDFS/S3)
scala
// HDFS 路径示例
val dfHdfs = spark.read.csv("hdfs://namenode:8020/user/data.csv")
// S3 路径示例(需配置 AWS 凭证)
val dfS3 = spark.read.csv("s3a://bucket-name/path/data.csv")
7. 读取多个文件或目录
scala
// 读取目录下所有 CSV 文件
val dfDirectory = spark.read.csv("file:///path/to/csv_directory/")
// 按通配符匹配文件(如读取以 "part-" 开头的文件)
val dfWildcard = spark.read.csv("file:///path/to/part-*.csv")
关键参数说明
表格
参数名 说明
header 是否包含表头( true / false ,默认 false )
inferSchema 是否自动推断数据类型(需 header=true ,默认 false )
delimiter 字段分隔符(默认 , ,可改为 ; 、 \t 等)
quote 文本字段引号(默认 " ,用于包裹含分隔符的文本)
escape 转义符(处理引号内的特殊字符)
nullValue 空值标识(如 NA 、 NULL ,默认空字符串视为 null)
完整代码示例
scala
import org.apache.spark.sql.SparkSession
object SparkReadCSV {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("Read CSV with Spark SQL")
.master("local[*]")
.getOrCreate()
// 读取带表头的 CSV 并推断类型
val df = spark.read
.option("header", "true")
.option("inferSchema", "true")
.csv("file:///user/data/products.csv")
// 打印数据和结构
println("Data:")
df.show(false)
println("\nSchema:")
df.printSchema()
spark.stop()
}
}
执行时直接运行代码即可,Spark 会自动处理 CSV 解析(无需额外依赖包)。