Java EE 7批处理和魔兽世界–第1部分
這是我在上一個JavaOne上的會議之一。 這篇文章將擴展主題并使用Batch JSR-352 API進入一個實際的應用程序。 此應用程序與MMORPG 魔獸世界集成。
由于JSR-352是Java EE世界中的新規范,所以我認為許多人不知道如何正確使用它。 確定本規范適用的用例也可能是一個挑戰。 希望該示例可以幫助您更好地理解用例。
抽象
《魔獸世界》是一款全球超過800萬玩家玩的游戲。 該服務按地區提供:美國(US) ,歐洲(EU) ,中國和韓國。 每個區域都有一組稱為Realm的服務器,您可以使用這些服務器進行連接以玩游戲。 對于此示例,我們僅研究美國和歐盟地區。
該游戲最有趣的功能之一是允許您使用拍賣行買賣稱為“ 物品”的游戲內商品。 每個領域都有兩個拍賣行 。 平均每個領域交易約70.000 項 。 讓我們計算一些數字:
- 512 境界 ( 美國和歐盟 )
- 每個領域 7萬個 物品
- 整個商品超過3500萬
數據
《魔獸世界》的另一個有趣之處在于,開發人員提供了REST API來訪問大多數游戲內信息,包括拍賣行的數據。 在此處檢查完整的API。
拍賣行的數據分兩步獲得。 首先,我們需要查詢對應的Auction House Realm REST端點,以獲取對JSON文件的引用。 接下來,我們需要訪問該URL并下載包含所有拍賣行 物品信息的文件。 這是一個例子:
http://eu.battle.net/api/wow/auction/data/aggra-portugues
應用程序
我們的目標是建立一個下載拍賣行的應用程序,對其進行處理并提取指標。 這些指標將建立商品價格隨時間變化的歷史記錄。 誰知道? 也許借助這些信息,我們可以預測價格波動并在最佳時間購買或出售商品 。
設置
對于設置,我們將在Java EE 7中使用一些其他功能:
- Java EE 7
- 角JS
- 角度ng-grid
- UI引導程序
- 谷歌圖表
- 野蠅
職位
批處理JSR-352作業將執行主要工作。 作業是封裝整個批處理過程的實體。 作業將通過作業規范語言連接在一起。 使用JSR-352 ,作業只是這些步驟的容器。 它組合了邏輯上屬于流程的多個步驟。
我們將把業務登錄分為三個工作:
- 準備 –創建所需的所有支持數據。 列出領域 ,創建文件夾以復制文件。
- 文件 –查詢領域以檢查是否有新文件要處理。
- 處理 –下載文件,處理數據,提取指標。
編碼
后端–具有Java 8的Java EE 7
大多數代碼將在后端。 我們需要Batch JSR-352 ,但我們還將使用Java EE的許多其他技術:例如JPA , JAX-RS , CDI和JSON-P 。
由于“ 準備工作”僅用于初始化應用程序資源以進行處理,因此我將跳過它,而深入到最有趣的部分。
文件作業
文件作業是AbstractBatchlet的實現。 批處理是批處理規范中可用的最簡單的處理樣式。 這是一個面向任務的步驟,其中任務被調用一次,執行并返回退出狀態。 對于執行各種非面向項目的任務,例如執行命令或執行文件傳輸,此類型最有用。 在這種情況下,我們的Batchlet將在每個Realm上對每個域發出REST請求,以進行迭代,并使用包含要處理的數據的文件檢索URL。 這是代碼:
LoadAuctionFilesBatchlet
@Named public class LoadAuctionFilesBatchlet extends AbstractBatchlet {@Injectprivate WoWBusiness woWBusiness;@Inject@BatchProperty(name = "region")private String region;@Inject@BatchProperty(name = "target")private String target;@Overridepublic String process() throws Exception {List<Realm> realmsByRegion = woWBusiness.findRealmsByRegion(Realm.Region.valueOf(region));realmsByRegion.parallelStream().forEach(this::getRealmAuctionFileInformation);return "COMPLETED";}void getRealmAuctionFileInformation(Realm realm) {try {Client client = ClientBuilder.newClient();Files files = client.target(target + realm.getSlug()).request(MediaType.TEXT_PLAIN).async().get(Files.class).get(2, TimeUnit.SECONDS);files.getFiles().forEach(auctionFile -> createAuctionFile(realm, auctionFile));} catch (Exception e) {getLogger(this.getClass().getName()).log(Level.INFO, "Could not get files for " + realm.getRealmDetail());}}void createAuctionFile(Realm realm, AuctionFile auctionFile) {auctionFile.setRealm(realm);auctionFile.setFileName("auctions." + auctionFile.getLastModified() + ".json");auctionFile.setFileStatus(FileStatus.LOADED);if (!woWBusiness.checkIfAuctionFileExists(auctionFile)) {woWBusiness.createAuctionFile(auctionFile);}} }關于此的一個很酷的事情是Java 8的使用parallelStream()一次調用多個REST請求很容易! 您真的可以注意到其中的區別。 如果您想嘗試一下,只需運行示例,然后用stream()替換parallelStream() stream()并檢出即可。 在我的機器上,使用parallelStream()可使任務執行速度提高約5或6倍。
更新資料
通常,我不會使用這種方法。 我這樣做了,因為部分邏輯涉及調用慢速的REST請求,而parallelStreams確實在這里閃耀。 可以使用批處理分區執行此操作,但是很難實現。 我們還需要每次都在服務器池中收集新數據,因此,如果跳過一個或兩個文件,這并不可怕。 請記住,如果您不想錯過任何一條記錄,塊處理樣式將更適合。 感謝Simon Simonelli引起我的注意。
由于美國和歐盟的領域要求調用不同的REST端點,因此它們非常適合分區。 分區意味著該任務將運行到多個線程中。 每個分區一個線程。 在這種情況下,我們有兩個分區。
要完成作業定義,我們需要提供一個JoB XML文件。 這需要放置在META-INF/batch-jobs目錄中。 這是此作業的files-job.xml :
files-job.xml
<job id="loadRealmAuctionFileJob" xmlns="http://xmlns.jcp.org/xml/ns/javaee" version="1.0"><step id="loadRealmAuctionFileStep"><batchlet ref="loadAuctionFilesBatchlet"><properties><property name="region" value="#{partitionPlan['region']}"/><property name="target" value="#{partitionPlan['target']}"/></properties></batchlet><partition><plan partitions="2"><properties partition="0"><property name="region" value="US"/><property name="target" value="http://us.battle.net/api/wow/auction/data/"/></properties><properties partition="1"><property name="region" value="EU"/><property name="target" value="http://eu.battle.net/api/wow/auction/data/"/></properties></plan></partition></step> </job>在files-job.xml我們需要定義我們Batchlet在batchlet元素。 對于分區,只需定義partition元素并為每個plan分配不同的properties 。 然后,可以使用這些properties使用表達式#{partitionPlan['region']}和#{partitionPlan['target']}將值后期綁定到LoadAuctionFilesBatchlet 。 這是一種非常簡單的表達式綁定機制,僅適用于簡單的屬性和字符串。
處理作業
現在,我們要處理領域拍賣數據文件。 使用上一份工作中的信息,我們現在可以下載文件并對數據進行某些處理。 JSON文件具有以下結構:
item-auctions-sample.json
{"realm": {"name": "Grim Batol","slug": "grim-batol"},"alliance": {"auctions": [{"auc": 279573567, // Auction Id"item": 22792, // Item for sale Id"owner": "Miljanko", // Seller Name"ownerRealm": "GrimBatol", // Realm"bid": 3800000, // Bid Value"buyout": 4000000, // Buyout Value"quantity": 20, // Numbers of items in the Auction"timeLeft": "LONG", // Time left for the Auction"rand": 0,"seed": 1069994368},{"auc": 278907544,"item": 40195,"owner": "Mongobank","ownerRealm": "GrimBatol","bid": 38000,"buyout": 40000,"quantity": 1,"timeLeft": "VERY_LONG","rand": 0,"seed": 1978036736}]},"horde": {"auctions": [{"auc": 278268046,"item": 4306,"owner": "Thuglifer","ownerRealm": "GrimBatol","bid": 570000,"buyout": 600000,"quantity": 20,"timeLeft": "VERY_LONG","rand": 0,"seed": 1757531904},{"auc": 278698948,"item": 4340,"owner": "Celticpala","ownerRealm": "Aggra(Português)","bid": 1000000,"buyout": 1000000,"quantity": 10,"timeLeft": "LONG","rand": 0,"seed": 0}]} }該文件包含從其下載的領域的拍賣列表。 在每個記錄中,我們可以檢查待售物品,價格,賣方和拍賣結束前的剩余時間。 拍賣的算法按拍賣行類型進行匯總: Alliance和Horde 。
對于process-job我們要讀取JSON文件,轉換數據并將其保存到數據庫。 這可以通過塊處理來實現。 塊是一種ETL(提取–轉換–加載)樣式的處理,適合處理大量數據。 塊一次讀取一個數據,并在事務內創建要寫出的塊。 從ItemReader讀入一項,交給ItemProcessor并進行聚合。 一旦讀取的項目數等于提交間隔,就通過ItemWriter寫入整個塊,然后提交事務。
ItemReader
實際文件太大,以致無法將它們完全加載到內存中,否則可能會耗盡它。 相反,我們使用JSON-P API以流方式解析數據。
AuctionDataItemReader
@Named public class AuctionDataItemReader extends AbstractAuctionFileProcess implements ItemReader {private JsonParser parser;private AuctionHouse auctionHouse;@Injectprivate JobContext jobContext;@Injectprivate WoWBusiness woWBusiness;@Overridepublic void open(Serializable checkpoint) throws Exception {setParser(Json.createParser(openInputStream(getContext().getFileToProcess(FolderType.FI_TMP))));AuctionFile fileToProcess = getContext().getFileToProcess();fileToProcess.setFileStatus(FileStatus.PROCESSING);woWBusiness.updateAuctionFile(fileToProcess);}@Overridepublic void close() throws Exception {AuctionFile fileToProcess = getContext().getFileToProcess();fileToProcess.setFileStatus(FileStatus.PROCESSED);woWBusiness.updateAuctionFile(fileToProcess);}@Overridepublic Object readItem() throws Exception {while (parser.hasNext()) {JsonParser.Event event = parser.next();Auction auction = new Auction();switch (event) {case KEY_NAME:updateAuctionHouseIfNeeded(auction);if (readAuctionItem(auction)) {return auction;}break;}}return null;}@Overridepublic Serializable checkpointInfo() throws Exception {return null;}protected void updateAuctionHouseIfNeeded(Auction auction) {if (parser.getString().equalsIgnoreCase(AuctionHouse.ALLIANCE.toString())) {auctionHouse = AuctionHouse.ALLIANCE;} else if (parser.getString().equalsIgnoreCase(AuctionHouse.HORDE.toString())) {auctionHouse = AuctionHouse.HORDE;} else if (parser.getString().equalsIgnoreCase(AuctionHouse.NEUTRAL.toString())) {auctionHouse = AuctionHouse.NEUTRAL;}auction.setAuctionHouse(auctionHouse);}protected boolean readAuctionItem(Auction auction) {if (parser.getString().equalsIgnoreCase("auc")) {parser.next();auction.setAuctionId(parser.getLong());parser.next();parser.next();auction.setItemId(parser.getInt());parser.next();parser.next();parser.next();parser.next();auction.setOwnerRealm(parser.getString());parser.next();parser.next();auction.setBid(parser.getInt());parser.next();parser.next();auction.setBuyout(parser.getInt());parser.next();parser.next();auction.setQuantity(parser.getInt());return true;}return false;}public void setParser(JsonParser parser) {this.parser = parser;} }要打開JSON Parse流,我們需要Json.createParser并傳遞輸入流的引用。 要讀取元素,我們只需要調用hasNext()和next()方法。 這將返回一個JsonParser.Event ,它使我們能夠檢查解析器在流中的位置。 從Batch API ItemReader的readItem()方法中讀取并返回元素。 當沒有更多元素可讀取時,返回null以完成處理。 注意,我們還實現了從ItemReader open和close的方法。 這些用于初始化和清理資源。 它們只執行一次。
ItemProcessor
ItemProcessor是可選的。 它用于轉換讀取的數據。 在這種情況下,我們需要向競價添加其他信息。
AuctionDataItemProcessor
@Named public class AuctionDataItemProcessor extends AbstractAuctionFileProcess implements ItemProcessor {@Overridepublic Object processItem(Object item) throws Exception {Auction auction = (Auction) item;auction.setRealm(getContext().getRealm());auction.setAuctionFile(getContext().getFileToProcess());return auction;} }ItemWriter
最后,我們只需要將數據寫到數據庫中即可:
AuctionDataItemWriter
@Named public class AuctionDataItemWriter extends AbstractItemWriter {@PersistenceContextprotected EntityManager em;@Overridepublic void writeItems(List<Object> items) throws Exception {items.forEach(em::persist);} }在我的計算機上,具有70 k記錄文件的整個過程大約需要20秒。 我確實注意到了一些非常有趣的事情。 在編寫此代碼之前,我使用的是注入的EJB,它通過persist操作來調用方法。 這總共花費了30秒,因此注入EntityManager并執行持久操作可以直接為我節省三分之一的處理時間。 我只能推測該延遲是由于堆棧調用的增加而造成的,其中EJB攔截器位于中間。 這是在Wildfly中發生的。 我將對此進行進一步調查。
要定義塊,我們需要將其添加到process-job.xml文件中:
process-job.xml
<step id="processFile" next="moveFileToProcessed"><chunk item-count="100"><reader ref="auctionDataItemReader"/><processor ref="auctionDataItemProcessor"/><writer ref="auctionDataItemWriter"/></chunk> </step>在item-count屬性中,我們定義每個處理塊中可以容納多少個元素。 這意味著每100個事務就會提交一次。 這對于保持較小的事務大小和檢查數據很有用。 如果我們需要停止然后重新開始操作,我們可以這樣做而不必再次處理每個項目。 我們必須自己編寫邏輯代碼。 該示例中不包括此功能,但以后會做。
跑步
要運行作業,我們需要獲得JobOperator的引用。 JobOperator提供了一個界面來管理作業處理的各個方面,包括操作命令(例如開始,重新啟動和停止),以及與作業存儲庫相關的命令,例如檢索作業和步驟執行。
要運行先前的files-job.xml Job,我們執行:
執行工作
JobOperator jobOperator = BatchRuntime.getJobOperator(); jobOperator.start("files-job", new Properties());請注意,我們使用Job xml文件的名稱,而沒有擴展名到JobOperator 。
下一步
我們仍然需要匯總數據以提取指標并將其顯示在網頁中。 這篇文章已經很長了,因此我將在以后的文章中介紹以下步驟。 無論如何,該部分的代碼已經在Github存儲庫中。 檢查資源部分。
資源資源
您可以從我的github存儲庫中克隆完整的工作副本,然后將其部署到Wildfly。 您可以在此處找到說明進行部署。
翻譯自: https://www.javacodegeeks.com/2014/10/java-ee-7-batch-processing-and-world-of-warcraft-part-1.html
總結
以上是生活随笔為你收集整理的Java EE 7批处理和魔兽世界–第1部分的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 城市道路路内停车泊位设置规范(城市道路路
- 下一篇: Java开发人员应该知道的5种错误跟踪工