血緣元數據采集開放標準:OpenLineage Guides 使用 Apache Airflow? 和 OpenLineage + Marquez 入門

OpenLineage

OpenLineage 是一個用于元數據和血緣采集的開放標準,專為在作業運行時動態采集數據而設計。它通過統一的命名策略定義了由作業(Job)、運行實例(Run)和數據集(Dataset) 組成的通用模型,并通過可擴展的Facets機制對這些實體進行元數據增強。
該項目是 LF AI & Data 基金會的畢業級項目,處于活躍開發階段,歡迎社區貢獻。

使用 Apache Airflow? 和 OpenLineage + Marquez 入門

Getting Started with Apache Airflow? and OpenLineage+Marquez

本教程將指導你配置 Apache Airflow? 以將 OpenLineage 事件發送到 Marquez,并通過一個真實的故障排查場景進行探索。

目錄

  • 前提條件
  • 獲取并啟動 Marquez
  • 配置 Airflow 將 OpenLineage 事件發送到 Marquez
  • 編寫 Airflow DAG
  • 在 Marquez 中查看已收集的血緣
  • 使用 Marquez 排查失敗的 DAG
  • 后續步驟
  • 反饋

前提條件

開始前,請確保已安裝:

  • Docker 17.05+
  • Apache Airflow 2.7+ 本地運行。

如需在本地輕松安裝并運行 Airflow 以用于開發,請參閱:快速開始。

獲取并啟動 Marquez

  1. 創建 Marquez 目錄,然后通過運行以下命令檢出 Marquez 源碼:

    MacOS/Linux

    git clone https://github.com/MarquezProject/marquez && cd marquez
    

    Windows

    git config --global core.autocrlf false
    git clone https://github.com/MarquezProject/marquez && cd marquez
    
  2. Airflow 和 Marquez 都需要 5432 端口用于其元數據庫,但 Marquez 服務更易于配置。你也可以即時為數據庫服務分配一個新端口。要使用 2345 端口啟動 Marquez,請運行:

    MacOS/Linux

    ./docker/up.sh --db-port 2345
    

    Windows

    驗證 Postgres 和 Bash 是否在 PATH 中,然后運行:

    sh ./docker/up.sh --db-port 2345
    
  3. 要查看 Marquez UI 并驗證其運行狀態,請打開 http://localhost:3000。該 UI 允許你:

    • 查看跨平臺依賴關系,即你可在生態系統中查看生成或消費關鍵表的工具中的作業。
    • 查看當前和先前作業運行的運行級元數據,使你能夠看到作業的最新狀態和數據集的更新歷史。
    • 獲取資源使用情況的高級視圖,使你能夠查看操作中的趨勢。

配置 Airflow 將 OpenLineage 事件發送到 Marquez

  1. 要配置 Airflow 以將 OpenLineage 事件發送到 Marquez,你需要修改本地 Airflow 環境并添加依賴。首先,定義一個 OpenLineage 傳輸。一種方法是使用環境變量。要使用 http 并將事件發送到本地端口 5000 上運行的 Marquez API,請運行:

    MacOS/Linux

    export AIRFLOW__OPENLINEAGE__TRANSPORT='{"type": "http", "url": "http://localhost:5000", "endpoint": "api/v1/lineage"}'
    

    Windows

    set AIRFLOW__OPENLINEAGE__TRANSPORT='{"type": "http", "url": "http://localhost:5000", "endpoint": "api/v1/lineage"}'
    
  2. 你還需要為 Airflow 作業定義一個命名空間。它可以是任意字符串。請運行:

    MacOS/Linux

    export AIRFLOW__OPENLINEAGE__NAMESPACE='my-team-airflow-instance'
    

    Windows

    set AIRFLOW__OPENLINEAGE__NAMESPACE='my-team-airflow-instance'
    
  3. 要將所需的 Airflow OpenLineage Provider 包添加到你的 Airflow 環境,請運行:

    MacOS/Linux

    pip install apache-airflow-providers-openlineage
    

    Windows

    pip install apache-airflow-providers-openlineage
    
  4. 要完成本教程,你還需要在 Airflow 中啟用本地 Postgres 操作。為此,請運行:

    MacOS/Linux

    pip install apache-airflow-providers-postgres
    

    Windows

    pip install apache-airflow-providers-postgres
    
  5. 在本地 Postgres 實例中創建一個數據庫,并使用默認 ID (postgres_default) 創建一個 Airflow Postgres 連接。如需前者幫助,請參閱:Postgres 文檔。如需后者幫助,請參閱:管理連接。

編寫 Airflow DAG

