hazelcast入门教程_Hazelcast入门指南第3部分
hazelcast入門教程
這是從初學(xué)者的角度來看一系列有關(guān)如何使用Hazelcast的文章的延續(xù)。 如果您還沒有閱讀最后兩個,我鼓勵閱讀它們:
- Hazelcast入門指南第1部分
- Hazelcast入門指南第2部分
原始人來了
在上一篇文章中,我提到將ILock與IList和ISet一起使用,因為它們不是線程安全的。 令我驚訝的是,我沒有涵蓋Hazelcast的基本部分,即分布式原語。 他們解決了以分布式方式同步資源使用的問題。 那些執(zhí)行大量線程編程的人將立即識別它們。 對于那些不熟悉線程編程的人,我將解釋每個原語的作用并舉一個例子。
長壽
這是一個分布式原子長。 這意味著所有操作都一次發(fā)生。 例如,可以在一個操作中添加一個數(shù)字并檢索結(jié)果值。 可以獲取值,然后添加值。 對于在此原語上執(zhí)行的每項操作都是如此。 可以想象,它是線程安全的,但不能做到這一點,而且是線程安全的。
atomicLong.addAndGet(2 * atomicLong.get());上面的行創(chuàng)建了一個競爭條件,因為有三個操作,讀取原子long的內(nèi)容,乘以2并將其添加到實例中。 僅在保證一步操作的情況下,線程才安全地存在。 為此,IAtomicLong有一個名為alterAndGet的方法。 AlterAndGet帶有IFunction對象。 這使多步操作成為一步。 IAtomicLong始終有一個同步備份,并且它是不可配置的。
IdGenerator
IAtomicLongs非常適合用來跟蹤一個人有多少。 問題在于,由于該呼叫很可能是遠程呼叫,因此在某些情況下,IAtomicLongs并不是理想的解決方案。 這些情況之一就是生成唯一的ID。 IdGenerator就是為此目的而制作的。 它的工作方式是每個成員要求生成一百萬個ID。 一旦所有這些要求的數(shù)字都被使用,該部門將要求另外一百萬。 因此,由于每個成員都有100萬個ID,所以遠程調(diào)用IdGenerator的機會是100萬個。 這使得生成唯一ID的方法非常快捷。 如果發(fā)生任何重復(fù),可能是因為成員沒有加入。 如果成員在其段用盡之前發(fā)生故障,則ID中將存在間隙。 對于唯一的ID生成,缺少數(shù)字不是問題。 我確實認為成員沒有掛接到集群是一個問題,但是如果發(fā)生這種情況,則還有更大的事情要擔(dān)心。 如果群集重新啟動,則ID將從零開始。 那是因為id不能持久存在。 這是一個內(nèi)存數(shù)據(jù)庫,一個機會。 為了解決這個問題,可以將IdGenerators設(shè)置為以特定數(shù)字開頭,只要其他人沒有聲明它并且還沒有生成ID。 替代方法是創(chuàng)建自己的ID生成器或使用java.util.UUID類。 這可能需要更多空間,但是每個項目都有其自己的要求可以滿足。 IdGenerator始終具有一個同步備份,無法進行配置。
鎖
這是經(jīng)典的同步方法。 它是分發(fā)的排他鎖。 只需調(diào)用方法鎖,線程便會等待或獲得鎖。 一旦建立了鎖定,就可以執(zhí)行關(guān)鍵部分。 工作完成后,將使用解鎖方法。 這項技術(shù)的資深人士會將關(guān)鍵部分放在try finally塊中,在try塊外部建立鎖定,并在finally部分建立解鎖。 這對于在線程安全的結(jié)構(gòu)上執(zhí)行操作非常有用。 獲取鎖的進程擁有該鎖,并且需要調(diào)用解鎖才能使其他進程能夠建立鎖。 當一個人在網(wǎng)絡(luò)上的多個位置都有線程時,這可能會出現(xiàn)問題。 Hazelcast想到了這個問題,并在成員退出時釋放了鎖定。 另一個功能是鎖定方法的超時時間為300秒。 這樣可以防止線程不足。 ILock具有一個同步備份,并且不可配置。
有經(jīng)驗的人的一些建議,使關(guān)鍵部分盡可能小 ; 這有助于提高性能并防止死鎖。 由于線程的執(zhí)行順序未知,因此死鎖難以調(diào)試且難以測試。 漏洞一度表現(xiàn)出來,然后就沒有。 由于鎖放錯了位置,因此可能會持續(xù)一周或更長時間。 然后必須確保它不會再次發(fā)生。 由于線程的執(zhí)行未知,很難證明這一點。 等到一切完成時,老板會因為花費的時間而感到沮喪,并且不知道該錯誤是否已修復(fù)。
ICondition
是否曾經(jīng)想等待事件發(fā)生,但又不想其他人也必須等待事件發(fā)生? 這正是線程編程中的條件。 在Java 1.5之前,這是通過synced-wait-notify技術(shù)完成的。 這可以通過鎖定條件技術(shù)來執(zhí)行。 和我一起旅行,我可以向大家展示這是如何工作的。 想象一下這樣一種情況,即存在一個非線程安全列表,并且有生產(chǎn)者和使用者進行讀寫。 顯然,有些關(guān)鍵部分需要保護。 那落入了鎖。 建立鎖定后,便可以開始關(guān)鍵工作。 唯一的問題是資源處于對線程無用的狀態(tài)。 例如,消費者無法從空列表中提取條目。 生產(chǎn)者無法將條目放入完整列表。 這是條件進入的地方。生產(chǎn)者或消費者將進入while循環(huán),以測試條件是否有利,然后調(diào)用condition.await()。 調(diào)用await之后,該線程將放棄其鎖定,并讓其他線程訪問其關(guān)鍵部分。 等待中的線程將鎖重新測試其條件,并可能等待更多時間或條件已滿足并開始工作。 關(guān)鍵部分完成后,線程可以調(diào)用signal()或signalAll()來告訴其他線程喚醒并檢查其狀況。 條件是由鎖而不是Hazelcast實例創(chuàng)建的。 另一件事是,如果要分發(fā)條件,則必須使用lock.newCondition(String name)方法。 IConditions具有一個同步備份,無法配置。
我無法說出使用這種技術(shù)會發(fā)生多少死鎖。 有時,當線程正在等待并且一切正常時,就會發(fā)出信號。 另一方面是在線程不等待時發(fā)送信號,進入等待狀態(tài)并永遠等待。 因此,我主張在等待時使用超時,以便線程可以每隔一段時間檢查一次是否滿足條件。 這樣,如果信號丟失,則可能發(fā)生的最壞情況是等待時間短而不是永遠等待。 我在示例中使用了超時技術(shù)。 復(fù)制并粘貼所需的代碼。 我寧愿使用正在測試的技術(shù),也不愿使用未經(jīng)測試的代碼入侵互聯(lián)網(wǎng)。
ICountDownLatch
ICountDownLatch是一個同步工具,當其計數(shù)器變?yōu)榱銜r觸發(fā)。 這不是進行協(xié)調(diào)的常用方法,但是在需要時可用。 我認為示例部分提供了有關(guān)其工作原理的更好的解釋。 鎖存器歸零后可以復(fù)位,因此可以再次使用。 如果擁有成員離開,則會發(fā)出所有等待閂鎖到達零的線程的信號,就好像已達到零。 ICountDownLatch在另一個地方同步備份,無法配置。
等量線
是的,有經(jīng)典信號量的分布式版本。 令我激動的是,因為我上次上操作系統(tǒng)課時,信號量需要一些硬件支持。 也許我只是和自己約會,哦,它仍然很酷(再次約會我自己)。 信號量通過限制可以訪問資源的線程數(shù)來工作。 與鎖不同,信號量沒有所有權(quán)感,因此不同的線程可以釋放對資源的聲明。 與其余的原語不同,可以配置ISemaphore。 我在示例中配置一個。 它位于我項目的默認包中的hazelcast.xml中。
例子
這里是例子。 我對上一篇帖子發(fā)表了評論,要求我對代碼進行縮進,以使其更具可讀性。 由于我要發(fā)布的代碼量很大,所以這次我將確保這樣做。 將會看到我以前沒有討論過的幾件事。 一種是IExecutorService。 這是ExecutorService的分布式版本。 一個人實際上可以發(fā)送工作,以由不同的成員完成。 另一件事是,所有定義的Runnable / Callable類都實現(xiàn)了Serializable。 這在分布式環(huán)境中是必需的,因為可以將對象發(fā)送給不同的成員。 最后一件事是HazelcastInstanceAware接口。 它允許類訪問本地 Hazelcast實例。 然后,類可以獲取所需資源的實例(例如ILists)。 事不宜遲,我們開始。
長壽
package hazelcastprimitives.iatomiclong;import com.hazelcast.core.Hazelcast; import com.hazelcast.core.HazelcastInstance; import com.hazelcast.core.IAtomicLong; import com.hazelcast.core.IFunction; import java.io.Serializable;/**** @author Daryl*/ public class IAtomicLongExample {public static class MultiplyByTwoAndSubtractOne implements IFunction, Serializable {@Overridepublic Long apply(Long t) {return (long)(2 * t - 1);}}public static final void main(String[] args) {HazelcastInstance instance = Hazelcast.newHazelcastInstance();final String NAME = "atomic";IAtomicLong aLong = instance.getAtomicLong(NAME);IAtomicLong bLong = instance.getAtomicLong(NAME);aLong.getAndSet(1L);System.out.println("bLong is now: " + bLong.getAndAdd(2));System.out.println("aLong is now: " + aLong.getAndAdd(0L));MultiplyByTwoAndSubtractOne alter = new MultiplyByTwoAndSubtractOne();aLong.alter(alter);System.out.println("bLong is now: " + bLong.getAndAdd(0L));bLong.alter(alter);System.out.println("aLong is now: " + aLong.getAndAdd(0L));System.exit(0);} }注意,即使MutilpyAndSubtractOne類也實現(xiàn)了Serializable。
IdGenerator
package hazelcastprimitives.idgenerator;import com.hazelcast.core.Hazelcast; import com.hazelcast.core.HazelcastInstance; import com.hazelcast.core.IdGenerator;/**** @author Daryl*/ public class IdGeneratorExample {public static void main(String[] args) {HazelcastInstance instance = Hazelcast.newHazelcastInstance();IdGenerator generator = instance.getIdGenerator("generator");for(int i = 0; i < 10; i++) {System.out.println("The generated value is " + generator.newId());}instance.shutdown();System.exit(0);} }鎖
此ILock示例也可以視為ICondition示例。 我必須使用一個條件,因為ListConsumer始終在ListProducer之前運行,所以我讓ListConsumer等到IList有消耗的東西。
package hazelcastprimitives.ilock;import com.hazelcast.core.Hazelcast; import com.hazelcast.core.HazelcastInstance; import com.hazelcast.core.HazelcastInstanceAware; import com.hazelcast.core.ICondition; import com.hazelcast.core.IExecutorService; import com.hazelcast.core.IList; import com.hazelcast.core.ILock; import java.io.Serializable; import java.util.concurrent.TimeUnit;/**** @author Daryl*/ public class ILockExample {static final String LIST_NAME = "to be locked";static final String LOCK_NAME = "to lock with";static final String CONDITION_NAME = "to signal with";/*** @param args the command line arguments*/public static void main(String[] args) {HazelcastInstance instance = Hazelcast.newHazelcastInstance();IExecutorService service = instance.getExecutorService("service");ListConsumer consumer = new ListConsumer();ListProducer producer = new ListProducer();try {service.submit(producer);service.submit(consumer);Thread.sleep(10000);} catch(InterruptedException ie){System.out.println("Got interrupted");} finally {instance.shutdown();}}public static class ListConsumer implements Runnable, Serializable, HazelcastInstanceAware {private transient HazelcastInstance instance;@Overridepublic void run() {ILock lock = instance.getLock(LOCK_NAME);ICondition condition = lock.newCondition(CONDITION_NAME);IList list = instance.getList(LIST_NAME);lock.lock();try {while(list.isEmpty()) {condition.await(2, TimeUnit.SECONDS);}while(!list.isEmpty()) {System.out.println("value is " + list.get(0));list.remove(0);}} catch(InterruptedException ie) {System.out.println("Consumer got interrupted");} finally {lock.unlock();}System.out.println("Consumer leaving");}@Overridepublic void setHazelcastInstance(HazelcastInstance hazelcastInstance) {instance = hazelcastInstance;}}public static class ListProducer implements Runnable, Serializable, HazelcastInstanceAware {private transient HazelcastInstance instance;@Overridepublic void run() {ILock lock = instance.getLock(LOCK_NAME);ICondition condition = lock.newCondition(CONDITION_NAME);IList list = instance.getList(LIST_NAME);lock.lock();try {for(int i = 1; i <= 10; i++){list.add(i);}condition.signalAll();} finally {lock.unlock();}System.out.println("Producer leaving");}@Overridepublic void setHazelcastInstance(HazelcastInstance hazelcastInstance) {instance = hazelcastInstance;}} }ICondition
這是真正的ICondition示例。 注意SpunProducer和SpunConsumer如何共享相同的ICondition并相互發(fā)出信號。 注意我正在使用超時來防止死鎖。
package hazelcastprimitives.icondition;import com.hazelcast.core.Hazelcast; import com.hazelcast.core.HazelcastInstance; import com.hazelcast.core.HazelcastInstanceAware; import com.hazelcast.core.ICondition; import com.hazelcast.core.IExecutorService; import com.hazelcast.core.IList; import com.hazelcast.core.ILock; import java.io.Serializable; import java.util.concurrent.TimeUnit;/**** @author Daryl*/ public class IConditionExample {static final String LOCK_NAME = "lock";static final String CONDITION_NAME = "condition";static final String SERVICE_NAME = "spinderella";static final String LIST_NAME = "list";public static final void main(String[] args) {HazelcastInstance instance = Hazelcast.newHazelcastInstance();IExecutorService service = instance.getExecutorService(SERVICE_NAME);service.execute(new SpunConsumer());service.execute(new SpunProducer());try {Thread.sleep(10000);} catch(InterruptedException ie) {System.out.println("Hey we got out sooner than I expected");} finally {instance.shutdown();System.exit(0);}}public static class SpunProducer implements Serializable, Runnable, HazelcastInstanceAware {private transient HazelcastInstance instance;private long counter = 0;@Overridepublic void run() {ILock lock = instance.getLock(LOCK_NAME);ICondition condition = lock.newCondition(CONDITION_NAME);IList list = instance.getList(LIST_NAME);lock.lock(); try {if(list.isEmpty()) {populate(list);System.out.println("telling the consumers");condition.signalAll();}for(int i = 0; i < 2; i++) {while(!list.isEmpty()) {System.out.println("Waiting for the list to be empty");System.out.println("list size: " + list.size() );condition.await(2, TimeUnit.SECONDS);} populate(list);System.out.println("Telling the consumers");condition.signalAll();}} catch(InterruptedException ie) {System.out.println("We have a found an interuption");} finally {condition.signalAll();System.out.println("Producer exiting stage left");lock.unlock();}}@Overridepublic void setHazelcastInstance(HazelcastInstance hazelcastInstance) {instance = hazelcastInstance;}private void populate(IList list) {System.out.println("Populating list");long currentCounter = counter;for(; counter < currentCounter + 10; counter++) {list.add(counter);}}}public static class SpunConsumer implements Serializable, Runnable, HazelcastInstanceAware {private transient HazelcastInstance instance;@Overridepublic void run() {ILock lock = instance.getLock(LOCK_NAME);ICondition condition = lock.newCondition(CONDITION_NAME);IList list = instance.getList(LIST_NAME);lock.lock(); try {for(int i = 0; i < 3; i++) {while(list.isEmpty()) {System.out.println("Waiting for the list to be filled");condition.await(1, TimeUnit.SECONDS);}System.out.println("removing values");while(!list.isEmpty()){System.out.println("value is " + list.get(0));list.remove(0);}System.out.println("Signaling the producer");condition.signalAll();}} catch(InterruptedException ie) {System.out.println("We had an interrupt");} finally {System.out.println("Consumer exiting stage right");condition.signalAll();lock.unlock();}}@Overridepublic void setHazelcastInstance(HazelcastInstance hazelcastInstance) {instance = hazelcastInstance;}}}ICountDownLatch
package hazelcastprimitives.icountdownlatch;import com.hazelcast.core.Hazelcast; import com.hazelcast.core.HazelcastInstance; import com.hazelcast.core.HazelcastInstanceAware; import com.hazelcast.core.ICountDownLatch; import com.hazelcast.core.IExecutorService; import com.hazelcast.core.IList; import com.hazelcast.core.ILock; import java.io.Serializable; import java.util.concurrent.TimeUnit;/**** @author Daryl*/ public class ICountDownLatchExample {static final String LOCK_NAME = "lock";static final String LATCH_NAME = "condition";static final String SERVICE_NAME = "spinderella";static final String LIST_NAME = "list";public static final void main(String[] args) {HazelcastInstance instance = Hazelcast.newHazelcastInstance();IExecutorService service = instance.getExecutorService(SERVICE_NAME);service.execute(new SpunMaster());service.execute(new SpunSlave());try {Thread.sleep(10000);} catch(InterruptedException ie) {System.out.println("Hey we got out sooner than I expected");} finally {instance.shutdown();System.exit(0);}}public static class SpunMaster implements Serializable, Runnable, HazelcastInstanceAware {private transient HazelcastInstance instance;private long counter = 0;@Overridepublic void run() {ILock lock = instance.getLock(LOCK_NAME);ICountDownLatch latch = instance.getCountDownLatch(LATCH_NAME);IList list = instance.getList(LIST_NAME);lock.lock(); try {latch.trySetCount(10);populate(list, latch);} finally {System.out.println("Master exiting stage left");lock.unlock();}}@Overridepublic void setHazelcastInstance(HazelcastInstance hazelcastInstance) {instance = hazelcastInstance;}private void populate(IList list, ICountDownLatch latch) {System.out.println("Populating list");long currentCounter = counter;for(; counter < currentCounter + 10; counter++) {list.add(counter);latch.countDown();}}}public static class SpunSlave implements Serializable, Runnable, HazelcastInstanceAware {private transient HazelcastInstance instance;@Overridepublic void run() {ILock lock = instance.getLock(LOCK_NAME);ICountDownLatch latch = instance.getCountDownLatch(LATCH_NAME);IList list = instance.getList(LIST_NAME);lock.lock(); try {if(latch.await(2, TimeUnit.SECONDS)) {while(!list.isEmpty()){System.out.println("value is " + list.get(0));list.remove(0);}}} catch(InterruptedException ie) {System.out.println("We had an interrupt");} finally {System.out.println("Slave exiting stage right");lock.unlock();}}@Overridepublic void setHazelcastInstance(HazelcastInstance hazelcastInstance) {instance = hazelcastInstance;}}}等量線
組態(tài)
這是ISemaphore配置:
<?xml version="1.0" encoding="UTF-8"?> <hazelcast xsi:schemaLocation ="http://www.hazelcast.com/schema/config http://www.hazelcast.com/schema/config/hazelcast-config-3.0.xsd " xmlns ="http://www.hazelcast.com/schema/config " xmlns:xsi ="http://www.w3.org/2001/XMLSchema-instance"><network><join><multicast enabled="true"/></join></network><semaphore name="to reduce access"><initial-permits>3</initial-permits></semaphore> </hazelcast>范例程式碼
package hazelcastprimitives.isemaphore;import com.hazelcast.core.Hazelcast; import com.hazelcast.core.HazelcastInstance; import com.hazelcast.core.HazelcastInstanceAware; import com.hazelcast.core.IExecutorService; import com.hazelcast.core.ISemaphore; import com.hazelcast.core.IdGenerator; import java.io.Serializable; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future;/**** @author Daryl*/ public class ISemaphoreExample {static final String SEMAPHORE_NAME = "to reduce access";static final String GENERATOR_NAME = "to use";/*** @param args the command line arguments*/public static void main(String[] args) {HazelcastInstance instance = Hazelcast.newHazelcastInstance();IExecutorService service = instance.getExecutorService("service");List<Future> futures = new ArrayList(10);try {for(int i = 0; i < 10; i++) {futures.add(service.submit(new GeneratorUser(i)));}// so I wait til the last man. No this may not be scalable.for(Future future: futures) {future.get();}} catch(InterruptedException ie){System.out.printf("Got interrupted.");} catch(ExecutionException ee) {System.out.printf("Cannot execute on Future. reason: %s\n", ee.toString());} finally {service.shutdown();instance.shutdown();}}static class GeneratorUser implements Callable, Serializable, HazelcastInstanceAware {private transient HazelcastInstance instance;private final int number;public GeneratorUser(int number) {this.number = number;}@Overridepublic Long call() {ISemaphore semaphore = instance.getSemaphore(SEMAPHORE_NAME);IdGenerator gen = instance.getIdGenerator(GENERATOR_NAME);long lastId = -1;try {semaphore.acquire();try {for(int i = 0; i < 10; i++){lastId = gen.newId();System.out.printf("current value of generator on %d is %d\n", number, lastId);Thread.sleep(1000);}} catch(InterruptedException ie) {System.out.printf("User %d was Interrupted\n", number);} finally {semaphore.release();}} catch(InterruptedException ie) {System.out.printf("User %d Got interrupted\n", number);}System.out.printf("User %d is leaving\n", number);return lastId;}@Overridepublic void setHazelcastInstance(HazelcastInstance hazelcastInstance) {instance = hazelcastInstance;}}}結(jié)論
在這篇文章中討論了Hazelcast的原語。 大多數(shù)(如果不是全部的話)都圍繞線程協(xié)調(diào)展開。 分享了原始和個人經(jīng)歷的解釋。 在示例中,顯示了不同類型的協(xié)調(diào)。 可以通過以下版本的http://darylmathisonblog.googlecode.com/svn/trunk/HazelcastPrimitives下載示例。
參考資料
- 《榛樹之書》:可在www.hazelcast.com找到
- Hazelcast文檔:在Hazelcast下載發(fā)現(xiàn)在發(fā)現(xiàn)www.hazelcast.org
翻譯自: https://www.javacodegeeks.com/2014/10/beginners-guide-to-hazelcast-part-3.html
hazelcast入門教程
總結(jié)
以上是生活随笔為你收集整理的hazelcast入门教程_Hazelcast入门指南第3部分的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 安卓手机安全模式怎么解除(安卓安全模式怎
- 下一篇: jpa 实体映射视图_JPA教程:实体映