作者:IvanCodes
日期:2025年5月14日
专栏:Hive教程
Apache Hive 作为一个强大的数据仓库工具,其核心价值在于对存储在分布式系统(如 HDFS)中的大规模数据进行查询和分析。但在进行分析之前,首先需要有效地将数据加载到 Hive 表中或在表之间进行数据流转。Hive 提供了多种数据导入和操纵的方式,主要通过其数据操纵语言 (DML) 来实现。本文将详细介绍这些常用的数据加载与写入方法,重点围绕 LOAD DATA
命令和功能丰富的 INSERT
语句展开。
一、LOAD DATA
命令:从文件系统直接加载
LOAD DATA
是 Hive 中最直接的数据导入命令,用于将文件系统(本地文件系统或 HDFS)中的数据文件加载到 Hive 表中。
语法核心:
LOAD DATA [LOCAL] INPATH 'filepath' [OVERWRITE] INTO TABLE tablename [PARTITION (partcol1=val1, partcol2=val2 ...)];
LOCAL
: 若指定,从客户端本地复制文件;否则,从HDFS 移动文件。INPATH 'filepath'
: 源数据文件或目录的路径。OVERWRITE
: 若指定,覆盖目标表或分区数据;否则,追加(行为依赖文件格式)。INTO TABLE tablename
: 目标 Hive 表。PARTITION (partcol1=val1, ...)
: 目标分区(若表为分区表)。
代码案例:
- 从本地加载 CSV 到非分区表(覆盖)
CREATE TABLE employees_load (id INT, name STRING, department STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';
LOAD DATA LOCAL INPATH '/path/to/local/employees.csv' OVERWRITE INTO TABLE employees_load;
- 从 HDFS 加载文本文件到分区表(追加)
CREATE TABLE web_logs_load (ip STRING, url STRING) PARTITIONED BY (dt STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t';
ALTER TABLE web_logs_load ADD IF NOT EXISTS PARTITION (dt='2023-11-20');
LOAD DATA INPATH '/hdfs/path/to/logs/2023-11-20/' INTO TABLE web_logs_load PARTITION (dt='2023-11-20');
二、INSERT
语句:多样化的数据写入与转换
INSERT
语句是 Hive 中更为灵活和强大的数据写入机制,它不仅能加载数据,还能在写入过程中进行数据转换、聚合和筛选。
(一) INSERT ... VALUES
:直接插入少量行
虽然不常用于大规模数据,但 Hive 支持通过 VALUES
子句直接插入少量记录。
语法:
INSERT INTO TABLE tablename [PARTITION (partcol1=val1, ...)] VALUES (value1_row1, value2_row1, ...), (value1_row2, value2_row2, ...), ...;
案例:向配置表中插入几条记录
CREATE TABLE app_config (config_key STRING, config_value STRING, updated_by STRING);
INSERT INTO TABLE app_config VALUES ('timeout', '300', 'admin'), ('retry_count', '3', 'admin');
(二) INSERT ... SELECT
:从查询结果加载数据 (核心用法)
这是最常用的 INSERT
形式,将一个 SELECT
查询的结果写入到目标表中。可以配合 OVERWRITE
或 INTO
(追加)。
语法:
INSERT OVERWRITE TABLE target_table [PARTITION (part_col1=val1, ...)] [IF NOT EXISTS]
SELECT col1, col2, ... FROM source_table WHERE condition;INSERT INTO TABLE target_table [PARTITION (part_col1=val1, ...)]
SELECT col1, col2, ... FROM source_table WHERE condition;
- 动态分区:可以将分区列也作为
SELECT
语句的最后几列,并在PARTITION
子句中只列出分区列名,实现动态分区加载。
SET hive.exec.dynamic.partition=true;
SET hive.exec.dynamic.partition.mode=nonstrict;
INSERT OVERWRITE TABLE target_partitioned_table PARTITION (dt, country)
SELECT ..., date_col AS dt, country_col AS country FROM source_table;
案例:
- 从
employees_load
筛选数据覆盖写入active_employees
CREATE TABLE active_employees (id INT, name STRING, department STRING);
INSERT OVERWRITE TABLE active_employees
SELECT id, name, department FROM employees_load WHERE department != 'HR';
- 动态分区加载日志数据
CREATE TABLE daily_logs_partitioned (message STRING, severity STRING) PARTITIONED BY (log_date DATE, log_hour INT);
SET hive.exec.dynamic.partition=true;
SET hive.exec.dynamic.partition.mode=nonstrict;
-- 假设 raw_logs 表有 message, severity, event_time (TIMESTAMP) 列
INSERT OVERWRITE TABLE daily_logs_partitioned PARTITION (log_date, log_hour)
SELECT message, severity, to_date(event_time) AS log_date, hour(event_time) AS log_hour
FROM raw_logs;
(三) 多表插入 (Multiple Inserts):一次查询,多目标写入
Hive 允许从一个源查询 (FROM ... SELECT ...
) 的结果同时插入到多个表或分区中,这能有效减少对源表的扫描次数。
语法:
FROM source_table
INSERT OVERWRITE TABLE target_table1 [PARTITION (part_spec1)]
SELECT_clause1 [WHERE where_clause1]
INSERT OVERWRITE TABLE target_table2 [PARTITION (part_spec2)]
SELECT_clause2 [WHERE where_clause2]
...;
- 每个
INSERT
子句可以有自己独立的SELECT
列列表和WHERE
条件,但它们都作用于同一个FROM source_table
。
案例:根据用户类型将用户数据分发到不同表
CREATE TABLE premium_users (user_id STRING, name STRING, join_date DATE);
CREATE TABLE regular_users (user_id STRING, name STRING, email STRING);
-- 假设 all_users 表有 user_id, name, user_type, join_date, email 列
FROM all_users
INSERT OVERWRITE TABLE premium_users
SELECT user_id, name, join_date WHERE user_type = 'PREMIUM'
INSERT OVERWRITE TABLE regular_users
SELECT user_id, name, email WHERE user_type = 'REGULAR';
(四) INSERT OVERWRITE DIRECTORY
:将查询结果导出到 HDFS 目录
此功能允许将查询结果直接写入 HDFS 文件系统中的指定目录,而不是 Hive 表。这对于数据导出、与其他系统集成或生成报告文件非常有用。
语法:
INSERT OVERWRITE [LOCAL] DIRECTORY 'directory_path'
[ROW FORMAT row_format_for_output_file]
[STORED AS file_format_for_output_file] -- STORED AS TEXTFILE is default if not specified
SELECT_statement;
LOCAL
: 如果指定,目录路径是本地文件系统;否则是 HDFS 路径。ROW FORMAT
: 可以指定输出文件的行格式,如字段分隔符。STORED AS
: 指定输出文件的存储格式 (如 TEXTFILE, ORC, PARQUET)。
案例:将高薪员工信息导出到 HDFS 目录,以 CSV 格式存储
INSERT OVERWRITE DIRECTORY '/output/high_salary_employees_csv'
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS TEXTFILE -- 可以省略,TEXTFILE 是默认的
SELECT id, name, salary
FROM employees_load
WHERE salary >= 100000;
导出到本地文件系统:
INSERT OVERWRITE LOCAL DIRECTORY '/tmp/local_employee_report'
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
SELECT name, department
FROM employees_load;
(五)、CREATE TABLE AS SELECT (CTAS)
:创建并加载
CTAS 一步完成表的创建和数据填充,非常便捷。
语法核心:
CREATE TABLE [IF NOT EXISTS] new_table_name
[COMMENT table_comment]
[ROW FORMAT row_format] -- 通常不需要,除非想覆盖默认的,但CTAS主要用于数据和结构推断
[STORED AS file_format]
[LOCATION hdfs_path]
[TBLPROPERTIES (...)]
AS
select_statement;
代码案例:创建汇总表
CREATE TABLE department_salary_summary
STORED AS PARQUET
AS
SELECT department, AVG(salary) as avg_salary, COUNT(*) as num_employees
FROM employees_load
GROUP BY department;
结语:掌握 INSERT
,驾驭 Hive 数据流
Hive 的 LOAD DATA
和各种形式的 INSERT
语句共同构成了其强大的数据加载和操纵能力。从简单的文件加载到复杂的查询结果转换、多目标分发乃至数据导出,INSERT
语句几乎无所不能。理解并熟练运用这些命令,结合对静态与动态分区的认知,是每一位 Hive 用户进行高效数据处理的基石。
练习题
-
场景:你有一个简单的日志级别表
log_levels
,包含level_id INT
和level_name STRING
两列。
要求:使用INSERT ... VALUES
语句向log_levels
表中插入以下三条记录:- (1, ‘INFO’)
- (2, ‘WARN’)
- (3, ‘ERROR’)
请编写相应的 HQL 代码(包括建表语句)。
-
场景:你有一个源表
raw_sales_data
包含product_id STRING, sales_amount DECIMAL(10,2), sale_date STRING
(格式 ‘YYYY-MM-DD’)。你希望将2023年10月份的销售数据加载到一个按month_partition STRING
(格式 ‘YYYY-MM’) 分区的表october_sales
中,并覆盖该分区现有数据。
要求:编写创建october_sales
表(包含product_id
,sales_amount
列和分区列)的 HQL,以及加载数据的 HQL。 -
场景:你有一个订单表
all_orders
包含order_id STRING, customer_id STRING, order_status STRING, order_value DECIMAL(12,2)
。你希望根据order_status
将数据分别写入到两个新表:completed_orders
(状态为 ‘COMPLETED’) 和pending_orders
(状态为 ‘PENDING’ 或 ‘PROCESSING’)。
要求:假设completed_orders
和pending_orders
表已创建且只包含order_id
和order_value
列。使用一次FROM
子句和多表插入完成数据分发,覆盖目标表数据。 -
场景:你有一个表
customer_feedback
包含customer_id STRING, feedback_text STRING, rating INT
。你希望将所有评分低于3星(即 rating < 3)的反馈导出到 HDFS 目录/user/reports/low_rating_feedback
中,以制表符分隔的文本文件形式存储。
要求:编写相应的 HQL 导出语句。 -
场景:你有一个原始事件表
event_logs_source
包含event_id STRING, user_id STRING, event_type STRING, event_timestamp TIMESTAMP
。你希望将这些数据加载到一个新的分区表categorized_events
中,该表按event_day DATE
(从event_timestamp
提取) 和event_category STRING
(假设根据event_type
的某种规则映射,例如 ‘TYPE_A’ -> ‘Category1’, ‘TYPE_B’ -> ‘Category2’, 其他 -> ‘OtherCategory’) 进行双重动态分区。
要求:编写创建categorized_events
表(包含event_id
,user_id
列和两个分区列)的 HQL,以及使用动态分区加载数据的 HQL。
练习题答案
- 题目一答案:
CREATE TABLE log_levels (
level_id INT,
level_name STRING
);INSERT INTO TABLE log_levels VALUES (1, 'INFO'), (2, 'WARN'), (3, 'ERROR');
- 题目二答案:
CREATE TABLE october_sales (
product_id STRING,
sales_amount DECIMAL(10,2)
)
PARTITIONED BY (month_partition STRING)
STORED AS ORC;-- 假设要加载到 '2023-10' 分区
ALTER TABLE october_sales ADD IF NOT EXISTS PARTITION (month_partition='2023-10');INSERT OVERWRITE TABLE october_sales PARTITION (month_partition='2023-10')
SELECT
product_id,
sales_amount
FROM
raw_sales_data
WHERE
SUBSTR(sale_date, 1, 7) = '2023-10';
- 题目三答案:
-- 假设表已创建:
-- CREATE TABLE completed_orders (order_id STRING, order_value DECIMAL(12,2));
-- CREATE TABLE pending_orders (order_id STRING, order_value DECIMAL(12,2));FROM all_orders
INSERT OVERWRITE TABLE completed_orders
SELECT order_id, order_value WHERE order_status = 'COMPLETED'
INSERT OVERWRITE TABLE pending_orders
SELECT order_id, order_value WHERE order_status IN ('PENDING', 'PROCESSING');
- 题目四答案:
INSERT OVERWRITE DIRECTORY '/user/reports/low_rating_feedback'
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE
SELECT
customer_id,
feedback_text,
rating
FROM
customer_feedback
WHERE
rating < 3;
- 题目五答案:
CREATE TABLE categorized_events (
event_id STRING,
user_id STRING
)
PARTITIONED BY (event_day DATE, event_category STRING)
STORED AS PARQUET;SET hive.exec.dynamic.partition=true;
SET hive.exec.dynamic.partition.mode=nonstrict;
-- 假设最大动态分区数也已适当设置,例如 SET hive.exec.max.dynamic.partitions.pernode=1000;INSERT OVERWRITE TABLE categorized_events PARTITION (event_day, event_category)
SELECT
event_id,
user_id,
to_date(event_timestamp) AS event_day_val, -- 对应第一个动态分区列 event_day
CASEWHEN event_type = 'TYPE_A' THEN 'Category1'WHEN event_type = 'TYPE_B' THEN 'Category2'ELSE 'OtherCategory'
END AS event_category_val -- 对应第二个动态分区列 event_category
FROM
event_logs_source;