一步一個腳印,一天一道大數據面試題
博主希望能夠得到大家的點贊收,藏支持!非常感謝~
點贊,收藏是情分,不點是本分。祝你身體健康,事事順心!
我們來看看 Flink SQL
大概流程和樣例:
流程:
1.創建 流處理環境 StreamExecutionEnvironment env
2.創建 表環境 StreamTableEnvironment.create(env);
3.創建 source
表,sink
表
4.用 table API 編寫查詢 SQL(返回 Table
對象)
5.執行 sink executeInsert("sink")
代碼樣例:
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import static org.apache.flink.table.api.Expressions.$;public class SqlDemo2 {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 1.創建表環境// 1.1 方法 1
// EnvironmentSettings settings = EnvironmentSettings.newInstance()
// .inStreamingMode()
// .build();
// TableEnvironment tableEnv = TableEnvironment.create(settings);// 1.2 方法 2StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);// 創建表// 用 datagen 生成隨機數據作為 sourcetableEnv.executeSql("CREATE TABLE source (\n" +" id INT\n" +" ,ts BIGINT\n" +" ,vc INT\n" +") WITH (\n" +" 'connector' = 'datagen'\n" +" ,'rows-per-second'='1'\n" +" ,'fields.id.kind'='random'\n" +" ,'fields.id.min'='1'\n" +" ,'fields.id.max'='10'\n" +" ,'fields.ts.kind'='sequence'\n" +" ,'fields.ts.min'='1'\n" +" ,'fields.ts.max'='1000000'\n" +" ,'fields.vc.kind'='random'\n" +" ,'fields.vc.min'='1'\n" +" ,'fields.vc.max'='100'\n" +");\n");tableEnv.executeSql("CREATE TABLE sink(\n" +" id INT,\n" +" sumVC INT,\n" +") WITH (\n" +"'connector'='print'\n" +");\n");// 執行查詢Table source = tableEnv.from("source");Table select = source.where($("id").isGreater(5)).groupBy($("id")).aggregate($("vc").sum().as("sumVC")).select($("id"), $("sumVC"));// 執行 sinkselect.executeInsert("sink");}
}
運行截圖:
我是近未來,祝你變得更強!