java中使用akka手记三 cluster详例
生活随笔
收集整理的這篇文章主要介紹了
java中使用akka手记三 cluster详例
小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
http://www.tuicool.com/articles/m2muui
原文? http://2014.54chen.com/blog/2014/04/17/how-to-use-akka-in-java-3/
一個(gè)例子
- 同樣是typesafe的經(jīng)典例子。
- 例子提供的服務(wù)是傳輸文本。當(dāng)文本發(fā)給frontend節(jié)點(diǎn),它會(huì)委派backend節(jié)點(diǎn),backend執(zhí)行轉(zhuǎn)化任務(wù),把結(jié)果返回給原來(lái)的客戶(hù)端。
- 新的backend節(jié)點(diǎn)和frontend節(jié)點(diǎn),都可以動(dòng)態(tài)地在cluster上增減。
message
| ? | public interface TransformationMessages { public static class TransformationJob implements Serializable { private final String text; //...... } public static class TransformationResult implements Serializable { private final String text; //..... } public static class JobFailed implements Serializable { private final String reason; private final TransformationJob job; //.... } public static final String BACKEND_REGISTRATION = "BackendRegistration"; } |
backend處理邏輯
| ? | public class TransformationBackend extends UntypedActor { Cluster cluster = Cluster.get(getContext().system()); //... @Override public void onReceive(Object message) { if (message instanceof TransformationJob) { TransformationJob job = (TransformationJob) message; getSender().tell(new TransformationResult(job.getText().toUpperCase()), getSelf()); } else if (message instanceof CurrentClusterState) { CurrentClusterState state = (CurrentClusterState) message; for (Member member : state.getMembers()) { if (member.status().equals(MemberStatus.up())) { register(member); } } } else if (message instanceof MemberUp) { MemberUp mUp = (MemberUp) message; register(mUp.member()); } else { unhandled(message); } } void register(Member member) { if (member.hasRole("frontend")) getContext().actorSelection(member.address() + "/user/frontend").tell( BACKEND_REGISTRATION, getSelf()); } } |
- backend訂閱了cluster的事件,檢測(cè)frontend節(jié)點(diǎn),還會(huì)發(fā)一條消息告訴fontend可以使用了。
- frontend節(jié)點(diǎn)接收用戶(hù)的任務(wù),扔給注冊(cè)好的backend節(jié)點(diǎn)。
frontend節(jié)點(diǎn)
| ? | public class TransformationFrontend extends UntypedActor { List<ActorRef> backends = new ArrayList<ActorRef>(); int jobCounter = 0; @Override public void onReceive(Object message) { if ((message instanceof TransformationJob) && backends.isEmpty()) { TransformationJob job = (TransformationJob) message; getSender().tell( new JobFailed("Service unavailable, try again later", job), getSender()); } else if (message instanceof TransformationJob) { TransformationJob job = (TransformationJob) message; jobCounter++; backends.get(jobCounter % backends.size()) .forward(job, getContext()); } else if (message.equals(BACKEND_REGISTRATION)) { getContext().watch(getSender()); backends.add(getSender()); } else if (message instanceof Terminated) { Terminated terminated = (Terminated) message; backends.remove(terminated.getActor()); } else { unhandled(message); } } } |
- frontend用List 保存了backend的actor位置,有需要的時(shí)候就輪循發(fā)給backend。
- getSender 本次收到消息的上游,一般用來(lái)回復(fù)消息。
- getContext 本actor的上下文。
- getContext().watch DeathWatch,相當(dāng)于watch了誰(shuí),誰(shuí)有啥公開(kāi)動(dòng)作就會(huì)告訴我,包括掛了之類(lèi)的。
- ActorRef.forward與tell、ask的區(qū)別,性能最好的是tell,發(fā)完就走。ask是發(fā)完等Future,要等的話(huà)性能是個(gè)問(wèn)題。forward用于從一個(gè)actor轉(zhuǎn)發(fā)消息給另一個(gè)actor,原始的sender信息會(huì)被保留,在做路由、負(fù)載均衡、備份時(shí)非常有用。
運(yùn)行TransformationApp
- sample.cluster.transformation.TransformationApp 啟動(dòng)三個(gè)backend 2551 2552 0為一個(gè)cluster,啟動(dòng)一個(gè)fronend。
- frontend每5秒會(huì)收到一次任務(wù),接收成功后print代碼,代碼如下:
| ? | system.scheduler().schedule(interval, interval, new Runnable() { public void run() { ask(frontend, new TransformationJob("hello-" + counter.incrementAndGet()), timeout).onSuccess(new OnSuccess<Object>() { public void onSuccess(Object result) { System.out.println(result); } }, ec); } }, ec); |
- frontend節(jié)點(diǎn)中,收到j(luò)ob的時(shí)候會(huì)去檢查backend注冊(cè)數(shù)是否可用了,如果有可用的就forward任務(wù)。
| ? | public void onReceive(Object message) { if ((message instanceof TransformationJob) && backends.isEmpty()) { TransformationJob job = (TransformationJob) message; getSender().tell( new JobFailed("Service unavailable, try again later", job), getSender()); } else if (message instanceof TransformationJob) { TransformationJob job = (TransformationJob) message; jobCounter++; backends.get(jobCounter % backends.size()) .forward(job, getContext()); } else if (message.equals(BACKEND_REGISTRATION)) { getContext().watch(getSender()); backends.add(getSender()); } else if (message instanceof Terminated) { Terminated terminated = (Terminated) message; backends.remove(terminated.getActor()); } else { unhandled(message); } } |
- 在backend中有一句代碼如下:
| ? | void register(Member member) { if (member.hasRole("frontend")) getContext().actorSelection(member.address() + "/user/frontend").tell( BACKEND_REGISTRATION, getSelf()); } |
- 解析:backend訂閱了memberUp事件,所以在cluster中如果有memberUp了,都會(huì)執(zhí)行上述代碼。
- actorSelection是根據(jù)地址進(jìn)行l(wèi)ookup,返回一個(gè)ActorSelection,可以當(dāng)成本地的actor一樣tell。
代碼
- 本文提及代碼在 https://github.com/54chen/akka_cluster_learn
原創(chuàng)文章如轉(zhuǎn)載,請(qǐng)注明:轉(zhuǎn)載自五四陳科學(xué)院[ http://www.54chen.com ]
總結(jié)
以上是生活随笔為你收集整理的java中使用akka手记三 cluster详例的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: Java Web应用程序:Oozie及其
- 下一篇: Spray + Akka高性能异步IO并