欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 财经 > 金融 > Airflow量化入门系列:第一章 Apache Airflow 基础

Airflow量化入门系列:第一章 Apache Airflow 基础

2025/5/5 21:13:02 来源:https://blog.csdn.net/weixin_47339916/article/details/146917656  浏览:    关键词:Airflow量化入门系列:第一章 Apache Airflow 基础

Airflow量化入门系列:第一章 Apache Airflow 基础

本教程系统性地讲解了 Apache Airflow 在 A 股量化交易中的应用,覆盖从基础安装到高级功能的完整知识体系。通过六章内容,读者将掌握 Airflow 的核心概念、任务调度、数据处理、技术指标计算、策略回测及工作流监控等关键技能。教程整合 Tushare 数据源、TA-Lib 技术指标库和 VectorBT 策略回测工具,提供丰富实战案例,帮助构建高效、可靠的工作流,助力量化交易实践。
文中内容仅限技术学习与代码实践参考,市场存在不确定性,技术分析需谨慎验证,不构成任何投资建议。适合量化新手建立系统认知,为策略开发打下基础。

AirFlow

学习对象

  • 中高级水平的开发者
  • 具备 Python 编程基础,熟悉基本的 ETL 流程和数据分析工具
  • 希望掌握 Airflow 在量化交易场景中的应用

教程目标

  • 系统掌握 Apache Airflow 的核心功能与高级特性
  • 深入理解 Airflow 在 A 股量化交易工作流中的应用
  • 能够独立设计、部署和维护复杂的量化交易工作流

教程目录

第一章:Apache Airflow 基础

1.1 Airflow 简介与安装
1.2 Airflow 核心概念
1.3 Airflow Web UI 使用与管理
1.4 Airflow 配置与环境搭建

第二章:Airflow 任务调度与依赖管理

2.1 DAG 定义与任务依赖关系
2.2 Task 类型
2.3 任务调度策略与优先级
2.4 动态任务生成与条件分支

第三章:A 股市场数据处理与存储

3.1 使用 Tushare 获取 A 股数据
3.2 数据清洗与预处理
3.3 Parquet 文件存储与读取
3.4 数据存储优化与性能提升

第四章:技术指标计算与策略实现

4.1 TA-Lib 基础与常用技术指标
4.2 技术指标计算流程设计
4.3 策略回测与评估(使用 Vector BT)
4.4 策略优化与参数调整

第五章:Airflow 高级功能与最佳实践

5.1 Airflow 与 Docker 集成
5.2 Airflow 任务监控与日志分析
5.3 Airflow 任务容错与重试机制
5.4 Airflow 任务性能优化

第六章:完整案例:A 股量化交易工作流

6.1 数据获取与存储工作流
6.2 技术指标计算工作流
6.3 策略回测与结果存储工作流
6.4 工作流监控与维护

第一章 Apache Airflow 基础

1.1 Airflow 简介与安装

理论详解

Apache Airflow 是一个开源的工作流管理平台,用于编写、调度和监控复杂的工作流。它具有以下核心功能:

  • 任务调度:支持复杂的任务依赖关系和调度策略。
  • 任务监控:通过 Web UI 实时监控任务状态和日志。
  • 可扩展性:支持多种任务类型(如 Bash、Python、HTTP 等)和多种存储后端(如 SQLite、MySQL、PostgreSQL)。

应用场景

  • 数据管道管理
  • ETL 流程自动化
  • 量化交易策略执行与监控

基础架构:Basic Airflow Architecture1

