rcp rapido
Back in 2019, when we were building our data platform, we started building the data platform with Hadoop 2.8 and Apache Hive, managing our own HDFS. The need for managing workflows whether it’s data pipelines, i.e. ETL’s, machine learning predictive, and general generic pipelines was a core requirement where each task is different.
在2019年其他回,當我們建設我們的數據平臺,我們開始用Hadoop 2.8和Apache蜂巢建設數據平臺,管理我們自己的HDFS。 無論是數據管道(即ETL,機器學習預測性管道還是通用管道),都需要管理工作流,這是每個任務都不同的核心要求。
Some just schedule a task on GCP like creating a Cloud Run an instance or Dataproc Spark cluster at a scheduled time and some tasks are different because they can only be scheduled based on data availability or Hive partition availability.
有些僅在GCP上安排任務,例如在創建Cloud時運行實例或Dataproc Spark集群,而有些任務則不同,因為它們只能基于數據可用性或Hive分區可用性進行計劃。
To sum up, the requirements at that time in the data platform which required a scheduling system were like:
綜上所述,當時需要調度系統的數據平臺需求如下:
- ETL pipelines ETL管道
- Machine learning workflows 機器學習工作流程
- Maintenance: Database backups 維護:數據庫備份
API Calls: Example can be Kafka Connect connectors management
API調用:示例可以是Kafka Connect連接 器管理
Run naive cron based jobs where the task is to spin up some infra in a public cloud, for example, to spin up a new cluster or scale-up existing Dataproc cluster, stop the instances or scale the Cloud Run, etc..
運行基于天真的cron的作業,其中的任務是在公共云中啟動一些基礎架構,例如,啟動新集群或擴展現有Dataproc集群,停止實例或擴展Cloud Run等。
- Deployments: Git -> Code deployments 部署:Git->代碼部署
ETL pipelines consist of a complex network of dependencies which are not just data-dependent, and these dependencies can vary based on the use cases metrics, creating canonical forms of datasets, model training
ETL管道由復雜的依賴關系網絡組成,這些依賴關系不僅僅依賴于數據,而且這些依賴關系可以根據用例指標,創建數據集的規范形式,模型訓練而變化
To create immutable datasets (no update, upsert, or delete) we started with a stack of Apache Hive, Apache Hadoop (HDFS), Apache Druid, Apache Kafka, and Apache Spark with the following requirements or goals:
為了創建不可變的數據集(不進行更新,更新或刪除),我們從滿足以下要求或目標的Apache Hive , Apache Hadoop ( HDFS ), Apache Druid , Apache Kafka和Apache Spark堆棧開始:
Creating reproducible pipelines, i.e. Pipelines output need to be deterministic like with Functional programming if there is retry or we retrigger the tasks the outcome should be same, i.e. Pipelines and tasks need to be idempotent
創建可重現的管道,例如,如果有重試,則管道輸出必須像函數式編程一樣具有確定性;如果重試或重新觸發任務, 結果應該是相同的,即管道和任務必須是冪等的
Backfilling is a must since data can evolve.
因為數據可以發展,所以回填是必須的。
Robust — Easy changes to the configuration
堅固耐用 —輕松更改配置
Versioning of configuration, data, and tasks, i.e. easily add or remove new tasks over time or update the existing dags code
配置,數據和任務的版本控制 ,即隨時間輕松添加或刪除新任務,或更新現有的dags代碼
Transparency with data flow: we discussed that something similar to Jaeger Tracing for data platform would be tremendous and checked at possible options like Atlas and Falcon
數據流的透明度 :我們討論過,類似于Jaeger Tracing的數據平臺將是巨大的,并在Atlas和Falcon等可能的選項中進行了檢查
Cloud Scheduler之旅(托管cron) (Journey with Cloud Scheduler(Managed cron))
We started using Cloud Scheduler with shell scripts, and python scripts since it was fully managed by google cloud, and setting up cron jobs is just a few clicks. Cloud Scheduler is a fully managed cron job scheduler.
我們開始將Cloud Scheduler與Shell腳本和python腳本一起使用,因為它完全由Google Cloud管理,并且只需單擊幾下即可設置cron作業。 Cloud Scheduler是完全托管的cron作業調度程序。
The main reason to go with Cloud Scheduler was unlike the self-managed cron instance, there is no single point of failure, and it’s designed to provide “at least once” delivery on jobs from cron tasks to automating resource creation like we used to run jobs which were creating Virtual Machine’s, Cloud Run, etc.
使用Cloud Scheduler的主要原因與自我管理的cron實例不同, 沒有單點故障 ,它旨在為從cron任務到自動化資源創建的作業提供“ 至少一次”交付,就像我們以前運行的那樣創建虛擬機,Cloud Run等的作業。


