相關資料
文檔內容 | 鏈接地址 |
---|---|
datagen生成器 | https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/datagen/ |
print 生成器 | https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/table/print/ |
準備工作
優點就是下載個idea就能體驗,無需配置的環境(如 數據源等)
1、idea 開發工具
2、創建 maven 項目 – archetype 選擇quickstart 表示java開發
java代碼
代碼邏輯
1、采用datagen 生成器,作為數據 source
2、采用print 作為打印器,作為sink 直接輸出
package org.example;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class FlinkSqlDemo {public static void main(String[] args) throws Exception {// 創建執行環境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);// 創建輸入表(使用DataGen生成測試數據)String sourceDDL = "CREATE TABLE user_behavior (\n" +" user_id BIGINT,\n" +" behavior STRING,\n" +" ts TIMESTAMP(3)\n" +") WITH (\n" +" 'connector' = 'datagen',\n" +" 'rows-per-second' = '1',\n" +" 'fields.user_id.kind' = 'random',\n" +" 'fields.user_id.min' = '1',\n" +" 'fields.user_id.max' = '2',\n" +" 'fields.behavior.length' = '2'\n" +")";// 創建輸出表(打印結果)String sinkDDL = "CREATE TABLE print_table (\n" +" behavior STRING,\n" +" cnt BIGINT\n" +") WITH (\n" +" 'connector' = 'print'\n" +")";// 執行DDLtableEnv.executeSql(sourceDDL);tableEnv.executeSql(sinkDDL);// 執行查詢并插入結果Table resultTable = tableEnv.sqlQuery("SELECT behavior, COUNT(*) AS cnt " +"FROM user_behavior " +"GROUP BY behavior");// 插入到輸出表resultTable.executeInsert("print_table").await();// 執行任務(流式任務需要保持運行)env.execute("Flink SQL Demo");}
}
pom依賴配置
`
4.0.0
<groupId>org.example</groupId>
<artifactId>flinklearn</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><flink.version>1.17.1</flink.version>
</properties><dependencies><!-- Flink Core --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><!-- Flink實時流--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><!-- Flink Table --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>com.ververica</groupId><artifactId>flink-sql-connector-mysql-cdc</artifactId><version>2.0.1</version></dependency>
</dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.2.4</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><transformers><transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>com.example.FlinkSqlDemo</mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins>
</build>