带有Kafka和ZeroMQ的分布式类星体演员
因此,您已經(jīng)有了使用actor的精美設(shè)計(jì),選擇了JVM和Quasar在該主題上的強(qiáng)大而忠實(shí)的觀點(diǎn)。 所有明智的決定,但是在集群上進(jìn)行分配時(shí)您有什么選擇呢?
星系
Galaxy是一個(gè)非常酷的選擇:快速的內(nèi)存中數(shù)據(jù)網(wǎng)格,針對(duì)數(shù)據(jù)局部性進(jìn)行了優(yōu)化,具有復(fù)制,可選的持久性,分布式參與者注冊(cè)表,甚至參與者之間的參與者遷移! 只有一個(gè)警告:要發(fā)布正式版的生產(chǎn)質(zhì)量的銀河版,還需要幾個(gè)月的時(shí)間。 不建議將當(dāng)前版本的Galaxy用于生產(chǎn)。
如果我們需要在那之前上線怎么辦?
幸運(yùn)的是,Quasar Actors的阻塞編程模型非常簡(jiǎn)單,以至于將其與大多數(shù)消息傳遞解決方案集成起來都是輕而易舉的,并且為了證明讓我們用兩種快速,流行且截然不同的模型來做到這一點(diǎn): Apache Kafka和?MQ 。
代碼和計(jì)劃
可以在GitHub上找到以下所有示例,只需快速閱讀簡(jiǎn)短的README ,即可立即運(yùn)行它們。
Kafka和?MQ分別有兩個(gè)示例:
- 快速而骯臟的人直接進(jìn)行發(fā)布/投票或發(fā)送/接收演員的呼叫。
- 詳細(xì)介紹了代理角色,這些代理角色將您的代碼與消息傳遞API隔離開。 為了證明我沒有在說謊,該程序?qū)煞N技術(shù)使用了相同的生產(chǎn)者和消費(fèi)者參與者類 ,并且?guī)缀跏褂昧讼嗤囊龑?dǎo)程序。
卡夫卡
Apache Kafka的采用率急劇上升,這是由于其基于持久性提交日志和用于并行消息使用的使用者組的獨(dú)特設(shè)計(jì):這種結(jié)合形成了快速,可靠,靈活和可擴(kuò)展的代理。
該API包括兩種類型的生產(chǎn)者:sync和async;一種消費(fèi)者(僅sync); Comsat包括社區(qū)貢獻(xiàn)的,對(duì)光纖友好的Kafka生產(chǎn)商集成 。
Kafka生產(chǎn)者句柄是線程安全的,在共享時(shí)表現(xiàn)最佳,并且可以像這樣在actor主體(或其他任何地方)中輕松獲得和使用:
final Properties producerConfig = new Properties(); producerConfig.put("bootstrap.servers", "localhost:9092"); producerConfig.put("client.id", "DemoProducer"); producerConfig.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer"); producerConfig.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");try (final FiberKafkaProducer<Integer, byte[]> producer = new FiberKafkaProducer<>(new KafkaProducer<>(producerConfig))) {final byte[] myBytes = getMyBytes(); // ...final Future<RecordMetaData> res = producer.send(new ProducerRecord<>("MyTopic", i, myBytes));res.get(); // Optional, blocks the fiber until the record is persisted; thre's also `producer.flush()` }我們用Comsat的FiberKafkaProducer包裝了KafkaProducer對(duì)象,以便找回光纖阻塞的未來。
但是,使用者句柄不是線程安全的1,并且僅是線程阻塞的:
final Properties producerConfig = new Properties(); consumerConfig = new Properties(); consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP); consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "DemoConsumer"); consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); consumerConfig.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); consumerConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000"); consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer"); consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");try (final Consumer<Integer, byte[]> consumer = new KafkaConsumer<>(consumerConfig)) {consumer.subscribe(Collections.singletonList(TOPIC));final ConsumerRecords<Integer, byte[]> records = consumer.poll(1000L);for (final ConsumerRecord<Integer, byte[]> record : records) {final byte[] v = record.value();useMyBytes(v); // ...} }由于我們不想阻塞光纖的基礎(chǔ)線程池(除了卡夫卡在doRun的線程池,我們無法對(duì)其做太多的事情),因此在我們的actor的doRun我們將使用FiberAsync.runBlocking代替FiberAsync.runBlocking來喂入固定的FiberAsync.runBlocking具有異步任務(wù)的size執(zhí)行程序,該任務(wù)將阻塞光纖直到poll (將在給定的池中執(zhí)行)返回之前:
final ExecutorService e = Executors.newFixedThreadPool(2);try (final Consumer<Integer, byte[]> consumer = new KafkaConsumer<>(consumerConfig)) {consumer.subscribe(Collections.singletonList(TOPIC));final ConsumerRecords<Integer, byte[]> records = call(e, () -> consumer.poll(1000L));for (final ConsumerRecord<Integer, byte[]> record : records) {final byte[] v = record.value();useMyBytes(v); // ...} }其中call是一個(gè)定義如下的實(shí)用程序方法(如果不是此Java編譯器bug,則沒有必要):
@Suspendable public static <V> V call(ExecutorService es, Callable<V> c) throws InterruptedException, SuspendExecution {try {return runBlocking(es, (CheckedCallable<V, Exception>) c::call);} catch (final InterruptedException | SuspendExecution e) {throw e;} catch (final Exception e) {throw new RuntimeException(e);} }在第一個(gè)完整的示例中,我們將從生產(chǎn)者角色向消費(fèi)者發(fā)送一千個(gè)序列化消息。
?MQ
?MQ(或ZeroMQ)不是集中的代理解決方案,而更多地是各種通信模式(請(qǐng)求/答復(fù),發(fā)布/訂閱等)的套接字的一般化。 在我們的示例中,我們將使用最簡(jiǎn)單的請(qǐng)求-答復(fù)模式。 這是我們的新生產(chǎn)者代碼:
try (final ZMQ.Context zmq = ZMQ.context(1 /* IO threads */);final ZMQ.Socket trgt = zmq.socket(ZMQ.REQ)) {trgt.connect("tcp://localhost:8000");final byte[] myBytes = getMyBytes(); // ...trgt.send(baos.toByteArray(), 0 /* flags */)trgt.recv(); // Reply, e.g. ACK }如您所見,上下文充當(dāng)套接字工廠,并傳遞了要使用的I / O線程數(shù):這是因?yàn)?MQ套接字不是連接綁定的OS句柄,而是用于處理的機(jī)器的簡(jiǎn)單前端重試連接,多個(gè)連接,高效的并發(fā)I / O甚至為您排隊(duì)。 這就是為什么send調(diào)用幾乎永遠(yuǎn)不會(huì)阻塞,而recv調(diào)用不是連接上的I / O調(diào)用,而是線程與專門的I / O任務(wù)之間的同步的原因,該任務(wù)將從一個(gè)或多個(gè)連接中傳入字節(jié)。
不過,我們將在角色中阻塞光纖,而不是線程,因此讓我們?cè)趓ead調(diào)用上使用FiberAsync.runBlocking ,以防萬一它阻塞甚至在send時(shí)阻塞:
final ExecutorService ep = Executors.newFixedThreadPool(2);try (final ZMQ.Context zmq = ZMQ.context(1 /* IO threads */);final ZMQ.Socket trgt = zmq.socket(ZMQ.REQ)) {exec(e, () -> trgt.connect("tcp://localhost:8000"));final byte[] myBytes = getMyBytes(); // ...call(e, trgt.send(myBytes, 0 /* flags */));call(e, trgt::recv); // Reply, e.g. ACK }這是消費(fèi)者:
try (final ZMQ.Context zmq = ZMQ.context(1 /* IO threads */);final ZMQ.Socket src = zmq.socket(ZMQ.REP)) {exec(e, () -> src.bind("tcp://*:8000"));final byte[] v = call(e, src::recv);exec(e, () -> src.send("ACK"));useMyBytes(v); // ... }exec是另一個(gè)實(shí)用程序函數(shù),類似于call :
@Suspendable public static void exec(ExecutorService es, Runnable r) throws InterruptedException, SuspendExecution {try {runBlocking(es, (CheckedCallable<Void, Exception>) () -> { r.run(); return null; });} catch (final InterruptedException | SuspendExecution e) {throw e;} catch (final Exception e) {throw new RuntimeException(e);} }這是完整的第一個(gè)示例 。
在不改變邏輯的情況下進(jìn)行分配:與救援人員的松散耦合
很簡(jiǎn)單,不是嗎? 但是,有些令人討厭的事情:我們與網(wǎng)絡(luò)另一端的參與者打交道的方式與本地參與者不同。 無論他們位于何處或如何連接,這些都是我們?cè)敢鈱懙难輪T:
public final class ProducerActor extends BasicActor<Void, Void> {private final ActorRef<Msg> target;public ProducerActor(ActorRef<Msg> target) {this.target = target;}@Overrideprotected final Void doRun() throws InterruptedException, SuspendExecution {for (int i = 0; i < MSGS; i++) {final Msg m = new Msg(i);System.err.println("USER PRODUCER: " + m);target.send(m);}System.err.println("USER PRODUCER: " + EXIT);target.send(EXIT);return null;} }public final class ConsumerActor extends BasicActor<Msg, Void> {@Overrideprotected final Void doRun() throws InterruptedException, SuspendExecution {for (;;) {final Msg m = receive();System.err.println("USER CONSUMER: " + m);if (EXIT.equals(m))return null;}} }幸運(yùn)的是,每個(gè)演員,無論做什么,都具有相同的非常基本的接口:傳入消息隊(duì)列,稱為信箱 。 這意味著我們可以在兩個(gè)通信參與者之間插入任意數(shù)量的中間參與者或代理 ,尤其是我們希望有一個(gè)發(fā)送代理,它將通過中間件將消息獲取到目標(biāo)主機(jī),并在其中接收接收代理,以捕獲傳入的消息。并將它們放入目標(biāo)目的地的郵箱中。
因此,在我們的主程序中,我們將為我們的ProducerActor提供合適的發(fā)送代理,然后讓ConsumerActor從合適的接收代理接收:
final ProducerActor pa = Actor.newActor(ProducerActor.class, getSendingProxy()); // ... final ConsumerActor ca = Actor.newActor(ConsumerActor.class); pa.spawn(); System.err.println("USER PRODUCER started"); subscribeToReceivingProxy(ca.spawn()); // ... System.err.println("USER CONSUMER started"); pa.join(); System.err.println("USER PRODUCER finished"); ca.join(); System.err.println("USER CONSUMER finished");讓我們看看如何首先使用Kafka然后使用?MQ來實(shí)現(xiàn)這些代理。
卡夫卡男演員代理
代理參與者的工廠將與特定的Kafka主題相關(guān)聯(lián):這是因?yàn)榭梢詫?duì)主題進(jìn)行分區(qū) ,以使多個(gè)使用者可以同時(shí)讀取該主題。 我們希望能夠最佳地利用每個(gè)主題的最大級(jí)別或并發(fā)性:
/* ... */ KafkaProxies implements AutoCloseable {/* ... */ KafkaProxies(String bootstrap, String topic) { /* ... */ }// ... }當(dāng)然,我們希望對(duì)多個(gè)參與者使用一個(gè)主題,因此發(fā)送代理將指定接收者參與者ID,接收代理將僅將消息轉(zhuǎn)發(fā)給綁定到該ID的用戶參與者:
/* ... */ <M> ActorRef<M> create(String actorID) { /* ... */ } /* ... */ void drop(ActorRef ref) throws ExecutionException, InterruptedException { /* ... */ } /* ... */ <M> void subscribe(ActorRef<? super M> consumer, String actorID) { /* ... */ } /* ... */ void unsubscribe(ActorRef<?> consumer, String actorID) { /* ... */ }關(guān)閉AutoClosable工廠將告訴所有代理終止,并清理簿記參考:
/* ... */ void close() throws Exception { /* ... */ }生產(chǎn)者實(shí)現(xiàn)是非常簡(jiǎn)單和無趣的,同時(shí)給消費(fèi)者帶來了更多的樂趣,因?yàn)樗鼘⑹褂肣uasar Actors的選擇性接收將傳入消息保留在其郵箱中,直到至少有一個(gè)訂閱的用戶actor可以使用它們?yōu)橹?#xff1a;
@Override protected Void doRun() throws InterruptedException, SuspendExecution {//noinspection InfiniteLoopStatementfor (;;) {// Try extracting from queuefinal Object msg = tryReceive((Object m) -> {if (EXIT.equals(m))return EXIT;if (m != null) {//noinspection uncheckedfinal ProxiedMsg rmsg = (ProxiedMsg) m;final List<ActorRef> l = subscribers.get(rmsg.actorID);if (l != null) {boolean sent = false;for (final ActorRef r : l) {//noinspection uncheckedr.send(rmsg.payload);sent = true;}if (sent) // Someone was listening, remove from queuereturn m;}}return null; // No subscribers (leave in queue) or no messages});// Something from queueif (msg != null) {if (EXIT.equals(msg)) {return null;}continue; // Go to next cycle -> precedence to queue}// Try receiving//noinspection Convert2Lambdafinal ConsumerRecords<Void, byte[]> records = call(e, () -> consumer.get().poll(100L));for (final ConsumerRecord<Void, byte[]> record : records) {final byte[] v = record.value();try (final ByteArrayInputStream bis = new ByteArrayInputStream(v);final ObjectInputStream ois = new ObjectInputStream(bis)) {//noinspection uncheckedfinal ProxiedMsg rmsg = (ProxiedMsg) ois.readObject();final List<ActorRef> l = subscribers.get(rmsg.actorID);if (l != null && l.size() > 0) {for (final ActorRef r : l) {//noinspection uncheckedr.send(rmsg.payload);}} else {ref().send(rmsg); // Enqueue}} catch (final IOException | ClassNotFoundException e) {e.printStackTrace();throw new RuntimeException(e);}} }由于我們還需要處理郵箱,因此我們以足夠小的超時(shí)來輪詢Kafka。 還要注意,許多參與者可以訂閱相同的ID,傳入的消息將廣播給所有參與者。 每個(gè)主題創(chuàng)建的接收actor代理(即光纖)的數(shù)量,以及池線程和Kafka使用者句柄( consumer是本地線程,因?yàn)镵afka使用者不是線程安全的)的數(shù)量將等于每個(gè)主題的分區(qū)數(shù):這使接收吞吐量達(dá)到最大。
目前,此實(shí)現(xiàn)使用Java序列化在字節(jié)之間來回轉(zhuǎn)換消息,但是當(dāng)然可以使用其他框架,例如Kryo 。
?MQ演員代理
?MQ模型是完全去中心化的:既沒有經(jīng)紀(jì)人,也沒有話題,因此我們可以簡(jiǎn)單地將?MQ地址/端點(diǎn)與一組參與者等同,而無需使用額外的參與者ID:
/* ... */ ZeroMQProxies implements AutoCloseable {/* ... */ ZeroMQProxies(int ioThreads) { /* ... */ }/* ... */ <M> ActorRef<M> to(String trgtZMQAddress) { /* ... */ }/* ... */ void drop(String trgtZMQAddress)/* ... */ void subscribe(ActorRef<? super M> consumer, String srcZMQEndpoint) { /* ... */ }/* ... */ void unsubscribe(ActorRef<?> consumer, String srcZMQEndpoint) { /* ... */ }/* ... */ void close() throws Exception { /* ... */ } }同樣,在這種情況下,并且由于與以前相同的原因,使用者有點(diǎn)有趣,但幸運(yùn)的是,線程安全性方面的任何問題都因?yàn)?MQ套接字在多個(gè)線程中可以正常工作:
@Override protected Void doRun() throws InterruptedException, SuspendExecution {try(final ZMQ.Socket src = zmq.socket(ZMQ.REP)) {System.err.printf("PROXY CONSUMER: binding %s\n", srcZMQEndpoint);Util.exec(e, () -> src.bind(srcZMQEndpoint));src.setReceiveTimeOut(100);//noinspection InfiniteLoopStatementfor (;;) {// Try extracting from queuefinal Object m = tryReceive((Object o) -> {if (EXIT.equals(o))return EXIT;if (o != null) {//noinspection uncheckedfinal List<ActorRef> l = subscribers.get(srcZMQEndpoint);if (l != null) {boolean sent = false;for (final ActorRef r : l) {//noinspection uncheckedr.send(o);sent = true;}if (sent) // Someone was listening, remove from queuereturn o;}}return null; // No subscribers (leave in queue) or no messages});// Something processable is thereif (m != null) {if (EXIT.equals(m)) {return null;}continue; // Go to next cycle -> precedence to queue}System.err.println("PROXY CONSUMER: receiving");final byte[] msg = Util.call(e, src::recv);if (msg != null) {System.err.println("PROXY CONSUMER: ACKing");Util.exec(e, () -> src.send(ACK));final Object o;try (final ByteArrayInputStream bis = new ByteArrayInputStream(msg);final ObjectInputStream ois = new ObjectInputStream(bis)) {o = ois.readObject();} catch (final IOException | ClassNotFoundException e) {e.printStackTrace();throw new RuntimeException(e);}System.err.printf("PROXY CONSUMER: distributing '%s' to %d subscribers\n", o, subscribers.size());//noinspection uncheckedfor (final ActorRef s : subscribers.getOrDefault(srcZMQEndpoint, (List<ActorRef>) Collections.EMPTY_LIST))//noinspection uncheckeds.send(o);} else {System.err.println("PROXY CONSUMER: receive timeout");}}} }更多功能
這篇簡(jiǎn)短的文章有望使人們一眼就可以看出,由于Quasar的Actor具有順暢的順序流程的特性,因此可以無縫地將Quasar的Actor與消息傳遞解決方案進(jìn)行接口連接。 當(dāng)然,可以更進(jìn)一步,例如:
- 演員查找和發(fā)現(xiàn) :我們?nèi)绾翁峁┤蜓輪T命名/發(fā)現(xiàn)服務(wù)? 例如,Kafka使用ZooKeeper,因此可能值得利用,但?MQ大量下注于去中心化,故意不提供預(yù)先打包的基礎(chǔ)。
- Actor故障管理 :我們?nèi)绾沃С衷诓煌?jié)點(diǎn)中運(yùn)行的actor之間的故障管理鏈接和監(jiān)視?
- 消息路由 :如何在不更改參與者內(nèi)部邏輯的情況下動(dòng)態(tài)調(diào)整節(jié)點(diǎn)與參與者之間的消息流?
- 角色移動(dòng)性 :我們?nèi)绾螌⒔巧苿?dòng)到其他節(jié)點(diǎn),例如,使其更靠近消息源,以提高性能或移動(dòng)到具有不同安全性的位置?
- 可伸縮性和容錯(cuò)性 :如何管理參與者節(jié)點(diǎn)的添加,刪除,死亡和分區(qū)? 像Galaxy這樣的分布式IMDG和像Kafka這樣的基于代理的解決方案通常已經(jīng)做到了,但是像?MQ這樣的結(jié)構(gòu)級(jí)解決方案通常不這樣做。
- 安全性 :我們?nèi)绾沃С窒嚓P(guān)的信息安全性屬性?
- 測(cè)試,記錄,監(jiān)視 :我們?nèi)绾畏奖愕卣w測(cè)試,跟蹤和監(jiān)視分布式參與者集合?
這些主題尤其是分布式系統(tǒng)設(shè)計(jì)的“硬核”,尤其是分布式參與者,因此,有效地解決它們可能需要大量的精力。 Galaxy解決了所有這些問題,但Quasar參與者提供了一個(gè)SPI ,涵蓋了上述一些主題,并允許與發(fā)行技術(shù)更緊密地集成。 您可能還對(duì)Akka與Quasar + Galaxy之間的比較感興趣,該比較涵蓋了許多此類方面。
就是這樣,請(qǐng)與您分布的Quasar演員一起玩樂,并在Quasar-Pulsar用戶組中留下有關(guān)您的旅程的注釋!
翻譯自: https://www.javacodegeeks.com/2016/05/distributed-quasar-actors-kafka-zeromq.html
總結(jié)
以上是生活随笔為你收集整理的带有Kafka和ZeroMQ的分布式类星体演员的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 电脑writer怎么搞中文版(write
- 下一篇: 最常见的Java异常及其对Java开发人