日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

java中使用akka手记三 cluster详例

發(fā)布時間:2024/4/17 编程问答 41 豆豆
生活随笔 收集整理的這篇文章主要介紹了 java中使用akka手记三 cluster详例 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.


http://www.tuicool.com/articles/m2muui

原文? http://2014.54chen.com/blog/2014/04/17/how-to-use-akka-in-java-3/

一個例子

  • 同樣是typesafe的經典例子。
  • 例子提供的服務是傳輸文本。當文本發(fā)給frontend節(jié)點,它會委派backend節(jié)點,backend執(zhí)行轉化任務,把結果返回給原來的客戶端。
  • 新的backend節(jié)點和frontend節(jié)點,都可以動態(tài)地在cluster上增減。

message

? public interface TransformationMessages { public static class TransformationJob implements Serializable { private final String text; //...... } public static class TransformationResult implements Serializable { private final String text; //..... } public static class JobFailed implements Serializable { private final String reason; private final TransformationJob job; //.... } public static final String BACKEND_REGISTRATION = "BackendRegistration"; }

backend處理邏輯

? public class TransformationBackend extends UntypedActor { Cluster cluster = Cluster.get(getContext().system()); //... @Override public void onReceive(Object message) { if (message instanceof TransformationJob) { TransformationJob job = (TransformationJob) message; getSender().tell(new TransformationResult(job.getText().toUpperCase()), getSelf()); } else if (message instanceof CurrentClusterState) { CurrentClusterState state = (CurrentClusterState) message; for (Member member : state.getMembers()) { if (member.status().equals(MemberStatus.up())) { register(member); } } } else if (message instanceof MemberUp) { MemberUp mUp = (MemberUp) message; register(mUp.member()); } else { unhandled(message); } } void register(Member member) { if (member.hasRole("frontend")) getContext().actorSelection(member.address() + "/user/frontend").tell( BACKEND_REGISTRATION, getSelf()); } }
  • backend訂閱了cluster的事件,檢測frontend節(jié)點,還會發(fā)一條消息告訴fontend可以使用了。
  • frontend節(jié)點接收用戶的任務,扔給注冊好的backend節(jié)點。

frontend節(jié)點

? public class TransformationFrontend extends UntypedActor { List<ActorRef> backends = new ArrayList<ActorRef>(); int jobCounter = 0; @Override public void onReceive(Object message) { if ((message instanceof TransformationJob) && backends.isEmpty()) { TransformationJob job = (TransformationJob) message; getSender().tell( new JobFailed("Service unavailable, try again later", job), getSender()); } else if (message instanceof TransformationJob) { TransformationJob job = (TransformationJob) message; jobCounter++; backends.get(jobCounter % backends.size()) .forward(job, getContext()); } else if (message.equals(BACKEND_REGISTRATION)) { getContext().watch(getSender()); backends.add(getSender()); } else if (message instanceof Terminated) { Terminated terminated = (Terminated) message; backends.remove(terminated.getActor()); } else { unhandled(message); } } }
  • frontend用List 保存了backend的actor位置,有需要的時候就輪循發(fā)給backend。
  • getSender 本次收到消息的上游,一般用來回復消息。
  • getContext 本actor的上下文。
  • getContext().watch DeathWatch,相當于watch了誰,誰有啥公開動作就會告訴我,包括掛了之類的。
  • ActorRef.forward與tell、ask的區(qū)別,性能最好的是tell,發(fā)完就走。ask是發(fā)完等Future,要等的話性能是個問題。forward用于從一個actor轉發(fā)消息給另一個actor,原始的sender信息會被保留,在做路由、負載均衡、備份時非常有用。

運行TransformationApp

  • sample.cluster.transformation.TransformationApp 啟動三個backend 2551 2552 0為一個cluster,啟動一個fronend。
  • frontend每5秒會收到一次任務,接收成功后print代碼,代碼如下:
? system.scheduler().schedule(interval, interval, new Runnable() { public void run() { ask(frontend, new TransformationJob("hello-" + counter.incrementAndGet()), timeout).onSuccess(new OnSuccess<Object>() { public void onSuccess(Object result) { System.out.println(result); } }, ec); } }, ec);
  • frontend節(jié)點中,收到job的時候會去檢查backend注冊數是否可用了,如果有可用的就forward任務。
? public void onReceive(Object message) { if ((message instanceof TransformationJob) && backends.isEmpty()) { TransformationJob job = (TransformationJob) message; getSender().tell( new JobFailed("Service unavailable, try again later", job), getSender()); } else if (message instanceof TransformationJob) { TransformationJob job = (TransformationJob) message; jobCounter++; backends.get(jobCounter % backends.size()) .forward(job, getContext()); } else if (message.equals(BACKEND_REGISTRATION)) { getContext().watch(getSender()); backends.add(getSender()); } else if (message instanceof Terminated) { Terminated terminated = (Terminated) message; backends.remove(terminated.getActor()); } else { unhandled(message); } }
  • 在backend中有一句代碼如下:
? void register(Member member) { if (member.hasRole("frontend")) getContext().actorSelection(member.address() + "/user/frontend").tell( BACKEND_REGISTRATION, getSelf()); }
  • 解析:backend訂閱了memberUp事件,所以在cluster中如果有memberUp了,都會執(zhí)行上述代碼。
  • actorSelection是根據地址進行l(wèi)ookup,返回一個ActorSelection,可以當成本地的actor一樣tell。

代碼

  • 本文提及代碼在 https://github.com/54chen/akka_cluster_learn


原創(chuàng)文章如轉載,請注明:轉載自五四陳科學院[ http://www.54chen.com ]

總結

以上是生活随笔為你收集整理的java中使用akka手记三 cluster详例的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。