具有Spring Boot和Java配置的Spring Batch教程
我一直在努力將Podcastpedia.org的一些批處理作業遷移到Spring Batch。 以前,這些工作是以我自己的方式開發的,我認為現在是時候使用一種更“標準化”的方法了。 因為我以前從未在Java配置中使用過Spring,所以我認為通過在Java中配置Spring Batch作業,這是學習它的好機會。 而且由于我都在嘗試使用Spring進行新的事物,所以為什么不把Spring Boot扔進船里呢?
注意:
在開始本教程之前,我建議您首先閱讀Spring的入門-創建批處理服務 ,因為此處提供的結構和代碼均基于該原始版本。
1.我要建立的
因此,如前所述,在這篇文章中,我將在配置Spring Batch和為Podcastpedia.org開發一些批處理作業的背景下介紹Spring Batch。 這是Podcastpedia-batch項目當前一部分的兩個工作的簡短描述:
源代碼:
本教程的源代碼可在GitHub- Podcastpedia-batch上獲得。
注意:在開始之前,我還強烈建議您閱讀Batch的域語言 ,以免使“ Jobs”,“ Steps”或“ ItemReaders”等術語聽起來很陌生。
2.你需要什么
- 最喜歡的文本編輯器或IDE
- JDK 1.7或更高版本
- Maven 3.0+
3.設置項目
該項目是使用Maven構建的。 它使用Spring Boot,這使創建可“運行”的基于獨立Spring的應用程序變得容易。 您可以通過訪問項目的網站來了解有關Spring Boot的更多信息。
Maven構建文件
因為它使用Spring Boot,所以它將使用spring-boot-starter-parent作為其父級,另外還有幾個其他spring-boot-starters將為我們提供項目中所需的一些庫:
podcastpedia-batch項目的pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.podcastpedia.batch</groupId><artifactId>podcastpedia-batch</artifactId><version>0.1.0</version><properties><sprinb.boot.version>1.1.6.RELEASE</sprinb.boot.version><java.version>1.7</java.version></properties><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>1.1.6.RELEASE</version></parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-batch</artifactId></dependency> <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-jpa</artifactId> </dependency> <dependency><groupId>org.apache.httpcomponents</groupId><artifactId>httpclient</artifactId><version>4.3.5</version></dependency> <dependency><groupId>org.apache.httpcomponents</groupId><artifactId>httpcore</artifactId><version>4.3.2</version></dependency><!-- velocity --><dependency><groupId>org.apache.velocity</groupId><artifactId>velocity</artifactId><version>1.7</version> </dependency><dependency><groupId>org.apache.velocity</groupId><artifactId>velocity-tools</artifactId><version>2.0</version><exclusions><exclusion><groupId>org.apache.struts</groupId><artifactId>struts-core</artifactId></exclusion></exclusions> </dependency><!-- Project rome rss, atom --><dependency><groupId>rome</groupId><artifactId>rome</artifactId><version>1.0</version></dependency><!-- option this fetcher thing --><dependency><groupId>rome</groupId><artifactId>rome-fetcher</artifactId><version>1.0</version></dependency><dependency><groupId>org.jdom</groupId><artifactId>jdom</artifactId><version>1.1</version></dependency> <!-- PID 1 --><dependency><groupId>xerces</groupId><artifactId>xercesImpl</artifactId><version>2.9.1</version></dependency><!-- MySQL JDBC connector --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.31</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-freemarker</artifactId> </dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-remote-shell</artifactId> <exclusions><exclusion><groupId>javax.mail</groupId><artifactId>mail</artifactId></exclusion></exclusions> </dependency><dependency><groupId>javax.mail</groupId><artifactId>mail</artifactId><version>1.4.7</version></dependency> <dependency><groupId>javax.inject</groupId><artifactId>javax.inject</artifactId><version>1</version></dependency> <dependency><groupId>org.twitter4j</groupId><artifactId>twitter4j-core</artifactId><version>[4.0,)</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency></dependencies><build><plugins><plugin> <artifactId>maven-compiler-plugin</artifactId> </plugin><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build> </project> 注意:
使用spring-boot-starter-parent作為項目的父項的一大優勢是,您只需升級父項的版本,它將為您提供“最新”的庫。 當我開始該項目時,spring boot的版本為1.1.3.RELEASE ,而在撰寫本文時,其版本已經是1.1.6.RELEASE 。
項目目錄結構
我以以下方式構造項目:
項目目錄結構
└── src └── main └── java └── org └── podcastpedia └── batch └── common └── jobs └── addpodcast └── notifysubscribers注意:
- org.podcastpedia.batch.jobs軟件包包含子軟件包,這些子軟件包具有針對特定作業的特定類。
- org.podcastpedia.batch.jobs.common包包含所有作業使用的類,例如,當前兩個作業都需要的JPA實體。
4.創建一個批處理作業配置
我將首先介紹第一個批處理作業的Java配置類:
批處理作業配置
package org.podcastpedia.batch.jobs.addpodcast;import org.podcastpedia.batch.common.configuration.DatabaseAccessConfiguration; import org.podcastpedia.batch.common.listeners.LogProcessListener; import org.podcastpedia.batch.common.listeners.ProtocolListener; import org.podcastpedia.batch.jobs.addpodcast.model.SuggestedPodcast; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.ItemReader; import org.springframework.batch.item.ItemWriter; import org.springframework.batch.item.file.FlatFileItemReader; import org.springframework.batch.item.file.LineMapper; import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper; import org.springframework.batch.item.file.mapping.DefaultLineMapper; import org.springframework.batch.item.file.transform.DelimitedLineTokenizer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; import org.springframework.core.io.ClassPathResource;import com.mysql.jdbc.exceptions.jdbc4.MySQLIntegrityConstraintViolationException;@Configuration @EnableBatchProcessing @Import({DatabaseAccessConfiguration.class, ServicesConfiguration.class}) public class AddPodcastJobConfiguration {@Autowiredprivate JobBuilderFactory jobs;@Autowiredprivate StepBuilderFactory stepBuilderFactory;// tag::jobstep[]@Beanpublic Job addNewPodcastJob(){return jobs.get("addNewPodcastJob").listener(protocolListener()).start(step()).build();} @Beanpublic Step step(){return stepBuilderFactory.get("step").<SuggestedPodcast,SuggestedPodcast>chunk(1) //important to be one in this case to commit after every line read.reader(reader()).processor(processor()).writer(writer()).listener(logProcessListener()).faultTolerant().skipLimit(10) //default is set to 0.skip(MySQLIntegrityConstraintViolationException.class).build();} // end::jobstep[]// tag::readerwriterprocessor[]@Beanpublic ItemReader<SuggestedPodcast> reader(){FlatFileItemReader<SuggestedPodcast> reader = new FlatFileItemReader<SuggestedPodcast>();reader.setLinesToSkip(1);//first line is title definition reader.setResource(new ClassPathResource("suggested-podcasts.txt"));reader.setLineMapper(lineMapper());return reader; }@Beanpublic LineMapper<SuggestedPodcast> lineMapper() {DefaultLineMapper<SuggestedPodcast> lineMapper = new DefaultLineMapper<SuggestedPodcast>();DelimitedLineTokenizer lineTokenizer = new DelimitedLineTokenizer();lineTokenizer.setDelimiter(";");lineTokenizer.setStrict(false);lineTokenizer.setNames(new String[]{"FEED_URL", "IDENTIFIER_ON_PODCASTPEDIA", "CATEGORIES", "LANGUAGE", "MEDIA_TYPE", "UPDATE_FREQUENCY", "KEYWORDS", "FB_PAGE", "TWITTER_PAGE", "GPLUS_PAGE", "NAME_SUBMITTER", "EMAIL_SUBMITTER"});BeanWrapperFieldSetMapper<SuggestedPodcast> fieldSetMapper = new BeanWrapperFieldSetMapper<SuggestedPodcast>();fieldSetMapper.setTargetType(SuggestedPodcast.class);lineMapper.setLineTokenizer(lineTokenizer);lineMapper.setFieldSetMapper(suggestedPodcastFieldSetMapper());return lineMapper;}@Beanpublic SuggestedPodcastFieldSetMapper suggestedPodcastFieldSetMapper() {return new SuggestedPodcastFieldSetMapper();}/** configure the processor related stuff */@Beanpublic ItemProcessor<SuggestedPodcast, SuggestedPodcast> processor() {return new SuggestedPodcastItemProcessor();}@Beanpublic ItemWriter<SuggestedPodcast> writer() {return new Writer();}// end::readerwriterprocessor[]@Beanpublic ProtocolListener protocolListener(){return new ProtocolListener();}@Beanpublic LogProcessListener logProcessListener(){return new LogProcessListener();} }@EnableBatchProcessing批注添加了許多支持作業的關鍵bean,并節省了我們的配置工作。 例如,您還可以@Autowired一些有用的東西到您的上下文中:
- JobRepository (bean名稱為“ jobRepository”)
- JobLauncher (bean名稱為“ jobLauncher”)
- JobRegistry (bean名稱為“ jobRegistry”)
- 一個PlatformTransactionManager (bean名稱為“ transactionManager”)
- 一個JobBuilderFactory (bean名稱為“ jobBuilders”)是一種便利,可以防止您不得不將作業存儲庫注入到每個作業中,如上例所示
- StepBuilderFactory (bean名稱為“ stepBuilders”)是一種便利,可防止您不得不將作業存儲庫和事務管理器注入到每個步驟中
第一部分著重于實際的作業配置:
批處理作業和步驟配置
@Bean public Job addNewPodcastJob(){return jobs.get("addNewPodcastJob").listener(protocolListener()).start(step()).build(); } @Bean public Step step(){return stepBuilderFactory.get("step").<SuggestedPodcast,SuggestedPodcast>chunk(1) //important to be one in this case to commit after every line read.reader(reader()).processor(processor()).writer(writer()).listener(logProcessListener()).faultTolerant().skipLimit(10) //default is set to 0.skip(MySQLIntegrityConstraintViolationException.class).build(); }第一種方法定義了一個工作,第二種方法定義了一個步驟。 正如您在“批處理的域語言”中所讀到的一樣 ,作業是從步驟構建的,其中每個步驟都可以涉及閱讀器,處理器和編寫器。
在步驟定義中,您定義一次要寫入多少數據(在本例中,一次要寫入1條記錄)。 接下來,您指定讀取器,處理器和寫入器。
5. Spring Batch處理單元
大部分批處理可描述為讀取數據,對其進行一些轉換,然后將結果寫出。 如果您對此有所了解,這將以某種方式反映提取,轉換,加載(ETL)的過程。 Spring Batch提供了三個關鍵接口來幫助執行批量讀取和寫入: ItemReader , ItemProcessor和ItemWriter 。
讀者群
ItemReader是一種抽象,它提供了從許多不同類型的輸入中檢索數據的方法: 平面文件 , xml文件 , 數據庫 , jms等,一次僅一項。 有關可用項目閱讀器的完整列表, 請參見附錄A. ItemReaders和ItemWriters列表。
在Podcastpedia批處理作業中,我使用以下專用的ItemReader:
5.1.1。 FlatFileItemReader
顧名思義,它從一個平面文件中讀取數據行,這些文件通常描述記錄,這??些記錄的數據字段由文件中的固定位置定義或由某些特殊字符(例如逗號)分隔。 這種類型的ItemReader在第一個批處理作業中使用,addNewPodcastJob。 所使用的輸入文件名為“ suggested-podcasts.in” ,位于類路徑( src / main / resources )中,其外觀類似于以下內容:
FlatFileItemReader的輸入文件
FEED_URL; IDENTIFIER_ON_PODCASTPEDIA; CATEGORIES; LANGUAGE; MEDIA_TYPE; UPDATE_FREQUENCY; KEYWORDS; FB_PAGE; TWITTER_PAGE; GPLUS_PAGE; NAME_SUBMITTER; EMAIL_SUBMITTER http://www.5minutebiographies.com/feed/; 5minutebiographies; people_society, history; en; Audio; WEEKLY; biography, biographies, short biography, short biographies, 5 minute biographies, five minute biographies, 5 minute biography, five minute biography; https://www.facebook.com/5minutebiographies; https://twitter.com/5MinuteBios; ; Adrian Matei; adrianmatei@gmail.com http://notanotherpodcast.libsyn.com/rss; NotAnotherPodcast; entertainment; en; Audio; WEEKLY; Comedy, Sports, Cinema, Movies, Pop Culture, Food, Games; https://www.facebook.com/notanotherpodcastusa; https://twitter.com/NAPodcastUSA; https://plus.google.com/u/0/103089891373760354121/posts; Adrian Matei; adrianmatei@gmail.com如您所見,第一行定義“列”的名稱,隨后幾行包含實際數據(以“;”分隔),需要轉換為上下文中相關的域對象。
現在讓我們看看如何配置FlatFileItemReader :
FlatFileItemReader示例
@Bean public ItemReader<SuggestedPodcast> reader(){FlatFileItemReader<SuggestedPodcast> reader = new FlatFileItemReader<SuggestedPodcast>();reader.setLinesToSkip(1);//first line is title definition reader.setResource(new ClassPathResource("suggested-podcasts.in"));reader.setLineMapper(lineMapper());return reader; }除其他外,您可以指定輸入資源,要跳過的行數和行映射器。
5.1.1.1。 LineMapper
LineMapper是用于將線(字符串)映射到域對象的接口,通常用于將從文件讀取的線映射到每行的域對象。 對于Podcastpedia作業,我使用DefaultLineMapper ,這是兩階段的實現,包括將行的標記化為FieldSet然后映射到item:
LineMapper默認實現示例
@Bean public LineMapper<SuggestedPodcast> lineMapper() {DefaultLineMapper<SuggestedPodcast> lineMapper = new DefaultLineMapper<SuggestedPodcast>();DelimitedLineTokenizer lineTokenizer = new DelimitedLineTokenizer();lineTokenizer.setDelimiter(";");lineTokenizer.setStrict(false);lineTokenizer.setNames(new String[]{"FEED_URL", "IDENTIFIER_ON_PODCASTPEDIA", "CATEGORIES", "LANGUAGE", "MEDIA_TYPE", "UPDATE_FREQUENCY", "KEYWORDS", "FB_PAGE", "TWITTER_PAGE", "GPLUS_PAGE", "NAME_SUBMITTER", "EMAIL_SUBMITTER"});BeanWrapperFieldSetMapper<SuggestedPodcast> fieldSetMapper = new BeanWrapperFieldSetMapper<SuggestedPodcast>();fieldSetMapper.setTargetType(SuggestedPodcast.class);lineMapper.setLineTokenizer(lineTokenizer);lineMapper.setFieldSetMapper(suggestedPodcastFieldSetMapper());return lineMapper; }- DelimitedLineTokenizer通過“;”分割輸入字符串 定界符。
- 如果將strict標志設置為false則將容忍具有較少令牌的行并用空列填充,而具有更多令牌的行將被截斷。
- 第一行的列名稱設置為lineTokenizer.setNames(...);
- 并設置了fieldMapper (第14行)
注意:
FieldSet是“接口”,平面文件輸入源使用它來封裝將字符串數組轉換為Java本機類型的擔憂。 就像JDBC中ResultSet扮演的角色一樣,客戶端將知道他們要提取的強類型字段的名稱或位置。”
FieldSetMapper
FieldSetMapper是一個接口,用于將從FieldSet獲得的數據FieldSet到對象中。 這是將fieldSet映射到SuggestedPodcast域對象的實現,該對象將進一步傳遞給處理器:
FieldSetMapper的實現
public class SuggestedPodcastFieldSetMapper implements FieldSetMapper<SuggestedPodcast> {@Overridepublic SuggestedPodcast mapFieldSet(FieldSet fieldSet) throws BindException {SuggestedPodcast suggestedPodcast = new SuggestedPodcast();suggestedPodcast.setCategories(fieldSet.readString("CATEGORIES"));suggestedPodcast.setEmail(fieldSet.readString("EMAIL_SUBMITTER"));suggestedPodcast.setName(fieldSet.readString("NAME_SUBMITTER"));suggestedPodcast.setTags(fieldSet.readString("KEYWORDS"));//some of the attributes we can map directly into the Podcast entity that we'll insert later into the databasePodcast podcast = new Podcast();podcast.setUrl(fieldSet.readString("FEED_URL"));podcast.setIdentifier(fieldSet.readString("IDENTIFIER_ON_PODCASTPEDIA"));podcast.setLanguageCode(LanguageCode.valueOf(fieldSet.readString("LANGUAGE")));podcast.setMediaType(MediaType.valueOf(fieldSet.readString("MEDIA_TYPE")));podcast.setUpdateFrequency(UpdateFrequency.valueOf(fieldSet.readString("UPDATE_FREQUENCY")));podcast.setFbPage(fieldSet.readString("FB_PAGE"));podcast.setTwitterPage(fieldSet.readString("TWITTER_PAGE"));podcast.setGplusPage(fieldSet.readString("GPLUS_PAGE"));suggestedPodcast.setPodcast(podcast);return suggestedPodcast;}}JdbcCursorItemReader
在第二個作業notifyRmailSubscribersJob中 ,在閱讀器中,我僅從單個數據庫表中讀取電子郵件訂閱者,但在處理器中,進一步執行了更詳細的讀取(通過JPA),以檢索用戶訂閱的播客的所有新片段。 。 這是批處理環境中使用的常見模式。 單擊此鏈接以獲取更多常見批處理模式。
對于初始讀取,我選擇了JdbcCursorItemReader ,這是一個簡單的閱讀器實現,它打開JDBC游標并連續檢索ResultSet的下一行:
JdbcCursorItemReader示例
@Bean public ItemReader<User> notifySubscribersReader(){JdbcCursorItemReader<User> reader = new JdbcCursorItemReader<User>();String sql = "select * from users where is_email_subscriber is not null";reader.setSql(sql);reader.setDataSource(dataSource);reader.setRowMapper(rowMapper()); return reader; }注意我必須設置sql ,要讀取的datasource和RowMapper 。
5.2.1。 行映射器
RowMapper是JdbcTemplate使用的接口,用于按行映射Result'set的行。 我對該接口的實現執行將每一行映射到結果對象的實際工作,但是我不必擔心異常處理:
RowMapper的實現
public class UserRowMapper implements RowMapper<User> {@Overridepublic User mapRow(ResultSet rs, int rowNum) throws SQLException {User user = new User();user.setEmail(rs.getString("email"));return user;}}作家
ItemWriter是一種抽象,表示一次Step的輸出,每次一批或大塊的項目。 通常,項目編寫者不知道下一步將要接收的輸入,僅知道在當前調用中傳遞的項目。
提出的兩項工作的作者非常簡單。 他們只是使用外部服務來發送電子郵件通知并在Podcastpedia的帳戶上發布推文。 這是第一個任務的ItemWriter的實現– addNewPodcast :
ItemWriter的Writer實現
package org.podcastpedia.batch.jobs.addpodcast;import java.util.Date; import java.util.List;import javax.inject.Inject; import javax.persistence.EntityManager;import org.podcastpedia.batch.common.entities.Podcast; import org.podcastpedia.batch.jobs.addpodcast.model.SuggestedPodcast; import org.podcastpedia.batch.jobs.addpodcast.service.EmailNotificationService; import org.podcastpedia.batch.jobs.addpodcast.service.SocialMediaService; import org.springframework.batch.item.ItemWriter; import org.springframework.beans.factory.annotation.Autowired;public class Writer implements ItemWriter<SuggestedPodcast>{@Autowiredprivate EntityManager entityManager;@Injectprivate EmailNotificationService emailNotificationService;@Injectprivate SocialMediaService socialMediaService;@Overridepublic void write(List<? extends SuggestedPodcast> items) throws Exception {if(items.get(0) != null){SuggestedPodcast suggestedPodcast = items.get(0);//first insert the data in the database Podcast podcast = suggestedPodcast.getPodcast();podcast.setInsertionDate(new Date());entityManager.persist(podcast);entityManager.flush();//notify submitter about the insertion and post a twitt about it String url = buildUrlOnPodcastpedia(podcast);emailNotificationService.sendPodcastAdditionConfirmation(suggestedPodcast.getName(), suggestedPodcast.getEmail(),url);if(podcast.getTwitterPage() != null){socialMediaService.postOnTwitterAboutNewPodcast(podcast,url); } }}private String buildUrlOnPodcastpedia(Podcast podcast) {StringBuffer urlOnPodcastpedia = new StringBuffer("http://www.podcastpedia.org");if (podcast.getIdentifier() != null) {urlOnPodcastpedia.append("/" + podcast.getIdentifier());} else {urlOnPodcastpedia.append("/podcasts/");urlOnPodcastpedia.append(String.valueOf(podcast.getPodcastId()));urlOnPodcastpedia.append("/" + podcast.getTitleInUrl());} String url = urlOnPodcastpedia.toString();return url;}}如您所見,這里沒有什么特別之處,除了必須重寫write方法之外,這是注入的外部服務EmailNotificationService和SocialMediaService用于通過電子郵件向播客提交者告知播客目錄添加內容以及Twitter是否可用的地方。提交的頁面上,將有一則推文張貼在播客的墻上 。 您可以在以下文章中找到有關如何通過Velocity發送電子郵件以及如何從Java在Twitter上發布的詳細說明:
- 如何使用Spring和Velocity在Java中編寫HTML電子郵件
- 如何在10分鐘內使用Java從Twitter4J發布到Twittter
處理器
ItemProcessor是代表項目業務處理的抽象。 當ItemReader讀取一個項目,而ItemWriter寫入一個項目時, ItemProcessor提供訪問以轉換或應用其他業務處理。 使用自己的Processors ,必須實現ItemProcessor<I,O>接口,其唯一方法O process(I item) throws Exception ,返回可能被修改的或新的項目以繼續處理。 如果返回的結果為null,則認為該項目的處理不應繼續。
盡管第一項工作的處理器需要更多的邏輯,但是因為我必須設置etag和last-modified標頭屬性,播客的feed屬性,情節,類別和關鍵字:
作業addNewPodcast的ItemProcessor實現
public class SuggestedPodcastItemProcessor implements ItemProcessor<SuggestedPodcast, SuggestedPodcast> {private static final int TIMEOUT = 10;@AutowiredReadDao readDao;@AutowiredPodcastAndEpisodeAttributesService podcastAndEpisodeAttributesService;@Autowiredprivate PoolingHttpClientConnectionManager poolingHttpClientConnectionManager; @Autowiredprivate SyndFeedService syndFeedService;/*** Method used to build the categories, tags and episodes of the podcast*/@Overridepublic SuggestedPodcast process(SuggestedPodcast item) throws Exception {if(isPodcastAlreadyInTheDirectory(item.getPodcast().getUrl())) {return null;}String[] categories = item.getCategories().trim().split("\\s*,\\s*"); item.getPodcast().setAvailability(org.apache.http.HttpStatus.SC_OK);//set etag and last modified attributes for the podcastsetHeaderFieldAttributes(item.getPodcast());//set the other attributes of the podcast from the feed podcastAndEpisodeAttributesService.setPodcastFeedAttributes(item.getPodcast());//set the categoriesList<Category> categoriesByNames = readDao.findCategoriesByNames(categories);item.getPodcast().setCategories(categoriesByNames);//set the tagssetTagsForPodcast(item);//build the episodes setEpisodesForPodcast(item.getPodcast());return item;}...... }第二個工作的處理器使用“驅動查詢”方法 ,在該方法中 ,我用另一個“ JPA讀取”擴展了從閱讀器中檢索的數據,并用情節對播客中的項目進行了分組,以便在我所用的電子郵件中看起來不錯發送給訂戶:
ItemProcessor實現的第二項工作– notifySubscribers
@Scope("step") public class NotifySubscribersItemProcessor implements ItemProcessor<User, User> {@AutowiredEntityManager em;@Value("#{jobParameters[updateFrequency]}")String updateFrequency;@Overridepublic User process(User item) throws Exception {String sqlInnerJoinEpisodes = "select e from User u JOIN u.podcasts p JOIN p.episodes e WHERE u.email=?1 AND p.updateFrequency=?2 AND"+ " e.isNew IS NOT NULL AND e.availability=200 ORDER BY e.podcast.podcastId ASC, e.publicationDate ASC";TypedQuery<Episode> queryInnerJoinepisodes = em.createQuery(sqlInnerJoinEpisodes, Episode.class);queryInnerJoinepisodes.setParameter(1, item.getEmail());queryInnerJoinepisodes.setParameter(2, UpdateFrequency.valueOf(updateFrequency)); List<Episode> newEpisodes = queryInnerJoinepisodes.getResultList();return regroupPodcastsWithEpisodes(item, newEpisodes);}....... } 注意:
如果您想了解更多有關如何使用Apache Http Client,獲取etag和last-modified標頭的信息,可以看一下我的文章– 如何使用新的Apache Http Client進行HEAD請求
6.執行批處理應用程序
批處理可以嵌入到Web應用程序和WAR文件中,但是在一開始我選擇了一種創建獨立應用程序的簡單方法,該方法可以通過Java main()方法啟動:
批處理Java main()方法
package org.podcastpedia.batch; //imports ...;@ComponentScan @EnableAutoConfiguration public class Application {private static final String NEW_EPISODES_NOTIFICATION_JOB = "newEpisodesNotificationJob";private static final String ADD_NEW_PODCAST_JOB = "addNewPodcastJob";public static void main(String[] args) throws BeansException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException, JobParametersInvalidException, InterruptedException {Log log = LogFactory.getLog(Application.class);SpringApplication app = new SpringApplication(Application.class);app.setWebEnvironment(false);ConfigurableApplicationContext ctx= app.run(args);JobLauncher jobLauncher = ctx.getBean(JobLauncher.class);if(ADD_NEW_PODCAST_JOB.equals(args[0])){//addNewPodcastJobJob addNewPodcastJob = ctx.getBean(ADD_NEW_PODCAST_JOB, Job.class);JobParameters jobParameters = new JobParametersBuilder().addDate("date", new Date()).toJobParameters(); JobExecution jobExecution = jobLauncher.run(addNewPodcastJob, jobParameters);BatchStatus batchStatus = jobExecution.getStatus();while(batchStatus.isRunning()){log.info("*********** Still running.... **************");Thread.sleep(1000);}log.info(String.format("*********** Exit status: %s", jobExecution.getExitStatus().getExitCode()));JobInstance jobInstance = jobExecution.getJobInstance();log.info(String.format("********* Name of the job %s", jobInstance.getJobName()));log.info(String.format("*********** job instance Id: %d", jobInstance.getId()));System.exit(0);} else if(NEW_EPISODES_NOTIFICATION_JOB.equals(args[0])){JobParameters jobParameters = new JobParametersBuilder().addDate("date", new Date()).addString("updateFrequency", args[1]).toJobParameters(); jobLauncher.run(ctx.getBean(NEW_EPISODES_NOTIFICATION_JOB, Job.class), jobParameters); } else {throw new IllegalArgumentException("Please provide a valid Job name as first application parameter");}System.exit(0);}}從源頭獲得的有關SpringApplication -, @ComponentScan @EnableAutoConfiguration和@EnableAutoConfiguration的最佳解釋-入門-創建批處理服務:
“ main()方法SpringApplication helper類,將Application.class作為其run()方法的參數提供。 這告訴Spring從Application讀取注釋元數據,并將其作為Spring應用程序上下文中的組件進行管理。
@ComponentScan批注告訴Spring通過org.podcastpedia.batch包及其子級進行遞歸搜索,以查找直接或間接用Spring的@Component批注標記的@Component 。 該指令確保Spring查找并注冊BatchConfiguration ,因為它被標記為@Configuration ,而@Configuration則是一種@Component注釋。
@EnableAutoConfiguration批注根據您的類路徑的內容打開合理的默認行為。 例如,它將查找實現CommandLineRunner接口并調用其run()方法的任何類。”
執行構建步驟:
- JobLauncher是用于控制作業的簡單界面,是從ApplicationContext中檢索的。 請記住,這是通過@EnableBatchProcessing注釋自動提供的。
- 現在基于應用程序的第一個參數( args[0] ),我將從ApplicationContext檢索相應的Job
- 然后準備JobParameters ,在這里使用當前日期– .addDate("date", new Date()) ,以便作業執行始終是唯一的。
- 一旦一切就緒,就可以執行作業: JobExecution jobExecution = jobLauncher.run(addNewPodcastJob, jobParameters);
- 您可以使用返回的jobExecution來訪問BatchStatus ,退出代碼或作業名稱和ID。
注意:我強烈建議您閱讀和理解Spring Batch的元數據架構 。 它還將幫助您更好地了解Spring Batch Domain對象。
在開發和生產環境中運行應用程序
為了能夠在不同的環境上運行Spring Batch / Spring Boot應用程序,我使用了Spring Profiles功能。 默認情況下,應用程序使用開發數據(數據庫)運行。 但是,如果我想讓工作使用生產數據庫,則必須執行以下操作:
- 提供以下環境參數-Dspring.profiles.active=prod
- 在默認的application.properties文件旁邊,在類路徑的application-prod.properties文件中配置了生產數據庫屬性
摘要
在本教程中,我們學習了如何使用Spring Boot和Java配置來配置Spring Batch項目,如何在批處理中使用一些最普通的閱讀器,如何配置一些簡單的作業,以及如何從A程序啟動Spring Batch作業。主要方法。
翻譯自: https://www.javacodegeeks.com/2014/09/spring-batch-tutorial-with-spring-boot-and-java-configuration.html
總結
以上是生活随笔為你收集整理的具有Spring Boot和Java配置的Spring Batch教程的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 什么是ddos大数据测试(什么是ddos
- 下一篇: Java转换难题者,不适合工作(或面试)