日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 前端技术 > javascript >内容正文

javascript

SpringBatch 自定义ItemReader和可重新启动Reader(十五)

發布時間:2025/1/21 javascript 82 豆豆
生活随笔 收集整理的這篇文章主要介紹了 SpringBatch 自定义ItemReader和可重新启动Reader(十五) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

文章目錄

    • 一、自定義CustomItemReader
    • 二、job 監聽器
    • 三、配置job
    • 四、改造CustomItemReader,發生異常批處理作業從停止的地方重新啟動

前言:在一些業務場景中,可能現有的reader不符合我們的要求,SpringBatch提供自定義reader,實現ItemReader接口,滿足我們業務場景。

SpringBatch其它文章直通車:

  • SpringBatch讀單個文件(FlatFileItemReader)和寫單個文件(FlatFileItemWriter)(一)
  • SpringBatch順序讀取多文件(MultiResourceItemReader)和順序寫文件(MultiResourceItemWriter)(二)
  • SpringBatch讀數據庫(MyBatisPagingItemReader)(三)
  • SpringBatch讀文件(FlatFileItemReader)寫據庫(MyBatisBatchItemWriter)(四)
  • SpringBatch 監聽器之Job監聽器(JobExecutionListener)和Step監聽器(StepExecutionListener)(五)
  • SpringBatch 監聽器之Chunk監聽器(ChunkListener)和Skip監聽器(SkipListener)(六)
  • SpringBatch 多線程(TaskExecutor)啟動Job詳解 (七)
  • SpringBatch 配置并行啟動Job詳解 (八)
  • SpringBatch 批處理分區(Partitioner )分片(九)
  • SpringBatch tasklet實現和用法(十)
  • SpringBatch 讀取JSON(JsonItemReader)用法(十一)
  • SpringBatch 寫文件JSON(JsonFileItemWriter)用法(十二)
  • SpringBatch 讀取xml文件(StaxEventItemReader)用法(十三)
  • SpringBatch 寫xml文件(StaxEventItemWriter)用法(十四)

代碼已上傳GitHub上面地址:git源碼地址

一、自定義CustomItemReader

創建了一個簡單的ItemReader實現,它從提供的列表中讀取數據。我們首先實現ItemReader最基本的read方法

package com.sl.common;import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.NonTransientResourceException; import org.springframework.batch.item.ParseException; import org.springframework.batch.item.UnexpectedInputException;import java.util.List;/*** 自定義reader* @author shuliangzhao* @Title: CustomItemReader* @ProjectName spring-boot-learn* @Description: TODO* @date 2019/9/21 14:49*/ public class CustomItemReader<T> implements ItemReader<T> {private List<String> list;public CustomItemReader(List<String> list) {this.list = list;}@Overridepublic T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {if (list.size() > 0 && !list.isEmpty()) {return (T)list.remove(0);}return null;} }

二、job 監聽器

job執行之前加載數據供reader使用

package com.sl.listener;import com.sl.common.CommonConstants; import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.JobExecutionListener; import org.springframework.stereotype.Component;/*** @author shuliangzhao* @Title: CustomJobListener* @ProjectName spring-boot-learn* @Description: TODO* @date 2019/9/21 14:59*/ @Component public class CustomJobListener implements JobExecutionListener {@Overridepublic void beforeJob(JobExecution jobExecution) {CommonConstants.getList().add("hello");CommonConstants.getList().add("springbatch");}@Overridepublic void afterJob(JobExecution jobExecution) {} }

三、配置job

package com.sl.config;import com.sl.common.CommonConstants; import com.sl.common.CustomItemReader; import com.sl.listener.CustomJobListener; import com.sl.writer.CustomItemWriter; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepScope; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;/*** 自定義重啟reader* @author shuliangzhao* @Title: CustomConfiguration* @ProjectName spring-boot-learn* @Description: TODO* @date 2019/9/21 14:55*/ @Configuration @EnableBatchProcessing public class CustomConfiguration {@Autowiredprivate JobBuilderFactory jobBuilderFactory;@Autowiredprivate StepBuilderFactory stepBuilderFactory;@Autowiredprivate CustomJobListener customJobListener;@Autowiredprivate CustomItemWriter customItemWriter;@Beanpublic Job customJob() {return jobBuilderFactory.get("customJob").listener(customJobListener).start(customStep()).build();}@Beanpublic Step customStep() {return stepBuilderFactory.get("customStep").chunk(10).reader(customItemReader()).writer(customItemWriter).build();}@Bean@StepScopepublic CustomItemReader customItemReader() {return new CustomItemReader(CommonConstants.getList());} }

敲黑板:如果這個job在執行過程中發生錯誤,是不能重啟。接下來我們講解怎么改進reader可以讓我們的job在發生異常,批處理作業從停止的地方重新啟動。

四、改造CustomItemReader,發生異常批處理作業從停止的地方重新啟動

目前,如果處理被中斷并重新開始,ItemReader必須從頭開始。在許多場景中,這實際上是有效的,但有時更可取的做法是,批處理作業從停止的地方重新啟動。關鍵的區別通常是閱讀器是有狀態的還是無狀態的。無狀態讀取器不需要擔心可重啟性,但是有狀態讀取器必須嘗試在重啟時重新構建其最后一個已知狀態。出于這個原因,我們建議盡可能保持自定義讀取器處于無狀態,因此不必擔心可重啟性。

CustomItemReader還需要實現接口ItemStream

package com.sl.common;import org.springframework.batch.item.*;import java.util.List;/*** 自定義reader* @author shuliangzhao* @Title: CustomItemReader* @ProjectName spring-boot-learn* @Description: TODO* @date 2019/9/21 14:49*/ public class CustomRestaItemReader<T> implements ItemReader<T> , ItemStream {private List<T> list;int currentIndex = 0;private static final String CURRENT_INDEX = "current.index";public CustomRestaItemReader(List<T> list) {this.list = list;}@Overridepublic T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {if (currentIndex < list.size()) {return list.get(currentIndex++);}return null;}@Overridepublic void open(ExecutionContext executionContext) throws ItemStreamException {if(executionContext.containsKey(CURRENT_INDEX)){currentIndex = new Long(executionContext.getLong(CURRENT_INDEX)).intValue();}else{currentIndex = 0;}}@Overridepublic void update(ExecutionContext executionContext) throws ItemStreamException {executionContext.putLong(CURRENT_INDEX, new Long(currentIndex).longValue());}@Overridepublic void close() throws ItemStreamException {} }

實現reader重啟關鍵方法是open方法和update方法

總結

以上是生活随笔為你收集整理的SpringBatch 自定义ItemReader和可重新启动Reader(十五)的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。