Cloud scheduler or cron doesn’t offer dependency management, so we have to “hope dependent pipelines finished in the correct order”. Had to scratch from the start for each pipeline or task (starting from blank for each pipeline won’t scale), though cloud scheduler has timezone supported we faced few timezone problems in druid ingestions and subsequent dependent tasks since the jobs were submitted manually via UI brings it can introduce human errors in pipelines, Cloud scheduler can also retry in case of failure to reduce manual toil and intervention, but there is no task dependency and managing complex workflows or backfilling was not available out of the box. So in few weeks, we decided that this may not be suitable for running data and ML pipelines since these involve lof of backfilling requirements, also cross DAG dependencies, and also may require data sharing between tasks.
云調度程序或cron不提供依賴項管理,因此我們必須“希望以正確的順序完成依賴的管道”。 盡管云調度程序已支持時區,但由于調度程序是通過UI手動提交的,因此在德魯伊和后續依賴任務中幾乎沒有時區問題 ,盡管云調度程序已支持時區,但必須從頭開始為每個管道或任務從頭開始(每個管道的空白都不會擴展)。帶來的好處是可以在管道中引入人為錯誤,Cloud Scheduler還可以在失敗的情況下重試,以減少人工勞動和干預,但是它沒有任務依賴性 ,管理復雜的工作流程或開箱即用也無法回填 。 因此,在幾周內,我們認為這可能不適合運行數據和ML管道,因為它們涉及回填要求的不足,還涉及DAG依賴性,并且還可能需要任務之間進行數據共享。
Then, we started looking into the popular open-source workflow management platforms which can handle 100’s of task with failures and callback strategies and tried to code a few tasks, and deploy them in GCP to complete the POC. Projects which were considered were Apache Airflow, Apache Oozie, Luigi, Argo, and Azkaban.
然后,我們開始研究流行的開源工作流管理平臺 ,該平臺可處理帶有故障和回調策略的 100多個任務,并嘗試編寫一些任務,并將其部署在GCP中以完成POC。 被考慮的項目是Apache Airflow , Apache Oozie , Luigi , Argo和Azkaban 。
Both Apache Oozie and Azkaban were top projects at that time with the stable release. Oozie is a reliable workflow scheduler system to manage Apache Hadoop jobs. Oozie is integrated with the rest of the Hadoop stack supporting several types of Hadoop jobs out of the box (such as Java map-reduce, Streaming map-reduce, Pig, Hive, Sqoop, and Distcp) as well as system-specific tasks (such as Java programs and shell scripts).
穩定版發布時,Apache Oozie和Azkaban都是當時的頂級項目。 Oozie是一個可靠的工作流計劃程序系統,用于管理Apache Hadoop作業。 Oozie與其余Hadoop堆棧集成在一起,支持開箱即用的幾種類型的Hadoop作業(例如Java map-reduce ,Streaming map-reduce,Pig,Hive,Sqoop和Distcp)以及特定于系統的任務(例如Java程序和Shell腳本)。

Still, with Oozie, we had to deal with XML definitions or had to zip a directory which contains the task-related dependencies, development workflow wasn’t as convincing as Airflow. Instead of managing multiple directories of XML configs and worrying about the dependencies between directories, the option to write python code can be tested, and since it’s a code all the software best practices can be applied to it.
仍然,對于Oozie,我們必須處理XML定義或壓縮包含與任務相關的依賴關系的目錄,開發工作流程并不像Airflow那樣令人信服。 無需管理XML配置的多個目錄并擔心目錄之間的依賴關系,而是可以測試編寫python代碼的選項,并且由于它是一種代碼,因此可以將所有軟件最佳實踐應用于該代碼。

