日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

java中使用akka手记三 cluster详例

發(fā)布時(shí)間:2024/4/17 编程问答 36 豆豆
生活随笔 收集整理的這篇文章主要介紹了 java中使用akka手记三 cluster详例 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.


http://www.tuicool.com/articles/m2muui

原文? http://2014.54chen.com/blog/2014/04/17/how-to-use-akka-in-java-3/

一個(gè)例子

  • 同樣是typesafe的經(jīng)典例子。
  • 例子提供的服務(wù)是傳輸文本。當(dāng)文本發(fā)給frontend節(jié)點(diǎn),它會(huì)委派backend節(jié)點(diǎn),backend執(zhí)行轉(zhuǎn)化任務(wù),把結(jié)果返回給原來(lái)的客戶(hù)端。
  • 新的backend節(jié)點(diǎn)和frontend節(jié)點(diǎn),都可以動(dòng)態(tài)地在cluster上增減。

message

? public interface TransformationMessages { public static class TransformationJob implements Serializable { private final String text; //...... } public static class TransformationResult implements Serializable { private final String text; //..... } public static class JobFailed implements Serializable { private final String reason; private final TransformationJob job; //.... } public static final String BACKEND_REGISTRATION = "BackendRegistration"; }

backend處理邏輯

? public class TransformationBackend extends UntypedActor { Cluster cluster = Cluster.get(getContext().system()); //... @Override public void onReceive(Object message) { if (message instanceof TransformationJob) { TransformationJob job = (TransformationJob) message; getSender().tell(new TransformationResult(job.getText().toUpperCase()), getSelf()); } else if (message instanceof CurrentClusterState) { CurrentClusterState state = (CurrentClusterState) message; for (Member member : state.getMembers()) { if (member.status().equals(MemberStatus.up())) { register(member); } } } else if (message instanceof MemberUp) { MemberUp mUp = (MemberUp) message; register(mUp.member()); } else { unhandled(message); } } void register(Member member) { if (member.hasRole("frontend")) getContext().actorSelection(member.address() + "/user/frontend").tell( BACKEND_REGISTRATION, getSelf()); } }
  • backend訂閱了cluster的事件,檢測(cè)frontend節(jié)點(diǎn),還會(huì)發(fā)一條消息告訴fontend可以使用了。
  • frontend節(jié)點(diǎn)接收用戶(hù)的任務(wù),扔給注冊(cè)好的backend節(jié)點(diǎn)。

frontend節(jié)點(diǎn)

? public class TransformationFrontend extends UntypedActor { List<ActorRef> backends = new ArrayList<ActorRef>(); int jobCounter = 0; @Override public void onReceive(Object message) { if ((message instanceof TransformationJob) && backends.isEmpty()) { TransformationJob job = (TransformationJob) message; getSender().tell( new JobFailed("Service unavailable, try again later", job), getSender()); } else if (message instanceof TransformationJob) { TransformationJob job = (TransformationJob) message; jobCounter++; backends.get(jobCounter % backends.size()) .forward(job, getContext()); } else if (message.equals(BACKEND_REGISTRATION)) { getContext().watch(getSender()); backends.add(getSender()); } else if (message instanceof Terminated) { Terminated terminated = (Terminated) message; backends.remove(terminated.getActor()); } else { unhandled(message); } } }
  • frontend用List 保存了backend的actor位置,有需要的時(shí)候就輪循發(fā)給backend。
  • getSender 本次收到消息的上游,一般用來(lái)回復(fù)消息。
  • getContext 本actor的上下文。
  • getContext().watch DeathWatch,相當(dāng)于watch了誰(shuí),誰(shuí)有啥公開(kāi)動(dòng)作就會(huì)告訴我,包括掛了之類(lèi)的。
  • ActorRef.forward與tell、ask的區(qū)別,性能最好的是tell,發(fā)完就走。ask是發(fā)完等Future,要等的話(huà)性能是個(gè)問(wèn)題。forward用于從一個(gè)actor轉(zhuǎn)發(fā)消息給另一個(gè)actor,原始的sender信息會(huì)被保留,在做路由、負(fù)載均衡、備份時(shí)非常有用。

運(yùn)行TransformationApp

  • sample.cluster.transformation.TransformationApp 啟動(dòng)三個(gè)backend 2551 2552 0為一個(gè)cluster,啟動(dòng)一個(gè)fronend。
  • frontend每5秒會(huì)收到一次任務(wù),接收成功后print代碼,代碼如下:
? system.scheduler().schedule(interval, interval, new Runnable() { public void run() { ask(frontend, new TransformationJob("hello-" + counter.incrementAndGet()), timeout).onSuccess(new OnSuccess<Object>() { public void onSuccess(Object result) { System.out.println(result); } }, ec); } }, ec);
  • frontend節(jié)點(diǎn)中,收到j(luò)ob的時(shí)候會(huì)去檢查backend注冊(cè)數(shù)是否可用了,如果有可用的就forward任務(wù)。
? public void onReceive(Object message) { if ((message instanceof TransformationJob) && backends.isEmpty()) { TransformationJob job = (TransformationJob) message; getSender().tell( new JobFailed("Service unavailable, try again later", job), getSender()); } else if (message instanceof TransformationJob) { TransformationJob job = (TransformationJob) message; jobCounter++; backends.get(jobCounter % backends.size()) .forward(job, getContext()); } else if (message.equals(BACKEND_REGISTRATION)) { getContext().watch(getSender()); backends.add(getSender()); } else if (message instanceof Terminated) { Terminated terminated = (Terminated) message; backends.remove(terminated.getActor()); } else { unhandled(message); } }
  • 在backend中有一句代碼如下:
? void register(Member member) { if (member.hasRole("frontend")) getContext().actorSelection(member.address() + "/user/frontend").tell( BACKEND_REGISTRATION, getSelf()); }
  • 解析:backend訂閱了memberUp事件,所以在cluster中如果有memberUp了,都會(huì)執(zhí)行上述代碼。
  • actorSelection是根據(jù)地址進(jìn)行l(wèi)ookup,返回一個(gè)ActorSelection,可以當(dāng)成本地的actor一樣tell。

代碼

  • 本文提及代碼在 https://github.com/54chen/akka_cluster_learn


原創(chuàng)文章如轉(zhuǎn)載,請(qǐng)注明:轉(zhuǎn)載自五四陳科學(xué)院[ http://www.54chen.com ]

總結(jié)

以上是生活随笔為你收集整理的java中使用akka手记三 cluster详例的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。