Hazelcast入门指南第3部分
這是從初學者的角度來看一系列有關如何使用Hazelcast的文章的延續。 如果您還沒有閱讀最后兩個,我鼓勵您閱讀它們:
- Hazelcast入門指南第1部分
- Hazelcast入門指南第2部分
原始人來了
在上一篇文章中,我提到將ILock與IList和ISet一起使用,因為它們不是線程安全的。 令我驚訝的是,我沒有涵蓋Hazelcast的基本部分,即分布式原語。 他們解決了以分布式方式同步資源使用的問題。 那些執行大量線程編程的人將立即識別它們。 對于那些不熟悉線程編程的人,我將解釋每個原語的作用并舉一個例子。
長壽
這是一個分布式原子長。 這意味著所有操作都一次發生。 例如,可以在一個操作中添加一個數字并檢索結果值。 可以獲取值,然后添加值。 對于在此原語上執行的每項操作都是如此。 可以想象,它是線程安全的,但不能做到這一點,而且是線程安全的。
atomicLong.addAndGet(2 * atomicLong.get());上面的行創建了一個競爭條件,因為有三個操作,讀取原子long的內容,乘以2并將其添加到實例中。 僅在保證一步操作的情況下,線程才安全地存在。 為此,IAtomicLong有一個名為alterAndGet的方法。 AlterAndGet帶有IFunction對象。 這使多步操作成為一步。 IAtomicLong始終只有一個同步備份,并且它是不可配置的。
IdGenerator
IAtomicLongs非常適合用來跟蹤一個人有多少。 問題在于,由于呼叫很可能是遠程呼叫,因此在某些情況下,IAtomicLongs并不是理想的解決方案。 這些情況之一就是生成唯一的ID。 IdGenerator就是為此目的而制作的。 它的工作方式是每個成員都要求生成一百萬個ID。 一旦所有這些要求的數字都被使用,該部門將要求另外一百萬。 因此,由于每個成員都有100萬個ID,所以遠程調用IdGenerator的機會是100萬分之一。 這使得生成唯一ID的方法非常快捷。 如果發生任何重復,可能是因為成員沒有加入。 如果成員在其段用盡之前發生故障,則ID中將存在間隙。 對于唯一的ID生成,缺少數字不是問題。 我確實認為成員沒有掛接到集群是一個問題,但是如果發生這種情況,則有更大的事情要擔心。 如果群集重新啟動,則ID將從零再次開始。 那是因為id不能持久存在。 這是一個內存數據庫,一個機會。 為了解決這個問題,可以將IdGenerators設置為以特定數字開頭,只要其他人沒有聲明它并且還沒有生成ID。 替代方法是創建一個自己的ID生成器或使用java.util.UUID類。 這可能需要更多的空間,但是每個項目都有自己的要求可以滿足。 IdGenerator始終具有一個同步備份,無法進行配置。
鎖
這是經典的同步方法。 它是分發的排他鎖。 只需調用方法鎖,線程便會等待或獲得鎖。 一旦建立了鎖定,就可以執行關鍵部分。 工作完成后,將使用解鎖方法。 這項技術的資深人士會將關鍵部分放在try finally塊中,在try塊外部建立鎖定,并在finally部分建立解鎖。 這對于在線程安全的結構上執行操作非常有用。 獲取鎖的進程擁有鎖,并且需要調用該鎖才能使其他進程能夠建立鎖。 當一個人在網絡上的多個位置都有線程時,這可能會出現問題。 Hazelcast想到了這個問題,并在成員退出時釋放了鎖定。 另一個功能是鎖定方法的超時時間為300秒。 這樣可以防止線程不足。 ILock具有一個同步備份,并且不可配置。
有經驗的人的一些建議,使關鍵部分盡可能小 ; 這有助于提高性能并防止死鎖。 由于線程的執行順序未知,因此死鎖很難調試且難以測試。 漏洞一度表現出來,然后就沒有。 由于鎖放錯了位置,因此可能會持續一周或更長時間。 然后必須確保它不會再次發生。 由于線程的執行未知,很難證明這一點。 等到一切完成時,老板會因為花費的時間而感到沮喪,并且不知道該錯誤是否已修復。
ICondition
是否曾經想等待事件發生,但又不想其他人也必須等待事件發生? 這正是線程編程中的條件。 在Java 1.5之前,這是通過synced-wait-notify技術完成的。 這可以通過鎖定條件技術來執行。 和我一起旅行,我可以向大家展示這是如何工作的。 想象一下這樣一種情況,其中存在一個非線程安全列表,并且有生產者和使用者編寫和讀取該清單。 顯然,有一些關鍵部分需要保護。 那落入了鎖。 建立鎖定后,便可以開始關鍵工作。 唯一的問題是資源處于對線程無用的狀態。 例如,使用者無法從空列表中提取條目。 生產者無法將條目放入完整列表。 這是條件進入的地方。生產者或消費者將進入while循環,以測試條件是否有利,然后調用condition.await()。 調用await之后,線程將放棄其鎖定,并讓其他線程訪問其關鍵部分。 等待中的線程將重新獲得鎖以對其條件進行測試,并且可以等待更多時間或條件得到滿足并開始執行工作。 關鍵部分完成后,線程可以調用signal()或signalAll()來告訴其他線程喚醒并檢查其條件。 條件是由鎖而不是Hazelcast實例創建的。 另一件事是,如果要分發條件,則必須使用lock.newCondition(String name)方法。 IConditions具有一個同步備份,無法配置。
我無法說出使用這種技術會發生多少死鎖。 有時,當線程正在等待并且一切正常時,就會發出信號。 另一方面是在線程不等待時發送信號,進入等待狀態并永遠等待。 出于這個原因,我主張在等待時使用超時,以便線程可以每隔一段時間檢查一次是否滿足條件。 這樣,如果信號丟失,則可能發生的最壞情況是等待時間短而不是永遠等待。 我在示例中使用了超時技術。 復制并粘貼所需的代碼。 我寧愿使用正在使用的測試技術,也不愿使用未經測試的代碼入侵互聯網。
ICountDownLatch
ICountDownLatch是一個同步工具,當其計數器變為零時觸發。 這不是進行協調的常用方法,但是在需要時可用。 我認為示例部分提供了有關其工作原理的更好的解釋。 鎖存器歸零后可以復位,因此可以再次使用。 如果擁有成員離開,則會發出所有等待閂鎖到達零的線程的信號,就好像已達到零。 ICountDownLatch在另一個地方同步備份,無法配置。
等量線
是的,有經典信號量的分布式版本。 這讓我很興奮,因為上次我上操作系統課時,信號燈需要一點硬件支持。 也許我只是和自己約會,哦,它仍然很酷(再次與自己約會)。 信號量通過限制可以訪問資源的線程數來工作。 與鎖不同,信號量沒有所有權感,因此不同的線程可以釋放對資源的聲明。 與其余的原語不同,可以配置ISemaphore。 我在示例中配置一個。 它位于我項目的默認包中的hazelcast.xml中。
例子
這里是例子。 我對上一篇帖子發表了評論,要求我對代碼進行縮進,以使其更具可讀性。 由于我要發布的代碼量很大,所以我這次肯定會這樣做。 將會看到我以前沒有討論過的幾件事。 一種是IExecutorService。 這是ExecutorService的分布式版本。 一個人實際上可以發送工作,以由不同的成員完成。 另一件事是,所有定義的Runnable / Callable類都實現了Serializable。 這在分布式環境中是必需的,因為可以將對象發送給不同的成員。 最后一件事是HazelcastInstanceAware接口。 它允許類訪問本地 Hazelcast實例。 然后,類可以獲取所需資源的實例(例如ILists)。 事不宜遲,我們開始。
長壽
package hazelcastprimitives.iatomiclong;import com.hazelcast.core.Hazelcast; import com.hazelcast.core.HazelcastInstance; import com.hazelcast.core.IAtomicLong; import com.hazelcast.core.IFunction; import java.io.Serializable;/**** @author Daryl*/ public class IAtomicLongExample {public static class MultiplyByTwoAndSubtractOne implements IFunction, Serializable {@Overridepublic Long apply(Long t) {return (long)(2 * t - 1);}}public static final void main(String[] args) {HazelcastInstance instance = Hazelcast.newHazelcastInstance();final String NAME = "atomic";IAtomicLong aLong = instance.getAtomicLong(NAME);IAtomicLong bLong = instance.getAtomicLong(NAME);aLong.getAndSet(1L);System.out.println("bLong is now: " + bLong.getAndAdd(2));System.out.println("aLong is now: " + aLong.getAndAdd(0L));MultiplyByTwoAndSubtractOne alter = new MultiplyByTwoAndSubtractOne();aLong.alter(alter);System.out.println("bLong is now: " + bLong.getAndAdd(0L));bLong.alter(alter);System.out.println("aLong is now: " + aLong.getAndAdd(0L));System.exit(0);} }注意,即使MutilpyAndSubtractOne類也實現了Serializable。
IdGenerator
package hazelcastprimitives.idgenerator;import com.hazelcast.core.Hazelcast; import com.hazelcast.core.HazelcastInstance; import com.hazelcast.core.IdGenerator;/**** @author Daryl*/ public class IdGeneratorExample {public static void main(String[] args) {HazelcastInstance instance = Hazelcast.newHazelcastInstance();IdGenerator generator = instance.getIdGenerator("generator");for(int i = 0; i < 10; i++) {System.out.println("The generated value is " + generator.newId());}instance.shutdown();System.exit(0);} }鎖
此ILock示例也可以視為ICondition示例。 我必須使用一個條件,因為ListConsumer始終在ListProducer之前運行,所以我讓ListConsumer等到IList有消耗的東西。
package hazelcastprimitives.ilock;import com.hazelcast.core.Hazelcast; import com.hazelcast.core.HazelcastInstance; import com.hazelcast.core.HazelcastInstanceAware; import com.hazelcast.core.ICondition; import com.hazelcast.core.IExecutorService; import com.hazelcast.core.IList; import com.hazelcast.core.ILock; import java.io.Serializable; import java.util.concurrent.TimeUnit;/**** @author Daryl*/ public class ILockExample {static final String LIST_NAME = "to be locked";static final String LOCK_NAME = "to lock with";static final String CONDITION_NAME = "to signal with";/*** @param args the command line arguments*/public static void main(String[] args) {HazelcastInstance instance = Hazelcast.newHazelcastInstance();IExecutorService service = instance.getExecutorService("service");ListConsumer consumer = new ListConsumer();ListProducer producer = new ListProducer();try {service.submit(producer);service.submit(consumer);Thread.sleep(10000);} catch(InterruptedException ie){System.out.println("Got interrupted");} finally {instance.shutdown();}}public static class ListConsumer implements Runnable, Serializable, HazelcastInstanceAware {private transient HazelcastInstance instance;@Overridepublic void run() {ILock lock = instance.getLock(LOCK_NAME);ICondition condition = lock.newCondition(CONDITION_NAME);IList list = instance.getList(LIST_NAME);lock.lock();try {while(list.isEmpty()) {condition.await(2, TimeUnit.SECONDS);}while(!list.isEmpty()) {System.out.println("value is " + list.get(0));list.remove(0);}} catch(InterruptedException ie) {System.out.println("Consumer got interrupted");} finally {lock.unlock();}System.out.println("Consumer leaving");}@Overridepublic void setHazelcastInstance(HazelcastInstance hazelcastInstance) {instance = hazelcastInstance;}}public static class ListProducer implements Runnable, Serializable, HazelcastInstanceAware {private transient HazelcastInstance instance;@Overridepublic void run() {ILock lock = instance.getLock(LOCK_NAME);ICondition condition = lock.newCondition(CONDITION_NAME);IList list = instance.getList(LIST_NAME);lock.lock();try {for(int i = 1; i <= 10; i++){list.add(i);}condition.signalAll();} finally {lock.unlock();}System.out.println("Producer leaving");}@Overridepublic void setHazelcastInstance(HazelcastInstance hazelcastInstance) {instance = hazelcastInstance;}} }ICondition
這是真正的ICondition示例。 請注意,SpunProducer和SpunConsumer如何共享相同的ICondition并相互發出信號。 注意我正在使用超時來防止死鎖。
package hazelcastprimitives.icondition;import com.hazelcast.core.Hazelcast; import com.hazelcast.core.HazelcastInstance; import com.hazelcast.core.HazelcastInstanceAware; import com.hazelcast.core.ICondition; import com.hazelcast.core.IExecutorService; import com.hazelcast.core.IList; import com.hazelcast.core.ILock; import java.io.Serializable; import java.util.concurrent.TimeUnit;/**** @author Daryl*/ public class IConditionExample {static final String LOCK_NAME = "lock";static final String CONDITION_NAME = "condition";static final String SERVICE_NAME = "spinderella";static final String LIST_NAME = "list";public static final void main(String[] args) {HazelcastInstance instance = Hazelcast.newHazelcastInstance();IExecutorService service = instance.getExecutorService(SERVICE_NAME);service.execute(new SpunConsumer());service.execute(new SpunProducer());try {Thread.sleep(10000);} catch(InterruptedException ie) {System.out.println("Hey we got out sooner than I expected");} finally {instance.shutdown();System.exit(0);}}public static class SpunProducer implements Serializable, Runnable, HazelcastInstanceAware {private transient HazelcastInstance instance;private long counter = 0;@Overridepublic void run() {ILock lock = instance.getLock(LOCK_NAME);ICondition condition = lock.newCondition(CONDITION_NAME);IList list = instance.getList(LIST_NAME);lock.lock(); try {if(list.isEmpty()) {populate(list);System.out.println("telling the consumers");condition.signalAll();}for(int i = 0; i < 2; i++) {while(!list.isEmpty()) {System.out.println("Waiting for the list to be empty");System.out.println("list size: " + list.size() );condition.await(2, TimeUnit.SECONDS);} populate(list);System.out.println("Telling the consumers");condition.signalAll();}} catch(InterruptedException ie) {System.out.println("We have a found an interuption");} finally {condition.signalAll();System.out.println("Producer exiting stage left");lock.unlock();}}@Overridepublic void setHazelcastInstance(HazelcastInstance hazelcastInstance) {instance = hazelcastInstance;}private void populate(IList list) {System.out.println("Populating list");long currentCounter = counter;for(; counter < currentCounter + 10; counter++) {list.add(counter);}}}public static class SpunConsumer implements Serializable, Runnable, HazelcastInstanceAware {private transient HazelcastInstance instance;@Overridepublic void run() {ILock lock = instance.getLock(LOCK_NAME);ICondition condition = lock.newCondition(CONDITION_NAME);IList list = instance.getList(LIST_NAME);lock.lock(); try {for(int i = 0; i < 3; i++) {while(list.isEmpty()) {System.out.println("Waiting for the list to be filled");condition.await(1, TimeUnit.SECONDS);}System.out.println("removing values");while(!list.isEmpty()){System.out.println("value is " + list.get(0));list.remove(0);}System.out.println("Signaling the producer");condition.signalAll();}} catch(InterruptedException ie) {System.out.println("We had an interrupt");} finally {System.out.println("Consumer exiting stage right");condition.signalAll();lock.unlock();}}@Overridepublic void setHazelcastInstance(HazelcastInstance hazelcastInstance) {instance = hazelcastInstance;}}}ICountDownLatch
package hazelcastprimitives.icountdownlatch;import com.hazelcast.core.Hazelcast; import com.hazelcast.core.HazelcastInstance; import com.hazelcast.core.HazelcastInstanceAware; import com.hazelcast.core.ICountDownLatch; import com.hazelcast.core.IExecutorService; import com.hazelcast.core.IList; import com.hazelcast.core.ILock; import java.io.Serializable; import java.util.concurrent.TimeUnit;/**** @author Daryl*/ public class ICountDownLatchExample {static final String LOCK_NAME = "lock";static final String LATCH_NAME = "condition";static final String SERVICE_NAME = "spinderella";static final String LIST_NAME = "list";public static final void main(String[] args) {HazelcastInstance instance = Hazelcast.newHazelcastInstance();IExecutorService service = instance.getExecutorService(SERVICE_NAME);service.execute(new SpunMaster());service.execute(new SpunSlave());try {Thread.sleep(10000);} catch(InterruptedException ie) {System.out.println("Hey we got out sooner than I expected");} finally {instance.shutdown();System.exit(0);}}public static class SpunMaster implements Serializable, Runnable, HazelcastInstanceAware {private transient HazelcastInstance instance;private long counter = 0;@Overridepublic void run() {ILock lock = instance.getLock(LOCK_NAME);ICountDownLatch latch = instance.getCountDownLatch(LATCH_NAME);IList list = instance.getList(LIST_NAME);lock.lock(); try {latch.trySetCount(10);populate(list, latch);} finally {System.out.println("Master exiting stage left");lock.unlock();}}@Overridepublic void setHazelcastInstance(HazelcastInstance hazelcastInstance) {instance = hazelcastInstance;}private void populate(IList list, ICountDownLatch latch) {System.out.println("Populating list");long currentCounter = counter;for(; counter < currentCounter + 10; counter++) {list.add(counter);latch.countDown();}}}public static class SpunSlave implements Serializable, Runnable, HazelcastInstanceAware {private transient HazelcastInstance instance;@Overridepublic void run() {ILock lock = instance.getLock(LOCK_NAME);ICountDownLatch latch = instance.getCountDownLatch(LATCH_NAME);IList list = instance.getList(LIST_NAME);lock.lock(); try {if(latch.await(2, TimeUnit.SECONDS)) {while(!list.isEmpty()){System.out.println("value is " + list.get(0));list.remove(0);}}} catch(InterruptedException ie) {System.out.println("We had an interrupt");} finally {System.out.println("Slave exiting stage right");lock.unlock();}}@Overridepublic void setHazelcastInstance(HazelcastInstance hazelcastInstance) {instance = hazelcastInstance;}}}等量線
組態
這是ISemaphore配置:
<?xml version="1.0" encoding="UTF-8"?> <hazelcast xsi:schemaLocation ="http://www.hazelcast.com/schema/config http://www.hazelcast.com/schema/config/hazelcast-config-3.0.xsd " xmlns ="http://www.hazelcast.com/schema/config " xmlns:xsi ="http://www.w3.org/2001/XMLSchema-instance"><network><join><multicast enabled="true"/></join></network><semaphore name="to reduce access"><initial-permits>3</initial-permits></semaphore> </hazelcast>范例程式碼
package hazelcastprimitives.isemaphore;import com.hazelcast.core.Hazelcast; import com.hazelcast.core.HazelcastInstance; import com.hazelcast.core.HazelcastInstanceAware; import com.hazelcast.core.IExecutorService; import com.hazelcast.core.ISemaphore; import com.hazelcast.core.IdGenerator; import java.io.Serializable; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future;/**** @author Daryl*/ public class ISemaphoreExample {static final String SEMAPHORE_NAME = "to reduce access";static final String GENERATOR_NAME = "to use";/*** @param args the command line arguments*/public static void main(String[] args) {HazelcastInstance instance = Hazelcast.newHazelcastInstance();IExecutorService service = instance.getExecutorService("service");List<Future> futures = new ArrayList(10);try {for(int i = 0; i < 10; i++) {futures.add(service.submit(new GeneratorUser(i)));}// so I wait til the last man. No this may not be scalable.for(Future future: futures) {future.get();}} catch(InterruptedException ie){System.out.printf("Got interrupted.");} catch(ExecutionException ee) {System.out.printf("Cannot execute on Future. reason: %s\n", ee.toString());} finally {service.shutdown();instance.shutdown();}}static class GeneratorUser implements Callable, Serializable, HazelcastInstanceAware {private transient HazelcastInstance instance;private final int number;public GeneratorUser(int number) {this.number = number;}@Overridepublic Long call() {ISemaphore semaphore = instance.getSemaphore(SEMAPHORE_NAME);IdGenerator gen = instance.getIdGenerator(GENERATOR_NAME);long lastId = -1;try {semaphore.acquire();try {for(int i = 0; i < 10; i++){lastId = gen.newId();System.out.printf("current value of generator on %d is %d\n", number, lastId);Thread.sleep(1000);}} catch(InterruptedException ie) {System.out.printf("User %d was Interrupted\n", number);} finally {semaphore.release();}} catch(InterruptedException ie) {System.out.printf("User %d Got interrupted\n", number);}System.out.printf("User %d is leaving\n", number);return lastId;}@Overridepublic void setHazelcastInstance(HazelcastInstance hazelcastInstance) {instance = hazelcastInstance;}}}結論
在這篇文章中討論了Hazelcast的原語。 大多數(如果不是全部)都圍繞線程協調展開。 分享了原始和個人經歷的解釋。 在示例中,顯示了不同類型的協調。 可以通過以下版本的http://darylmathisonblog.googlecode.com/svn/trunk/HazelcastPrimitives下載示例。
參考文獻
- 《榛樹之書》:可在www.hazelcast.com上找到
- Hazelcast文檔:在Hazelcast下載發現在發現www.hazelcast.org
翻譯自: https://www.javacodegeeks.com/2014/10/beginners-guide-to-hazelcast-part-3.html
總結
以上是生活随笔為你收集整理的Hazelcast入门指南第3部分的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 1360x768电脑壁纸(2706x27
- 下一篇: 实用程序类的OOP替代