javascript
全网最详细SpringBatch批处理读取分区(Paratition)文件讲解
文章目錄
- 一、分區Step
- 1、數據分區
- 2、分區處理
- 二、實現分區關鍵接口
- 1、Partitioner
- 2、StepExecutionSplitter
- 3、PartitionHandler
- 三、基本配置和屬性說明
- 1、基本配置
- 2、屬性說明
- 四、文件分區
- 1、定義分區文件Partitioner
- 2、定義文件讀
- 3、定義分區job配置
- 4、定義processor
- 4、定義writer
- 4、定義step監聽器
- 6、運行job
寫在前面: 我是「境里婆娑」。我還是從前那個少年,沒有一絲絲改變,時間只不過是考驗,種在心中信念絲毫未減,眼前這個少年,還是最初那張臉,面前再多艱險不退卻。
寫博客的目的就是分享給大家一起學習交流,如果您對 Java感興趣,可以關注我,我們一起學習。
前言:為什么要寫這篇文章,在網上很難找到一篇關于SpringBatch批處理讀取分區文件基于JavaBean配置的文章,因此我決定寫一篇關于SpringBatch讀取分區文件基于javaBean配置的文章,希望這篇文章可以幫助新手的你或者你有一定經驗的可以加深印象。
一、分區Step
何為分區Step:
通過將任務進行分區,不同的Step處理不同任務數據達到提高Job效率功能。
分區作業可以分區兩個處理階段,數據分區、分區處理;
1、數據分區
數據分區:根據特殊的規則,將數據進行合理分片,為不同的數據切片生成數據執行上下文Execution Context、作業執行器Step Execution。可以通過接口Partitioner生成自定義分區邏輯,SpringBatch批處理框架默認對多文件實現MultiResourcePartititoner;也可以自行擴展接口Partitioner實現自定義分區邏輯。
2、分區處理
分區處理:通過數據分區后,不同的數據已經被分配到不同的作業執行器中,接下來需要交給分區處理器進行作業,分區處理器可以在本地或遠程執行被劃分的作業。接口PartitionHandler定義了分區處理邏輯,SpringBatch批處理框架默認實現了本地分區處理TaskExecutorPartitionHandler;也可以自行擴展接口PartitionHandler來實現自定義分區邏輯。
分區作業邏輯結構圖:
二、實現分區關鍵接口
實現分區關鍵接口有如下:PartitionHandler、StepExecutionSplitter、Partitioner。
1、Partitioner
Partitoner接口定義了如何根據給定的分區規則進行創建作業執行分區的上下文。
Partitioner接口定義如下:
public interface Partitioner {Map<String, ExecutionContext> partition(int gridSize); }gridSize含義:根據給定的gridSize大小進行執行上下文劃分。
2、StepExecutionSplitter
StepExecutionSplitter接口定義了如何根據給定的分區規則進行創建作業執行分區的執行器。
StepExecutionSplitter接口定義如下
public interface StepExecutionSplitter {String getStepName();Set<StepExecution> split(StepExecution stepExecution, int gridSize) throws JobExecutionException; }getStepName:獲取當前定義的分區作業的名稱。
split:根據給定的分區規則為每個分區生成對應的分區執行器。
3、PartitionHandler
PartitionHandler接口定義了分區處理的邏輯,根據給定的StepExecutionSplitter進行分區并執行,最后將執行的結果進行收集,反饋給前端。
PartitionHandler接口定義如下
public interface PartitionHandler {Collection<StepExecution> handle(StepExecutionSplitter stepSplitter, StepExecution stepExecution) throws Exception; }三、基本配置和屬性說明
上面兩節基本知識已經介紹完畢,下面我們將講一個例子來鞏固之前知識。
1、基本配置
一個典型分區Job配置
@Beanpublic Step partitionMasterMultiFileStep() {return stepBuilderFactory.get("partitionMasterMultiFileStep").partitioner(partitionSlaveMultiFileStep().getName(),multiResourcePartitioner()).partitionHandler(multiFilePartitionHandler()).build();}2、屬性說明
在配置分區Step之前,我們先看下分區Step的主要屬性定義和元素定義
| step | 用于指定分區step名稱 |
| handler(屬性) | 屬性handler指定分區執行器,需要實現接口PartitionHandler |
| handler(子元素) | 用于定義默認實現:TaskExecutorPartitionHandler |
| task-executor | 生命使用的線程池 |
| grid-size | 聲明分區的HashMap的初始值大小 |
四、文件分區
SpringBatch框架提供了對文件分區的支持,實現類:MultiResourcePartitioner提供了對文件分區的默認支持,根據文件名將不同文件處理進行分區,提升處理速度和效率。本文將按照此例子給出如何配置多文件分區實現。
讀取文件如下:
本節實例由于文件多,我們對文件進行分區,然后將文件的內容寫入DB,邏輯示意圖如下:
1、定義分區文件Partitioner
定義文件分區,將不同的文件分配到不同的作業中,使用自定義MyMultiResourcePartitioner分區。
自定義分區MyMultiResourcePartitioner如下:
/*** @author shuliangzhao* @date 2020/12/4 23:14*/ public class MyMultiResourcePartitioner implements Partitioner {private static final String DEFAULT_KEY_NAME = "fileName";private static final String PARTITION_KEY = "partition";private Resource[] resources = new Resource[0];private String keyName = DEFAULT_KEY_NAME;public void setResources(Resource[] resources) {this.resources = resources;}public void setKeyName(String keyName) {this.keyName = keyName;}@Overridepublic Map<String, ExecutionContext> partition(int gridSize) {Map<String, ExecutionContext> map = new HashMap<String, ExecutionContext>(gridSize);int i = 0;for (Resource resource : resources) {ExecutionContext context = new ExecutionContext();Assert.state(resource.exists(), "Resource does not exist: "+resource);try {context.putString(keyName, resource.getURI().getPath());}catch (IOException e) {throw new IllegalArgumentException("File could not be located for: "+resource, e);}map.put(PARTITION_KEY + i, context);i++;}return map;} }屬性keyName:用于指定作業上文中屬性名字,作用是在不同的作業上下文中可以獲取設置的對于屬性值。可以在讀寫階段通過@Value("#{stepExecutionContext[fileName]}"方式獲取。
2、定義文件讀
配置好分區實現,需要在每個分區作業中讀入不同文件,進而提供文件處理效率。
PartitionMultiFileReader 實現
public class PartitionMultiFileReader extends FlatFileItemReader {public PartitionMultiFileReader(Class clz,String fileName) {setResource(new FileSystemResource(fileName.substring(1)));Field[] declaredFields = clz.getDeclaredFields();List<String> list = new ArrayList<>();for (Field field:declaredFields) {list.add(field.getName());}String[] names = new String[list.size()];DelimitedLineTokenizer delimitedLineTokenizer = new DelimitedLineTokenizer();delimitedLineTokenizer.setDelimiter(",");delimitedLineTokenizer.setNames(list.toArray(names));DefaultLineMapper defaultLineMapper = new DefaultLineMapper();defaultLineMapper.setLineTokenizer(delimitedLineTokenizer);CommonFieldSetMapper commonFieldSetMapper = new CommonFieldSetMapper();commonFieldSetMapper.setTargetType(clz);defaultLineMapper.setFieldSetMapper(commonFieldSetMapper);setLineMapper(defaultLineMapper);setName(clz.getSimpleName());} }3、定義分區job配置
基于javabean方式實現job配置
package com.sl.config; //包導入省略/*** @author shuliangzhao* @Title: PartitionFileConfiguration* @ProjectName spring-boot-learn* @Description: TODO* @date 2020/12/4 21:09*/ @Configuration @EnableBatchProcessing public class PartitionMultiFileConfiguration {@Autowiredprivate JobBuilderFactory jobBuilderFactory;@Autowiredprivate StepBuilderFactory stepBuilderFactory;@Autowiredprivate PartitonMultiFileProcessor partitonMultiFileProcessor;@Autowiredprivate PartitionMultiFileWriter partitionMultiFileWriter;@Beanpublic Job partitionMultiFileJob() {return jobBuilderFactory.get("partitionMultiFileJob").start(partitionMasterMultiFileStep()).build();}@Beanpublic Step partitionMasterMultiFileStep() {return stepBuilderFactory.get("partitionMasterMultiFileStep").partitioner(partitionSlaveMultiFileStep().getName(),multiResourcePartitioner()).partitionHandler(multiFilePartitionHandler()).build();}@Beanpublic PartitionHandler multiFilePartitionHandler() {TaskExecutorPartitionHandler handler = new TaskExecutorPartitionHandler();handler.setGridSize(2);handler.setStep(partitionSlaveMultiFileStep());handler.setTaskExecutor(new SimpleAsyncTaskExecutor());return handler;}@Beanpublic Step partitionSlaveMultiFileStep() {return stepBuilderFactory.get("partitionSlaveMultiFileStep").<CreditBill,CreditBill>chunk(1).reader(partitionMultiFileReader(null)).processor(partitonMultiFileProcessor).writer(partitionMultiFileWriter).build();}@Bean@StepScopepublic PartitionMultiFileReader partitionMultiFileReader(@Value("#{stepExecutionContext[fileName]}")String fileName) {return new PartitionMultiFileReader(CreditBill.class,fileName);}@Beanpublic MyMultiResourcePartitioner multiResourcePartitioner() {MyMultiResourcePartitioner multiResourcePartitioner = new MyMultiResourcePartitioner();multiResourcePartitioner.setKeyName("fileName");multiResourcePartitioner.setResources(getResource());return multiResourcePartitioner;}private Resource[] getResource() {String filePath = "D:\\aplus\\bill\\";File file = new File(filePath);List<Resource> resourceList = new ArrayList<>();if (file.isDirectory()) {String[] list = file.list();if (list != null) {for (String str : list) {String resource = file.getPath() + "\\" + str;FileSystemResource fileSystemResource = new FileSystemResource(resource);resourceList.add(fileSystemResource);}}}Resource[] resources = new Resource[resourceList.size()];return resourceList.toArray(resources);}}4、定義processor
定義processor
/*** @author shuliangzhao* @date 2020/12/4 22:11*/ @Component @StepScope public class PartitonMultiFileProcessor implements ItemProcessor<CreditBill,CreditBill> {@Overridepublic CreditBill process(CreditBill item) throws Exception {CreditBill creditBill = new CreditBill();creditBill.setAcctid(item.getAcctid());creditBill.setAddress(item.getAddress());creditBill.setAmout(item.getAmout());creditBill.setDate(item.getDate());creditBill.setName(item.getName());return creditBill;} }4、定義writer
/*** @author shuliangzhao* @date 2020/12/4 22:29*/ @Component @StepScope public class PartitionMultiFileWriter implements ItemWriter<CreditBill> {@Autowiredprivate CreditBillMapper creditBillMapper;@Overridepublic void write(List<? extends CreditBill> items) throws Exception {if (items != null && items.size() > 0) {items.stream().forEach(item -> {creditBillMapper.insert(item);});}} }4、定義step監聽器
定義step監聽器目的是在處理作業之前打印線程名字和讀取文件名字
@Component public class PartitionStepListener implements StepExecutionListener {private static final Logger logger = LoggerFactory.getLogger(PartitionStepListener.class);@Overridepublic void beforeStep(StepExecution stepExecution) {logger.info("ThreadName={},steName={},FileName={}",Thread.currentThread().getName(),stepExecution.getStepName(),stepExecution.getExecutionContext().getString("fileName"));}@Overridepublic ExitStatus afterStep(StepExecution stepExecution) {return null;} }6、運行job
執行job查看結果,可以看出不同的文件有不同的線程來處理,并且被分配到不同的分區作業步中執行
2020-12-05 15:58:34.100 INFO 13208 --- [cTaskExecutor-1] com.sl.listener.PartitionStepListener : ThreadName=SimpleAsyncTaskExecutor-1,steName=partitionSlaveMultiFileStep:partition1,FileName=/D:/aplus/bill/bill2.csv 2020-12-05 15:58:34.114 INFO 13208 --- [cTaskExecutor-3] com.sl.listener.PartitionStepListener : ThreadName=SimpleAsyncTaskExecutor-3,steName=partitionSlaveMultiFileStep:partition0,FileName=/D:/aplus/bill/bill1.csv 2020-12-05 15:58:34.122 INFO 13208 --- [cTaskExecutor-2] com.sl.listener.PartitionStepListener : ThreadName=SimpleAsyncTaskExecutor-2,steName=partitionSlaveMultiFileStep:partition2,FileName=/D:/aplus/bill/bill3.csv至此,我們完成了對文件分區的處理。
如果想更詳細查看以上所有代碼請移步到github:文件分區詳細代碼
總結
以上是生活随笔為你收集整理的全网最详细SpringBatch批处理读取分区(Paratition)文件讲解的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 一篇文章教你弄懂 SpringMvc中的
- 下一篇: 全网最详细SpringBatch读(Re