如果對于框架的介紹不感興趣的可以直接跳到Getting Started快速開始
在設計一款數據加載編排框架時,除了任何框架都必須具備的可靠性與穩定性之外,對于本次編排框架的設計,我們把核心目標放在高性能與易用性上。這不僅要求框架能夠快速、高效地完成數據加載任務,更要讓使用者將精力集中在業務邏輯的實現上,而無需為緩存細節或單次調用性能反復操心。受到設計模式的啟發,我采用了一種全新的思路,并行Builder,
設計目標
易用性
框架在易用性上的設計理念是:讓使用者編寫業務步驟時像“施工工人”一樣簡單。用戶只需專注于當前節點的任務及其依賴,其他諸如超時控制、重試策略、降級處理、生命周期鉤子、動態任務提交等通用能力,均由框架自動完成,同時使用者也不需要編寫額外的流程表達式腳本。
為了實現這種體驗,我們采用了隱式約定的方式,給予用戶極大的自由度,同時簡化了依賴數據的獲取與調用鏈路。框架支持 Builder 模式,將整個結果構建過程抽象為“大樓施工”:每個節點是一個工人,節點間的依賴關系就是施工順序。這樣既直觀又易于維護,能覆蓋 90% 以上的使用場景。
對于更復雜、靈活的需求,我們還提供了低層級 API,允許用戶自定義甚至擴展框架功能。框架還會記錄節點運行時的附加信息(如耗時、重試次數等),以便做調用監控或動態決策。
高性能
我們的框架另外一個核心目標就是高性能,把性能做到極致,性能優化貫穿了框架實現的各個細節。
-
依賴就緒即執行:節點在所有依賴完成后立即開始工作,最大化并行度。
-
減少鎖與對象創建:盡可能避免鎖、匿名類與多余對象,盡可能減少用戶態/內核態切換和減輕 JVM 垃圾回收壓力。
-
無腳本性能損耗:不引入額外腳本語言,無需額外的編譯,直接用高性能原生代碼執行。
-
結果共享與懶加載:同一節點只執行一次,其結果可被多個依賴節點復用;無需執行的節點不會創建任務,也不會進入線程池。
-
線程池可定制:用戶可根據調用信息優化線程池配置,精細控制并發性能。
編排框架的“不可能三角”
在通用性、易用性(或易維護性)、復用性這三個維度上,幾乎不存在一個編排框架能三者兼得。
易用性:易用性意味著低學習成本和直觀的 API 設計,無需額外腳本或復雜配置文件即可完成開發。
通用性:通用性要求框架能夠在不同場景、不同流程下正常運行。例如,如果一個流程是固定的(像智能洗衣機洗衣服那樣),那么針對這個場景的專用框架會很簡單。但現實中,業務流程經常變化,甚至同一團隊內部也可能有多個差異很大的流程,因此通用性對框架來說往往是必需的。
復用性:復用性強調組件可以跨流程復用。
根據我在編排框架的學習和設計中,發現在自定義流程的編排框架的設計中,易用性與復用性天然存在沖突,如果要實現高度復用,組件必須對流程無感,這會讓它無法確定數據來源與結果去向,從而需要引入流程層的變量綁定、條件判斷等配置語言。這類腳本復雜度隨著流程增長而急劇上升,不僅維護困難,還缺乏 IDE 的智能支持。
因此,我們在設計時選擇部分放棄組件復用性,換取簡單易用的開發體驗:
-
每個組件知道自己處在什么位置、需要什么數據、產出什么結果;
-
組件邏輯更單純,減少為了兼顧多場景而加入的復雜判斷;
-
代碼可維護性更高,減少迭代中的邏輯腐化風險。
對于重復邏輯,我們通過公共方法封裝來消除冗余。同時,借助 Builder 模式,我們仍保留了一定的復用能力,通過抽象初始化參數與返回結果來實現組件的共享化。
富節點與窮節點
在對于節點設計上,參考DDD理論,我區分了兩種模式:
-
窮節點:只關心自身業務邏輯,其他的依賴關系、兜底策略、超時、重試等全部交給框架。這種方式節點解耦好,但數據傳遞與流程表達復雜度高,尤其在總流程龐大時維護成本極高。比較適合在強調組件復用的框架中使用。
-
富節點:節點同時管理自身的依賴、容錯、超時、重試等邏輯。雖然單個節點更重、復用性降低,但整體流程更直觀,數據傳遞簡單,易于可視化和性能優化,尤其在流程本身很龐大復雜的時候更能體現其優勢。
本框架采用富節點設計思路,因為它不僅優化了可讀性與性能,還能輕松實現運行時的動態流程調整。
框架組件
我們的框架設計本身就很簡單,很多復雜性已經封裝在了框架內部,并且采用了大量默認配置,只有在真正需要的時候才去設置他們,使用上非常方便,核心組件也很簡單。這里我們只介紹框架的應用層組件,底層組件一般用于做二次拓展時才會用到這里不做介紹,
ParallelDataBuilder:負責管理和執行整個流程,主要加入流程中的各個節點,需要時候要能設置超時時間,節點鉤子函數,自定義執行線程池等等,還提供檢查節點是否有循環依賴的方法,提高代碼安全性,ParallelDataBuilder是可以復用并且并發安全的,添加了節點和進行了必要配置后就能傳入初始參數調用它的構建方法,開啟整個流程
ProcessNode:執行每個步驟的節點,主要需要定義名稱,執行的工作內容,依賴的節點名稱等,如果有需要還可以定義其重試次數,自定義重試判斷,是否需要加載判斷(用于實現分支的效果)等等,為了進一步簡化開發者使用,框架在processNode的工作參數暴露了流程初始化參數,最終返回結果,盡管這些都可以在LoadContext中獲取,
LoadContext:context是一個自動化的組件,用戶無需自己顯式地創建,在ProcessNode直接使用就可以了,LoadContext提供了幾乎ProcessNode可能需要的所有功能,包括其他節點的運行結果,流程的啟動參數,讀寫本次流程的共享變量,動態提交其他任務等等
框架結構和流程設計
我們的框架會按照各個節點的依賴關系自動生成最終的任務圖,并且每個任務的依賴都完成后都會盡可能快地去執行,大體流程如下
每個節點都會在所有依賴節點執行完成后執行,直到所有節點都完成后整個流程才返回。
其中上下文存儲了這個流程的幾乎所有信息,包括每個節點的執行結果,執行信息,節點的共享變量等等,每個節點都可以在上下文中獲取到自己想要的信息,設置共享變量,以及提交任務等。
Builder模式下,會暴露一個構造目標給到所有節點,這個構造目標是不可修改的,每個節點都參與對這個目標的構建,主要就是修改它的字段內容,最終一起完成這個目標的構建,這個構造目標需要在流程開始時傳入。
Builder可以設置鉤子鏈,鉤子鏈由一個個鉤子函數組成,他們會在這些節點進入某些特定生命周期時候運行,包括執行前,執行完成,拋出異常,重試等等。主要用于對節點做統一管理,減少重復代碼。
額外能力支持
重試:支持自動化重試和自定義重試,節點可以編寫最大重試次數(1為不重試),也可以自定義重試判斷邏輯。
超時:總體流程支持超時設置,超時后拋出異常。
降級:支持節點編寫兜底方法,執行異常并且重試失敗后進入降級邏輯。
鉤子:支持為Builder加入鉤子,在流程或者各個節點進入相應生命周期時候執行。
動態提交任務:框架支持節點在執行時候動態提交異步任務,并且可以選擇堵塞式和非堵塞式,堵塞式異步任務雖然不會堵塞當前節點的工作,但是總流程會得到這個任務完成后才返回。
循環依賴檢測:框架也為builder提供自查循環依賴的能力,檢查是否有循環依賴的節點,避免造成系統性風險。
嵌套:框架提供了對流程之間嵌套的支持。
支持spring:builder提供了對spring的支持,可以通過bean name直接加入節點,
詳細信息:框架提供了這流程和每個節點的具體運行詳情,比如運行時間,重試次數等
作品信息
github:https://github.com/oraen/oraen-box
maven:?Maven Central: com.oraen.box:oraen-box-loader
docs:this
Getting Started
完成了介紹后,讓我開始使用吧,這里會通過一個demo展示怎么使用這個框架。
demo目標
本次簡單demo的假設需要從獲取用戶訂單詳情列表,需要調用四個接口,分別用于獲取用戶信息,用戶訂單,訂單詳情(不支持批量調用),定位接口。流程如下。
傳參對象
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
class Param{String token;String lat;String lng;
}
返回對象
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
class Resp{String cityId;String userName;Long userId;String lat;String lng;List<OrderDetail> orderList;@Data@Builder@NoArgsConstructor@AllArgsConstructorpublic static class OrderDetail{Long orderId;Long orderCreateTime;}
}
需求分析
根據demo的需求,我們需要根據token獲取到用戶具體信息,同時通過經緯度獲取定位城市,在獲取到用戶信息后需要通過訂單接口獲取到用戶的所有訂單,然后每個訂單都需要單獨調用訂單詳情接口獲取到訂單詳情。前面都可以設置單獨的節點完成,但是我們不知道訂單的數量,最后一步只能通過動態提交任務的方式來完成(相當于動態創建節點),我們這里通過builder的方式來完成這個目標
引入依賴:目前可以通過maven直接引入,各個版本的優化可以在github的commit信息查看
<dependency><groupId>com.oraen.box</groupId><artifactId>oraen-box-loader</artifactId><version>1.2.1-RELEASE</version> </dependency>
編寫獲取用戶信息節點
編寫一個節點,負責調用服務解析token,獲取用戶信息,名字可以取名為getUserInfo,不依賴其他節點,繼承ProcessNode<Param, Resp>并且實現主要方法。
class GetUserInfoNode implements ProcessNode<Param, Resp> {@Overridepublic Object process(Param param, Resp resp, LoadContext context) {String token = param.getToken();//mock解析token操作//直接給最終要返回的結果設置值resp.setUserId(20L);resp.setUserName("corki");//builder模式下一般用不到節點的返回結果,可以返回nullreturn null;}@Overridepublic String name() {return "getUserInfo";}@Overridepublic List<String> dependencies() {return Collections.emptyList();}
}
編寫用戶定位節點
同樣編寫一個節點,獲取用戶所在地,名字可以取名為getMapInfo,不依賴其他節點
class GetMapInfoNode implements ProcessNode<Param, Resp> {@Overridepublic Object process(Param param, Resp resp, LoadContext context) {String lat = param.getLat();String lng = param.getLng();//mock解析經緯度...resp.setCityId("211");return null;}@Overridepublic String name() {return "getMapInfo";}@Overridepublic List<String> dependencies() {return Collections.emptyList();}
}
編寫獲取用戶訂單節點
編寫一個負責獲取用戶訂單的節點,他需要依賴獲取用戶信息節點getUserInfo完成才能執行,并且獲取到用戶訂單后還需要獲取所有訂單的訂單詳情
class GetUserOrderNode implements ProcessNode<Param, Resp> {@Overridepublic Object process(Param param, Resp resp, LoadContext context) {//mock獲取用戶的所有訂單List<Resp.OrderDetail> orderDetails = new ArrayList<>();for(int i = 0; i < 4; i ++){orderDetails.add(Resp.OrderDetail.builder().orderId(i + 1000L).build());}resp.setOrderList(orderDetails);//異步獲取各訂單的詳情,不堵塞當前節點,但是需全部完成后主流程才能完成for(Resp.OrderDetail orderDetail : orderDetails){//調用context的submitTask的方法用于提交任務,true標識堵塞主流程context.submitTask(() -> {orderDetail.setOrderCreateTime(1000000 + orderDetail.getOrderId());}, true);}return null;}@Overridepublic String name() {return "getUserOrder";}@Overridepublic List<String> dependencies() {return ListUtil.of("getUserInfo");}
}
創建Builder
現在我們已經完成了所有節點的編碼了,現在就需要創建一個Builder,并且吧這些節點全部加入這個Builder上
ParallelDataBuilder<Param, Resp> builder = new ParallelDataBuilder<Param, Resp>()//添加工作節點,可以根據自己編碼習慣一行加入單個或者多個.addNodes(new GetUserInfoNode(), new GetMapInfoNode()).addNodes(new GetUserOrderNode())//設置超時時間.setExecTimeout(1000L)//確保節點之間沒出現循環依賴,.ensure();
初始化參數和初始化構建目標對象,執行后打印結果
Param initParam = Param.builder().token("asdasdasdasd").lat("18.444369").lng("-97.3794933").build();Resp resp = Resp.builder().lat(initParam.lat).lng(initParam.lng).build();builder.buildResp(initParam, resp);System.out.println(JSONUtil.toJson(resp));
打印結果
{
? ? "cityId": "211",
? ? "userName": "corki",
? ? "userId": 20,
? ? "lat": "18.444369",
? ? "lng": "-97.3794933",
? ? "orderList":
? ? [
? ? ? ? {
? ? ? ? ? ? "orderId": 1000,
? ? ? ? ? ? "orderCreateTime": 1001000
? ? ? ? },
? ? ? ? {
? ? ? ? ? ? "orderId": 1001,
? ? ? ? ? ? "orderCreateTime": 1001001
? ? ? ? },
? ? ? ? {
? ? ? ? ? ? "orderId": 1002,
? ? ? ? ? ? "orderCreateTime": 1001002
? ? ? ? },
? ? ? ? {
? ? ? ? ? ? "orderId": 1003,
? ? ? ? ? ? "orderCreateTime": 1001003
? ? ? ? }
? ? ]
}
整體demo代碼
package test.oraen.box.loader.loader;import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import oraen.box.common.util.JSONUtil;
import oraen.box.common.util.ListUtil;
import oraen.box.loader.LoadContext;
import oraen.box.loader.extend.ParallelDataBuilder;
import oraen.box.loader.extend.ProcessNode;
import org.junit.jupiter.api.Test;import java.util.ArrayList;
import java.util.Collections;
import java.util.List;public class SimTest {@Testpublic void test() throws Exception {ParallelDataBuilder<Param, Resp> builder = new ParallelDataBuilder<Param, Resp>()//添加工作節點,可以根據自己編碼習慣一行加入單個或者多個.addNodes(new GetUserInfoNode(), new GetMapInfoNode()).addNodes(new GetUserOrderNode())//設置超時時間.setExecTimeout(1000L)//確保節點之間沒出現循環依賴,.ensure();Param initParam = Param.builder().token("asdasdasdasd").lat("18.444369").lng("-97.3794933").build();Resp resp = Resp.builder().lat(initParam.lat).lng(initParam.lng).build();builder.buildResp(initParam, resp);System.out.println(JSONUtil.toJson(resp));}}@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
class Param{String token;String lat;String lng;
}@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
class Resp{String cityId;String userName;Long userId;String lat;String lng;List<OrderDetail> orderList;@Data@Builder@NoArgsConstructor@AllArgsConstructorpublic static class OrderDetail{Long orderId;Long orderCreateTime;}
}class GetUserInfoNode implements ProcessNode<Param, Resp> {@Overridepublic Object process(Param param, Resp resp, LoadContext context) {String token = param.getToken();//mock解析token操作//直接給最終要返回的結果設置值resp.setUserId(20L);resp.setUserName("corki");//builder模式下一般用不到節點的返回結果,可以返回nullreturn null;}@Overridepublic String name() {return "getUserInfo";}@Overridepublic List<String> dependencies() {return Collections.emptyList();}
}class GetMapInfoNode implements ProcessNode<Param, Resp> {@Overridepublic Object process(Param param, Resp resp, LoadContext context) {String lat = param.getLat();String lng = param.getLng();//mock解析經緯度...resp.setCityId("211");return null;}@Overridepublic String name() {return "getMapInfo";}@Overridepublic List<String> dependencies() {return Collections.emptyList();}
}class GetUserOrderNode implements ProcessNode<Param, Resp> {@Overridepublic Object process(Param param, Resp resp, LoadContext context) {//mock獲取用戶的所有訂單List<Resp.OrderDetail> orderDetails = new ArrayList<>();for(int i = 0; i < 4; i ++){orderDetails.add(Resp.OrderDetail.builder().orderId(i + 1000L).build());}resp.setOrderList(orderDetails);//異步獲取各訂單的詳情,不堵塞當前節點,但是需全部完成后主流程才能完成for(Resp.OrderDetail orderDetail : orderDetails){//調用context的submitTask的方法用于提交任務,true標識堵塞主流程context.submitTask(() -> {orderDetail.setOrderCreateTime(1000000 + orderDetail.getOrderId());}, true);}return null;}@Overridepublic String name() {return "getUserOrder";}@Overridepublic List<String> dependencies() {return ListUtil.of("getUserInfo");}
}
最后
至此已經完成了框架的介紹,如果有其他問題可以聯系1543493541@qq.com或者oraen1998@gmail.com