Apache Flink是一個在有界數據流和無界數據流上進行有狀態計算分布式處理引擎和框架。Flink 設計旨在所有常見的集群環境中運行,以任意規模和內存級速度執行計算。
一、主要特點和功能
1. 實時流處理:
- 低延遲: Flink 能夠以亞秒級的延遲處理數據流,非常適合對時間敏感的應用,如實時分析、監控和告警系統。
- 狀態管理: Flink 提供了對狀態的內置支持,使得開發有狀態的流式處理應用變得容易,如窗口操作、復雜事件處理等。
2. 批處理和流處理的統一:
-
Flink 既可以用于流處理,也可以用于批處理,允許用戶在一個框架中編寫應用程序,而不必在批處理和流處理之間切換。
-
事件時間處理: Flink 支持事件時間語義,可以基于數據本身的時間戳進行處理,而不是數據到達的時間,這對于處理無序數據流非常重要。
// scala table api // 引入 Flink 的批處理環境 val env = ExecutionEnvironment.getExecutionEnvironment//批處理: 讀取文本文件 env.readTextFile("data/words.txt")// 處理數據: 切換、轉換、分組、聚合.flatMap(_.replaceAll("[^a-zA-Z ]", "").split("\\s+")).map((_, 1)).groupBy(0).sum(1)// 輸出.print()
// 引入 Flink 的流處理環境 val env = StreamExecutionEnvironment.getExecutionEnvironment // 有界流處理: 文件數據 env.readTextFile("data/words.txt").flatMap(_.replaceAll("[^a-zA-Z ]", "").split("\\s+")).map((_, 1)).keyBy(_._1).sum(1).print()// 無界流處理: Socket env.socketTextStream("nodeIp", 9999).flatMap(_.replaceAll("[^a-zA-Z ]", "").split("\\s+")).map((_, 1)).keyBy(_._1).sum(1).print()// 執行 Flink 作業,并給它命名 env.execute("Word Count Example")
3. 高度可擴展性:
- Flink 能夠在大規模分布式集群上運行,處理從幾千到上百萬個事件每秒的數據流。
- 彈性和容錯: Flink 使用檢查點和保存點機制來提供容錯能力,確保在發生故障時可以從之前的狀態恢復,減少數據丟失。
4. 支持多種數據源和接收器:
- Flink 能夠與多種數據源和接收器集成,如 Kafka、HDFS、Cassandra、Elasticsearch 等,使其可以輕松地處理和存儲來自不同系統的數據。
5. 豐富的 API 和庫:
- DataStream API: 用于流處理,允許開發者定義復雜的數據流處理邏輯。
- DataSet API: 用于批處理,提供了豐富的操作符來處理靜態數據集(將在 Flink 2.0 版本被刪除如何遷移 DataSet 到 DataStream | Apache Flink)
- Table API 和 SQL: 提供了一個更高級別的 API,允許開發者使用 SQL 查詢來處理數據流和數據集。
- 機器學習和圖處理庫: Flink 提供了機器學習庫(FlinkML)和圖處理庫(Gelly),適用于高級分析任務。
6. 部署靈活性:
- Flink 可以部署在多種環境中,如獨立集群、YARN、Kubernetes、Mesos 以及本地環境中。
- 流批一體: Flink 支持將批處理和流處理集成到同一個應用程序中,簡化了部署和管理。
7. 社區與生態系統:
- Flink 由一個活躍的開源社區維護和發展,生態系統日益壯大,支持越來越多的第三方工具和集成。
典型應用場景
- 實時數據分析: Flink 可用于處理實時事件流,提供實時分析、告警和監控。
- 復雜事件處理: Flink 能夠處理和識別復雜事件模式,用于金融監控、欺詐檢測等。
- 日志處理: 可以實時處理和分析來自各種系統的日志數據,提取有價值的信息。
- 機器學習: Flink 的流處理能力可以用于實時更新機器學習模型,或在流數據上直接進行預測。
Apache Flink 適用于各種需要實時和批處理的應用程序,尤其是在處理大規模數據流時表現出色。·
二、Flink下載,集群安裝配置
官方下載地址:Downloads | Apache Flink
1. 下載、解壓、配置環境變量
wget https://dlcdn.apache.org/flink/flink-1.20.0/flink-1.20.0-bin-scala_2.12.tgztar -zxvf flink-1.20.0 -C /opt/softwarevim /etc/profile.d/myenv.sh
# FLINK_HOME=...
# PAHT=$PATH:$FLINK_HOME/bin
2. 進入 Flink 配置目錄:
cd $FLINK_HOME/conf
3. 編輯 masters
文件:
在文件中指定 JobManager 的主機名或 IP 地址。如果有多個 JobManager(用于高可用性),每個 JobManager 使用一行。
vim masters
# 格式如下
<JobManager1>:<port>
<JobManager2>:<port> # 如果有高可用性設置# 示例
master01:8081
master02:8081
4. 編輯 workers
文件:
在文件中列出所有 TaskManager 的主機名或 IP 地址,每個 TaskManager 使用一行
vim workers
# 格式如下
<TaskManager1>
<TaskManager2># 示例
worker01
worker02
5. 編輯 flink-conf.yaml
文件:
flink-conf.yaml
是 Flink 的主要配置文件,用于配置各種集群參數。
vim flink-conf.yaml
# 指定 JobManager 的 RPC 服務監聽的地址
jobmanager.rpc.address:
# 指定 JobManager 在所有網絡接口上進行綁定
jobmanager.bind-host: 0.0.0.0# 指定 TaskManager 進程的外部地址
taskmanager.host: master01 # 每臺機器這里不同,其他相同
# 指定 TaskManager 綁定的網絡接口
taskmanager.bind-host: 0.0.0.0# 指定 Flink 集群中 REST API 服務的外部地址
rest.address: master01
# 指定 REST API 服務在所有網絡接口上進行綁定。
rest.bind-address: 0.0.0.0
以下看需配置
# TaskManager 內存:
taskmanager.memory.process.size: 1024m# TaskManager 的槽位數:
taskmanager.numberOfTaskSlots: 4# 高可用性配置(如果需要高可用性):
high-availability.type: zookeeper
high-availability.zookeeper.quorum: localhost:2181
high-availability.zookeeper.path.root: /flink
high-availability.cluster-id: /cluster_one
high-availability.storageDir: hdfs:///flink/recovery
state.checkpoints.dir: hdfs:///flink/checkpoints
state.savepoints.dir: hdfs:///flink/savepoints# 并行度:
parallelism.default: 4# 日志配置:
taskmanager.log.dir: /var/log/flink# HDFS 配置:
fs.default-scheme: hdfs://namenode:9000
6. 啟動 Flink 集群
$FLINK_HOME/bin/stop-cluster.sh
$FLINK_HOME/bin/start-cluster.sh