Azkaban has distributed multiple-executor mode and beautiful UI visualizations, option to retry failed jobs, good alerting options are available, can track user actions, i.e. auditing was available, but since the workflows are defined using property files finally, we didn’t consider this option.
Azkaban已分發了多個執行程序模式和精美的UI可視化效果,可以選擇重試失敗的作業,可以使用良好的警報選項,可以跟蹤用戶操作,即可以進行審核,但是由于最終使用屬性文件定義了工作流程,因此我們沒有考慮此選項。
Luigi was promising since the deployment in Kubernetes was so simple, and it handles dependency resolution, workflow management, visualization, handling failures, command line integration, and much more.
Luigi很有前途,因為在Kubernetes中的部署是如此簡單,它可以處理依賴關系解析,工作流管理,可視化,處理故障,命令行集成等。

But re-running old tasks or pipelines was not clear, i.e. No option to retrigger the tasks. Since it uses the cron feature to schedule we have to wait for Luigi scheduler to schedule it again after updating the code where Airflow has its own scheduler hence we can retrigger the dag using Airflow CLI or UI.
但是重新運行舊任務或管道并不清楚,即沒有重新觸發任務的選項。 由于它使用cron功能進行計劃,因此我們必須等待Luigi計劃程序在更新代碼后重新安排它,其中Airflow擁有自己的計劃程序,因此我們可以使用Airflow CLI或UI重新觸發dag。
Luigi being cron scheduler scaling seem difficult whereas in Airflow the Kubernetes executor was very promising. Creating a task was not as simple as Airflow, also maintaining the dag is a bit difficult as there were no resources for dag versioning.
Luigi作為cron調度程序的擴展似乎很困難,而在Airflow中,Kubernetes執行器非常有前途。 創建任務并不像Airflow那樣簡單,而且維護dag有點困難,因為沒有用于dag版本控制的資源。

Finally, it was down to Airflow and Argo:
最后,歸結為Airflow和Argo :
- Both are designed for batch workflows involving the directed Acyclic Graph (DAG) of tasks. 兩者都是為涉及任務的有向無環圖(DAG)的批處理工作流而設計的。
- Both provide flow control for error handling and conditional logic based on the output of upstream steps. 兩者都基于上游步驟的輸出提供錯誤處理和條件邏輯的流控制。
- Both have a great community and actively maintained by a community of contributors. 兩者都有一個偉大的社區,并由貢獻者社區積極維護。
為什么選擇Airflow而不是Argo? (Why choose Airflow over Argo?)
But few main points at the time of decision were Airflow is tightly coupled to the Python ecosystem, and it’s all about dag code. At the same time, Argo provides flexibility to schedule steps which is very useful as anything which can run in the container may be used with Argo. Still, the problem is a longer development time since we will have to prepare each task’s docker container and push to Google Container Registry, which is our private Docker repository via CI/CD.
但是在做出決定時,很少有要點是Airflow與Python生態系統緊密相關,而這全都是關于dag代碼的。 同時, Argo可以靈活地安排步驟,這非常有用, 因為可以在容器中運行的任何內容都可以與Argo一起使用。 盡管如此,問題仍然是更長的開發 時間,因為我們將不得不準備每個任務的docker容器并通過CI / CD推送到Google Container Registry ,這是我們的私有Docker存儲庫 。

Argo natively schedules steps to run in a Kubernetes cluster, potentially across several hosts or nodes. Airflow also has K8 Pod Operator and Kubernetes Executor which sounded exciting since it will create a new pod for every task instance and no need worry about scaling celery pods
Argo本地計劃在Kubernetes集群中運行的步驟 ,可能跨多個主機或節點運行。 Airflow還具有K8 Pod Operator和Kubernetes Executor ,這聽起來很令人興奮,因為它將為每個任務實例創建一個新的Pod,而無需擔心縮放芹菜Pod
Airflow and Argo CLI are equally good, Airflow DAGs are expressed in a Python-based DSL, while Argo DAGs are expressed in a K8s YAML syntax with docker containers packing all the task code.
Airflow和Argo CLI都一樣好,Airflow DAG用基于Python的DSL表示,而Argo DAG用K8s YAML語法表示,并且Docker容器包裝了所有任務代碼。

