日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程语言 > java >内容正文

java

玩转java并发工具_玩Java并发

發(fā)布時(shí)間:2023/12/3 java 34 豆豆
生活随笔 收集整理的這篇文章主要介紹了 玩转java并发工具_玩Java并发 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

玩轉(zhuǎn)java并發(fā)工具

最近,我需要將一些文件(每個(gè)文件都有JSON格式的對(duì)象列表(數(shù)組))轉(zhuǎn)換為每個(gè)文件都具有相同數(shù)據(jù)(對(duì)象)的分隔行的文件。 這是一次性的任務(wù),很簡(jiǎn)單。 我使用Java nio的某些功能進(jìn)行了讀寫(xiě)。 我以最簡(jiǎn)單的方式使用了GSON。 一個(gè)線程在文件上運(yùn)行,轉(zhuǎn)換和寫(xiě)入。 整個(gè)操作在幾秒鐘內(nèi)完成。 但是,我想并發(fā)一點(diǎn)。 因此,我增強(qiáng)了可同時(shí)工作的工具。



線程數(shù)

可運(yùn)行以讀取文件。

閱讀器線程將提交到ExecutorService。 輸出是對(duì)象列表(示例中為User),將被放入BlockingQueue。

可運(yùn)行以寫(xiě)入文件。

每個(gè)可運(yùn)行對(duì)象將從阻塞隊(duì)列中輪詢。 它將數(shù)據(jù)行寫(xiě)入文件。 我沒(méi)有將編寫(xiě)器Runnable添加到ExecutorService,而是僅使用它啟動(dòng)了一個(gè)線程。 Runnable具有while(some boolen is true) {...}模式。 下面的更多內(nèi)容...

同步一切

BlockingQueue是這兩種線程的接口。 由于writer的runnable在while循環(huán)(消費(fèi)者)中運(yùn)行,我希望能夠使其停止,以便該工具終止。 因此,我為此使用了兩個(gè)對(duì)象:

信號(hào)

讀取輸入文件的循環(huán)會(huì)增加一個(gè)計(jì)數(shù)器。 完成遍歷輸入文件并提交編寫(xiě)器后,我在主線程中初始化了一個(gè)信號(hào)燈: semaphore.acquire(numberOfFiles);

在每個(gè)可運(yùn)行的閱讀器中,我釋放了信號(hào)量: semaphore.release();

原子布爾

作者的while循環(huán)使用AtomicBoolean。 只要AtomicBoolean == true,編寫(xiě)器將繼續(xù)。 在主線程中,在獲取信號(hào)量之后,我將AtomicBoolean設(shè)置為false。 這使編寫(xiě)器線程可以終止。

使用Java NIO

為了掃描,讀取和寫(xiě)入文件系統(tǒng),我使用了Java NIO的某些功能。

掃描: Files.newDirectoryStream(inputFilesDirectory, "*.json");
開(kāi)始之前刪除輸出目錄: Files.walkFileTree...
BufferedReader和BufferedWriter: Files.newBufferedReader(filePath); Files.newBufferedWriter(fileOutputPath, Charset.defaultCharset());

一注。 為了生成此示例的隨機(jī)文件,我使用了apache commons lang: RandomStringUtils.randomAlphabetic
GitHub中的所有代碼。

public class JsonArrayToJsonLines {private final static Path inputFilesDirectory = Paths.get("src\\main\\resources\\files");private final static Path outputDirectory = Paths.get("src\\main\\resources\\files\\output");private final static Gson gson = new Gson();private final BlockingQueue<EntitiesData> entitiesQueue = new LinkedBlockingQueue<>();private AtomicBoolean stillWorking = new AtomicBoolean(true);private Semaphore semaphore = new Semaphore(0);int numberOfFiles = 0;private JsonArrayToJsonLines() {}public static void main(String[] args) throws IOException, InterruptedException {new JsonArrayToJsonLines().process();}private void process() throws IOException, InterruptedException {deleteFilesInOutputDir();final ExecutorService executorService = createExecutorService();DirectoryStream<Path> directoryStream = Files.newDirectoryStream(inputFilesDirectory, "*.json");for (int i = 0; i < 2; i++) {new Thread(new JsonElementsFileWriter(stillWorking, semaphore, entitiesQueue)).start();}directoryStream.forEach(new Consumer<Path>() {@Overridepublic void accept(Path filePath) {numberOfFiles++;executorService.submit(new OriginalFileReader(filePath, entitiesQueue));}});semaphore.acquire(numberOfFiles);stillWorking.set(false);shutDownExecutor(executorService);}private void deleteFilesInOutputDir() throws IOException {Files.walkFileTree(outputDirectory, new SimpleFileVisitor<Path>() {@Overridepublic FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {Files.delete(file);return FileVisitResult.CONTINUE;}});}private ExecutorService createExecutorService() {int numberOfCpus = Runtime.getRuntime().availableProcessors();return Executors.newFixedThreadPool(numberOfCpus);}private void shutDownExecutor(final ExecutorService executorService) {executorService.shutdown();try {if (!executorService.awaitTermination(120, TimeUnit.SECONDS)) {executorService.shutdownNow();}if (!executorService.awaitTermination(120, TimeUnit.SECONDS)) {}} catch (InterruptedException ex) {executorService.shutdownNow();Thread.currentThread().interrupt();}}private static final class OriginalFileReader implements Runnable {private final Path filePath;private final BlockingQueue<EntitiesData> entitiesQueue;private OriginalFileReader(Path filePath, BlockingQueue<EntitiesData> entitiesQueue) {this.filePath = filePath;this.entitiesQueue = entitiesQueue;}@Overridepublic void run() {Path fileName = filePath.getFileName();try {BufferedReader br = Files.newBufferedReader(filePath);User[] entities = gson.fromJson(br, User[].class);System.out.println("---> " + fileName);entitiesQueue.put(new EntitiesData(fileName.toString(), entities));} catch (IOException | InterruptedException e) {throw new RuntimeException(filePath.toString(), e);}}}private static final class JsonElementsFileWriter implements Runnable {private final BlockingQueue<EntitiesData> entitiesQueue;private final AtomicBoolean stillWorking;private final Semaphore semaphore;private JsonElementsFileWriter(AtomicBoolean stillWorking, Semaphore semaphore,BlockingQueue<EntitiesData> entitiesQueue) {this.stillWorking = stillWorking;this.semaphore = semaphore;this.entitiesQueue = entitiesQueue;}@Overridepublic void run() {while (stillWorking.get()) {try {EntitiesData data = entitiesQueue.poll(100, TimeUnit.MILLISECONDS);if (data != null) {try {String fileOutput = outputDirectory.toString() + File.separator + data.fileName;Path fileOutputPath = Paths.get(fileOutput);BufferedWriter writer = Files.newBufferedWriter(fileOutputPath, Charset.defaultCharset());for (User user : data.entities) {writer.append(gson.toJson(user));writer.newLine();}writer.flush();System.out.println("=======================================>>>>> " + data.fileName);} catch (IOException e) {throw new RuntimeException(data.fileName, e);} finally {semaphore.release();}}} catch (InterruptedException e1) {}}}}private static final class EntitiesData {private final String fileName;private final User[] entities;private EntitiesData(String fileName, User[] entities) {this.fileName = fileName;this.entities = entities;}} }

翻譯自: https://www.javacodegeeks.com/2014/12/playing-with-java-concurrency.html

玩轉(zhuǎn)java并發(fā)工具

總結(jié)

以上是生活随笔為你收集整理的玩转java并发工具_玩Java并发的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。