Basic Airflow Architecture

  • Metadata Database:Airflow 使用 SQL 数据库来存储有关正在运行的数据管道的元数据。在上图中,使用 Postgres数据库,它在 Airflow 中非常流行。Airflow 默认数据库为 SQLite。
  • Webserver:Airflow Web 服务器和调度器是单独的进程,在本例中运行在本地机器上,并与上面提到的数据库交互。
  • Executor:在图中,执行器被单独列出,因为它是 Airflow 中经常被讨论的内容,也是教程中提到的关键部分。但实际上,执行器并不是一个单独的进程,而是运行在调度器内部。
  • Worker(s):工作进程是单独的进程,它们也与其他 Airflow 架构组件以及元数据存储库进行交互。
  • DAG(s):包含 Python 代码的文件,用于表示 Airflow 要运行的数据管道。这些文件的位置在 Airflow 配置文件中指定,但它们需要能够被 Web Server、Scheduler 和 Workers 访问。
  • airflow.cfg 是 Airflow 配置文件,由 Web Server、Scheduler和 Worker 访问。

优势

  • 灵活性:支持多种任务类型和调度策略。
  • 可扩展性:支持分布式执行和多种存储后端。
  • 易用性:通过 Web UI 直观管理任务。

实战示例

安装 Airflow

# 安装 Airflow
pip install apache-airflow# 初始化 Airflow 数据库,默认:SQLite
airflow db init# 创建默认用户
airflow users create --role Admin --username admin --email admin@example.com --firstname admin --lastname admin --password admin# 启动 Airflow Web Server,默认端口:8080
airflow webserver # 启动 Airflow Scheduler
airflow scheduler

1.2 Airflow 核心概念

理论详解

DAG(Directed Acyclic Graph)

  • 任务流程的定义,表示任务之间的依赖关系。
  • 无环图,确保任务可以按顺序执行。

Task

  • 任务单元,可以是 Bash、Python、HTTP 等类型。
  • 任务可以设置优先级、重试机制等。

Operator

  • 任务的执行逻辑。
  • 常见的 Operator 包括:
    • BashOperator:执行 Bash 命令。
    • PythonOperator:执行 Python 函数。
    • BranchOperator:根据条件分支任务。

Executor

  • 任务调度机制。
  • 常见的 Executor 包括:
    • SequentialExecutor:默认执行器,单进程顺序执行任务,适用于调试和开发环境。
    • LocalExecutor:所有任务都在同一个进程中运行,适用于开发和测试环境。
    • CeleryExecutor:使用 Celery 作为任务队列,支持多台机器上的分布式任务执行,多用于生产场景,使用时需要配置消息队列。
    • KubernetesExecutor:在 Kubernetes 集群中动态创建 Pod 来执行任务,每个任务在一个独立的 Pod 中运行,需要配置 Kubernetes API 和相关资源。
    • DaskExecutor:使用 Dask 分布式计算库,支持并行和分布式计算,适用于数据密集型任务。
    • LocalKubernetesExecutor:对于本地任务使用 LocalExecutor,对于 Kubernetes 任务使用 KubernetesExecutor,适用于混合环境,既可以在本地运行一些任务,又可以在 Kubernetes 上运行其他任务。

任务实例 Task Instances2

任务的一个实例是该任务在特定 DAG(以及特定数据时间间隔)中的一次具体运行。任务实例也是具有状态的任务的表示,反映了它所处的生命周期阶段。

Task Lifecycle Diagram

任务实例的可能状态包括:

  • none:任务尚未被排队执行(其依赖条件尚未满足)。
  • scheduled:调度器已确定任务的依赖条件已满足,可以运行。
  • queued:任务已被分配给执行器,正在等待工作进程。
  • running:任务正在工作进程上运行(或在本地/同步执行器上运行)。
  • success:任务成功完成,没有错误。
  • restarting:任务在运行时被外部请求重新启动。
  • failed:任务在执行过程中出现错误,未能运行。
  • skipped:由于分支、LatestOnly 或类似原因,任务被跳过。
  • upstream_failed:上游任务失败,触发规则要求需要它。
  • up_for_retry:任务失败,但还有重试机会,将被重新调度。
  • up_for_reschedule:任务是一个处于重新调度模式的传感器。
  • deferred:任务已委托给触发器。
  • removed:任务在运行开始后从 DAG 中消失。

理想情况下,任务应从 none 状态开始,依次经过 scheduled、queued、running,最终达到 success 状态。

