使用SynchronousQueue实现生产者/消费者
Java提供了許多用于并發(fā)支持的有用類中,有一個(gè)我想談一談: SynchronousQueue 。 特別是,我想通過(guò)使用方便的SynchronousQueue作為交換機(jī)制來(lái)完成Producer / Consumer實(shí)現(xiàn)。
除非我們了解SynchronousQueue實(shí)現(xiàn)的內(nèi)幕,否則可能不清楚為什么要使用這種類型的隊(duì)列進(jìn)行生產(chǎn)者/消費(fèi)者通信。 事實(shí)證明,這并不是我們過(guò)去通??紤]的隊(duì)列。 這個(gè)類比只是一個(gè)最多包含一個(gè)元素的集合。
為什么有用? 好吧,有幾個(gè)原因。 從生產(chǎn)者的角度來(lái)看,只能將一個(gè)元素(或消息)存儲(chǔ)到隊(duì)列中。 為了繼續(xù)進(jìn)行下一個(gè)元素(或消息),生產(chǎn)者應(yīng)等到消費(fèi)者使用隊(duì)列中的當(dāng)前元素。 從使用者的角度來(lái)看,它只是輪詢隊(duì)列以查找下一個(gè)可用的元素(或消息)。 很簡(jiǎn)單,但是最大的好處是:生產(chǎn)者發(fā)送消息的速度不能超過(guò)消費(fèi)者處理消息的速度。
這是我最近遇到的用例之一:比較兩個(gè)數(shù)據(jù)庫(kù)表(可能只是巨大的),并檢測(cè)其中包含不同數(shù)據(jù)或數(shù)據(jù)是否相同(副本)。 SynchronousQueue是解決此問(wèn)題的便捷工具:它允許在自己的線程中處理每個(gè)表,并在從兩個(gè)不同的數(shù)據(jù)庫(kù)讀取數(shù)據(jù)時(shí)補(bǔ)償可能的超時(shí)/延遲。
讓我們從定義比較功能開(kāi)始,該功能接受源數(shù)據(jù)源和目標(biāo)數(shù)據(jù)源以及表名(進(jìn)行比較)。 我正在使用Spring框架中非常有用的JdbcTemplate類,因?yàn)樗浅:玫爻橄罅颂幚磉B接和準(zhǔn)備好的語(yǔ)句的所有無(wú)聊的細(xì)節(jié)。
public boolean compare( final DataSource source, final DataSource destination, final String table ) {final JdbcTemplate from = new JdbcTemplate( source );final JdbcTemplate to = new JdbcTemplate( destination ); }在進(jìn)行任何實(shí)際數(shù)據(jù)比較之前,最好比較一下源數(shù)據(jù)庫(kù)和目標(biāo)數(shù)據(jù)庫(kù)的表行數(shù):
if( from.queryForLong('SELECT count(1) FROM ' + table ) != to.queryForLong('SELECT count(1) FROM ' + table ) ) {return false; }現(xiàn)在,至少知道表在兩個(gè)數(shù)據(jù)庫(kù)中包含相同數(shù)量的行,我們可以開(kāi)始進(jìn)行數(shù)據(jù)比較。 該算法非常簡(jiǎn)單:
- 為源(生產(chǎn)者)和目標(biāo)(消費(fèi)者)數(shù)據(jù)庫(kù)創(chuàng)建一個(gè)單獨(dú)的線程
- 生產(chǎn)者線程從表中讀取單行并將其放入SynchronousQueue
- 使用者線程還從表中讀取單行,然后向隊(duì)列詢問(wèn)要比較的可用行(必要時(shí)等待),最后比較兩個(gè)結(jié)果集
使用另一大部分Java并發(fā)實(shí)用程序進(jìn)行線程池,讓我們定義一個(gè)具有固定線程數(shù)量的線程池(2)。
final ExecutorService executor = Executors.newFixedThreadPool( 2 ); final SynchronousQueue< List< ? > > resultSets = new SynchronousQueue< List< ? > >();按照描述的算法,生產(chǎn)者功能可以表示為單個(gè)可調(diào)用項(xiàng):
Callable< Void > producer = new Callable< Void >() {@Overridepublic Void call() throws Exception {from.query( 'SELECT * FROM ' + table,new RowCallbackHandler() {@Overridepublic void processRow(ResultSet rs) throws SQLException {try { List< ? > row = ...; // convert ResultSet to Listif( !resultSets.offer( row, 2, TimeUnit.MINUTES ) ) {throw new SQLException( 'Having more data but consumer has already completed' );}} catch( InterruptedException ex ) {throw new SQLException( 'Having more data but producer has been interrupted' );}}});return null;} };由于Java語(yǔ)法,該代碼有點(diǎn)冗長(zhǎng),但實(shí)際上并沒(méi)有做很多事情。 從表生成器讀取的每個(gè)結(jié)果集都將轉(zhuǎn)換為一個(gè)列表(由于是樣板,因此省略了實(shí)現(xiàn)),并將其放入隊(duì)列( offer )。 如果隊(duì)列不為空,則生產(chǎn)者將被阻止等待消費(fèi)者完成工作。 使用者可以分別表示為以下可調(diào)用對(duì)象:
Callable< Void > consumer = new Callable< Void >() {@Overridepublic Void call() throws Exception {to.query( 'SELECT * FROM ' + table,new RowCallbackHandler() {@Overridepublic void processRow(ResultSet rs) throws SQLException {try {List< ? > source = resultSets.poll( 2, TimeUnit.MINUTES );if( source == null ) {throw new SQLException( 'Having more data but producer has already completed' );} List< ? > destination = ...; // convert ResultSet to Listif( !source.equals( destination ) ) {throw new SQLException( 'Row data is not the same' );}} catch ( InterruptedException ex ) {throw new SQLException( 'Having more data but consumer has been interrupted' );}}});return null;} };使用者對(duì)隊(duì)列執(zhí)行反向操作:與其放入數(shù)據(jù),不如將數(shù)據(jù)從隊(duì)列中拉出( poll )。 如果隊(duì)列為空,則阻止消費(fèi)者,等待生產(chǎn)者發(fā)布下一行。 剩下的部分只是提交那些可調(diào)用對(duì)象以執(zhí)行。 Future的get方法返回的任何異常都表明表不包含相同的數(shù)據(jù)(或者從數(shù)據(jù)庫(kù)獲取數(shù)據(jù)存在問(wèn)題):
List< Future< Void > > futures = executor.invokeAll( Arrays.asList( producer, consumer ) );for( final Future< Void > future: futures ) {future.get( 5, TimeUnit.MINUTES );}參考: Andriy Redko {devmind}博客中的JCG合作伙伴 Andrey Redko 使用SynchronousQueue實(shí)現(xiàn)了生產(chǎn)者/消費(fèi)者 。
翻譯自: https://www.javacodegeeks.com/2013/01/implementing-producerconsumer-using-synchronousqueue.html
總結(jié)
以上是生活随笔為你收集整理的使用SynchronousQueue实现生产者/消费者的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 投影仪该怎么连接电脑摄像机如何连接电脑
- 下一篇: wr886n路由器怎么设置wr886n路