SpringBatch的Step默認使用同步方式批量處理數據,也可以通過配置將讀數改為同步,處理和寫入改為異步方式。
1、同步處理Step
SpringBatch的Step一般由ItemReader、ItemProcessor和ItemWriter組成,其中ItemProcessor是可選的。他的設計思路的通過ItemReader讀取一條數據之后,匯總到inputs中,當達到chunkSize數量時,使用ItemProcessor處理數據,然后使用ItemWriter寫入。
這個過程都是同步的操作,不存在異步的過程。實際業務處理過程中,數據一般來源于數據庫,如果每次只讀取一條數據,效率比較低,可以采用批量讀取數據,單條返回的方式提高效率。如:DataReader通過游標批量讀取數據
public class DataReader implements ItemReader<InputType> {// 游標記錄上個批次讀取的數據private long lastRowId = 0;// 單次數據庫讀取的數據量private int batchSize;// 數據緩存迭代器private Iterator<InputType> cacheIterator;public DataReader(int batchSize) {this.batchSize = batchSize;}@Overridepublic InputType read() throws Exception {if (cacheIterator == null || !cacheIterator.hasNext()) {// 使用游標方式批量查詢數據庫List<InputType> batchList = ....if (batchList == null || batchList.isEmpty()) {return null; // 讀取結束}// 更新lastRowId為當前批次最后一條的rowIdlastRowId = batchList.get(batchList.size() - 1).getRowId();cacheIterator = batchList.iterator();}// 迭代器返回一條數據return cacheIterator.next();}
}
DataProcessor將讀取的數據做業務處理,轉化為OutputType類型數據傳遞給ItemWriter
public class DataProcessor implements ItemProcessor<InputType, OutputType> {@Overridepublic OutputType process(InputType item) throws Exception {// 處理item,轉換為OutputTypereturn output;}
}
DataWriter中寫入OutputType類型數據到數據庫
public class DataWriter implements ItemWriter<OutputType> {@Overridepublic void write(List<? extends OutputType> items) throws Exception {// 寫入數據都數據庫。。。}
}
配置同步Step
return stepBuilderFactory.get("step1").<InputType, OutputType> chunk(100) // 每100條數據為一個批次,執行processor和writer.reader(new DataReader(1000)) // 數據庫每次讀取1000條.processor(new DataProcessor()).writer(new DataWriter()).build();
2、異步處理Step
在大數據量的批處理系統中,希望盡可能地提高性能,這時可以將ItemProcossor和ItemWriter環節采用異步多線程的方式進行優化,這是需要將ItemProcossor和ItemWriter分別包裝為AsyncItemProcessor和AsyncItemWriter,如下方法可以實現包裝:
private <I, O> AsyncItemProcessor<I, O> wrapAsyncProcessor(ItemProcessor<I, O> processor,TaskExecutor taskExecutor) {AsyncItemProcessor<I, O> asyncItemProcessor = new AsyncItemProcessor<>();asyncItemProcessor.setDelegate(processor);asyncItemProcessor.setTaskExecutor(taskExecutor);return asyncItemProcessor;
}private <O> AsyncItemWriter<O> wrapAsyncWriter(ItemWriter<O> writer) {AsyncItemWriter<O> asyncItemWriter = new AsyncItemWriter<>();asyncItemWriter.setDelegate(writer);return asyncItemWriter;
}
配置異步Step
private Step step2() {AsyncItemProcessor<PayOrderPo, PayOrderPo> asyncItemProcessor =wrapAsyncProcessor(new DataProcessor(), getAsyncExecutor("TestJobPool"));AsyncItemWriter<PayOrderPo> asyncItemWriter = wrapAsyncWriter(new DataWriter());return stepBuilderFactory.get("step2").<PayOrderPo, Future<PayOrderPo>> chunk(500).reader(new DataReader(1000)).processor(asyncItemProcessor).writer(asyncItemWriter).build();
}
private TaskExecutor getAsyncExecutor(String threadPoolName) {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(4);executor.setMaxPoolSize(8);executor.setQueueCapacity(200);executor.setKeepAliveSeconds(60);executor.setThreadNamePrefix(threadPoolName + "-");executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());executor.setAllowCoreThreadTimeOut(true);executor.initialize();return executor;
}
AsyncItemProcessor使用了代理模式,內部代理到ItemProcessor進行實際數據處理,通過taskExecutor線程池異步高性能處理數據
public class AsyncItemProcessor<I, O> implements ItemProcessor<I, Future<O>>, InitializingBean {// 代理的ItemProcessorprivate ItemProcessor<I, O> delegate;private TaskExecutor taskExecutor = new SyncTaskExecutor();public void afterPropertiesSet() throws Exception {Assert.notNull(delegate, "The delegate must be set.");}@Nullablepublic Future<O> process(final I item) throws Exception {final StepExecution stepExecution = getStepExecution();FutureTask<O> task = new FutureTask<>(new Callable<O>() {public O call() throws Exception {if (stepExecution != null) {StepSynchronizationManager.register(stepExecution);}try {// 代理的processor實際處理數據return delegate.process(item);}finally {if (stepExecution != null) {StepSynchronizationManager.close();}}}});// 提交異步任務taskExecutor.execute(task);return task;}
}
處理的過程如下:
- 創建FutureTask,在其線程中實際調用
process(item)
方法進行數據處理。 - 通過
TaskExecutor
異步執行任務FutureTask - 返回
Future
對象給Writer來跟蹤異步結果。
AsyncItemWriter同樣使用了代理模式,代理到實際處理數據的ItemWriter,主要通過兩個步驟進行:
1、獲取processor環境的異步處理結果
2、匯總結果到實際的ItemWriter進行數據寫入
public class AsyncItemWriter<T> implements ItemStreamWriter<Future<T>>, InitializingBean {// 代理的ItemWriterprivate ItemWriter<T> delegate;public void write(List<? extends Future<T>> items) throws Exception {// 用于保存異步結果List<T> list = new ArrayList<>();// 獲取異步結果for (Future<T> future : items) {try {T item = future.get();if(item != null) {list.add(future.get());}}catch (ExecutionException e) {Throwable cause = e.getCause();if(cause != null && cause instanceof Exception) {logger.debug("An exception was thrown while processing an item", e);throw (Exception) cause;}else {throw e;}}}// 代理到實際的ItemWriter進行數據寫入delegate.write(list);}
}