实战示例

设计一个简单的 DAG

创建一个 DAG,包含多个任务依赖关系。

from datetime import datetimefrom airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperatordef task1(**kwargs):"""任务 1:打印任务开始信息。:param kwargs: Airflow 任务参数:return: None"""print("Task 1 started")def task2(**kwargs):"""任务 2:打印任务执行信息。:param kwargs: Airflow 任务参数:return: None"""print("Task 2 executed")def task3(**kwargs):"""任务 3:打印任务结束信息。:param kwargs: Airflow 任务参数:return: None"""print("Task 3 completed")# 定义 DAG
with DAG("simple_dag",description="A simple DAG with multiple tasks",schedule_interval=None,tags=["quant", "tutorial"],
) as dag:# 定义任务task1 = PythonOperator(task_id="task1", python_callable=task1, provide_context=True)task2 = PythonOperator(task_id="task2", python_callable=task2, provide_context=True)task3 = PythonOperator(task_id="task3", python_callable=task3, provide_context=True)# 设置任务依赖关系task1 >> task2 >> task3

运行 DAG

  1. 将 DAG 文件保存到 Airflow 的 dags 目录。
  2. 在 Airflow Web UI 中触发 DAG,查看任务状态和日志。

1.3 Airflow Web UI 使用与管理

Airflow Web UI

理论详解

Airflow Web UI3 是一个直观的管理工具,支持以下功能:

  • DAGs:查看所有 DAG 的状态和执行历史。
  • Tree:查看 DAG 的任务依赖关系。
  • Graph:以图形化方式展示任务执行状态。
  • Gantt:以甘特图方式展示任务调度计划。
  • Logs:查看任务日志。

实战示例