在此步驟中,你將創建兩個新的 Airflow DAG,它們執行簡單任務,并將其添加到你現有的 Airflow 實例。counter DAG 每分鐘將列值加 1,而 sum DAG 每五分鐘計算一次總和。這將形成一個包含兩個作業和兩個數據集的簡單管道。

  1. dags/ 目錄下,創建一個名為 counter.py 的文件,并添加以下代碼:

    import pendulum
    from airflow.decorators import dag, task
    from airflow.providers.postgres.operators.postgres import PostgresOperator
    from airflow.utils.dates import days_ago@dag(schedule='*/1 * * * *',start_date=days_ago(1),catchup=False,is_paused_upon_creation=False,max_active_runs=1,description='DAG that generates a new count value equal to 1.'
    )def counter():query1 = PostgresOperator(task_id='if_not_exists',postgres_conn_id='postgres_default',sql='''CREATE TABLE IF NOT EXISTS counts (value INTEGER);''',)query2 = PostgresOperator(task_id='inc',postgres_conn_id='postgres_default',sql='''INSERT INTO "counts" (value) VALUES (1);''',)query1 >> query2counter()
  2. dags/ 目錄下,創建一個名為 sum.py 的文件,并添加以下代碼:

    import pendulum
    from airflow.decorators import dag, task
    from airflow.providers.postgres.operators.postgres import PostgresOperator
    from airflow.utils.dates import days_ago@dag(start_date=days_ago(1),schedule='*/5 * * * *',catchup=False,is_paused_upon_creation=False,max_active_runs=1,description='DAG that sums the total of generated count values.'
    )def sum():query1 = PostgresOperator(task_id='if_not_exists',postgres_conn_id='postgres_default',sql='''CREATE TABLE IF NOT EXISTS sums (value INTEGER);''')query2 = PostgresOperator(task_id='total',postgres_conn_id='postgres_default',sql='''INSERT INTO sums (value)SELECT SUM(value) FROM counts;''')query1 >> query2sum()
  3. 重啟 Airflow 以應用更改。然后,取消暫停兩個 DAG。

在 Marquez 中查看已收集的血緣

  1. 要查看 Marquez 從 Airflow 收集的血緣,請訪問 http://localhost:3000 打開 Marquez UI。然后,使用左上角搜索欄搜索 counter.inc 作業。要查看 counter.inc 的血緣元數據,請從下拉列表中點擊該作業:

    image

  2. 查看 counter.inc 的血緣圖,你應看到 <database>.public.counts 作為輸出數據集,sum.total 作為下游作業:

    image

