40 Kafka Streams與其他流處理平臺的差異在哪里?
什么是流處理平臺?
“Streaming Systems”一書是這么定義“流處理平臺”的:流處理平臺(Streaming System)是處理無限數據集(Unbounded Dataset)的數據處理引擎,而流處理是與批處理(Batch Processing)相對應的。
所謂的無限數據,是指數據永遠沒有盡頭。流處理平臺是專門處理這種數據集的系統或框架。當然,這并不是說批處理系統不能處理這種無限數據集,只是通常情況下,它更擅長處理有限數據集(Bounded Dataset)。
那流處理和批處理究竟該如何區分呢?下面這張圖應該能幫助你快速且直觀地理解它們的區別。
現在我來詳細解釋一下流處理和批處理的區別。
長期以來,流處理給人的印象通常是低延時,但是結果不準確。每來一條消息,它就能計算一次結果,但由于它處理的大多是無界數據,可能永遠也不會結束,因此在流處理中,我們很難精確描述結果何時是精確的。理論上,流處理的計算結果會不斷地逼近精確結果。
但是,它的競爭對手批處理則正好相反。批處理能提供準確的計算結果,但往往延時很高。
因此,業界的大神們揚長避短,將兩者結合在一起使用。一方面,利用流處理快速地給出不那么精確的結果;另一方面,依托于批處理,最終實現數據一致性。這就是所謂的Lambda架構。
延時低是個很好的特性,但如果計算結果不準確,流處理是無法完全替代批處理的。所謂計算結果準確,在教科書或文獻中有個專屬的名字,叫正確性(Correctness)。可以這么說,目前難以實現正確性是流處理取代批處理的最大障礙,而實現正確性的基石是精確一次處理語義
(Exactly Once Semantics,EOS)。
這里的精確一次是流處理平臺能提供的一類一致性保障。常見的一致性保障有三類:
- 至多一次(At most once)語義:消息或事件對應用狀態的影響最多只有一次。
- 至少一次(At least once)語義:消息或事件對應用狀態的影響最少一次。
- 精確一次(Exactly once)語義:消息或事件對應用狀態的影響有且只有一次。
注意,我這里說的都是對應用狀態的影響。對于很多有副作用(Side Effect)的操作而言,實現精確一次語義幾乎是不可能的。舉個例子,假設流處理中的某個步驟是發送郵件操作,當郵件發送出去后,倘若后面出現問題要回滾整個流處理流程,已發送的郵件是沒法追回的,這就是所謂的副作用。當你的流處理邏輯中存在包含副作用的操作算子時,該操作算子的執行是無法保證精確一次處理的。因此,我們通常只是保證這類操作對應用狀態的影響精確一次罷了。后面我們會重點討論Kafka Streams是如何實現EOS的。
我們今天討論的流處理既包含真正的實時流處理,也包含微批化(Microbatch)的流處理。所謂的微批化,其實就是重復地執行批處理引擎來實現對無限數據集的處理。典型的微批化實現平臺就是Spark Streaming。
Kafka Streams的特色
相比于其他流處理平臺,Kafka Streams最大的特色就是它不是一個平臺,至少它不是一個具備完整功能的平臺,比如其他框架中自帶的調度器和資源管理器,就是Kafka Streams不提供的。
Kafka官網明確定義Kafka Streams是一個Java客戶端庫。你可以使用這個庫來構建高伸縮性、高彈性、高容錯性的分布式應用以及微服務。
使用Kafka Streams API構建的應用就是一個普通的Java應用程序。你可以選擇任何熟悉的技術或框架對其進行編譯、打包、部署和上線。
在我看來,這是Kafka Streams與Storm、Spark Streaming或Flink最大的區別。
Java客戶端庫的定位既可以說是特色,也可以說是一個缺陷。目前Kafka Streams在國內推廣緩慢的一個重要原因也在于此。畢竟,很多公司希望它是一個功能完備的平臺,既能提供流處理應用API,也能提供集群資源管理域調度方面的能力。所以,這個定位到底是特色還是缺陷,仁者見仁、智者見智吧。
Kafka Streams與其他框架的差異
接下來,我從應用部署、上下游數據源、協調方式和消息語義保障4個方面,詳細分析一下Kafka Streams與其他框架的差異。
應用部署
首先,我們從流處理應用部署方式上對Kafka Streams及其他框架進行區分。
我們剛剛提到過,Kafka Streams應用需要開發人員自行打包和部署,你甚至可以將Kafka Streams應用嵌入到其他Java應用中。因此,作為開發者的你,除了要開發代碼之外,還要自行管理Kafka Streams應用的生命周期,要么將其打包成獨立的jar包單獨運行,要么將流處理邏輯嵌入到微服務中,開放給其他服務調用。
但不論是哪種部署方式,你需要自己處理,不要指望Kafka Streams幫你做這些事情。
相反地,其他流處理平臺則提供了完整的部署方案。我以Apache Flink為例來解釋一下。在Flink中,流處理應用會被建模成單個的流處理計算邏輯,并封裝進Flink的作業中。類似地,Spark中也有作業的概念,而在Storm中則叫拓撲。作業的生命周期由框架來管理,特別是在Flink中,Flink框架自行負責管理作業,包括作業的部署和更新等。這些都無需應用開發人員干預。
另外,Flink這類框架都存在資源管理器的角色。一個作業所需的資源完全由框架層的資源管理器來支持。常見的資源管理器,如YARN、Kubernetes、Mesos等,比較新的流處理框架(如Spark、Flink等)都是支持的。像Spark和Flink這樣的框架,也支持Standalone集群的方式,即不借助于任何已有的資源管理器,完全由集群自己來管理資源。這些都是Kafka Streams無法提供的。
因此,從應用部署方面來看,Kafka Streams更傾向于將部署交給開發人員來做,而不是依賴于框架自己實現。
上下游數據源
簡單來說,Kafka Streams目前只支持從Kafka讀數據以及向Kafka寫數據。在沒有Kafka Connect組件的支持下,Kafka Streams只能讀取Kafka集群上的主題數據,在完成流處理邏輯后也只能將結果寫回到Kafka主題上。
反觀Spark Streaming和Flink這類框架,它們都集成了豐富的上下游數據源連接器(Connector),比如常見的連接器MySQL、ElasticSearch、HBase、HDFS、Kafka等。如果使用這些框架,你可以很方便地集成這些外部框架,無需二次開發。
當然,由于開發Connector通常需要同時掌握流處理框架和外部框架,因此在實際使用過程中,Connector的質量參差不齊,在具體使用的時候,你可以多查查對應的jira官網,看看有沒有明顯的“坑”,然后再決定是否使用。
在這個方面,我是有前車之鑒的。曾經,我使用過一個Connector,我發現它在讀取Kafka消息向其他系統寫入的時候似乎總是重復消費。費了很多周折之后,我才發現這是一個已知的Bug,而且早就被記錄在jira官網上了。因此,我推薦你多逛下jira,也許能提前避開一些“坑”。
總之,目前Kafka Streams只支持與Kafka集群進行交互,它沒有提供開箱即用的外部數據源連接器。
協調方式
在分布式協調方面,Kafka Streams應用依賴于Kafka集群提供的協調功能,來提供高容錯性和高伸縮性。
Kafka Streams應用底層使用了消費者組機制來實現任意的流處理擴縮容。應用的每個實例或節點,本質上都是相同消費者組下的獨立消費者,彼此互不影響。它們之間的協調工作,由Kafka集群Broker上對應的協調者組件來完成。當有實例增加或退出時,協調者自動感知并重新分配負載。
我畫了一張圖來展示每個Kafka Streams實例內部的構造,從這張圖中,我們可以看出,每個實例都由一個消費者實例、特定的流處理邏輯,以及一個生產者實例組成,而這些實例中的消費者實例,共同構成了一個消費者組。
通過這個機制,Kafka Streams應用同時實現了高伸縮性和高容錯性,而這一切都是自動提供的,不需要你手動實現。
而像Flink這樣的框架,它的容錯性和擴展性是通過專屬的主節點(Master Node)全局來協調控制的。
Flink支持通過ZooKeeper實現主節點的高可用性,避免單點失效:某個節點出現故障會自動觸發恢復操作。這種全局性協調模型對于流處理中的作業而言非常實用,但不太適配單獨的流處理應用程序。原因就在于它不像Kafka Streams那樣輕量級,應用程序必須要實現特定的API來開啟檢查點機制(checkpointing),同時還需要親身參與到錯誤恢復的過程中。
應該這樣說,在不同的場景下,Kafka Streams和Flink這種重量級的協調模型各有優劣。
消息語義保障
精確一次處理語義(Exactly Once Semantics,EOS)
我們剛剛提到過EOS,目前很多流處理框架都宣稱它們實現了EOS,也包括Kafka Streams本身。關于精確一次處理語義,有一些地方需要澄清一下。
實際上,當把Spark、Flink與Kafka結合使用時,如果不使用Kafka在0.11.0.0版本引入的冪等性Producer和事務型Producer,這些框架是無法實現端到端的EOS的。
因為這些框架與Kafka是相互獨立的,彼此之間沒有任何語義保障機制。但如果使用了事務機制,情況就不同了。這些外部系統利用Kafka的事務機制,保障了消息從Kafka讀取到計算再到寫入Kafka的全流程EOS。這就是所謂的端到端精確一次處理語義。
之前Spark和Flink宣稱的EOS都是在各自的框架內實現的,無法實現端到端的EOS。只有使用了Kafka的事務機制,它們對應的Connector才有可能支持端到端精確一次處理語義。
Spark官網上明確指出了用戶若要實現與Kafka的EOS,必須自己確保冪等輸出和位移保存在同一個事務中。如果你不能自己實現這套機制,那么就要依賴于Kafka提供的事務機制來保證。
而Flink在Kafka 0.11之前也宣稱提供EOS,不過是有前提條件的,即每條消息對Flink應用狀態的影響有且只有一次。
舉個例子,如果你使用Flink從Kafka讀取消息,然后不加任何處理直接寫入到MySQL,那么這個操作就是無狀態的,此時Flink無法保證端到端的EOS。
換句話說,Flink最后寫入到MySQL的Kafka消息可能有重復的。當然,Flink社區自1.4版本起正式實現了端到端的EOS,其基本設計思想正是基于Kafka 0.11冪等性Producer的兩階段提交機制。
兩階段提交(2PC)機制是一種分布式事務機制,用于實現分布式系統上跨多個節點事務的原子性提交。下面這張圖來自于神書“Designing Data-Intensive Applications”中關于2PC講解的章節。它清晰地描述了一次成功2PC的過程。在這張圖中,兩個數據庫參與到分布式事務的提交過程中,它們各自做了一些變更,現在需要使用2PC來保證兩個數據庫的變更被原子性地提交。如圖所示,2PC被分為兩個階段:Prepare階段和Commit階段。只有完整地執行了這兩個階段,這個分布式事務才算是提交成功。
分布式系統中的2PC常見于數據庫內部實現或以XA事務的方式供各種異質系統使用。Kafka也借鑒了2PC的思想,在Kafka內部實現了基于2PC的事務機制。
但是,對于Kafka Streams而言,情況就不同了。它天然支持端到端的EOS,因為它本來就是和Kafka緊密相連的。
下圖展示了一個典型的Kafka Streams應用的執行邏輯。
通常情況下,一個Kafka Streams需要執行5個步驟:
- 讀取最新處理的消息位移;
- 讀取消息數據;
- 執行處理邏輯;
- 將處理結果寫回到Kafka;
- 保存位置信息。
這五步的執行必須是原子性的,否則無法實現精確一次處理語義。
在設計上,Kafka Streams在底層大量使用Kafka事務機制和冪等性Producer來實現多分區的原子性寫入,又因為它只能讀寫Kafka,因此Kafka Streams很容易地就實現了端到端的EOS。
總之,雖然Flink自1.4版本也提供與Kafka的EOS,但從適配性來考量的話,應該說Kafka Streams與Kafka的適配性是最好的。
41 Kafka Streams DSL開發實例
DSL,也就是Domain Specific Language,意思是領域特定語言。它提供了一組便捷的API幫助我們實現流式數據處理邏輯。今天,我就來分享一些Kafka Streams中的DSL開發方法以及具體實例。
Kafka Streams背景介紹
流處理平臺是專門處理無限數據集的引擎。就Kafka Streams而言,它僅僅是一個客戶端庫。所謂的Kafka Streams應用,就是調用了Streams API的普通Java應用程序。只不過在Kafka Streams中,流處理邏輯是用拓撲來表征的。
一個拓撲結構本質上是一個有向無環圖(DAG),它由多個處理節點(Node)和連接節點的多條邊組成,如下圖所示:
圖中的節點也稱為處理單元或Processor,它封裝了具體的事件處理邏輯。Processor在其他流處理平臺也被稱為操作算子。常見的操作算子包括轉換(map)、過濾(filter)、連接(join)和聚合(aggregation)等。后面我會詳細介紹幾種常見的操作算子。
大體上,Kafka Streams開放了兩大類API供你定義Processor邏輯。
第1類就是我剛剛提到的DSL,它是聲明式的函數式API,使用起來感覺和SQL類似,你不用操心它的底層是怎么實現的,你只需要調用特定的API告訴Kafka Streams你要做什么即可。
舉個簡單的例子,你可以看看下面這段代碼,嘗試理解下它是做什么的。
movies.filter((title, movie) -> movie.getGenre().equals("動作片")).xxx()...
這段代碼雖然用了Java 8的Lambda表達式,但從整體上來看,它要做的事情應該還是很清晰的:它要從所有Movie事件中過濾出影片類型是“動作片”的事件。這就是DSL聲明式API的實現方式。
第2類則是命令式的低階API,稱為Processor API。比起DSL,這組API提供的實現方式更加靈活。你可以編寫自定義的算子來實現一些DSL天然沒有提供的處理邏輯。事實上,DSL底層也是用Processor API實現的。
目前,Kafka Streams DSL提供的API已經很豐富了,基本上能夠滿足我們大部分的處理邏輯需求,我今天重點介紹一下DSL的使用方法。
不論是用哪組API實現,所有流處理應用本質上都可以分為兩類:有狀態的(Stateful)應用和無狀態的(Stateless)應用。
有狀態的應用指的是應用中使用了類似于連接
、聚合
或時間窗口(Window)
的API。一旦調用了這些API,你的應用就變為有狀態的了,也就是說你需要讓Kafka Streams幫你保存應用的狀態。
無狀態的應用是指在這類應用中,某條消息的處理結果不會影響或依賴其他消息的處理。常見的無狀態操作包括事件轉換以及剛剛那個例子中的過濾等。
關鍵概念
了解了這些背景之后,你還需要掌握一些流處理領域內的關鍵概念,即流、表以及流表二元性,還有時間和時間窗口。
流表二元性
流就是一個永不停止(至少理論上是這樣的)的事件序列,而表和關系型數據庫中的概念類似,是一組行記錄。在流處理領域,兩者是有機統一的:流在時間維度上聚合之后形成表
,表在時間維度上不斷更新形成流
,這就是所謂的流表二元性。流表二元性在流處理領域內的應用是Kafka框架賴以成功的重要原因之一。
下面這張圖展示了表轉換成流,流再轉換成表的全過程。
剛開始時,表中只有一條記錄“張三:1”。將該條記錄轉成流,變成了一條事件。接著,表增加了新記錄“李四:1”。針對這個變更,流中也增加了對應的新事件。之后,表中張三的對應值,從1更新為2,流也增加了相應的更新事件。最后,表中添加了新數據“王五:1”,流也增加了新記錄。至此,表轉換成流的工作就完成了。
從這個過程中我們可以看出,流可以看作是表的變更事件日志(Changelog)。與之相反的是,流轉換成表的過程,可以說是這個過程的逆過程:我們為流中的每條事件打一個快照(Snapshot),就形成了表。
流和表的概念在流處理領域非常關鍵。在Kafka Streams DSL中,流用KStream表示,而表用KTable表示。
Kafka Streams還定義了GlobalKTable。本質上它和KTable都表征了一個表,里面封裝了事件變更流,但是它和KTable的最大不同在于,當Streams應用程序讀取Kafka主題數據到GlobalKTable時,它會讀取主題所有分區的數據,而對KTable而言,Streams程序實例只會讀取部分分區的數據,這主要取決于Streams實例的數量。
時間
在流處理領域內,精確定義事件時間是非常關鍵的:一方面,它是決定流處理應用能否實現正確性的前提;另一方面,流處理中時間窗口等操作依賴于時間概念才能正常工作。
常見的時間概念有兩類:事件發生時間(Event Time)和事件處理時間(Processing Time)。理想情況下,我們希望這兩個時間相等,即事件一旦發生就馬上被處理,但在實際場景中,這是不可能的,Processing Time永遠滯后于Event Time,,而且滯后程度又是一個高度變化,無法預知,就像“Streaming Systems”一書中的這張圖片所展示的那樣:
該圖中的45°虛線刻畫的是理想狀態,即Event Time等于Processing Time,而粉色的曲線表征的是真實情況,即Processing Time落后于Event Time,而且落后的程度(Lag)不斷變化,毫無規律。
如果流處理應用要實現結果的正確性,就必須要使用基于Event Time的時間窗口,而不能使用基于Processing Time的時間窗口。
時間窗口
所謂的時間窗口機制,就是將流數據沿著時間線切分的過程。常見的時間窗口包括:固定時間窗口(Fixed Windows)、滑動時間窗口(Sliding Windows)和會話窗口(Session Windows)。Kafka Streams同時支持這三類時間窗口。在后面的例子中,我會詳細介紹如何使用Kafka Streams API實現時間窗口功能。
運行WordCount實例
關于Kafka Streams及其DSL的基本概念我都闡述完了,下面我給出大數據處理領域的Hello World實例:WordCount程序。
每個大數據處理框架第一個要實現的程序基本上都是單詞計數。我們來看下Kafka Streams DSL如何實現WordCount。我先給出完整代碼,稍后我會詳細介紹關鍵部分代碼的含義以及運行它的方法。
package kafkalearn.demo.wordcount;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Produced;import java.util.Arrays;
import java.util.Locale;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;public final class WordCountDemo {public static void main(final String[] args) {final Properties props = new Properties();props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-stream-demo");props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");final StreamsBuilder builder = new StreamsBuilder();final KStream<String, String> source = builder.stream("wordcount-input-topic");final KTable<String, Long> counts = source.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" "))).groupBy((key, value) -> value).count();counts.toStream().to("wordcount-output-topic", Produced.with(Serdes.String(), Serdes.Long()));final KafkaStreams streams = new KafkaStreams(builder.build(), props);final CountDownLatch latch = new CountDownLatch(1);Runtime.getRuntime().addShutdownHook(new Thread("wordcount-stream-demo-jvm-hook") {@Overridepublic void run() {streams.close();latch.countDown();}});try {streams.start();latch.await();} catch (final Throwable e) {System.exit(1);}System.exit(0)}
}
在程序開頭,我構造了一個Properties對象實例,對Kafka Streams程序的關鍵參數進行了賦值,比如application id、bootstrap servers和默認的KV序列化器(Serializer)和反序列化器(Deserializer)。其中,application id是Kafka Streams應用的唯一標識,必須要顯式地指定。默認的KV序列化器、反序列化器是為消息的Key和Value進行序列化和反序列化操作的。
接著,我構造了一個StreamsBuilder對象,并使用該對象實例創建了一個KStream,這個KStream從名為wordcount-input-topic的Kafka主題讀取消息。該主題消息由一組單詞組成,單詞間用空格分割,比如zhangsan lisi wangwu。
由于我們要進行單詞計數,所以就需要將消息中的單詞提取出來。有了前面的概念介紹,你應該可以猜到,KTable是很合適的存儲結構,因此,下一步就是將剛才的這個KStream轉換成KTable。
我們先對單詞進行分割,這里我用到了flatMapValues方法,代碼中的Lambda表達式實現了從消息中提取單詞的邏輯。由于String.split()方法會返回多個單詞,因此我們使用flatMapValues而不是mapValues。原因是,前者能夠將多個元素“打散”成一組單詞,而如果使用后者,我們得到的就不是一組單詞,而是多組單詞了。
這些都做完之后,程序調用groupBy方法對單詞進行分組。由于是計數,相同的單詞必須被分到一起,然后就是調用count方法對每個出現的單詞進行統計計數,并保存在名為counts的KTable對象中。
最后,我們將統計結果寫回到Kafka中。由于KTable是表,是靜態的數據,因此這里要先將其轉換成KStream,然后再調用to方法寫入到名為wordcount-output-topic的主題中。此時,counts中事件的Key是單詞,而Value是統計個數,因此我們在調用to方法時,同時指定了Key和Value的序列化器,分別是字符串序列化器和長整型序列化器。
至此,Kafka Streams的流計算邏輯就編寫完了,接下來就是構造KafkaStreams實例并啟動它了。通常來說,這部分的代碼都是類似的,即調用start方法啟動整個流處理應用,以及配置一個JVM關閉鉤子(Shutdown Hook)實現流處理應用的關閉等。
總體來說,Kafka Streams DSL實現WordCount的方式還是很簡單的,僅僅調用幾個操作算子就輕松地實現了分布式的單詞計數實時處理功能。事實上,現在主流的實時流處理框架越來越傾向于這樣的設計思路,即通過提供豐富而便捷的開箱即用操作算子,簡化用戶的開發成本,采用類似于搭積木的方式快捷地構建實時計算應用。
待啟動該Java程序之后,你需要創建出對應的輸入和輸出主題,并向輸入主題不斷地寫入符合剛才所說的格式的單詞行,之后,你需要運行下面的命令去查看輸出主題中是否正確地統計了你剛才輸入的單詞個數:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \--topic wordcount-output-topic \--from-beginning \--formatter kafka.tools.DefaultMessageFormatter \--property print.key=true \--property print.value=true \--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
開發API
介紹了具體的例子之后,我們來看下Kafka Streams還提供了哪些功能強大的API。我們可以重點關注兩個方面,一個是常見的操作算子,另一個是時間窗口API。
常見操作算子
操作算子的豐富程度和易用性是衡量流處理框架受歡迎程度的重要依據之一。Kafka Streams DSL提供了很多開箱即用的操作算子,大體上分為兩大類:無狀態算子和有狀態算子。下面我就向你分別介紹幾個經常使用的算子。
在無狀態算子中,filter的出場率是極高的。它執行的就是過濾的邏輯。依然拿WordCount為例,假設我們只想統計那些以字母s開頭的單詞的個數,我們可以在執行完flatMapValues后增加一行代碼,代碼如下:
.filter(((key, value) -> value.startsWith("s")))
另一個常見的無狀態算子當屬map一族了。Streams DSL提供了很多變體,比如map、mapValues、flatMap和flatMapValues。我們已經見識了flatMapValues的威力,其他三個的功能也是類似的,只是所有帶Values的變體都只對消息體執行轉換,不觸及消息的Key,而不帶Values的變體則能修改消息的Key。
舉個例子,假設當前消息沒有Key,而Value是單詞本身,現在我們想要將消息變更成這樣的KV對,即Key是單詞小寫,而Value是單詞長度,那么我們可以調用map方法,代碼如下:
KStream<String, Integer> transformed = stream.map((key, value) -> KeyValue.pair(value.toLowerCase(), value.length()));
最后,我再介紹一組調試用的無狀態算子:print和peek。Streams DSL支持你使用這兩個方法查看你的消息流中的事件。這兩者的區別在于,print是終止操作,一旦你調用了print方法,后面就不能再調用任何其他方法了,而peek則允許你在查看消息流的同時,依然能夠繼續對其進行處理,比如下面這兩段代碼所示:
stream.print(Printed.toFile("streams.out").withLabel("debug"));
stream.peek((key, value) -> System.out.println("key=" + key + ", value=" + value)).map(...);
常見的有狀態操作算子主要涉及聚合(Aggregation)方面的操作,比如計數、求和、求平均值、求最大最小值等。Streams DSL目前只提供了count方法用于計數,其他的聚合操作需要你自行使用API實現。
假設我們有個消息流,每條事件就是一個單獨的整數,現在我們想要對其中的偶數進行求和,那么Streams DSL中的實現方法如下:
final KTable<Integer, Integer> sumOfEvenNumbers = input.filter((k, v) -> v % 2 == 0).selectKey((k, v) -> 1).groupByKey().reduce((v1, v2) -> v1 + v2);
我簡單解釋一下selectKey調用。由于我們要對所有事件中的偶數進行求和,因此需要把這些消息的Key都調整成相同的值,因此這里我使用selectKey指定了一個Dummy Key值,即上面這段代碼中的數值1。它沒有任何含義,僅僅是讓所有消息都賦值上這個Key而已。真正核心的代碼在于reduce調用,它是執行求和的關鍵邏輯。
時間窗口實例
前面說過,Streams DSL支持3類時間窗口。前兩類窗口通過TimeWindows.of方法來實現,會話窗口通過SessionWindows.with來實現。
假設在剛才的WordCount實例中,我們想每一分鐘統計一次單詞計數,那么需要在調用count之前增加下面這行代碼:
.windowedBy(TimeWindows.of(Duration.ofMinutes(1)))
同時,你還需要修改counts的類型,此時它不再是KTable了,而變成了KTable,因為引入了時間窗口,所以,事件的Key也必須要攜帶時間窗口的信息。除了這兩點變化,WordCount其他部分代碼都不需要修改。
可見,Streams DSL在API封裝性方面還是做得很好的,通常你只需要增加或刪減幾行代碼,就能實現處理邏輯的修改了。
小結
- 流表二元性:流就是一個永不停止(至少理論上是這樣的)的事件序列,而表是一組行記錄。流在時間維度上聚合之后形成表,表在時間維度上不斷更新形成流。
- 時間概念:常見的有兩類,分別是事件發生時間和事件處理時間。在實際場景中,事件處理時間永遠滯后于事件發生時間。
- 時間窗口機制:將流數據沿著時間線切分的過程常見的時間窗口包括固定時間窗口、滑動時間窗口和會話窗口。
- 常見操作算子:無狀態算子中的filter、map一族、print和peek;有狀態算子中涉及聚合方面的操作。