你好,我是 shengjk1,多年大廠經驗,努力構建 通俗易懂的、好玩的編程語言教程。 歡迎關注!你會有如下收益:
- 了解大廠經驗
- 擁有和大廠相匹配的技術等
希望看什么,評論或者私信告訴我!
文章目錄
- 一、前言
- 二、Flink 代碼優化
- 2.0 問題發現
- 2.1 原有代碼
- 2.2 CompletableFuture 優化
- 三、avatorscript 使用的簡單介紹
- 3.1 自定義函數
- 3.2 從 Map 中取值
- 3.3 使用 Java 的工具類
- 3.4 AviatorScript 函數
- 四、總結
一、前言
目前 Flink 利用 avatorscript 腳本語言,來做到規則的自動化更新。avatorscript將表達式直接翻譯成對應的 java 字節碼執行,所以在大數據量的情況下,自然而然這里就成為了瓶頸
二、Flink 代碼優化
2.0 問題發現
通過 Flink UI 發現 window 算子是瓶頸,而 window 算子的核心就是 avatorscript 表達式
2.1 原有代碼
xxx
AviatorEvaluator.execute(columnFunction, dataView.getProperties(), true);
xxx
經過測試平均執行時間在1毫秒以內,但經不住數據量大,所以Flink QPS一直在 11w 左右
2.2 CompletableFuture 優化
xxx
List<CompletableFuture> executeFutures=new ArrayList<>();CompletableFuture<Object> executeFuture = CompletableFuture.supplyAsync(() -> {return AviatorEvaluator.execute(columnFunction, dataView.getProperties(), true);});
executeFutures.add(executeFuture);for (int i = 0; i < executeFutures.size(); i++) {executeFutures.get(i).get()xxxx
}
修改完上線后,Flink QPS 有原來 11W 增加到 17W 左右
三、avatorscript 使用的簡單介紹
為了讓你更容易理解 avatorscript,這里我們也可以先簡單的介紹一下:
3.1 自定義函數
class AddFunction extends AbstractFunction {@Overridepublic AviatorObject call(Map<String, Object> env,AviatorObject arg1, AviatorObject arg2) {Number left = FunctionUtils.getNumberValue(arg1, env);Number right = FunctionUtils.getNumberValue(arg2, env);return new AviatorDouble(left.intValue() + right.intValue());}public String getName() {return "add" ;}
}public static void main(String[] args) throws IllegalAccessException, NoSuchMethodException {//注冊函數AviatorEvaluator.addFunction(new AddFunction());System.out.println(AviatorEvaluator.execute( "add(2,1)" ));
}
3.2 從 Map 中取值
public static void main(String[] args) throws IllegalAccessException, NoSuchMethodException {//注冊函數AviatorEvaluator.addFunction(new AddFunction());HashMap<String, Object> stringObjectHashMap = new HashMap<>();stringObjectHashMap.put( "testId1" , 1);stringObjectHashMap.put( "testId2" , 2);Object execute = AviatorEvaluator.execute( "add(testId1,testId2)" , stringObjectHashMap);
3.3 使用 Java 的工具類
public static void main(String[] args) throws IllegalAccessException, NoSuchMethodException {HashMap<String, Object> stringObjectHashMap = new HashMap<>();stringObjectHashMap.put( "ip" , "a1111" );// stringObjectHashMap.put("result", "a&B&C&d");stringObjectHashMap.put( "voucher_endtime" , "2022.03.02 11:32" );stringObjectHashMap.put( "imei2" , "v1aaaaaa1" );stringObjectHashMap.put( "testId" , "v1ot_service_quality_1111" );stringObjectHashMap.put( "testId1" , "sku" );stringObjectHashMap.put( "a" , "123" );stringObjectHashMap.put( "a1" , "null" );stringObjectHashMap.put( "b1" , 123);AviatorEvaluator.addStaticFunctions( "doubleStatic" , Double.class);AviatorEvaluator.addInstanceFunctions( "doubleInstance" , Double.class)execute2 = AviatorEvaluator.execute( "(doubleStatic.valueOf(sys_net_bandwidth))" , stringObjectHashMap);System.out.println(execute2);execute2 = AviatorEvaluator.execute( "doubleInstance.longValue(doubleStatic.valueOf(sys_net_bandwidth)) " , stringObjectHashMap);System.out.println( "###" + execute2);execute2 = AviatorEvaluator.execute( "doubleInstance.longValue(doubleStatic.valueOf(str(voucher)))" , stringObjectHashMap);
3.4 AviatorScript 函數
## examples/function.av
fn add(x, y) {return x + y;
}
p(add(1,2))
public static void main(String[] args) throws IllegalAccessException, NoSuchMethodException {String function = "## examples/function.av\n" +"\n" +"fn add(x, y) {\n" +" return x + y;\n" +"}" ;AviatorEvaluator.defineFunction( "add" , function);System.out.println( "defineFunction6666================+" + AviatorEvaluator.execute( "add(1,2)" , stringObjectHashMap));
}
四、總結
本文主要介紹了 Flink 中使用 avatorscript 腳本語言的問題,以及如何通過 CompletableFuture 優化代碼來提高 Flink QPS。同時,還介紹了 avatorscript 的使用方法,包括自定義函數、從 Map 中取值、使用 Java 工具類和 AviatorScript 函數。通過本文的介紹,讀者可以更好地了解 Flink 中 avatorscript 的使用方法,以及如何優化代碼來提高 Flink QPS。