Airflow has a colossal adoption; hence there is a massive list of “Operators” and “Hooks” with support for other runtimes like Bash, Spark, Hive, Druid, Pinot, etc. Hence, Airflow was the clear winner.
氣流的采用非常廣泛; 因此,有大量的“ 操作員”和“ 鉤子”列表支持其他運行時,例如Bash,Spark,Hive,Druid,Pinot等。因此,Airflow無疑是贏家。
To sum up, Airflow provides:
綜上所述,Airflow提供:
Reliability: Airflow provides retries, i.e. can handle task failures by retrying it, that is if upstream dependencies succeed, then downstream tasks can retry if things fail.
可靠性 :Airflow提供重試功能,即可以通過重試來處理任務失敗,也就是說,如果上游依賴項成功,那么如果事情失敗,則下游任務可以重試。
Alerting: Airflow can report if dag failed or if dag didn’t meet an SLA and inform on any failure.
警報 :Airflow可以報告dag是否失敗或dag不符合SLA,并通知任何失敗。
Priority-based queue management which ensures the most critical tasks are completed first
基于優先級的隊列管理 ,可確保首先完成最關鍵的任務
Resource pools can be used to limit the execution of parallelism on arbitrary sets of tasks.
資源池可用于限制并行執行任意任務集。
Centralized configuration
集中配置
Centralized metrics of tasks
集中的任務指標
Centralized Performace Views: With views like Gantt we can look at the actual performance of the dag and check if this specific dag has spent five or ten minutes waiting for some data to land in then once data arrived it trigger the spark job which might do some aggregation on that data. So these Views help us to analyze the performance over time.
集中的Performace視圖 :使用類似Gantt的視圖,我們可以查看dag的實際性能,并檢查此特定dag是否花了五到十分鐘等待一些數據進入,一旦數據到達,它就會觸發火花作業,這可能會做一些對該數據進行匯總。 因此,這些視圖可以幫助我們分析一段時間內的效果。
Future of Airflow:
氣流的未來:
Since we already have 100s of dags running in production and with Fluidity(inhouse airflow dag generator) we expect the number of dags to grow by twice or thrice in the next few months itself, one of the most-watched features from Airflow 2.0 is the separation of the DAG parsing from DAG scheduling which can reduce the amount of time(time where no tasks are running) wasted in waiting and reduce the task time via fast follow of airflow tasks from workers.
由于我們已經有100多個dag在生產中運行,并且使用Fluidity(內部氣流dag發生器),我們預計在接下來的幾個月中dag的數量將增長兩倍或三倍,因此Airflow 2.0最受關注的功能之一是DAG解析與DAG調度的分離 ,可以減少等待中浪費的時間(無任務運行的時間),并通過快速跟蹤工人的氣流任務來減少任務時間 。
Improving the DAG versioning to avoid manual creation of versioned dag [i.e. to add new task we go from DAG_SCALE_UP to DAG_SCALE_UP_V1]
改進DAG版本控制,以避免手動創建版本控制的數據[例如,添加新任務,我們從DAG_SCALE_UP轉到DAG_SCALE_UP_V1]
High availability of scheduler for performance scalability and resiliency reasons is most with Active-Active models.
出于性能可伸縮性和彈性的原因,調度程序的高可用性在Active-Active模型中最為常見。
The development speed of Airflow is generally slow, and it involves a steep learning curve, so there is Fluidity(full blog coming soon), and the same dev replica exactly like the production environment using Docker and Minikube is spawned.
Airflow的開發速度通常很慢,并且學習曲線陡峭,因此存在Fluidity(即將推出完整博客),并且產生了與使用Docker和Minikube的生產環境完全相同的相同dev副本。
Work on data evaluation, reports and data lineage with Airflow
使用Airflow進行數據評估,報告和數據沿襲
If you enjoyed this blog post, check out what we’ve posted so far over here, and keep an eye out on the same space for some really cool upcoming blogs in the near future. If you have any questions about the problems we face as Data Platform at Rapido or about anything else, please reach out to chethan@rapido.bike, looking forward to answering any questions!
如果您喜歡這篇博客文章,請查看我們到目前為止在這里發布的內容,并在不久的將來留意相同的空間來關注一些即將發布的非常酷的博客。 如果您對我們在Rapido上使用數據平臺時遇到的問題或其他任何問題有任何疑問,請聯系chethan@rapido.bike ,期待回答任何問題!
*Special Thanks to the Rapido Data Team for making this blog possible.
*特別感謝Rapido數據團隊使此博客成為可能。
翻譯自: https://medium.com/rapido-labs/why-is-airflow-a-great-fit-for-rapido-d8438ca0d1ab
rcp rapido
本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。 如若轉載,請注明出處:http://www.pswp.cn/news/389336.shtml 繁體地址,請注明出處:http://hk.pswp.cn/news/389336.shtml 英文地址,請注明出處:http://en.pswp.cn/news/389336.shtml
如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!