引入
上一篇我們了解了Spark,相比起MapReduce來說,它確實已經快了超級多了,但是人類的欲望是沒有止境的,這也是推動人類進步的動力。
Flink就是為了滿足實時響應的場景需求誕生的。
其實在Flink之前,實時處理其實已經有Storm和Spark Streaming了,它們都是老牌流處理引擎,那么為什么Flink會后來居上呢?
流處理場景中常用的兩個標準,分別是數據處理延遲和產出數據質量:
- 數據處理延遲:流處理的核心要求是數據處理快,因此數據處理延遲是衡量流處理引擎優劣的重要指標。
- 產出數據質量:流處理作業一般都是7×24小時運行的,這期間很難保證作業不發生故障,數據質量主要指流處理作業在發生故障后,通過自身的異常容錯功能是否能夠保證數據計算結果的準確性。
我們以上面的兩個標準來看看這三個引擎各自的優缺點:
引擎 | 數據處理延遲 | 產出數據質量 |
---|---|---|
Apache Storm | 處理數據是一條一條進行的,可以做到毫秒級別延遲 | 作業在發生故障時,只能保證數據不會丟失,但是有可能重復計算數據,只能保證至少一次(At-least-once)的數據處理。以一個案例說明至少一次數據處理的結果:流處理作業需要計算輸入數據的條目數,假如有10條輸入數據,理論上得到的正確結果為10,但是作業一旦發生故障,在異常容錯恢復后,計算得到的結果會大于或等于10 |
Spark Streaming | Flink認為數據是以流的形式存在的,批是一種有界流;而Spark Streaming則相反,它認為數據是以批的形式存在的,流只是劃分得非常細的批,流是批的一種特殊形式。基于該理論,Spark Streaming在處理數據時是按照微批進行的,一批數據統一處理一次,因此延遲相對比較高 | 作業在發生故障時,基于Spark Streaming的檢查點機制可以實現數據計算不重不丟,保證精確一次(Exactly-once)的數據處理。以一個案例說明精確一次數據處理的結果:流處理作業計算輸入數據的條目數,有10條輸入數據,在作業發生故障時,異常容錯恢復后算出的結果依然等于10 |
Apache Flink | 和Storm一樣,處理數據是一條條進行的,可以做到毫秒級別延遲 | 作業在發生故障時,基于Flink的檢查點機制可以做到數據計算不重不丟,保證精確一次的數據處理 |
可以看到,Flink在正是通過數據處理延遲低和產出數據質量高的優勢,才得以后來居上的。
Flink的核心概念
先看看Flink官網對Flink的定義:
翻譯:
有狀態的流計算
Apache Flink 是一個用于對有界和無界數據流進行有狀態計算的框架和分布式處理引擎。Flink 被設計用于在所有常見的集群環境中運行,以內存速度執行計算,并且能夠處理任意規模的數據。
下面我們看看里面的一些重點概念
處理無界和有界數據流
什么是流處理?
流處理是對運動中的數據的處理,換句話說,在生成或接收數據時直接計算數據。
任何類型的數據都可以形成一種事件流。信用卡交易、傳感器測量、機器日志、網站或移動應用程序上的用戶交互記錄等等數據,大多都是連續的流,都是隨著時間的推移而創建的。
在流處理之前,這些數據通常存儲在數據庫、文件系統或其他形式的大容量存儲中。應用程序將根據需要查詢數據或計算數據。
流處理改變了這種模式:讓應用邏輯,分析和查詢始終存在,讓數據不斷地流經它們。
在從流中接收到事件時,流處理應用程序對該事件作出反應。它可以觸發動作,更新聚合或其他統計,或“記住”該事件以供將來參考。以及連接多個數據流,并產生數據流。
什么是有界和無界流?
流也擁有著多種特征。這些特征決定了流如何以及何時被處理。
在Flink的設計理念中,數據可以被作為無界或者有界流來處理。(當然本質上來說是不存在有界流,這只是一個邏輯概念,因為流就是無界的。所謂的有界流就是通過人為截取一段數據,這樣所有數據就是確定的,可以被排序了。)
-
無界流?有定義流的開始,但沒有定義流的結束。它們會無休止地產生數據。無界流的數據必須持續處理,即數據被攝取后需要立刻處理。我們不能等到所有數據都到達再處理,因為輸入是無限的,在任何時候輸入都不會完成。處理無界數據通常要求以特定順序攝取事件,例如事件發生的順序,以便能夠推斷結果的完整性。
-
有界流?有定義流的開始,也有定義流的結束。有界流可以在攝取所有數據后再進行計算。有界流所有數據可以被排序,所以并不需要有序攝取。有界流處理通常被稱為批處理
所有的數據都是以流的方式產生,但用戶通常會使用兩種截然不同的方法處理數據。或是在數據生成時進行實時的處理;亦或是先將數據流持久化到存儲系統中——例如文件系統或對象存儲,然后再進行批處理。
簡單理解有界無界數據流處理,可以以老師批改作業為例子,老師考試完統一收起卷子來批改就是有界流處理也叫批處理,老師讓學生做完就交卷,來一張批改一張,這就是無界數據流處理,也就是實時流處理。
流與時間
說到流處理,不得不提到時間的概念。時間是流處理應用另一個重要的組成部分。
真實世界的系統、網絡和通信渠道遠非完美,流數據經常被推遲或無序(亂序)到達。如何在這種條件下提供準確和確定的結果是至關重要的。
處理時間
處理時間是處理流的應用程序的機器的本地時鐘的時間。
事件時間
事件事件是流中的事件實際發生的時間。事件時間基于流中的事件所包含的時間戳。通常情況下,在事件進入流處理程序前,事件數據就已經包含了時間戳。
事件時間使得計算結果的過程不需要依賴處理數據的速度。基于事件時間的操作是可以預測的,而計算結果也是確定的。
無論流處理程序處理流數據的速度快或是慢,事件時間窗口的計算結果都是一樣的。
水位線Watermarks
因為處理時間和事件時間總是不能完全一致,我們如何知道事件是遲到的?我們需要確定一個時間點之前的所有事件都已經到達之前,需要等待多久。
水位線是全局進度的度量標準。系統可以確信在一個時間點之后,不會有早于這個時間點發生的事件到來了。
當一個算子接收到含有時間T的水位線時,這個算子會認為早于時間T的發生的事件已經全部都到達了。
對于事件時間窗口和亂序事件的處理,水位線非常重要。算子一旦接收到水位線,算子會認為一段時間內發生的所有事件都已經觀察到,可以觸發針對這段時間內所有事件的計算了。
水位線提供了一種結果可信度和延時之間的妥協。激進的水位線設置可以保證低延遲,但結果的準確性不夠。如果水位線設置得過于寬松,計算的結果準確性會很高,但可能會增加流處理程序不必要的延時。
時間語義
因為事件總是在特定時間點發生,所以大多數的事件流都擁有事件本身所固有的時間語義。進一步而言,許多常見的流計算都基于時間語義,例如窗口聚合、會話計算、模式檢測和基于時間的 join。流處理的一個重要方面是應用程序如何衡量時間,即區分事件時間(event-time)和處理時間(processing-time)。
Flink 提供了豐富的時間語義支持。
-
事件時間模式:使用事件時間語義的流處理應用根據事件本身自帶的時間戳進行結果的計算。因此,無論處理的是歷史記錄的事件還是實時的事件,事件時間模式的處理總能保證結果的準確性和一致性。
-
Watermark 支持:Flink 引入了 watermark 的概念,用以衡量事件時間進展。Watermark 也是一種平衡處理延時和完整性的靈活機制。
-
遲到數據處理:當以帶有 watermark 的事件時間模式處理數據流時,在計算完成之后仍會有相關數據到達。這樣的事件被稱為遲到事件。Flink 提供了多種處理遲到數據的選項,例如將這些數據重定向到旁路輸出(side output)或者更新之前完成計算的結果。
-
處理時間模式:除了事件時間模式,Flink 還支持處理時間語義。處理時間模式根據處理引擎的機器時鐘觸發計算,一般適用于有著嚴格的低延遲需求,并且能夠容忍近似結果的流處理應用。
有狀態的計算
Flink的有狀態計算,本質就是把流處理需要的額外數據保存成一個“狀態”,然后針對這條數據進行處理,并且更新狀態。
為了加快訪問速度,我們可以直接將狀態保存在本地內存。當應用收到一個新事件時,它可以從狀態中讀取數據,也可以更新狀態。而當狀態是從內存中讀寫的時候,這就和訪問本地變量沒什么區別了,實時性可以得到極大的提升。
另外,數據規模增大時,我們也不需要做重構,只需要構建分布式集群,各自在本地計算就可以了,可擴展性也變得更好。
因為底層是分布式系統,所以還需要保護本地狀態,防止在故障時數據丟失。我們可以定期地將應用狀態的一致性檢查點(checkpoint)存盤,寫入遠程的持久化存儲,遇到故障時再去讀取進行恢復,這樣就保證了更好的容錯性。
有狀態的流處理是一種通用而且靈活的設計架構,可用于許多不同的場景。其處理架構上其實并不復雜,很多用戶基于這種思想開發出了自己的流處理系統,這就是第一代流處理器。Apache Storm就是其中的代表。Storm 提供了低延遲的流處理,但很難實現高吞吐,而且無法保證結果的正確性,所以目前已經被Flink替代了。
只有在每一個單獨的事件上進行轉換操作的應用才不需要狀態,換言之,每一個具有一定復雜度的流處理應用都是有狀態的。任何運行基本業務邏輯的流處理應用都需要在一定時間內存儲所接收的事件或中間結果,以供后續的某個時間點(例如收到下一個事件或者經過一段特定時間)進行訪問并進行后續處理。
有狀態流處理是流處理的子集。此狀態用于存儲從先前看到的事件派生的信息。
大多數的流處理都需要處理狀態:
-
防欺詐應用程序會保留信用卡的最后交易。將新的狀態與保留的狀態中進行比較,標記為有效或欺詐,并更新狀態。
-
在線推薦應用程序將保留描述用戶偏好的參數。
有狀態流處理需要支持狀態管理的流處理器。
狀態是 Flink 中的一等公民,Flink 提供了許多狀態管理相關的特性支持,其中包括:
-
多種狀態基礎類型:Flink 為多種不同的數據結構提供了相對應的狀態基礎類型,例如原子值(value),列表(list)以及映射(map)。開發者可以基于處理函數對狀態的訪問方式,選擇最高效、最適合的狀態基礎類型。
-
插件化的State Backend:State Backend 負責管理應用程序狀態,并在需要的時候進行 checkpoint。Flink 支持多種 state backend,可以將狀態存在內存或者RocksDB。RocksDB 是一種高效的嵌入式、持久化鍵值存儲引擎。Flink 也支持插件式的自定義 state backend 進行狀態存儲。
-
精確一次語義:Flink 的 checkpoint 和故障恢復算法保證了故障發生后應用狀態的一致性。因此,Flink 能夠在應用程序發生故障時,對應用程序透明,不造成正確性的影響。
-
超大數據量狀態:Flink 能夠利用其異步以及增量式的 checkpoint 算法,存儲數 TB 級別的應用狀態。
-
可彈性伸縮的應用:Flink 能夠通過在更多或更少的工作節點上對狀態進行重新分布,支持有狀態應用的分布式的橫向伸縮。
核心特性
下面我們總結一下Flink的核心特性
1.真正的流處理
在Flink的流處理模式下,Flink處理數據的粒度是事件粒度或者說數據粒度,也就是來一條數據就處理一條數據。這個特性反饋到日常的業務中就是數據處理延遲極低,一般是毫秒級別,基于該特性,在實時大屏場景中,我們可以實現大屏指標的快速更新。
2.強大的性能
Flink是分布式的計算引擎,處理數據的吞吐能力能夠輕松達到百萬、千萬級別QPS(Queries Per Second,每秒處理數據的條目)?。Flink處理數據的吞吐量在很多場景下甚至可以做到和物理資源呈線性關系,因此在面對大流量數據時,Flink無所畏懼。
3.時間語義豐富
Flink預置了多種時間語義的API,包括事件時間、處理時間和攝入時間語義,我們可以通過這些預置API實現時間窗口上的高效數據處理。
4.高可用的有狀態計算
Flink不但提供了豐富的狀態類型及狀態操作API,而且提供了Checkpoint、Savepoint這樣的快照機制來保障精確一次的數據處理,即使作業發生異常,我們也無須擔心數據丟失或者重復。同時,Flink支持TB級別的狀態數據存儲能力。
5.流批一體
Flink不僅是流處理的好手,目前在批處理方面也在大力發展,我們通過同一條SQL語句就可以同時完成流處理、批處理,這可以顯著降低開發、維護和資源使用的成本。
Flink的執行原理
由于目前使用最廣泛的是1.13版本,這里我們重點就以1.13版本源碼,來看看FlinkSQL的執行過程。既然是FLinkSQL,自然要看flink-table里面的內容啦,我們先看看這個里面有些什么:
flink-sql-client:提供了 Flink SQL 的命令行客戶端工具,用戶可以通過該客戶端連接到 Flink 集群,執行 SQL 腳本,進行交互式的數據查詢和操作。它封裝了與 Flink 集群通信的邏輯,支持提交 SQL 作業、查看作業狀態等功能,方便用戶在命令行環境下使用 Flink SQL 進行數據處理任務。
?flink-sql-parser:負責解析 Flink SQL 語句,將其轉換為抽象語法樹(AST)。在解析過程中,會對 SQL 語句進行詞法分析和語法分析,檢查語句的語法正確性,識別出 SQL 語句中的關鍵字、標識符、表達式等元素,并構建出相應的 AST 結構,為后續的查詢優化和執行打下基礎。
?flink-sql-parser-hive:提供了對 Hive SQL 語法的支持,使得 Flink 能夠解析和執行 Hive SQL 語句。它擴展了 Flink 的 SQL 解析器,增加了對 Hive 特有語法和函數的解析能力,使得用戶可以使用熟悉的 Hive SQL 語法來操作 Flink,降低了從 Hive 遷移到 Flink 的學習成本和開發成本。
?flink-table-api-java:提供了 Java 語言的 Table API,允許開發者使用 Java 代碼以面向對象的方式進行表數據的操作和處理。通過該 API,開發者可以方便地進行表的創建、查詢、轉換、連接等操作,實現復雜的數據處理邏輯,并且可以與 Flink 的其他 API(如 DataStream API)無縫集成。
?flink-table-api-java-bridge:作為 Java 語言的 Table API 與其他 Flink API 之間的橋梁,使得開發者可以在 Java 代碼中方便地在 Table API 和其他 API(如 DataStream API)之間進行切換和交互。它提供了一些轉換方法和工具類,幫助開發者將 Table 轉換為 DataStream 或將 DataStream 轉換為 Table,從而實現不同 API 之間的數據共享和協同處理。
?flink-table-api-scala:提供了 Scala 語言的 Table API,允許開發者使用 Scala 代碼以面向對象的方式進行表數據的操作和處理。Scala 語言的 Table API 在功能上與 Java 版本類似,但由于 Scala 語言的特性,其語法更加簡潔和靈活,能夠更好地利用 Scala 的函數式編程特性進行數據處理。
?flink-table-api-scala-bridge:作為 Scala 語言的 Table API 與其他 Flink API 之間的橋梁,使得開發者可以在 Scala 代碼中方便地在 Table API 和其他 API(如 DataStream API)之間進行切換和交互。它提供了一些轉換方法和工具類,幫助開發者將 Table 轉換為 DataStream 或將 DataStream 轉換為 Table,從而實現不同 API 之間的數據共享和協同處理。
?flink-table-common:包含了 Flink Table 模塊中的一些通用類和接口,如表結構定義、數據類型定義、函數定義等。這些通用類和接口在 Flink Table 模塊的各個子模塊中被廣泛使用,為 Table API、SQL 解析、查詢優化和執行等提供了基礎的構建塊。
?flink-table-planner:負責 Flink Table 的查詢規劃,包括邏輯查詢計劃的生成和優化。它將 Table API 或 SQL 查詢轉換為邏輯查詢計劃,然后對邏輯查詢計劃進行優化,如應用各種查詢規則、重寫查詢、選擇最優的執行計劃等,以提高查詢的執行效率。
?flink-table-planner-blink:Blink Planner 是 Flink 的一種查詢規劃器,該包實現了 Blink Planner 的相關功能。Blink Planner 引入了一些新的查詢優化技術,如動態分區裁剪、謂詞下推等,能夠生成更高效的執行計劃,提高查詢性能。
?flink-table-runtime-blink:Blink Runtime 是 Flink 的一種運行時環境,該包實現了 Blink Runtime 的相關功能。它負責執行由 Blink Planner 生成的執行計劃,包括任務的調度、數據的傳輸、算子的執行等,提供了高效的運行時支持。
假如我們使用Java開發,編寫一條簡單的SQL查詢語句,就會進入到flink-table-api-java模塊里,由其中的org.apache.flink.table.api.internal.TableEnvironmentImpl類的sqlQuery(String query)方法接收,并返回Table類型的對象,可用于進一步的SQL查詢或變換。
sqlQuery()
sqlQuery()方法對應源碼如下:
/*** 執行單個 SQL 查詢并返回一個 Table 對象。* 此方法僅支持單個 SQL 查詢,且查詢類型必須為 SELECT、UNION、INTERSECT、EXCEPT、VALUES 或 ORDER_BY。** @param query 要執行的 SQL 查詢語句。* @return 包含查詢結果的 Table 對象。* @throws ValidationException 如果查詢不是單個 SQL 查詢,或者查詢類型不被支持。*/public Table sqlQuery(String query) {// 使用解析器,解析 SQL 查詢語句,得到Operation列表List<Operation> operations = getParser().parse(query);// 如果解析出來的Operation多余1個,說明填寫了多個SQL,不支持這樣使用if (operations.size() != 1) {throw new ValidationException("Unsupported SQL query! sqlQuery() only accepts a single SQL query.");}// 獲取解析到的OperationOperation operation = operations.get(0);// 僅支持查詢語句,這里會檢查Operation是否為 QueryOperation 類型,且不是 ModifyOperation 類型if (operation instanceof QueryOperation && !(operation instanceof ModifyOperation)) {// 如果滿足條件,則將Operation轉換為 QueryOperation 類型并創建 Table 對象return createTable((QueryOperation) operation);} else {// 如果不滿足條件,則拋出異常throw new ValidationException("Unsupported SQL query! sqlQuery() only accepts a single SQL query of type "+ "SELECT, UNION, INTERSECT, EXCEPT, VALUES, and ORDER_BY.");}}
parse()
其中的parse()方法在1.13以前是如下實現的:
protected final Parser parser;
... ...
List<Operation> operations = parser.parse(query);
1.13迭代主要基于以下幾個方面的考慮
設計模式:由直接訪問變量轉變為通過接口獲取對象,遵循了依賴倒置和接口隔離原則。
靈活性:接口允許更容易地擴展、切換不同的解析器實現或策略。
可測試性:隔離核心邏輯,讓測試更易模擬和介入。
解耦:減少內部依賴,明確職責邊界,使系統更清晰、健壯。
這樣可以更方便集成更多SQL解析規則,并能支撐更多樣化的輸入來源,有助于推動Flink整體架構向更穩定、更高效的分布式數據處理框架發展。
我們通過parse()方法可追溯到 flink-table-planner 模塊里的org.apache.flink.table.planner.ParserImpl類的parse()方法,其源碼注釋如下:
When parsing statement, it first uses {@link ExtendedParser} to parse statements. If {@link ExtendedParser} fails to parse statement, it uses the {@link CalciteParser} to parse statements.
翻譯:在解析語句時,它首先使用擴展解析器(ExtendedParser)來解析語句。如果擴展解析器(ExtendedParser)無法解析語句,它將使用Calcite解析器(CalciteParser)來解析語句。
對應源碼如下:
/*** 解析輸入的 SQL 語句,將其轉換為 Operation 列表。* 首先嘗試使用擴展解析器解析語句,如果失敗則使用 Calcite 解析器。** @param statement 輸入的 SQL 語句。* @return 解析后的操作列表。*/@Overridepublic List<Operation> parse(String statement) {// 獲取 Calcite 解析器實例CalciteParser parser = calciteParserSupplier.get();// 使用 FlinkPlannerImpl 作為 validatorFlinkPlannerImpl planner = validatorSupplier.get();// 解析 SQL 查詢// 對于一些特殊的寫法,例如SET key=value,CalciteParser是不支持這種寫法的// 為了避免在Calcite中引入過多的關鍵字,這里定義了一組擴展解析器,專門用于在CalciteParser 之前,解析這些特殊的語句// 嘗試使用擴展解析器解析語句Optional<Operation> command = EXTENDED_PARSER.parse(statement);// 如果擴展解析器成功解析出操作,則直接返回該操作if (command.isPresent()) {return Collections.singletonList(command.get());}// 使用 Calcite 解析器解析 SQL 為語法樹SqlNode parsed = parser.parse(statement);// 將解析后的 SqlNode 轉換為 OperationOperation operation =SqlToOperationConverter.convert(planner, catalogManager, parsed).orElseThrow(() ->// 如果轉換失敗,拋出不支持的 SQL 查詢異常new TableException("Unsupported SQL query! parse() only accepts SQL queries of type "+ "SELECT, UNION, INTERSECT, EXCEPT, VALUES, ORDER_BY or INSERT;"+ "and SQL DDLs of type "+ "CREATE TABLE"));// 返回包含單個操作的列表return Collections.singletonList(operation);}
其中ExtendedParser.parse(statement)是1.13中優化迭代新增的,在1.13之前是沒有的。這個設計,是為了在不增加CalciteParser復雜性的前提下,可以讓Flink SQL支持更多專用的語法。
這里我們可以看一下,ExtendedParser包含的解析策略:
/*** ExtendedParser 用于解析一些 CalciteParser 不支持的特殊命令,例如 SET key=value,其中鍵和值標識符包含特殊字符。* 將一些解析邏輯移到這里也有助于避免引入新的保留關鍵字。* 該類提供了單例模式的實例,可用于解析命令并生成相應的操作。*/
public class ExtendedParser {/*** 單例模式的 ExtendedParser 實例。* 使用單例模式確保在整個應用程序中只有一個 ExtendedParser 實例被創建和使用。*/public static final ExtendedParser INSTANCE = new ExtendedParser();/*** 存儲所有擴展解析策略的列表。* 這些策略用于匹配和轉換特定的命令,每個策略對應一種特殊命令的解析方式。*/private static final List<AbstractRegexParseStrategy> PARSE_STRATEGIES =Arrays.asList(// 清空輸出ClearOperationParseStrategy.INSTANCE,// 打印幫助信息HelpOperationParseStrategy.INSTANCE,// 退出執行環境QuitOperationParseStrategy.INSTANCE,// 重置一個變量的值ResetOperationParseStrategy.INSTANCE,// 設置一個變量的值SetOperationParseStrategy.INSTANCE);
而正常對于標準的SQL語句,則由org.apache.flink.table.parse.CalciteParser類的parse()方法負責解析,其對應源碼如下:
/*** 解析一個 SQL 語句為 {@link SqlNode}。這個 {@link SqlNode} 尚未經過驗證。** @param sql 要解析的 SQL 字符串* @return 解析后的 SQL 節點* @throws SqlParserException 如果在解析語句時拋出異常*/public SqlNode parse(String sql) {try {// 創建一個 SQL 解析器實例,使用傳入的 SQL 字符串和配置SqlParser parser = SqlParser.create(sql, config);// 解析 SQL 語句并返回解析后的 SqlNodereturn parser.parseStmt();} catch (SqlParseException e) {// 如果解析過程中出現異常,拋出 SqlParserException 并附帶詳細錯誤信息throw new SqlParserException("SQL parse failed. " + e.getMessage(), e);}}
getSqlParserConfig()
Flink的SQL方言與標準SQL相比有很大差別,那么Flink是如何借助Calcite實現Flink SQL專用的解析器呢?
我們可以通過SqlParser.Config入手,可以從下面源碼看到,在構造SqlParser類的配置類SqlParser.Config時,需要傳入FlinkSqlParserImpl.FACTORY:
/*** 獲取SQL解析器的配置。** 此方法首先嘗試從表配置對應的Calcite配置中獲取SQL解析器配置。* 如果未找到,則使用默認的構建器創建一個新的配置。* 默認配置使用Java詞法,因為反引號比雙引號在編程中更方便,并且保留大小寫。** @return SQL解析器的配置對象*/public SqlParser.Config getSqlParserConfig() {// 嘗試從Calcite配置中獲取SQL解析器配置return JavaScalaConversionUtil.toJava(calciteConfig(tableConfig).sqlParserConfig()).orElseGet(() ->// 如果Calcite配置中沒有SQL解析器配置,則使用默認配置// 我們使用Java詞法,因為反引號比雙引號在編程中更方便,并且保留大小寫SqlParser.configBuilder()// 設置解析器工廠為Flink SQL解析器實現工廠.setParserFactory(FlinkSqlParserImpl.FACTORY)// 設置SQL兼容性.setConformance(getSqlConformance())// 設置詞法為Java詞法.setLex(Lex.JAVA)// 構建配置對象.build());}
其中的類FlinkSqlParserImpl,是通過編譯Flink SQL的語法描述文件(包含Calcite內置的Parser.jj與Flink定制好的Freemarker模板)生成,最終在generated-sources目錄下生成了FlinkSqlParserImpl及其附屬的類,Calcite會利用它們進行Flink SQL的解析。codegen目錄下則是語法描述文件的本體。
Flink借助了Calcite實現Flink SQL專用的解析器,具體實現過程如下:
- 引入Calcite
Flink通過Maven依賴引入Calcite,并使用Maven插件將Calcite的解析器模板解壓到Flink項目的構建目錄下。- 生成Parser.jj文件
Flink使用FreeMarker模板引擎(FMPP)生成Parser.jj文件,該文件定義了Flink SQL的語法。- 使用JavaCC生成解析器
Flink利用JavaCC工具根據Parser.jj文件生成解析器代碼,生成的解析器能夠將SQL語句解析為SqlNode對象。- 擴展Calcite的解析器
Flink通過擴展Calcite的解析器,實現了對Flink SQL方言的支持。具體來說,Flink在Calcite的基礎上進行了二次開發,增加了對Flink特定語法的支持,例如對SET key=value語句的支持。- 實現FlinkSqlParserImpl
最終,Flink生成了FlinkSqlParserImpl,這是Flink專用的解析器實現。它在解析過程中首先嘗試使用ExtendedParser解析特殊命令,如果失敗,則使用CalciteParser進行解析。
convert()
好了,下面我們回到主線劇情主流程,SQL解析完成后,前面的ParserImpl.parse()方法緊接著就會調用驗證邏輯。SqlToOperationConverter.convert()方法負責校驗SQL語句,并將它轉換為Flink對應的Operation,同時還會將SqlNode轉化成RelNode,從單純的SQL語句轉化為對數據的處理邏輯,即關系代數的具體操作,如Scan、Project、Filter、Join等。
其對應源碼如下:
/*** 將 SQL 節點轉換為對應的操作對象。** 該方法接收一個 FlinkPlannerImpl 對象、一個 CatalogManager 對象和一個 SqlNode 對象,* 首先對 SQL 節點進行驗證,然后根據驗證后的 SQL 節點類型,調用相應的轉換方法將其轉換為操作對象。* 如果 SQL 節點類型不支持,則返回一個空的 Optional 對象。** @param flinkPlanner Flink 規劃器實現,用于將 SQL 節點轉換為關系節點* @param catalogManager 目錄管理器,用于管理目錄和數據庫* @param sqlNode 要執行的 SQL 節點* @return 一個包含操作對象的 Optional 對象,如果 SQL 節點類型不支持,則返回空的 Optional 對象*/public static Optional<Operation> convert(FlinkPlannerImpl flinkPlanner, CatalogManager catalogManager, SqlNode sqlNode) {// 校驗解析后的SQL語法樹final SqlNode validated = flinkPlanner.validate(sqlNode);// 將SqlNode轉化成OperationSqlToOperationConverter converter =new SqlToOperationConverter(flinkPlanner, catalogManager);// 根據驗證后的 SQL 節點類型進行轉換if (validated instanceof SqlUseCatalog) {// 如果是使用目錄的 SQL 節點,調用 convertUseCatalog 方法進行轉換return Optional.of(converter.convertUseCatalog((SqlUseCatalog) validated));} else if (validated instanceof SqlShowCatalogs) {// 如果是顯示目錄的 SQL 節點,調用 convertShowCatalogs 方法進行轉換return Optional.of(converter.convertShowCatalogs((SqlShowCatalogs) validated));} else if (validated instanceof SqlShowCurrentCatalog) {// 如果是顯示當前目錄的 SQL 節點,調用 convertShowCurrentCatalog 方法進行轉換return Optional.of(converter.convertShowCurrentCatalog((SqlShowCurrentCatalog) validated));}if (validated instanceof SqlCreateDatabase) {// 如果是創建數據庫的 SQL 節點,調用 convertCreateDatabase 方法進行轉換return Optional.of(converter.convertCreateDatabase((SqlCreateDatabase) validated));} else if (validated instanceof SqlDropDatabase) {// 如果是刪除數據庫的 SQL 節點,調用 convertDropDatabase 方法進行轉換return Optional.of(converter.convertDropDatabase((SqlDropDatabase) validated));} else if (validated instanceof SqlAlterDatabase) {// 如果是修改數據庫的 SQL 節點,調用 convertAlterDatabase 方法進行轉換return Optional.of(converter.convertAlterDatabase((SqlAlterDatabase) validated));} else if (validated instanceof SqlShowDatabases) {// 如果是顯示數據庫的 SQL 節點,調用 convertShowDatabases 方法進行轉換return Optional.of(converter.convertShowDatabases((SqlShowDatabases) validated));} else if (validated instanceof SqlShowCurrentDatabase) {// 如果是顯示當前數據庫的 SQL 節點,調用 convertShowCurrentDatabase 方法進行轉換return Optional.of(converter.convertShowCurrentDatabase((SqlShowCurrentDatabase) validated));} else if (validated instanceof SqlUseDatabase) {// 如果是使用數據庫的 SQL 節點,調用 convertUseDatabase 方法進行轉換return Optional.of(converter.convertUseDatabase((SqlUseDatabase) validated));} else if (validated instanceof SqlCreateTable) {// 如果是創建表的 SQL 節點,調用 convertCreateTable 方法進行轉換return Optional.of(converter.convertCreateTable((SqlCreateTable) validated));} else if (validated instanceof SqlDropTable) {// 如果是刪除表的 SQL 節點,調用 convertDropTable 方法進行轉換return Optional.of(converter.convertDropTable((SqlDropTable) validated));} else if (validated instanceof SqlAlterTable) {// 如果是修改表的 SQL 節點,調用 convertAlterTable 方法進行轉換return Optional.of(converter.convertAlterTable((SqlAlterTable) validated));} else if (validated instanceof SqlShowTables) {// 如果是顯示表的 SQL 節點,調用 convertShowTables 方法進行轉換return Optional.of(converter.convertShowTables((SqlShowTables) validated));} else if (validated instanceof SqlCreateView) {// 如果是創建視圖的 SQL 節點,調用 convertCreateView 方法進行轉換return Optional.of(converter.convertCreateView((SqlCreateView) validated));} else if (validated instanceof SqlDropView) {// 如果是刪除視圖的 SQL 節點,調用 convertDropView 方法進行轉換return Optional.of(converter.convertDropView((SqlDropView) validated));} else if (validated instanceof SqlShowViews) {// 如果是顯示視圖的 SQL 節點,調用 convertShowViews 方法進行轉換return Optional.of(converter.convertShowViews((SqlShowViews) validated));} else if (validated instanceof SqlCreateFunction) {// 如果是創建函數的 SQL 節點,調用 convertCreateFunction 方法進行轉換return Optional.of(converter.convertCreateFunction((SqlCreateFunction) validated));} else if (validated instanceof SqlAlterFunction) {// 如果是修改函數的 SQL 節點,調用 convertAlterFunction 方法進行轉換return Optional.of(converter.convertAlterFunction((SqlAlterFunction) validated));} else if (validated instanceof SqlDropFunction) {// 如果是刪除函數的 SQL 節點,調用 convertDropFunction 方法進行轉換return Optional.of(converter.convertDropFunction((SqlDropFunction) validated));} else if (validated instanceof SqlShowFunctions) {// 如果是顯示函數的 SQL 節點,調用 convertShowFunctions 方法進行轉換return Optional.of(converter.convertShowFunctions((SqlShowFunctions) validated));} else if (validated instanceof SqlRichExplain) {// 如果是解釋 SQL 語句的 SQL 節點,調用 convertRichExplain 方法進行轉換return Optional.of(converter.convertRichExplain((SqlRichExplain) validated));} else if (validated instanceof SqlRichDescribeTable) {// 如果是描述表的 SQL 節點,調用 convertDescribeTable 方法進行轉換return Optional.of(converter.convertDescribeTable((SqlRichDescribeTable) validated));} else if (validated instanceof RichSqlInsert) {// 如果是插入數據的 SQL 節點,檢查是否為部分插入SqlNodeList targetColumnList = ((RichSqlInsert) validated).getTargetColumnList();if (targetColumnList != null && targetColumnList.size() != 0) {// 如果是部分插入,拋出異常throw new ValidationException("Partial inserts are not supported");}// 如果不是部分插入,調用 convertSqlInsert 方法進行轉換return Optional.of(converter.convertSqlInsert((RichSqlInsert) validated));} else if (validated.getKind().belongsTo(SqlKind.QUERY)) {// 如果是查詢語句的 SQL 節點,調用 convertSqlQuery 方法進行轉換return Optional.of(converter.convertSqlQuery(validated));} else {// 如果 SQL 節點類型不支持,返回空的 Optional 對象return Optional.empty();}}
其中的validate()方法中,會基于FlinkCalciteSqlValidator(繼承了Calcite的默認驗證器SqlValidatorImpl)采用訪問者模式,遞歸訪問每個SqlCall節點,并額外規定了對字面量和Join的驗證邏輯。在這個過程中會同時連接Catalog,主要的功能就是匹配表的Scheme和基本函數信息,例如表的基本定義(列名、數據類型)和函數名等。最后會將語法樹重寫為標準形式,以便其余的驗證邏輯可以更方便地執行。
我們可以從源碼看到,其中使用多個if-else判斷驗證之后的SqlNode屬于何種類型,再分別調用不同的方法觸發轉換為RelNode的操作。
其中對于SELECT語句,會調用 convertSqlQuery() 方法進行轉換,其對應源碼如下:
/*** 將 SQL 查詢節點轉換為查詢操作。** @param node 待轉換的 SQL 節點* @return 轉換后的查詢操作*/private Operation convertSqlQuery(SqlNode node) {return toQueryOperation(flinkPlanner, node);}
繼續查看toQueryOperation()方法源碼如下:
/*** 將經過驗證的 SQL 節點轉換為查詢操作。** @param planner 用于將 SQL 轉換為關系代數的 Flink 規劃器實例。* @param validated 經過驗證的 SQL 節點。* @return 表示查詢操作的 PlannerQueryOperation 對象。*/private PlannerQueryOperation toQueryOperation(FlinkPlannerImpl planner, SqlNode validated) {// transform to a relational tree// 將驗證后的 SQL 節點轉換為關系樹RelRoot relational = planner.rel(validated);// 創建一個新的 PlannerQueryOperation 對象,傳入關系樹的根節點return new PlannerQueryOperation(relational.rel);}
該方法最終生成一個PlannerQueryOperation,將Calcite轉換成的RelNode包裝進去。其中生成RelNodede的過程則是由Calcite的SqlToRelConverter完成,在這個過程中,會基于Flink定制的優化規則以及Calcite自身的一些規則進行優化。
execute()
最后通過org.apache.flink.table.api.internal.TableImpl類的execute()方法執行SQL查詢,返回一個TableResult對象,其源碼如下:
/*** 執行當前表的查詢操作,并返回執行結果。* 該方法調用 TableEnvironment 的 executeInternal 方法,傳入當前表的查詢操作對象。** @return 表示查詢執行結果的 TableResult 對象。*/@Overridepublic TableResult execute() {return tableEnvironment.executeInternal(getQueryOperation());}
而executeInternal()方法的整體邏輯是判斷Operation的類型,不同的Operation類型執行不同的操作,比如創建表、刪除表、修改表、查詢表、創建數據庫、刪除數據庫、創建視圖、刪除視圖、創建函數、刪除函數,分頁操作等。
其源碼如下:
/*** 執行單個操作并返回操作結果。** @param operation 要執行的操作* @return 操作結果* @throws ValidationException 如果操作過程中出現驗證錯誤* @throws TableException 如果操作過程中出現表相關的錯誤*/@Overridepublic TableResult executeInternal(Operation operation) {// 根據操作類型分發到不同的處理邏輯if (operation instanceof ModifyOperation) {// 如果傳入的是ModifyOperation,則調用批量執行方法return executeInternal(Collections.singletonList((ModifyOperation) operation));} else if (operation instanceof CreateTableOperation) {// 處理CreateTableOperation,包括創建臨時表或普通表CreateTableOperation createTableOperation = (CreateTableOperation) operation;if (createTableOperation.isTemporary()) {// 創建臨時表catalogManager.createTemporaryTable(createTableOperation.getCatalogTable(),createTableOperation.getTableIdentifier(),createTableOperation.isIgnoreIfExists());} else {// 創建普通表catalogManager.createTable(createTableOperation.getCatalogTable(),createTableOperation.getTableIdentifier(),createTableOperation.isIgnoreIfExists());}return TableResultImpl.TABLE_RESULT_OK;} else if (operation instanceof DropTableOperation) {// 處理DropTableOperation,包括刪除臨時表或普通表DropTableOperation dropTableOperation = (DropTableOperation) operation;if (dropTableOperation.isTemporary()) {// 刪除臨時表catalogManager.dropTemporaryTable(dropTableOperation.getTableIdentifier(), dropTableOperation.isIfExists());} else {// 刪除普通表catalogManager.dropTable(dropTableOperation.getTableIdentifier(), dropTableOperation.isIfExists());}return TableResultImpl.TABLE_RESULT_OK;} else if (operation instanceof AlterTableOperation) {// 處理各種表修改操作(AlterTableOperation),包括重命名表、修改表選項、修改表約束等AlterTableOperation alterTableOperation = (AlterTableOperation) operation;Catalog catalog = getCatalogOrThrowException(alterTableOperation.getTableIdentifier().getCatalogName());String exMsg = getDDLOpExecuteErrorMsg(alterTableOperation.asSummaryString());try {// 根據不同的AlterTableOperation類型執行相應的操作if (alterTableOperation instanceof AlterTableRenameOperation) {AlterTableRenameOperation alterTableRenameOp = (AlterTableRenameOperation) operation;catalog.renameTable(alterTableRenameOp.getTableIdentifier().toObjectPath(),alterTableRenameOp.getNewTableIdentifier().getObjectName(),false);} else if (alterTableOperation instanceof AlterTableOptionsOperation) {AlterTableOptionsOperation alterTablePropertiesOp = (AlterTableOptionsOperation) operation;catalogManager.alterTable(alterTablePropertiesOp.getCatalogTable(),alterTablePropertiesOp.getTableIdentifier(),false);} else if (alterTableOperation instanceof AlterTableAddConstraintOperation) {AlterTableAddConstraintOperation addConstraintOP = (AlterTableAddConstraintOperation) operation;// 更新表結構,添加主鍵約束CatalogTable oriTable = (CatalogTable) catalogManager.getTable(addConstraintOP.getTableIdentifier()).get().getTable();TableSchema.Builder builder = TableSchemaUtils.builderWithGivenSchema(oriTable.getSchema());if (addConstraintOP.getConstraintName().isPresent()) {builder.primaryKey(addConstraintOP.getConstraintName().get(),addConstraintOP.getColumnNames());} else {builder.primaryKey(addConstraintOP.getColumnNames());}CatalogTable newTable = new CatalogTableImpl(builder.build(),oriTable.getPartitionKeys(),oriTable.getOptions(),oriTable.getComment());catalogManager.alterTable(newTable, addConstraintOP.getTableIdentifier(), false);} else if (alterTableOperation instanceof AlterTableDropConstraintOperation) {AlterTableDropConstraintOperation dropConstraintOperation = (AlterTableDropConstraintOperation) operation;// 更新表結構,移除主鍵約束CatalogTable oriTable = (CatalogTable) catalogManager.getTable(dropConstraintOperation.getTableIdentifier()).get().getTable();CatalogTable newTable = new CatalogTableImpl(TableSchemaUtils.dropConstraint(oriTable.getSchema(),dropConstraintOperation.getConstraintName()),oriTable.getPartitionKeys(),oriTable.getOptions(),oriTable.getComment());catalogManager.alterTable(newTable, dropConstraintOperation.getTableIdentifier(), false);} else if (alterTableOperation instanceof AlterPartitionPropertiesOperation) {AlterPartitionPropertiesOperation alterPartPropsOp = (AlterPartitionPropertiesOperation) operation;// 修改分區屬性catalog.alterPartition(alterPartPropsOp.getTableIdentifier().toObjectPath(),alterPartPropsOp.getPartitionSpec(),alterPartPropsOp.getCatalogPartition(),false);} else if (alterTableOperation instanceof AlterTableSchemaOperation) {AlterTableSchemaOperation alterTableSchemaOperation = (AlterTableSchemaOperation) alterTableOperation;catalogManager.alterTable(alterTableSchemaOperation.getCatalogTable(),alterTableSchemaOperation.getTableIdentifier(),false);} else if (alterTableOperation instanceof AddPartitionsOperation) {AddPartitionsOperation addPartitionsOperation = (AddPartitionsOperation) alterTableOperation;// 添加多個分區List<CatalogPartitionSpec> specs = addPartitionsOperation.getPartitionSpecs();List<CatalogPartition> partitions = addPartitionsOperation.getCatalogPartitions();boolean ifNotExists = addPartitionsOperation.ifNotExists();ObjectPath tablePath = addPartitionsOperation.getTableIdentifier().toObjectPath();for (int i = 0; i < specs.size(); i++) {catalog.createPartition(tablePath, specs.get(i), partitions.get(i), ifNotExists);}} else if (alterTableOperation instanceof DropPartitionsOperation) {DropPartitionsOperation dropPartitionsOperation = (DropPartitionsOperation) alterTableOperation;// 刪除多個分區ObjectPath tablePath = dropPartitionsOperation.getTableIdentifier().toObjectPath();boolean ifExists = dropPartitionsOperation.ifExists();for (CatalogPartitionSpec spec : dropPartitionsOperation.getPartitionSpecs()) {catalog.dropPartition(tablePath, spec, ifExists);}}return TableResultImpl.TABLE_RESULT_OK;} catch (TableAlreadyExistException | TableNotExistException e) {throw new ValidationException(exMsg, e);} catch (Exception e) {throw new TableException(exMsg, e);}} else if (operation instanceof CreateViewOperation) {// 處理創建視圖操作CreateViewOperation createViewOperation = (CreateViewOperation) operation;if (createViewOperation.isTemporary()) {catalogManager.createTemporaryTable(createViewOperation.getCatalogView(),createViewOperation.getViewIdentifier(),createViewOperation.isIgnoreIfExists());} else {catalogManager.createTable(createViewOperation.getCatalogView(),createViewOperation.getViewIdentifier(),createViewOperation.isIgnoreIfExists());}return TableResultImpl.TABLE_RESULT_OK;} else if (operation instanceof DropViewOperation) {// 處理刪除視圖操作DropViewOperation dropViewOperation = (DropViewOperation) operation;if (dropViewOperation.isTemporary()) {catalogManager.dropTemporaryView(dropViewOperation.getViewIdentifier(), dropViewOperation.isIfExists());} else {catalogManager.dropView(dropViewOperation.getViewIdentifier(), dropViewOperation.isIfExists());}return TableResultImpl.TABLE_RESULT_OK;} else if (operation instanceof AlterViewOperation) {// 處理視圖修改操作AlterViewOperation alterViewOperation = (AlterViewOperation) operation;Catalog catalog = getCatalogOrThrowException(alterViewOperation.getViewIdentifier().getCatalogName());String exMsg = getDDLOpExecuteErrorMsg(alterViewOperation.asSummaryString());try {if (alterViewOperation instanceof AlterViewRenameOperation) {AlterViewRenameOperation alterTableRenameOp = (AlterViewRenameOperation) operation;catalog.renameTable(alterTableRenameOp.getViewIdentifier().toObjectPath(),alterTableRenameOp.getNewViewIdentifier().getObjectName(),false);} else if (alterViewOperation instanceof AlterViewPropertiesOperation) {AlterViewPropertiesOperation alterTablePropertiesOp = (AlterViewPropertiesOperation) operation;catalogManager.alterTable(alterTablePropertiesOp.getCatalogView(),alterTablePropertiesOp.getViewIdentifier(),false);} else if (alterViewOperation instanceof AlterViewAsOperation) {AlterViewAsOperation alterViewAsOperation = (AlterViewAsOperation) alterViewOperation;catalogManager.alterTable(alterViewAsOperation.getNewView(),alterViewAsOperation.getViewIdentifier(),false);}return TableResultImpl.TABLE_RESULT_OK;} catch (TableAlreadyExistException | TableNotExistException e) {throw new ValidationException(exMsg, e);} catch (Exception e) {throw new TableException(exMsg, e);}} else if (operation instanceof CreateDatabaseOperation) {// 處理創建數據庫操作CreateDatabaseOperation createDatabaseOperation = (CreateDatabaseOperation) operation;Catalog catalog = getCatalogOrThrowException(createDatabaseOperation.getCatalogName());String exMsg = getDDLOpExecuteErrorMsg(createDatabaseOperation.asSummaryString());try {catalog.createDatabase(createDatabaseOperation.getDatabaseName(),createDatabaseOperation.getCatalogDatabase(),createDatabaseOperation.isIgnoreIfExists());return TableResultImpl.TABLE_RESULT_OK;} catch (DatabaseAlreadyExistException e) {throw new ValidationException(exMsg, e);} catch (Exception e) {throw new TableException(exMsg, e);}} else if (operation instanceof DropDatabaseOperation) {// 處理刪除數據庫操作DropDatabaseOperation dropDatabaseOperation = (DropDatabaseOperation) operation;Catalog catalog = getCatalogOrThrowException(dropDatabaseOperation.getCatalogName());String exMsg = getDDLOpExecuteErrorMsg(dropDatabaseOperation.asSummaryString());try {catalog.dropDatabase(dropDatabaseOperation.getDatabaseName(),dropDatabaseOperation.isIfExists(),dropDatabaseOperation.isCascade());return TableResultImpl.TABLE_RESULT_OK;} catch (DatabaseNotExistException | DatabaseNotEmptyException e) {throw new ValidationException(exMsg, e);} catch (Exception e) {throw new TableException(exMsg, e);}} else if (operation instanceof AlterDatabaseOperation) {// 處理修改數據庫操作AlterDatabaseOperation alterDatabaseOperation = (AlterDatabaseOperation) operation;Catalog catalog = getCatalogOrThrowException(alterDatabaseOperation.getCatalogName());String exMsg = getDDLOpExecuteErrorMsg(alterDatabaseOperation.asSummaryString());try {catalog.alterDatabase(alterDatabaseOperation.getDatabaseName(),alterDatabaseOperation.getCatalogDatabase(),false);return TableResultImpl.TABLE_RESULT_OK;} catch (DatabaseNotExistException e) {throw new ValidationException(exMsg, e);} catch (Exception e) {throw new TableException(exMsg, e);}} else if (operation instanceof CreateCatalogFunctionOperation) {// 處理創建目錄函數操作return createCatalogFunction((CreateCatalogFunctionOperation) operation);} else if (operation instanceof CreateTempSystemFunctionOperation) {// 處理創建臨時系統函數操作return createSystemFunction((CreateTempSystemFunctionOperation) operation);} else if (operation instanceof DropCatalogFunctionOperation) {// 處理刪除目錄函數操作return dropCatalogFunction((DropCatalogFunctionOperation) operation);} else if (operation instanceof DropTempSystemFunctionOperation) {// 處理刪除臨時系統函數操作return dropSystemFunction((DropTempSystemFunctionOperation) operation);} else if (operation instanceof AlterCatalogFunctionOperation) {// 處理修改目錄函數操作return alterCatalogFunction((AlterCatalogFunctionOperation) operation);} else if (operation instanceof CreateCatalogOperation) {// 處理創建目錄操作return createCatalog((CreateCatalogOperation) operation);} else if (operation instanceof DropCatalogOperation) {// 處理刪除目錄操作DropCatalogOperation dropCatalogOperation = (DropCatalogOperation) operation;String exMsg = getDDLOpExecuteErrorMsg(dropCatalogOperation.asSummaryString());try {catalogManager.unregisterCatalog(dropCatalogOperation.getCatalogName(), dropCatalogOperation.isIfExists());return TableResultImpl.TABLE_RESULT_OK;} catch (CatalogException e) {throw new ValidationException(exMsg, e);}} else if (operation instanceof LoadModuleOperation) {// 處理加載模塊操作return loadModule((LoadModuleOperation) operation);} else if (operation instanceof UnloadModuleOperation) {// 處理卸載模塊操作return unloadModule((UnloadModuleOperation) operation);} else if (operation instanceof UseModulesOperation) {// 處理使用模塊操作return useModules((UseModulesOperation) operation);} else if (operation instanceof UseCatalogOperation) {// 處理切換當前目錄操作UseCatalogOperation useCatalogOperation = (UseCatalogOperation) operation;catalogManager.setCurrentCatalog(useCatalogOperation.getCatalogName());return TableResultImpl.TABLE_RESULT_OK;} else if (operation instanceof UseDatabaseOperation) {// 處理切換當前數據庫操作UseDatabaseOperation useDatabaseOperation = (UseDatabaseOperation) operation;catalogManager.setCurrentCatalog(useDatabaseOperation.getCatalogName());catalogManager.setCurrentDatabase(useDatabaseOperation.getDatabaseName());return TableResultImpl.TABLE_RESULT_OK;} else if (operation instanceof ShowCatalogsOperation) {// 處理顯示所有目錄操作return buildShowResult("catalog name", listCatalogs());} else if (operation instanceof ShowCurrentCatalogOperation) {// 處理顯示當前目錄操作return buildShowResult("current catalog name", new String[] {catalogManager.getCurrentCatalog()});} else if (operation instanceof ShowDatabasesOperation) {// 處理顯示所有數據庫操作return buildShowResult("database name", listDatabases());} else if (operation instanceof ShowCurrentDatabaseOperation) {// 處理顯示當前數據庫操作return buildShowResult("current database name", new String[] {catalogManager.getCurrentDatabase()});} else if (operation instanceof ShowModulesOperation) {// 處理顯示模塊操作,支持簡略和完整格式ShowModulesOperation showModulesOperation = (ShowModulesOperation) operation;if (showModulesOperation.requireFull()) {return buildShowFullModulesResult(listFullModules());} else {return buildShowResult("module name", listModules());}} else if (operation instanceof ShowTablesOperation) {// 處理顯示所有表操作return buildShowResult("table name", listTables());} else if (operation instanceof ShowFunctionsOperation) {// 處理顯示函數操作,支持顯示用戶定義函數和所有函數ShowFunctionsOperation showFunctionsOperation = (ShowFunctionsOperation) operation;String[] functionNames = null;switch (showFunctionsOperation.getFunctionScope()) {case USER:functionNames = listUserDefinedFunctions();break;case ALL:functionNames = listFunctions();break;default:throw new UnsupportedOperationException(String.format("SHOW FUNCTIONS with %s scope is not supported.",showFunctionsOperation.getFunctionScope()));}return buildShowResult("function name", functionNames);} else if (operation instanceof ShowViewsOperation) {// 處理顯示所有視圖操作return buildShowResult("view name", listViews());} else if (operation instanceof ShowPartitionsOperation) {// 處理顯示分區操作String exMsg = getDDLOpExecuteErrorMsg(operation.asSummaryString());try {ShowPartitionsOperation showPartitionsOperation = (ShowPartitionsOperation) operation;Catalog catalog = getCatalogOrThrowException(showPartitionsOperation.getTableIdentifier().getCatalogName());ObjectPath tablePath = showPartitionsOperation.getTableIdentifier().toObjectPath();CatalogPartitionSpec partitionSpec = showPartitionsOperation.getPartitionSpec();List<CatalogPartitionSpec> partitionSpecs =partitionSpec == null? catalog.listPartitions(tablePath): catalog.listPartitions(tablePath, partitionSpec);List<String> partitionNames = new ArrayList<>(partitionSpecs.size());for (CatalogPartitionSpec spec : partitionSpecs) {List<String> partitionKVs = new ArrayList<>(spec.getPartitionSpec().size());for (Map.Entry<String, String> partitionKV : spec.getPartitionSpec().entrySet()) {partitionKVs.add(partitionKV.getKey() + "=" + partitionKV.getValue());}partitionNames.add(String.join("/", partitionKVs));}return buildShowResult("partition name", partitionNames.toArray(new String[0]));} catch (TableNotExistException e) {throw new ValidationException(exMsg, e);} catch (Exception e) {throw new TableException(exMsg, e);}} else if (operation instanceof ExplainOperation) {// 處理EXPLAIN操作,返回SQL的執行計劃String explanation =explainInternal(Collections.singletonList(((ExplainOperation) operation).getChild()));return TableResultImpl.builder().resultKind(ResultKind.SUCCESS_WITH_CONTENT).schema(ResolvedSchema.of(Column.physical("result", DataTypes.STRING()))).data(Collections.singletonList(Row.of(explanation))).setPrintStyle(TableResultImpl.PrintStyle.rawContent()).setSessionTimeZone(getConfig().getLocalTimeZone()).build();} else if (operation instanceof DescribeTableOperation) {// 處理DESCRIBE TABLE操作,返回表的詳細信息DescribeTableOperation describeTableOperation = (DescribeTableOperation) operation;Optional<CatalogManager.TableLookupResult> result =catalogManager.getTable(describeTableOperation.getSqlIdentifier());if (result.isPresent()) {return buildDescribeResult(result.get().getResolvedSchema());} else {throw new ValidationException(String.format("Tables or views with the identifier '%s' doesn't exist",describeTableOperation.getSqlIdentifier().asSummaryString()));}} else if (operation instanceof QueryOperation) {// 處理查詢操作return executeQueryOperation((QueryOperation) operation);} else if (operation instanceof CreateTableASOperation) {// 處理CREATE TABLE AS操作,包括創建表和插入數據executeInternal(((CreateTableASOperation) operation).getCreateTableOperation());return executeInternal(((CreateTableASOperation) operation).getInsertOperation());} else if (operation instanceof NopOperation) {// 處理空操作return TableResultImpl.TABLE_RESULT_OK;} else {// 如果操作類型不支持,則拋出異常throw new TableException(UNSUPPORTED_QUERY_IN_EXECUTE_SQL_MSG);}
}
其中會通過executeQueryOperation()來執行查詢操作,其源碼如下:
/*** 執行查詢操作并返回表結果。* 此方法將查詢操作包裝在一個收集修改操作中,并異步執行該操作。* 它創建一個未注冊的收集接收器,將查詢操作轉換為轉換列表,* 然后使用執行環境創建并執行管道。** @param operation 要執行的查詢操作* @return 包含查詢結果的表結果* @throws TableException 如果執行SQL時發生錯誤*/private TableResult executeQueryOperation(QueryOperation operation) {// 創建一個未解析的標識符,用于表示未注冊的收集接收器final UnresolvedIdentifier unresolvedIdentifier =UnresolvedIdentifier.of("Unregistered_Collect_Sink_" + CollectModifyOperation.getUniqueId());// 將未解析的標識符轉換為合格的對象標識符final ObjectIdentifier objectIdentifier =catalogManager.qualifyIdentifier(unresolvedIdentifier);// 創建一個本地收集ModifyOperation結果的OperationCollectModifyOperation sinkOperation =new CollectModifyOperation(objectIdentifier, operation);// 將上一步的 sinkOperation 翻譯為Flink的transformationList<Transformation<?>> transformations =translate(Collections.singletonList(sinkOperation));// 設置作業名稱String jobName = getJobName("collect");// 根據transformation,生成StreamGraphPipeline pipeline = execEnv.createPipeline(transformations, tableConfig, jobName);try {// 代表作業異步執行過程JobClient jobClient = execEnv.executeAsync(pipeline);// 用于幫助jobClient獲取執行結果CollectResultProvider resultProvider = sinkOperation.getSelectResultProvider();resultProvider.setJobClient(jobClient);// 構建TableResultImpl對象return TableResultImpl.builder().jobClient(jobClient).resultKind(ResultKind.SUCCESS_WITH_CONTENT).schema(operation.getResolvedSchema()).data(resultProvider.getResultIterator()).setPrintStyle(TableResultImpl.PrintStyle.tableau(PrintUtils.MAX_COLUMN_WIDTH,PrintUtils.NULL_COLUMN,true,isStreamingMode)).setSessionTimeZone(getConfig().getLocalTimeZone()).build();} catch (Exception e) {// 如果執行過程中發生異常,拋出表異常throw new TableException("Failed to execute sql", e);}}
總結
本文介紹了Flink,并通過源碼梳理了Flink SQL的執行原理。
Flink SQL從提交查詢到任務執行,可以分為以下過程:
- 語法解析
利用Calcite將SQL語句轉換成一棵抽象語法樹,在Calcite中用SqlNode來表示。 - 語法校驗
根據元數據信息進行驗證,例如查詢的表、使用的函數是否存在等,校驗之后仍然是由SqlNode構成的語法樹。 - 查詢計劃優化
首先將SqlNode語法樹轉換成由關系表達式RelNode構成的邏輯樹,然后使用優化器基于規則進行等價變換。 - 物理執行
邏輯查詢計劃翻譯成物理執行計劃,生成對應的可執行代碼并提交運行。