使用 Marquez 排查失敗的 DAG

  1. 在此步驟中,你將模擬由于跨 DAG 依賴項更改導致的管道中斷,并了解來自 OpenLineage + Marquez 的增強血緣如何使架構更改的故障排查變得輕松。

    假設 Team A 擁有 DAG counterTeam A 更新 counter 以將 counts 表中的 values 列重命名為 value_1_to_10,但未將架構更改正確傳達給擁有 sum 的團隊。

    counter 應用以下更改以模擬破壞性更改:

    query1 = PostgresOperator(
    -   task_id='if_not_exists',
    +   task_id='alter_name_of_column',postgres_conn_id='example_db',sql='''
    -   CREATE TABLE IF NOT EXISTS counts (
    -     value INTEGER
    -   );''',
    +   ALTER TABLE "counts" RENAME COLUMN "value" TO "value_1_to_10";
    +   '''
    )
    
    query2 = PostgresOperator(task_id='inc',postgres_conn_id='example_db',sql='''
    -    INSERT INTO counts (value)
    +    INSERT INTO counts (value_1_to_10)VALUES (1)''',
    )
    

    正如 sum 的所有者 Team B 所做的那樣,注意 Marquez 中 DataOps 視圖的失敗運行:

    image

    Team B 只能猜測 DAG 失敗的可能原因,因為 DAG 最近沒有更改。因此,團隊決定檢查 Marquez。

  2. 在 Marquez 中,導航到 Datasets 視圖,并從右上角的命名空間下拉菜單中選擇你的 Postgres 實例。然后,點擊 <database>.public.counts 數據集并檢查圖表。你將在節點上找到架構:

    image

  3. 假設你不認識該列,并希望了解其原始名稱及更改時間。點擊節點將打開詳情抽屜。在那里,使用版本歷史查找架構更改的運行:

    image

  4. 在 Airflow 中,通過更新計算計數總和的任務以使用新列名來修復中斷的下游 DAG:

    query2 = PostgresOperator(task_id='total',postgres_conn_id='example_db',sql='''
    -    INSERT INTO sums (value)
    -       SELECT SUM(value) FROM counts;
    +       SELECT SUM(value_1_to_10) FROM counts;'''
    )
    
  5. 重新運行 DAG。在 Marquez 中,通過查看 DataOps 視圖中最近的運行歷史來驗證修復:

    image

后續步驟

  • 查看用于收集 Airflow DAG 元數據的 Marquez HTTP API,并學習如何使用 OpenLineage 構建自己的集成。
  • 查看可與 Airflow 一起使用的 openlineage-spark 集成。

反饋

你覺得本指南如何?請在 OpenLineage Slack 或 Marquez Slack 中告訴我們。你也可以通過 提交拉取請求 直接提出更改。

風險提示與免責聲明
本文內容基于公開信息研究整理,不構成任何形式的投資建議。歷史表現不應作為未來收益保證,市場存在不可預見的波動風險。投資者需結合自身財務狀況及風險承受能力獨立決策,并自行承擔交易結果。作者及發布方不對任何依據本文操作導致的損失承擔法律責任。市場有風險,投資須謹慎。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/95435.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/95435.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/95435.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

FPGA|Quartus II 中使用TCL文件進行引腳一鍵分配

在FPGA設計過程中&#xff0c;合理的引腳分配是確保硬件功能正確實現的關鍵步驟之一。Quartus II 提供了通過 TCL&#xff08;Tool Command Language&#xff09;腳本自動化引腳分配的功能&#xff0c;這不僅可以大大提高設計效率&#xff0c;還能夠確保引腳分配的精確性和可重…

【Docker/Redis】服務端高并發分布式結構演進之路

目錄 概述 常見概念 基本概念 應用&#xff08;Application&#xff09;/ 系統&#xff08;System&#xff09; 模塊&#xff08;Module&#xff09;/ 組件&#xff08;Component&#xff09; 分布式&#xff08;Distributed&#xff09; 集群&#xff08;Cluster&#x…

【Excel】將一個單元格內??的多行文本,??拆分成多個單元格,每個單元格一行??

??所有文本都堆積在“prefix”列頂部的同一個單元格里&#xff08;很可能是B10單元格&#xff09;&#xff0c;并且它們是用空格分隔的&#xff0c;而不是換行符。??因此&#xff0c;您不需要處理換行符&#xff0c;而是需要??按“空格”進行分列&#xff0c;并且將分列后…

新手SEO操作第一步

內容概要 網站優化對于新手而言&#xff0c;常常感覺無從下手。別擔心&#xff0c;這篇文章就是為你量身打造的入門指南。我們將從最基礎也是最重要的關鍵詞研究開始講起&#xff0c;手把手教你如何精準找到目標用戶搜索的詞。掌握了關鍵詞&#xff0c;接下來就是如何創作出搜索…

【高階數據結構】秘法(一)——并查集:探索如何高效地管理集合

前言&#xff1a; 前面我們已經學習了簡單的數據結構&#xff0c;包括棧與隊列、二叉樹、紅黑樹等等&#xff0c;今天我們繼續數據結構的學習&#xff0c;但是難度上會逐漸增大&#xff0c;在高階數據結構中我們要學習的重點是圖等 目錄 一、并查集的原理 二、并查集的基本操作…

spring boot 整合AI教程

1、pom.xml配置<?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0"xmlns:xsi"http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation"http://maven.apache.org/POM/4…

基于SpringBoot2+Vue2開發的儲物柜管理系統

角色 管理員&#xff1a;管理系統、用戶&#xff0c;管理儲物柜用戶&#xff1a;借用、歸還儲物柜&#xff0c;報修故障 技術棧 后端&#xff1a;Springboot2, JWT, PageHelper前端&#xff1a;Vue2數據庫&#xff1a;MySQL 核心功能 提供智能儲物柜管理&#xff0c;包括用戶注…

uniapp中輸入金額的過濾(只允許輸入數字和小數點)

一、完整代碼&#xff1a; <template><view class"numberIndex" :style"{ paddingTop: navbarHeight px }"><view class"custom-navbar" :style"{ paddingTop: statusBarHeight px }"><view class"navbar…

系統科學核心概念辨析及其在人工智能領域的應用研究:一個整合性分析框架

摘要&#xff1a;本文旨在系統性地梳理和辨析系統科學中的核心概念——結構、功能與層級。文章首先追溯系統思想的理論源流&#xff0c;確立其作為一種超越還原論的整體性研究范式。在此基礎上&#xff0c;深度剖析系統結構的內在構成&#xff08;組分、框架、動態性&#xff0…

Ubuntu環境下刪除Docker鏡像與容器、配置靜態IP地址

刪除Docker鏡像與容器刪除容器&#xff1a;要刪除特定的Docker容器&#xff0c;首先需要停止該容器&#xff1a;docker stop <container_id_or_name>然后可以使用以下命令刪除它&#xff1a;docker rm <container_id_or_name>如果要強制刪除正在運行的容器&#xf…

零樣本視覺模型(DINOv3)

DINOv3是Meta于2025年8月14日發布的第三代自監督視覺基礎模型&#xff0c;通過17億張無標注圖像訓練&#xff0c;參數規模最大達70億&#xff0c;首次在密集預測任務中全面超越弱監督模型&#xff0c;成為計算機視覺領域的里程碑。其核心突破在于無需人工標注即可生成高分辨率密…

【機器學習入門】5.2 回歸的起源——從身高遺傳到線性模型的百年演變

提到 “回歸”&#xff0c;很多剛入門的同學會覺得它是個抽象的數學概念&#xff0c;但你可能想不到&#xff0c;這個術語的誕生&#xff0c;竟然源于 19 世紀一位生物學家對 “身高遺傳” 的研究。回歸分析從 “觀察生物現象” 出發&#xff0c;逐步發展成機器學習中預測連續值…

輕型載貨汽車變速器設計cad+設計說明書

摘 要 變速器是汽車重要的傳動系組成&#xff0c;在較大范圍內改變汽車行駛速度的大小和汽車驅動輪上扭矩的大小。變速器能在發動機旋轉方向不變的前提下&#xff0c;使汽車倒退行駛&#xff0c;而且利用擋位可以中斷動力的傳遞。所以變速器的結構設計的合理性直接影響到汽車動…

如何對嵌入式軟件進行單元測試

ceedling就是一款嵌入式軟件測試框架。ceedling是一個用ruby語言編寫的C語言自動化測試框架&#xff0c;它集成了Cmock、Unity和Cexception等多個開源項目。在整個ceedling框架中&#xff0c;使用unity進行代碼測試&#xff0c;使用CMock生成模擬函數&#xff0c;使用CExceptio…

通義萬相Wan2.2-S2V-14B:AI視頻生成的革命性突破與實踐指南

一張圖片+一段音頻=電影級數字人視頻?這不是魔法,是開源AI技術帶來的現實。 近日,阿里巴巴通義萬相團隊開源了Wan2.2-S2V-14B模型,僅在短短幾天內就引發了AI視頻生成領域的震動。這個僅需**一張靜態圖片**和**一段音頻**就能生成影視級質量視頻的模型,正在改變我們對AI視…

基于 HTML、CSS 和 JavaScript 的智能圖像銳化系統

目錄 1 前言 2 技術實現 2.1 HTML&#xff1a;構建系統骨架? 2.2 CSS&#xff1a;打造視覺與交互體驗? 2.3 JavaScript&#xff1a;實現核心銳化邏輯? 3 代碼解析 3.1 數據存儲與初始化 3.2 圖像加載流程 3.3 銳化算法核心&#xff1a;卷積計算? 3.4 下載功能實現…

(MySQL)分布式鎖

在分布式系統中&#xff0c;多個進程可能會同時對同一資源進行操作&#xff0c;如果沒有同步機制&#xff0c;就會造成數據不一致問題。為了避免這種情況&#xff0c;需要分布式鎖。Redis 是常見的實現方式&#xff0c;但在某些場景下&#xff0c;我們也可以使用 MySQL 來實現分…

基于RS-485接口的芯片的FPGA驅動程序

1.簡介ADM3485E 是一款 3.3V 低功耗數據收發器&#xff0c;具有 15kV 的 ESD&#xff08;靜電放電&#xff09; 保護&#xff0c;專為多點總線傳輸線上的半雙工通信設計。它支持平衡數據傳輸&#xff0c;符合 TIA/EIA 標準 RS-485 和 RS-422 的要求。作為一款半雙工收發器&…

SQLSERVER關鍵字:N

在 SQL Server 中&#xff0c;單獨的 N 并不是一個 “關鍵字”&#xff0c;但它作為前綴有特殊含義 —— 用于標識字符串為 Unicode 字符串&#xff08;對應 NVARCHAR、NCHAR 等 Unicode 數據類型&#xff09;。具體作用當字符串前加 N 前綴時&#xff0c;SQL Server 會將該字符…

【MySQL基礎】MySQL核心操作全解析

【MySQL基礎】MySQL核心操作全解析前言一、數據庫操作&#x1f636;?&#x1f32b;?1.1 查看數據庫&#x1f50d;1.2 創建數據庫? 1.3 選擇數據庫&#x1f4cc; 1.4 刪除數據庫? 二、數據表操作&#x1f4cb; 2.1 創建數據表? 2.2 查看數據表&#x1f50d; 2.3 查看表結構…