通过 Web UI 触发 DAG

  1. 打开 Airflow Web UI(默认地址:http://localhost:8080)。
  2. 在 DAG 列表中找到 fetch_stock_data DAG。
  3. 点击 “Trigger DAG” 按钮,输入任务参数(如股票代码、日期范围)。
  4. 查看任务状态和日志。

查看任务日志

  1. 在 DAG 的 “Graph” 视图中找到任务实例。
  2. 点击任务实例,进入任务详情页面。
  3. 在 “Logs” 标签页中查看任务日志。

1.4 Airflow 配置与环境搭建

理论详解

Airflow 的配置文件 airflow.cfg 和环境变量用于管理 Airflow 的运行环境。常见的配置包括:

  1. 基本配置

    • [core] 部分:

      • dags_folder: 指定存放 DAG 文件的目录路径。
      • load_examples: 设置是否加载示例 DAG(默认为 True)。生产环境中建议设为 False。
      • executor: 定义使用的执行器类型,如 LocalExecutor, CeleryExecutor 或 KubernetesExecutor。
      • sql_alchemy_conn: 数据库连接字符串,指定 Airflow 使用哪个数据库来存储元数据。
    • [webserver] 部分:

      • web_server_host: Web 服务器监听的 IP 地址。
      • web_server_port: Web 服务器监听的端口号,默认为 8080。
      • base_url: 如果需要的话,可以设置 Web 界面的基础 URL。
    • [scheduler] 部分:

      • scheduler_heartbeat_sec: 调度器心跳间隔时间(秒)。
      • min_file_process_interval: 文件处理最小间隔时间(秒),防止频繁检查文件变化。
      • dag_dir_list_interval: DAG 目录列表刷新频率(秒)。
  2. 安全性相关配置

    • [webserver] 部分:

      • authenticate: 是否启用身份验证功能。
      • auth_backend: 自定义认证后端模块的位置。
      • secret_key: 用于加密会话的密钥。
    • [api] 部分:

      • auth_backend: API 认证后端的选择。
  3. 性能调优

    • [celery] 部分 (当使用 CeleryExecutor 时):

      • broker_url: 消息队列服务的地址。
      • result_backend: 结果存储的位置。
      • worker_concurrency: 工作者进程的数量。
    • [kubernetes] 部分 (当使用 KubernetesExecutor 时):

      • kube_config: Kubernetes 配置文件路径。
      • namespace: 运行任务所在的命名空间。
      • delete_worker_pods: 完成后是否删除 worker pods。
  4. 邮件通知

    • [smtp] 部分:
      • smtp_host, smtp_starttls, smtp_ssl, smtp_user, smtp_password, smtp_port, smtp_mail_from: 邮件发送相关的配置参数。
  5. 日志管理

    • [logging] 部分:
      • logging_level: 日志级别。
      • base_log_folder: 存放日志的基本目录。
      • remote_logging: 是否启用远程日志记录。
  6. 其他重要配置

    • [operators][hooks] 部分: 可以自定义特定操作符或钩子的行为。
    • 环境变量: 除了直接修改配置文件外,许多配置项也可以通过环境变量来设置,这在容器化部署中非常有用。

实战示例

配置 Airflow 使用 Tushare API

  1. airflow.cfg 文件中添加 Tushare API 配置:

    [core]
    load_examples = False
    [tushare]
    api_token = your_tushare_token
    data_folder = /data
    
  2. 在 DAG 中读取配置:

    from airflow.configuration import conftushare_token = conf.get("tushare", "api_token")
    ts.set_token(tushare_token)
    

完整示例

import os
from datetime import datetimeimport tushare as tsfrom airflow import DAG
from airflow.configuration import conf
from airflow.operators.python import PythonOperator# 配置 Tushare API
tushare_token = conf.get("tushare", "api_token")
ts.set_token(tushare_token)
pro = ts.pro_api()def fetch_stock_data(**kwargs):"""获取特定股票的历史数据并存储为 Parquet 文件。:param kwargs: Airflow 任务参数:return: None"""stock_code = kwargs["dag_run"].conf.get("stock_code", "000001.SZ")start_date = kwargs["dag_run"].conf.get("start_date", "20230101")end_date = kwargs["dag_run"].conf.get("end_date", "20241231")print(f"Fetch data for {stock_code} from {start_date} to {end_date}")# 获取股票数据df = pro.daily(ts_code=stock_code, start_date=start_date, end_date=end_date)# 数据存储目录data_dir = conf.get("tushare", "data_folder")if not os.path.exists(data_dir):os.makedirs(data_dir)# 存储为 Parquet 文件file_path = os.path.join(data_dir, f"stock_data_{stock_code}.parquet")df.to_parquet(file_path, index=False)print(f"Data saved to {file_path}")# 定义 DAG
with DAG("fetch_stock_data",description="Fetch stock data from Tushare and save as Parquet",schedule_interval=None,start_date=datetime(2025, 1, 1),tags=["quant", "stock"],
) as dag:# 定义任务fetch_task = PythonOperator(task_id="fetch_stock_data",python_callable=fetch_stock_data,provide_context=True,)# 设置任务依赖关系
fetch_task

运行 DAG

  1. 将 DAG 文件保存到 Airflow 的 dags 目录。
  2. 在 Airflow Web UI 中触发 DAG,查看任务状态和日志。

总结

通过本章的学习,您已经掌握了 Apache Airflow 的基础概念和基本操作。在接下来的章节中,我们将深入探讨 Airflow 的任务调度、依赖管理以及在 A 股量化交易中的应用。

风险提示与免责声明
本文内容基于公开信息研究整理,不构成任何形式的投资建议。历史表现不应作为未来收益保证,市场存在不可预见的波动风险。投资者需结合自身财务状况及风险承受能力独立决策,并自行承担交易结果。作者及发布方不对任何依据本文操作导致的损失承担法律责任。市场有风险,投资须谨慎。


  1. https://airflow.apache.org/docs/apache-airflow/2.0.1/concepts.html ↩︎

  2. https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/tasks.html ↩︎

  3. https://airflow.apache.org/docs/apache-airflow/stable/ui.html ↩︎

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词