SpringBatch處理數據性能優化

SpringBatch的Step默認使用同步方式批量處理數據,也可以通過配置將讀數改為同步,處理和寫入改為異步方式。

1、同步處理Step

SpringBatch的Step一般由ItemReader、ItemProcessor和ItemWriter組成,其中ItemProcessor是可選的。他的設計思路的通過ItemReader讀取一條數據之后,匯總到inputs中,當達到chunkSize數量時,使用ItemProcessor處理數據,然后使用ItemWriter寫入。

這個過程都是同步的操作,不存在異步的過程。實際業務處理過程中,數據一般來源于數據庫,如果每次只讀取一條數據,效率比較低,可以采用批量讀取數據,單條返回的方式提高效率。如:DataReader通過游標批量讀取數據

public class DataReader implements ItemReader<InputType> {// 游標記錄上個批次讀取的數據private long lastRowId = 0;// 單次數據庫讀取的數據量private int batchSize;// 數據緩存迭代器private Iterator<InputType> cacheIterator;public DataReader(int batchSize) {this.batchSize = batchSize;}@Overridepublic InputType read() throws Exception {if (cacheIterator == null || !cacheIterator.hasNext()) {// 使用游標方式批量查詢數據庫List<InputType> batchList = ....if (batchList == null || batchList.isEmpty()) {return null; // 讀取結束}// 更新lastRowId為當前批次最后一條的rowIdlastRowId = batchList.get(batchList.size() - 1).getRowId();cacheIterator = batchList.iterator();}// 迭代器返回一條數據return cacheIterator.next();}
}

DataProcessor將讀取的數據做業務處理,轉化為OutputType類型數據傳遞給ItemWriter

public class DataProcessor implements ItemProcessor<InputType, OutputType> {@Overridepublic OutputType process(InputType item) throws Exception {// 處理item,轉換為OutputTypereturn output;}
}

DataWriter中寫入OutputType類型數據到數據庫

public class DataWriter implements ItemWriter<OutputType> {@Overridepublic void write(List<? extends OutputType> items) throws Exception {// 寫入數據都數據庫。。。}
}

配置同步Step

return stepBuilderFactory.get("step1").<InputType, OutputType> chunk(100)  // 每100條數據為一個批次,執行processor和writer.reader(new DataReader(1000)) // 數據庫每次讀取1000條.processor(new DataProcessor()).writer(new DataWriter()).build();

2、異步處理Step

在大數據量的批處理系統中,希望盡可能地提高性能,這時可以將ItemProcossor和ItemWriter環節采用異步多線程的方式進行優化,這是需要將ItemProcossor和ItemWriter分別包裝為AsyncItemProcessor和AsyncItemWriter,如下方法可以實現包裝:

private <I, O> AsyncItemProcessor<I, O> wrapAsyncProcessor(ItemProcessor<I, O> processor,TaskExecutor taskExecutor) {AsyncItemProcessor<I, O> asyncItemProcessor = new AsyncItemProcessor<>();asyncItemProcessor.setDelegate(processor);asyncItemProcessor.setTaskExecutor(taskExecutor);return asyncItemProcessor;
}private <O> AsyncItemWriter<O> wrapAsyncWriter(ItemWriter<O> writer) {AsyncItemWriter<O> asyncItemWriter = new AsyncItemWriter<>();asyncItemWriter.setDelegate(writer);return asyncItemWriter;
}

配置異步Step

private Step step2() {AsyncItemProcessor<PayOrderPo, PayOrderPo> asyncItemProcessor =wrapAsyncProcessor(new DataProcessor(), getAsyncExecutor("TestJobPool"));AsyncItemWriter<PayOrderPo> asyncItemWriter = wrapAsyncWriter(new DataWriter());return stepBuilderFactory.get("step2").<PayOrderPo, Future<PayOrderPo>> chunk(500).reader(new DataReader(1000)).processor(asyncItemProcessor).writer(asyncItemWriter).build();
}
private TaskExecutor getAsyncExecutor(String threadPoolName) {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(4);executor.setMaxPoolSize(8);executor.setQueueCapacity(200);executor.setKeepAliveSeconds(60);executor.setThreadNamePrefix(threadPoolName + "-");executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());executor.setAllowCoreThreadTimeOut(true);executor.initialize();return executor;
}

AsyncItemProcessor使用了代理模式,內部代理到ItemProcessor進行實際數據處理,通過taskExecutor線程池異步高性能處理數據

public class AsyncItemProcessor<I, O> implements ItemProcessor<I, Future<O>>, InitializingBean {// 代理的ItemProcessorprivate ItemProcessor<I, O> delegate;private TaskExecutor taskExecutor = new SyncTaskExecutor();public void afterPropertiesSet() throws Exception {Assert.notNull(delegate, "The delegate must be set.");}@Nullablepublic Future<O> process(final I item) throws Exception {final StepExecution stepExecution = getStepExecution();FutureTask<O> task = new FutureTask<>(new Callable<O>() {public O call() throws Exception {if (stepExecution != null) {StepSynchronizationManager.register(stepExecution);}try {// 代理的processor實際處理數據return delegate.process(item);}finally {if (stepExecution != null) {StepSynchronizationManager.close();}}}});// 提交異步任務taskExecutor.execute(task);return task;}
}

處理的過程如下:

  1. 創建FutureTask,在其線程中實際調用 process(item) 方法進行數據處理。
  2. 通過 TaskExecutor 異步執行任務FutureTask
  3. 返回 Future 對象給Writer來跟蹤異步結果。

AsyncItemWriter同樣使用了代理模式,代理到實際處理數據的ItemWriter,主要通過兩個步驟進行:

1、獲取processor環境的異步處理結果

2、匯總結果到實際的ItemWriter進行數據寫入

public class AsyncItemWriter<T> implements ItemStreamWriter<Future<T>>, InitializingBean {// 代理的ItemWriterprivate ItemWriter<T> delegate;public void write(List<? extends Future<T>> items) throws Exception {// 用于保存異步結果List<T> list = new ArrayList<>();// 獲取異步結果for (Future<T> future : items) {try {T item = future.get();if(item != null) {list.add(future.get());}}catch (ExecutionException e) {Throwable cause = e.getCause();if(cause != null && cause instanceof Exception) {logger.debug("An exception was thrown while processing an item", e);throw (Exception) cause;}else {throw e;}}}// 代理到實際的ItemWriter進行數據寫入delegate.write(list);}
}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/bicheng/86814.shtml
繁體地址,請注明出處:http://hk.pswp.cn/bicheng/86814.shtml
英文地址,請注明出處:http://en.pswp.cn/bicheng/86814.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

【機器學習深度學習】前饋神經網絡(單隱藏層)

目錄 一、什么是前饋神經網絡&#xff1f; 二、數學表達式是什么&#xff1f; 三、為什么需要“非線性函數”&#xff1f; 四、NumPy 實現前饋神經網絡代碼示例 五、 運行結果 六、代碼解析 6.1 初始化部分 6.2 前向傳播 6.3 計算損失&#xff08;Loss&#xff09; 6…

設計模式系列(08):創建型模式 - 原型模式

系列導讀&#xff1a;完成創建型模式的學習&#xff0c;我們來看最后一個創建型模式——原型模式。它通過復制已有對象來創建新對象&#xff0c;是一種獨特的創建方式。 解決什么問題&#xff1a;通過復制現有對象來創建新對象&#xff0c;而不是重新實例化。適用于對象創建成本…

區塊鏈到底是什么?

區塊鏈本質上是一種去中心化的分布式賬本技術&#xff0c;具有以下核心特點&#xff1a; - 去中心化&#xff1a;沒有中央管理機構&#xff0c;數據由網絡中的多個節點共同維護&#xff0c;比如比特幣網絡中各個節點都保存著完整賬本。 - 分布式存儲&#xff1a;數據不是存在一…

系統架構設計師論文分享-論ATAM的使用

我的軟考歷程 摘要 2023年2月&#xff0c;我司通過了研發紗線MES系統的立項&#xff0c;該系統為國內紗線工廠提供SAAS服務&#xff0c;旨在提高紗線工廠的數字化和智能化水平。我在本項目中擔任系統架構設計師&#xff0c;負責整個項目的架構設計工作。本文結合我在該項目中…

vue-28(服務器端渲染(SSR)簡介及其優勢)

服務器端渲染&#xff08;SSR&#xff09;簡介及其優勢 服務器端渲染&#xff08;SSR&#xff09;是現代網絡應用的關鍵技術&#xff0c;特別是使用 Vue.js 等框架構建的應用。它通過在服務器上渲染初始應用狀態來彌補傳統單頁應用&#xff08;SPA&#xff09;的局限性&#x…

工業電子 | 什么是SerDes,為何工業和汽車應用需要它?

重點內容速覽&#xff1a; 1. 什么是SerDes&#xff1f; 2. ADI&#xff1a;私有協議的GMSL將向公有協議轉變 3. TI&#xff1a;工業和汽車有兩套SerDes解決方案 4. Microchip&#xff1a;推出通用協議SerDes芯片 5. 羅姆&#xff1a;主要針對汽車領域 6. 國產SerDes芯…

大事件項目記錄4-用戶接口開發-更新用戶基本信息

4&#xff09;更新用戶基本信息。 UserController.java&#xff1a; UserMapper.java&#xff1a; Update("update user set nickname #{nickname},email #{email},update_time #{updateTime} where id #{id}")void update(User user); UserServiceInterface…

Transformer結構--輸入編碼(BPE,PE)

在Transformer結構中&#xff0c;輸入編碼是模型處理文本數據的關鍵步驟&#xff0c;其中**BPE&#xff08;Byte Pair Encoding&#xff0c;字節對編碼&#xff09;和PE&#xff08;Positional Encoding&#xff0c;位置編碼&#xff09;**是兩種重要的編碼方式&#xff0c;它們…

Confluence-測試用例設計指導方法

測試經驗知識庫 典型的測試場景驗證點各個項目有價值的經驗和測試點 測試經驗知識庫 - 草稿測試用例執行量化指導建議 何時需要進行全量測試和如何定義和執行測試用例量的一些建議和標準 端對端&#xff08;E2E&#xff09;測試用例設計指導方案 在測試行業中&#xff0c;端到端…

淺析JVM

一、JVM運行流程 如圖&#xff1a; JVM由四個部分構成&#xff1a; 1.類加載器 加載類文件到內存2.運行時數據區 寫的程序需要加載到這里才能運行3.執行引擎 負責解釋命令&#xff0c;提交操作系統執行4.本地接口 融合不同編程語言為java所用&#xff0c;如Java程序驅動打印…

多個 Job 并發運行時共享配置文件導致上下文污染,固化 Jenkins Job 上下文

基于 context.py 固化 Jenkins Job 上下文的完整方案&#xff0c;適用于你當前的工作流&#xff08;Python Jenkins Pipeline&#xff09;&#xff0c;解決&#xff1a; 多個 Job 并發運行時共享配置文件導致上下文污染&#xff1b;讀取環境變量或 JSON 文件時被其他 Job 修改…

簡木易支付系統 功能齊全,對接接口超多

簡木易支付系統&#xff0c;作為一款引領行業潮流的卓越支付解決方案&#xff0c;依托先進的 PHP MySQL 技術架構精心打造。在開發過程中&#xff0c;它巧妙運用了功能強大的 ThinkPHP8 框架&#xff0c;完美融合前端主流技術 Vue、Element 以及 Layuiadmin&#xff0c;共同鑄…

【軟考高項論文】信息系統項目的人力資源管理

摘要 本文圍繞信息系統項目的人力資源管理展開論述。以我在2024年參與的為大型國有企業構建供應鏈管理系統項目為例&#xff0c;闡述了項目人力資源管理的主要流程&#xff0c;包括規劃、組建、建設和管理團隊四個過程&#xff0c;以及所運用的工具和理論。同時&#xff0c;分…

【EI會議征稿】東北大學主辦第三屆機器視覺、圖像處理與影像技術國際會議(MVIPIT 2025)

一、會議信息 大會官網&#xff1a;www.mvipit.org 官方郵箱&#xff1a;mvipit163.com 會議地點&#xff1a;遼寧沈陽 主辦單位&#xff1a;東北大學 會議時間&#xff1a;2025 年 9 月 27 日-9 月 29 日 二、征稿主題 集中但不限于“機器視覺、圖像處理與影像技術”等其…

從零開始的云計算生活——第二十三天,稍作休息,Tomcat

目錄 一.故事背景 二.Tomcat概述 1、Tomcat介紹 2、Tomcat歷史 二、Tomcat原理分析 1、Http工作原理 2、Tomcat整體架構 3、Coyote連接器架構 4、Catalina容器架構 5、Jasper處理流程 6、JSP編譯過程 7、Tomcat啟動流程 8、Tomcat請求處理流程 三、Tomcat安裝與配…

幾種基于Doherty結構的GAN氮化鎵功放設計方法介紹

功率放大器是現代無線通信系統中最重要的組件之一。理想情況下&#xff0c;它們能夠以高線性度和高效率提供高輸出功率。但通常在這三個關鍵的功率放大器性能參數之間需要進行權衡取舍&#xff0c;而且具有最高輸出功率和線性度的放大器往往會犧牲效率。 在支持寬帶寬和高數據…

前端打印計算單位 cm、mm、px

A4 縱向 寬&#xff1a;21cm&#xff0c;210mm&#xff0c;793.698px 高&#xff1a;29.7cm&#xff0c;297mm&#xff0c;1122.520px A4 橫向 寬&#xff1a;29.7cm&#xff0c;297mm&#xff0c;1122.520px 高&#xff1a;21cm&#xff0c;210mm&#xff0c;793.698px …

c# sugersql 獲取子表數據排序

在C#中使用Sugar ORM&#xff08;一個流行的.NET ORM框架&#xff09;獲取子表數據并進行排序&#xff0c;可以通過以下幾種方式實現&#xff1a; 1. 使用HasMany或HasOne配置 首先&#xff0c;確保你在配置實體時已經正確設置了HasMany或HasOne關系。例如&#xff0c;假設你…

【nRF52832】【環境搭建 3】【如何新建一個純單片機開發的工程】

1. 前言 笨叔&#xff0c;又要開始扯淡了!!! 不感興趣的同學&#xff0c;可以跳過了!!! 笨叔之前在大學里面&#xff0c; 剛接觸單片機時。就被 windows 平臺 例如 keill 5 、IAR 等一堆開會環境差點勸退。 當時也是堅持咬牙一點點摸索過來的。剛摸索明白&#xff0c;覺得單片…

Spring-loC與DI

目錄 1 loC控制反轉思想 2 DI依賴注入 3 loC詳解 3.1 存儲Bean &#xff08;1&#xff09;Controller &#xff08;2&#xff09;Service &#xff08;3&#xff09;Repository &#xff08;4&#xff09;Component &#xff08;5&#xff09;Configuration &#xf…