多线程导出excel高并发_大牛带你深入java多线程与高并发:JMH与Disruptor,确定能学会?...
前言
今天我們講兩個內容,第一個是JMH,第二個是Disruptor。這兩個內容是給大家做更進一步的這種多線程和高并發的一些專業上的處理。生產環境之中我們很可能不自己定義消息隊列,而是使用
Disruptor。我們生產環境做測試的時候也不是像我說的那樣寫一個start寫一個end就測試完了。在這里給大家先介紹專業的JMH測試工具,在給大家介紹Disruptor號稱最快的消息隊列。
JMH -java Microbenchmark Harness
微基準測試,它是測的某一個方法的性能到底是好或者不好,換了方法的實現之后他的性能到底好還是不好。
這個測試的框架是2013年發出來的,由JLT的開發人員開發,后來歸到了OpenJDK下面。
官網:http://openjdk.java.net/projects/code-tools/jmh/
下面我們來介紹什么是一個JMH,他是用來干什么的,我們來看到底怎么使用,給大家一個簡單的介紹肯定是了解不了jmh是個什么東西,已經把這個步驟給大家總結一篇文檔,官網在哪里,怎么樣去創建一個JMH的測試,創建一共大致有七個步驟,還有他的一些基本概念,什么叫預熱,什么叫Mesurement等等的,還有進一步了解的官方地址。
JMH Java準測試工具套件
什么是JMH
官網:http://openjdk.java.net/projects/code-tools/jmh/
創建JMH測試
1. 創建Maven項目,添加依賴,我們需要添加兩個依賴:
1.1:jmh-core (jmh的核心)
1.2:jmh-generator-annprocess(注解處理包)
<?xml version="1.0" encoding="UTF-8"?>4.0.0UTF-8UTF-81.81.81.8mashibing.comHelloJMH21.0-SNAPSHOTorg.openjdk.jmhjmh-core1.21org.openjdk.jmhjmh-generator-annprocess1.21test2. idea安裝JMH插件 JMH plugin v1.0.3
JMH這個東西你要想真正的安安靜靜的去運行,就不會去影響我們正常程序的執行,最好的方式就是按照官網的說法是命令行的方式,比方說你要測試某一個包里面的類的話你應該把這個類和其他的依賴類打成一個jar包,然后單獨的把這個jar包放到某一個機器上,在這個機器上對這個jar包進行微基準的測試,如果對它進行測試的比較好,那說明最后的結果還可以,如果說邊開發邊進行這種微基準的測試實際上他非常的不準,因為你的開發環境會對結果產生影響。
只不過我們自己開發人員來說平時你要想進行一些微基準的測試的話,你要是每次打個包來進行正規一個從頭到尾的測試 ,完了之后發現問題不對再去重新改,效率太低了。所以在這里教大家的是怎么樣在IDE里面來進行微基準的測試。idea安裝JMH插件:fifile->Settings->Plugins->JMH-plugin。它運行的時候需要這個plugin的支持,如果你用命令行是不需要這些東西的。
3. 由于用到了注解,打開運行程序注解配置
因為JMH在運行的時候他用到了注解,注解這個東西你自己得寫一個程序得解釋他,所以你要把這個給設置上允許JMH能夠對注解進行處理:
compiler -> Annotation Processors -> Enable Annotation Processing
4. 定義需要測試類PS (ParallelStream)看這里,寫了一個類,并行處理流的一個程序,定義了一個list集合,然后往這個集合里扔了1000個數。寫了一個方法來判斷這個數到底是不是一個質數。寫了兩個方法,第一個是用forEach來判斷我們這1000個數里到底有誰是質數;第二個是使用了并行處理流,這個forEach的方法就只有單線程里面執行,挨著牌從頭拿到尾,從0拿到1000,但是并行處理的時候會有多個線程采用ForkJoin的方式來把里面的數分成好幾份并行的進行處理。一種是串行處理,一種是并行處理,都可以對他們進行測試,但需要注意這個基準測試并不是對比測試的,你只是測試一下你這方法寫出這樣的情況下他的吞吐量到底是多少,這是一個非常專業的測試的工具。嚴格的來講這部分是測試開發專業的。
package com.mashibing.jmh;import java.util.ArrayList;import java.util.List;import java.util.Random;public class PS {static List nums = new ArrayList<>();static {Random r = new Random();for (int i = 0; i < 10000; i++) nums.add(1000000 +r.nextInt(1000000));}static void foreach() {nums.forEach(v->isPrime(v));}static void parallel() {nums.parallelStream().forEach(PS::isPrime);}static boolean isPrime(int num) {for(int i=2; i<=num/2; i++) {if(num % i == 0) return false;}return true;}}5. 寫單元測試
這個測試類一定要在test package下面
我對這個方法進行測試testForEach,很簡單我就調用PS這個類的foreach就行了,對它測試最關鍵的是我加了這個注解@Benchmark,這個是JMH的注解,是要被JMH來解析處理的,這也是我們為么要把那個Annotation Processing給設置上的原因,非常簡單,你只要加上注解就可以對這個方法進行微基準測試了,點擊右鍵直接run。
package com.mashibing.jmh;import org.openjdk.jmh.annotations.Benchmark;import static org.junit.jupiter.api.Assertions.*;public class PSTest {@Benchmark@Warmup(iteration=1, time=3)//在專業測試里面首先要進行預熱,預熱多少次,預熱多少時間@Fork(5)//意思是用多少個線程去執行我們的程序@BenchmarkMode(Mode.Throughput)//是對基準測試的一個模式,這個模式用的最多的是Throughput吞吐量@Measurement(iteration=1, time=3)//是整個測試要測試多少遍,調用這個方法要調用多少次public void testForEach() {PS.foreach();}}6. 運行測試類,如果遇到下面的錯誤:
ERROR: org.openjdk.jmh.runner.RunnerException: ERROR: Exception while trying
to acquire the JMH lock (C:WINDOWS/jmh.lock): C:WINDOWSjmh.lock (拒絕訪問。), exiting. Use -Djmh.ignoreLock=true to forcefully continue.
at org.openjdk.jmh.runner.Runner.run(Runner.java:216)
at org.openjdk.jmh.Main.main(Main.java:71)
這個錯誤是因為JMH運行需要訪問系統的TMP目錄,解決辦法是:
打開RunConfifiguration -> Environment Variables -> include system environment viables
7. 閱讀測試報告
JMH中的基本概念
1. Warmup
預熱,由于JVM中對于特定代碼會存在優化(本地化),預熱對于測試結果很重要
2. Mesurement
總共執行多少次測試
3. Timeout
4. Threads
線程數,由fork指定
5. Benchmark mode
基準測試的模式
6. Benchmark
測試哪一段代碼
next
做個是JMH的一個入門,嚴格來講這個和我們的關系其實并不大,這個是測試部門干的事兒,但是你了解一下沒有特別多的壞處,你也知道你的方法最后效率高或者底,可以通過一個簡單的JMH插件來幫你完成,你不要在手動的去寫這件事兒了。
如果說大家對JMH有興趣,你們在工作中可能會有用的上大家去讀一下官方的例子,官方大概有好幾十個例子程序,你可以自己一個一個的去研究。
官方樣例:
http://hg.openjdk.java.net/code-tools/jmh/fifile/tip/jmh-samples/src/main/java/org/openjdk/jmh/s
amples/
Disruptor按照英文翻譯的話,Disruptor應該是分裂、瓦解。這個Disruptor是一個做金融的、做股票的這樣一個公司交易所來開發的,為自己來開發的這么一個底層的框架,開發出來之后受到了很多的認可,開源之后,2011年獲得Duke獎。如果你想把它用作MQ的話,單機最快的MQ。性能非常的高,主要是它里面用的全都是cas,另外把各種各樣的性能開發到了極致,所以他單機支持很高的一個并發。
Disruptor不是平時我們學的這個redis、不是平時我們所學的kafka,他可以跟他們一樣有類似的用途,但他是單機,redis、kafka也可以用于集群。redis他有這種序列化的機制,就是你可以把它存儲到硬盤上或數據庫當中是可以的,kafka當然也有,Disruptor沒有,Disruptor就是在內存里,Disruptor簡單理解就是內存里用于存放元素的一個高效率的隊列。
介紹
關于Disruptor的一些資料,給大家列在這里。
主頁:http://imax-exchange.github.io/disruptor/
源碼:https://github.com/LMAX-Exchange/disruptor
GettingStarted:https://github.com/LMAX-Exchange/disruptor/wiki/Getting-Started
api:http://imax-exchange.github.io/disruptor/docs/index.html
maven:https://mvnrepository.com/artifact/com.imax/disruptor
Disruptor叫無鎖、高并發、環形Buffffer,直接覆蓋(不用清除)舊的數據,降低GC頻率,用于生產者消費者模式(如果說按照設計者角度來講他就是觀察者模式)。什么叫觀察者模式,想象一下,我們在前面學各種各樣的隊列的時候,隊列就是個容器,好多生產者往里頭扔東西,好多消費者從里頭往外拿東西。所謂的生產者消費者就是這個意思,為什么我們可以叫他觀察者呢,因為這些消費者正在觀察著里面有沒有新東西,如果有的話我馬上拿過來消費,所以他也是一種觀察者模式。Disruptor實現的就是這個容器
Disruptor核心與特點
Disruptor也是一個隊列,和其他隊列不一樣的是他是一個環形隊列,環形的Buffffer。一般情況下我們的容器是一個隊列,不管你是用鏈表實現還是用數組實現的,它會是一個隊列,那么這個隊列生產者這邊使勁往里塞,消費者這邊使勁往外拿,但Disruptor的核心是一個環形的buffffer。
對比ConcurrentLinkedQueue:鏈表實現這種環形的buffffer速度就是更快,同學們可以去查一下JDK自帶的容器,你會發現效率比較高的有各種各樣的隊列,如果不想阻塞就可以用Concurrent相關的,ConcurrentLinkedQueue是并發的用鏈表實現的隊列,它里面大量的使用了cas,因此它的效率相對比較高,可是對于遍歷來講鏈表的效率一定會比數組低。
JDK中沒有ConcurrentArrayQueue
因為數組的大小的固定的,如果想擴展的話就要把原來的數組拷貝到新數組里,每次加都要拷貝這個效率相當底,所以他并沒有給大家加這個叫ConcurrentArrayQueue,但是Disruptor就非常牛X,想到了這樣一個辦法,就是把數組的頭尾相連。
Disruptor是用數組實現的這樣的一個隊列,你可以認為Disruptor就是用數組實現的ConcurrentArrayQueue,另外這個Queue是首尾相連的.
那Disruptor用數組實現的環形的就比上面兩個都牛嗎,牛在哪?為啥呢?如果我們用ConcurrentLinkedQueue這里面就是一個一個鏈表,這個鏈表遍歷起來肯定沒有數組快,這個是一點。還有第二點就是這個鏈表要維護一個頭指針和一個尾指針,我往頭部加的時候要加鎖,往尾部拿的時候也要加鎖。另外鏈表本身效率就偏低,還要維護兩個指針。關于環形的呢,環形本身就維護一個位置,這個位置稱之為sequence序列,這個序列代表的是我下一個有效的元素指在什么位置上,就相當于他只有一個指針來回轉。加在某個位置上怎么計算:直接用那個數除以我們整個的容量求余就可以了。
RingBuffffer是一個環形隊列
RingBuffffer的序號,指向下一個可用的元素
采用數組實現,沒有首尾指針
對比ConcurrentLinkedQueue,用數組實現的速度更快
假如長度為8,當添加到第12個元素的時候在哪個序號上呢?用12%8決定
當Buffffer被填滿的時候到底是覆蓋還是等待,由Produce決定長度設為2的n次冪,利于二進制計算,例如:12%8=12&(8-1)
如果大家對于位運算有疑問的,在咱們網站上有一個菜鳥預習,里面有一部分是二進制,大家去翻看一下。
由于它會采用覆蓋的方式,所以他沒有必要記頭指針,沒有必要記尾指針。我只要記一個指針放在這就可以了。在這點上依然要比ConcurrentLinkedQueue要快。
那我生產者線程生產的特別多,消費者沒來得及消費那我在往后覆蓋的話怎么辦?不會那么輕易的讓你覆蓋的,我們是有策略的,我生產者生產滿了,要在生產一個的話就馬上覆蓋這個位置上的數了。這時候是不能覆蓋的,指定了一個策略叫等待策略,這里面有8種等待策略,分情況自己去用。最常見的是BlockingWait,滿了我就在這等著,什么時候你空了消費者來喚醒一下就繼續。
Disruptor開發步驟
開發步驟是比較固定的一個開發步驟。
1:定義Event-隊列中需要處理的元素。
在Disruptor他是每一個消息都認為是一個事件,在他這個概念里就是一個事件,所以在這個環形隊列里面存的是一個一個的Event。
2:定義Event工廠,用于填充隊列
那這個Event怎么產生,就需要指定Event的工廠。3:定義EventHandler(消費者),處理容器中的元素那這個Event怎么消費呢,就需要指定Event的消費者EventHandler。
下面我們直接看程序,先看來自官網的幾個輔助程序:LongEvent這個事件里面或者說消息里面裝的什么值,我只裝了一個long值,但這里面可以裝任何值,任何類型的都可以往里裝,這個long類型的值我們可以指定它set,官網上沒有toString方法,我給大家加了一段主要是為了打印消息讓大家看的更清楚。
package com.mashibing.disruptor;public class LongEvent{private long value;public void set(long value){this.value = value;}@Overridepublic String toString(){return "LongEvent{" +"value=" + value +"}";}}然后呢,我需要一個EventFactory就是怎么產生這些個事件,這個Factory非常簡單,
LongEventFactory去實現EventFactiry的接口,去重寫它的newInstance方法直接new LongEvent。構建這個環的時候為什么要指定一個產生事件的工廠,我直接new這個事件不可以嗎?但是有的事件里面的構造方法不讓你new呢,產生事件工廠的話你可以靈活的指定一些 ,這里面也是牽扯到效率的。底層比較深,我給大家解釋一下:
這里牽扯效率問題,因為Disruptor初始化的時候會調用Event工廠,對ringBuffffer進行內存的提前分配,GC頻率會降低。
package com.mashibing.disruptor;import com.lmax.disruptor.EventFactory;public class LongEventFactory implements EventFactiry{@Overridepublic LongEvent newInstance(){return new LongEvent();}}在看第三個叫LongEventHandler,Handler就是我拿到這個事件之后該怎么樣進行處理,所以這里是消息的消費者,怎么處理呢,很簡單,我處理完這個消息之后呢就記一個數,總共記下來我一共處理了多少消息了,處理消息的時候默認調用的是onEvent方法,這個方法里面有三個參數,第一個是你要處理的那個消息,第二個是你處理的是哪個位置上的消息,第三個是整體的消息結束沒結束,是不是處理完了。你可以判斷他如果是true的話消費者就可以退出了,如果是false的話說明后面還有繼續消費。
package com.mashibing.disruptor;import com.lmax.disruptor.EventHandler;public class LongEventHandler implements EventHandler{/****@param event*@param sequence RingBuffer的序號*@param endOfBatch 是否為最后一個元素*@throws Exception**/public static long count = 0;@Overridepublic void onEvent(LongEvent event,long sequence,boolean endOfBatch) throwsException{count++;System.out.println("["+Thread.currentThread().getName()+"]"+event+"序號:"+sequence);}}所以我們定義了這三個類,關于這三個類在給大家解釋一下,我們現在有一個環,然后這個環上每一個位置裝LongEvent,怎么產生這個LongEvent通過這個LongEventFactory的newInstance方法來產生,當我拿到這個Event之后通過LongEventHandler進行處理。
到現在我們把這三個輔助類都已經定義好了,定義好的情況下我們怎么才能比較有機的結合在一起,讓他在Disruptor進行處理呢,看第一個小例子程序,首先把EvenFactory給他初始化了new
LongEventFactory,我們這個環應該是2的N次方1024,然后new一個Disruptor出來,需要指定這么幾個參數:factory產生消息的工廠;bufffferSize是指定這個環大小到底是多少;defaultThreadFactory線程工廠,指的是當他要產生消費者的時候,當要調用這個消費者的時候他是在一個特定的線程里執行的,這個線程就是通過defaultThreadFactory來產生;
繼續往下看,當我們拿到這個消息之后怎么進行處理啊,我們就用這個LongEventHandler來處理。然后start,當start之后一個環起來了,每個環上指向的這個LongEvent也得初始化好,內存分配好了,整個就安安靜靜的等待著生產者的到來。
看生產者的代碼,long sequence = ringBuffffer.next(),通過next找到下一個可用的位置,最開始這個環是空的,下一個可用的位置是0這個位置,拿到這個位置之后直接去ringBuffffer里面get(0)這個位置上的event。如果說你要是追求效率的極致,你應該是一次性全部初始化好,你get的時候就不用再去判斷,如果你想做一個延遲,很不幸的是你每次都要做判斷是不是初始化了。get的時候就是拿到一個event,這個是我們new出來的默認的,但是我們可以改里面的event.set( 值...),填好數據之后ringBuffffer.publish發布生產。
package com.mashibing.disruptor;import java.util.concurrent.Executor;import java.util.concurrent.Executors;import com.lmax.disruptor.dsl.Disruptor;import com.lmax.disruptor.RingBuffer;import com.lmax.disruptor.util.DaemonThreadFactory;import java.nio.ByteBuffer;public class Main01{public static void main(String[] args) thrwos Exception{//the factory for the eventLongEvenFactory factory = new LongEventFactory();//Specify the of the ring buffer,must be power of 2.int bufferSize = 1024;//Construct the DisruptorDisruptor disruptor = new Disruptor<>(factory,bufferSize,Executors.defaultThreadFactory());//Connect the handlerdisruptor.handleEventsWith(new LongEventHandler());//Start the Disruptor,start all threads runningdisruptor.start();//Get the ring buffer form the Disruptor to be used for publishing.RingBuffer ringBuffer = disruptor.getRingBuffer();//官方例程long sequence = ringBuffer.next();//Grab the next sequencetry{LongEvent event=ringBuffer.get(sequence);//Get the entry in theDisruptor//for the sequenceevent.set(8888L);//Fill with data}finally{ringBuffer.publish(sequence);}}}disruptor在后面提供了一些Lambda表達式的寫法,為了支持這種寫法對整個消息的構建過程做了改進,讀下面02小程序使用translator,就是怎么樣構建這個消息,原來我們都是用消息的factory,但是下面這次我們用translator對他進行構建,就是把某一些數據翻譯成消息。前面產生event工廠還是一樣,然后bufffferSize,后面再扔的是DaemonThreadFactory就是后臺線程了,new LongEventHandler然后start拿到他的ringBuffffer,前面都一樣。只有一個地方叫EventTranslator不一樣,我們在main01里面的代碼是要寫try catch然后把里面的值給設好,相當于把這個值轉換成event對象。相對簡單的寫法,它會把某些值轉成一個LongEvent,通過EventTranslator。new出來后實現了translateTo方法,EventTranslator它本身是一個接口,所以你要new的時候你又要實現它里面沒有實現的方法,translateTo的意思是你給我一個Event,我會把這個Event給你填好。
ringBuffffer.publishEvent(translator1) 你只要把translator1交給ringBuffffer就可以了。這個translator就是為了迎合Lambda表達式的寫法(為java8的寫法做準備)
另外translator有很多種用法:
EventTranslatorOneArg只有帶一個參數的EventTranslator。我帶有一個參數,這個參數會通過我的translateTo方法轉換成一個LongEvent;既然有EventTranslatorOneArg就有EventTranslatorTwoArg、EventTranslatorThreeArg,還有EventTranslatorVararg多了去了Vararg就是有好多個值,我把里面的值全都給你加起來最后把結果set到event里面。
package com.mashibing.disruptor;import java.util.concurrent.Executor;import java.util.concurrent.Executors;import com.lmax.disruptor.dsl.Disruptor;import com.lmax.disruptor.RingBuffer;import com.lmax.disruptor.util.DaemonThreadFactory;import java.nio.ByteBuffer;public class Main02{public static void main(String[] args) thrwos Exception{//the factory for the eventLongEvenFactory factory = new LongEventFactory();//Specify the of the ring buffer,must be power of 2.int bufferSize = 1024;//Construct the DisruptorDisruptor disruptor = new Disruptor<>(factory,bufferSize,DaemonThreadFactory.INSTANCE);//Connect the handlerdisruptor.handleEventsWith(new LongEventHandler());//Start the Disruptor,start all threads runningdisruptor.start();//Get the ring buffer form the Disruptor to be used for publishing.RingBuffer ringBuffer = disruptor.getRingBuffer();//========================================================================EventTranslator translator1 = new EventTranslator(){@Overridepublic void translateTo(LongEvent event,long sequence){event.set(8888L); }};ringBuffer.publishEvent(translator1);//========================================================================EventTranslatorOneArg translator2 = newEventTranslatorOneArg(){@Overridepublic void translateTo(LongEvent event,long sequence,Long l){event.set(l); }};ringBuffer.publishEvent(translator2,7777L);//========================================================================EventTranslatorTwoArg translator3 = newEventTranslatorTwoArg(){@Overridepublic void translateTo(LongEvent event,long sequence,Long l1,Long l2){ event.set(l); }};ringBuffer.publishEvent(translator3,10000L,10000L);//========================================================================EventTranslatorThreeArg translator4 = newEventTranslatorThreeArg(){@Overridepublic void translateTo(LongEvent event,long sequence,Long l1,Longl2,Long l3){ event.set(l1+ l2+ l3); }};ringBuffer.publishEvent(translator4,10000L,10000L,10000L);//========================================================================EventTranslatorVararg translator5 = newEventTranslatorThreeArg(){@Overridepublic void translateTo(LongEvent event,long sequence,Object...objects){long result = 0;for(Object o : objects){long l =(Long)o;result +=l;}}};ringBuffer.publishEvent(translator5,10000L,10000L,10000L,10000L);}}有了上面Translator之后呢,下面看Lambda表達式怎么寫,這個是比較簡潔的寫法,連factory都省了,直接指定一個Lambda表達式LongEvent::new。繼續handleEventsWith把三個參數傳進來后面寫好Lambda表達式直接打印,然后start, 接著RingBuffffer,publishEvent原來我們還有寫try...catch,現在簡單了直接ringBuffffer.publishEvent(第一個是lambda表達式,表達式后是你指定的幾個參
數),所以現在的這種寫法就不定義各種各樣的EventTranslator了。
package com.mashibing.disruptor;import java.util.concurrent.Executor;import java.util.concurrent.Executors;import com.lmax.disruptor.dsl.Disruptor;import com.lmax.disruptor.RingBuffer;import com.lmax.disruptor.util.DaemonThreadFactory;import java.nio.ByteBuffer;public class Main03{public static void main(String[] args) thrwos Exception{//the factory for the eventLongEvenFactory factory = new LongEventFactory();//Specify the of the ring buffer,must be power of 2.int bufferSize = 1024;//Construct the DisruptorDisruptor disruptor = new Disruptor<>(LongEvent::new,bufferSize,DaemonThreadFactory.INSTANCE);//Connect the handlerdisruptor.handleEventsWith((event,sequence,endOfBatch)->System.out.println("Event:"+event));//Start the Disruptor,start all threads runningdisruptor.start();//Get the ring buffer form the Disruptor to be used for publishing.RingBuffer ringBuffer = disruptor.getRingBuffer();ringBuffer.publishEvent((event, sequence)-> event.set(10000L));System.in.read();}}下面我們叫一些細節,這些個細節也不難,講給大家。第一個細節是我們生產者的時候默認會有好多種生產方式,默認的是多線程生產者,但是假如你確定你整個程序里頭只有一個生產者的話那你還能提高效率,就是在你指定Disruptor生產者的線程的方式是SINGLE,生產者的類型ProducerType。
ProducerType生產者線程模式
ProducerType有兩種模式ProducerMULTI和Producer.SINGLE
默認是MULTI,表示在多線程模式下產生sequence
如果確認是單線程生產者,那么可以指定SINGLE,效率會提升如果是多個生產者(多線程),但模式指定為SINGLE,會出什么問題?
假如你的程序里頭只有一個生產者還用ProducerMULTI的話,我們對序列來進行多線程訪問的時候肯定是要加鎖的,所以MULTI里面默認是有鎖定處理的,但是假如你只有一個線程這個時候應該把生產者指定為SINGLE,他的效率更高,因為它里面不加鎖。
下面這個小程序,我這里指定的是Producer.SINGLE,但是我生產的時候用的是一堆線程,當我制定了Producer.SINGLE之后相當于內部對于序列的訪問就沒有鎖了,它會把性能發揮到極致,它不會報錯,它會把你的消息靜悄悄的覆蓋了,因此你要小心一點。我這里這個寫法是我有50 個線程然后每個線程生產100個數,最后結果正常的話應該是有5000個消費產生。
package com.mashibing.disruptor;import java.util.concurrent.Executor;import java.util.concurrent.Executors;import com.lmax.disruptor.dsl.Disruptor;import com.lmax.disruptor.RingBuffer;import com.lmax.disruptor.util.DaemonThreadFactory;import java.nio.ByteBuffer;public class Main04_ProducerType{public static void main(String[] args) thrwos Exception{//the factory for the eventLongEvenFactory factory = new LongEventFactory();//Specify the of the ring buffer,must be power of 2.int bufferSize = 1024;//Construct the Disruptor//Disruptor disruptor = new Disruptor<>(factory,bufferSize,Executors.defaultThreadFactory());Disruptor disruptor = new Disruptor<>(factory,bufferSize,Executors.defaultThreadFactory(),ProducerType.SINGLE,newBlockingWaitStrategy());//Connect the handlerdisruptor.handleEventsWith(new LongEventHandler());//Start the Disruptor,start all threads runningdisruptor.start();//Get the ring buffer form the Disruptor to be used for publishing.RingBuffer ringBuffer = disruptor.getRingBuffer();//========================================================================final int threadCount = 50;CycliBarrier barrier=new CycliBarrier(threadCount);ExecutorService service = Executors.newCachedThreadPool();for(long i=0; i{System.out.printf("Thread %s ready to start!",threadNum);try{barrier.await();}catch(InterruptedException e){e.printStackTrace();}catch(BrokenBarrierException e){e.printStackTrace();}for(int j=0; j<100;j++){ringBuffer.publishEvent((event,sequence)->{event.set(threadNum);System.out.println("生產了"+threadNum);});}});}service.shutdown();//disruptor.shutdown();TimeUnit.SECONDS.sleep(3);System.out.println(LongEventHandler.count);}}我們再來聊一下等待策略WaitStrategy,有好多種方法,看下面等待策略(常用)BlockingWaitStrategy:通過線程堵塞的方式,等待生產者喚醒,被喚醒后,再循環檢查依賴的sequence是否已經消費。
BusySpinWaitStrategy:線程一直自旋等待,可能比較耗cpu
LiteBlockingWaitStrategy:線程阻塞等待生產者喚醒,與BlockingWaitStrategy相比,區別在signalNeeded.getAndSet,如果兩個線程同時訪問一個訪問waitfor,一個訪問signalAll時,可以
減少lock加鎖次數LiteTimeoutBlockingWaitStrategy:與LiteBlockingWaitStrategy相比,設置了阻塞時間,超過時
間后拋出異常PhasedBackoffffWaitStrategy:根據時間參數和傳入的等待策略來決定使用那種等待策略TimeoutBlockingWaitStrategy:相對于BlockingWaitStrategy來說,設置了等待時間,超過后拋出異常
(常用)YieldingWaitStrategy:嘗試100次,然后Thread.yield()讓出cpu
(常用)SleepingWaitStrategy:sleep
我們常用的BlockingWaitStrategy滿了就等著;SleepingWaitStrategy滿了就睡一覺,睡醒了看看能不能繼續執行了;YieldingWaitStrategy讓出cpu,讓你消費者趕緊消費,消費完了之后我又回來看看我是不是又能生產了;一般YieldingWaitStrategy效率是最高的,但也要看實際情況適用不適用。
package com.mashibing.disruptor;import java.util.concurrent.Executor;import java.util.concurrent.Executors;import com.lmax.disruptor.dsl.Disruptor;import com.lmax.disruptor.RingBuffer;import com.lmax.disruptor.util.DaemonThreadFactory;import java.nio.ByteBuffer;public class Main05_WaitStrategy{public static void main(String[] args) thrwos Exception{//the factory for the eventLongEvenFactory factory = new LongEventFactory();//Specify the of the ring buffer,must be power of 2.int bufferSize = 1024;//Construct the DisruptorDisruptor disruptor = new Disruptor<>(factory,bufferSize,Executors.defaultThreadFactory(),ProducerType.MULTI,new SleepingWaitStrategy());//Connect the handlerdisruptor.handleEventsWith(new LongEventHandler());//Start the Disruptor,start all threads runningdisruptor.start();//Get the ring buffer form the Disruptor to be used for publishing.RingBuffer ringBuffer = disruptor.getRingBuffer();//========================================================================final int threadCount = 50;CycliBarrier barrier=new CycliBarrier(threadCount);ExecutorService service = Executors.newCachedThreadPool();for(long i=0; i{System.out.printf("Thread %s ready to start!",threadNum);try{barrier.await();}catch(InterruptedException e){e.printStackTrace();}catch(BrokenBarrierException e){e.printStackTrace();}for(int j=0; j<100;j++){ringBuffer.publishEvent((event,sequence)->{event.set(threadNum);System.out.println("生產了"+threadNum);});}});}service.shutdown();//disruptor.shutdown();TimeUnit.SECONDS.sleep(3);System.out.println(LongEventHandler.count);}}我們來看多個消費者怎么指定,默認的情況下只有一個消費者,你想要有多個消費者的時候也非常簡單,看下面代碼我定義了兩個消費者h1、h2,disruptor.handleEventsWith(h1,h2)這里面是一個可變參數,所以你要想有多個消費者的時候就往里裝,多個消費者是位于多個線程里面的。
package com.mashibing.disruptor;import java.util.concurrent.Executor;import java.util.concurrent.Executors;import com.lmax.disruptor.dsl.Disruptor;import com.lmax.disruptor.RingBuffer;import com.lmax.disruptor.util.DaemonThreadFactory;import java.nio.ByteBuffer;public class Main06_MultiConsumer{public static void main(String[] args) thrwos Exception{//the factory for the eventLongEvenFactory factory = new LongEventFactory();//Specify the of the ring buffer,must be power of 2.int bufferSize = 1024;//Construct the DisruptorDisruptor disruptor = new Disruptor<>(factory,bufferSize,Executors.defaultThreadFactory(),ProducerType.MULTI,new SleepingWaitStrategy());//Connect the handlersLongEventHandler h1 = new LongEventHandler();LongEventHandler h2 = new LongEventHandler();disruptor.handleEventsWith(h1,h2);//Start the Disruptor,start all threads runningdisruptor.start();//Get the ring buffer form the Disruptor to be used for publishing.RingBuffer ringBuffer = disruptor.getRingBuffer();//========================================================================final int threadCount = 10;CycliBarrier barrier=new CycliBarrier(threadCount);ExecutorService service = Executors.newCachedThreadPool();for(long i=0; i{System.out.printf("Thread %s ready to start!",threadNum);try{barrier.await();}catch(InterruptedException e){e.printStackTrace();}catch(BrokenBarrierException e){e.printStackTrace();}for(int j=0; j<10;j++){ringBuffer.publishEvent((event,sequence)->{event.set(threadNum);System.out.println("生產了"+threadNum);});}});}service.shutdown();//disruptor.shutdown();TimeUnit.SECONDS.sleep(3);System.out.println(LongEventHandler.count);}}還有disruptor最后一個問題,出了異常怎么處理
消費者異常處理
默認:disruptor.setDefaultExceptionHandler()
覆蓋:disruptor.handleExceptionFor().with()
看下面代碼,這這里方法里寫了一個EventHandler是我們的消費者,在消費者里打印了event之后馬上拋出了異常,當我們消費者出現異常之后你不能讓整個線程停下來,有一個消費者出了異常那其他的消費者就不干活了,肯定不行。handleExceptionsFor為消費者指定Exception處理器 (h1).with后面是我們的ExceptionHandler出了異常之后該怎么辦進行處理,重寫三個方法,第一個是當產生異常的時候在這很簡單直接打印出來了;第二個是handleOnStart如果啟動的時候出異常;第三個handleOnShutdown你該怎么處理。
package com.mashibing.disruptor;import java.util.concurrent.Executor;import java.util.concurrent.Executors;import com.lmax.disruptor.dsl.Disruptor;import com.lmax.disruptor.RingBuffer;import com.lmax.disruptor.util.DaemonThreadFactory;import java.nio.ByteBuffer;public class Main07_ExceptionHandler{public static void main(String[] args) thrwos Exception{//the factory for the eventLongEvenFactory factory = new LongEventFactory();//Specify the of the ring buffer,must be power of 2.int bufferSize = 1024;//Construct the DisruptorDisruptor disruptor = new Disruptor<>(factory,bufferSize,Executors.defaultThreadFactory(),ProducerType.MULTI,new SleepingWaitStrategy());//Connect the handlersEventHandler h1 = (event,sequence,end)->{System.out.println("消費者出異常");};disruptor.handleEventsWith(h1);disruptor.handleExceptionsFor(h1).with(newExceptionHandler(){@Overridepublic void handleEventException(Throwable throwable,longl,LongEvent longEvent){throwable.printStackTrace();}@Overridepublic void handleOnStartException(Throwable throwable){System.out.println("Exception Start to Handle!");}@Overridepublic void handleOnShutdownException(Throwable throwable){System.out.println("Exception End to Handle!");}});//Start the Disruptor,start all threads runningdisruptor.start();//Get the ring buffer form the Disruptor to be used for publishing.RingBuffer ringBuffer = disruptor.getRingBuffer();//========================================================================final int threadCount = 1;CycliBarrier barrier=new CycliBarrier(threadCount);ExecutorService service = Executors.newCachedThreadPool();for(long i=0; i{System.out.printf("Thread %s ready to start!",threadNum);try{barrier.await();}catch(InterruptedException e){e.printStackTrace();}catch(BrokenBarrierException e){e.printStackTrace();}for(int j=0; j<10;j++){ringBuffer.publishEvent((event,sequence)->{event.set(threadNum);System.out.println("生產了"+threadNum);});}});}service.shutdown();//disruptor.shutdown();TimeUnit.SECONDS.sleep(3);System.out.println(LongEventHandler.count);}}disruptor是一個環,然后這個環有多個生產者可以往里頭生產,由于它是環形的設計效率會非常的高,我們寫程序的時候是這樣寫的,首先你自己定義好Event消息的格式,然后定義消息工廠,消息工廠是用來初始化整個環的時候相應的一些位置上各種各樣不同的消息先把它new出來,new出來之后先占好空間,我們在生產的時候只需要把這個位置上這個默認的這塊空間拿出來往里頭填值,填好值之后
消費者就可以往里頭消費了,消費完了生產者就可以繼續往里頭生產了,如果說你生產者消費的比較快,消費者消費的比較慢,滿了怎么辦,就是用各種各樣的等待策略,消費者出了問題之后可以用ExceptionHandler來進行處理。
覺得文章內容不錯的話,可以轉發關注一下小編~ 之后持續更新干貨好文~~
總結
以上是生活随笔為你收集整理的多线程导出excel高并发_大牛带你深入java多线程与高并发:JMH与Disruptor,确定能学会?...的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 第五人格服务器维修中怎么进,第五人格进不
- 下一篇: birt什么意思中文翻译_ECTN是什么