Spray + Akka高性能异步IO并发
http://ju.outofmemory.cn/entry/70913
http://www.jdon.com/concurrent/spray-akka.html
如何使用Java建立像Node.js那樣非堵塞異步事件并發(fā)IO服務(wù)器呢?Spray是基于NIO2高并發(fā)框架,雖然Tomcat 8也是基于NIO2,但是Spary的線程數(shù)要低得到,降低CPU上下文切換的負(fù)載;Akka和其Mysql庫(kù)包都是相同線程執(zhí)行上下文(?execution context),因?yàn)樵诜嵌氯疤嵯?#xff0c;性能拼的就不是線程數(shù)目越多越好,正好相反,線程數(shù)目越低,越接近理論理論最佳點(diǎn)。
啟動(dòng)設(shè)置
為了啟動(dòng)Spary和Akka,需要一個(gè)main啟動(dòng)函數(shù),在main函數(shù)中,我們創(chuàng)建一個(gè)Actor系統(tǒng):
??? ActorSystem system = ActorSystem.create("system");
??? ActorRef listener = system.actorOf(Props.create(HttpActor.class), "httpActor");
listener是使用Actor用來(lái)處理Http請(qǐng)求。然后要設(shè)定監(jiān)聽Http的端口:
????InetSocketAddress endpoint = new InetSocketAddress(3000); ??? int backlog = 100; ??? List<Inet.SocketOption> options = JavaConversions.asScalaBuffer(new ArrayList<Inet.SocketOption>()).toList(); ??? Option<ServerSettings> settings = scala.Option.empty();最后,綁定Actor到監(jiān)聽的Http端口:
??? Bind bind = new Http.Bind(listener, endpoint, backlog, options, settings, sslEngineProvider);
??? IO.apply(spray.can.Http$.MODULE$, system).tell(bind, ActorRef.noSender());
?
整個(gè)main函數(shù)主要代碼如下:
public static final ActorSystem system = ActorSystem.create("system"); ? public static void main(String[] args) { ??? ... ??? ActorRef listener = system.actorOf(Props.create(HttpActor.class), "httpActor"); ???? ????InetSocketAddress endpoint = new InetSocketAddress(3000); ??? int backlog = 100; ??? List<Inet.SocketOption> options = JavaConversions.asScalaBuffer(new ArrayList<Inet.SocketOption>()).toList(); ??? Option<ServerSettings> settings = scala.Option.empty(); ??? ServerSSLEngineProvider sslEngineProvider = null; ??? Bind bind = new Http.Bind(listener, endpoint, backlog, options, settings, sslEngineProvider); ??? IO.apply(spray.can.Http$.MODULE$, system).tell(bind, ActorRef.noSender()); ??? ??? ... }設(shè)置好啟動(dòng)函數(shù)以后,下面是真正開始在Actor里處理進(jìn)來(lái)Http請(qǐng)求了。
請(qǐng)求處理器
首先,我們因?yàn)槭褂玫氖窃鶭ava代碼,不是Scala,因此需要將Scala集成到Java中,可能比較丑陋,可以用專門類包裝一下,引入Scala的Http協(xié)議:
HttpProtocol HTTP_1_1 = HttpProtocols.HTTP$div1$u002E1();
Http Actor為了響應(yīng)Http請(qǐng)求做三件事,第一件是創(chuàng)建一個(gè)路由router,這樣能夠根據(jù)請(qǐng)求URL:http://xxx/path中不同的/path分別處理:
Router router = partitionAndCreateRouter();
第二件是處理新的連接,告訴Spray這個(gè)actor不僅接受Http連接,也處理實(shí)際http連接:
???? ??}).match(Tcp.Connected.class, r ->{ ??????????????? sender().tell(new Http.Register(self(), Http.EmptyFastPath$.MODULE$), self());//tell that connection will be handled here! ???????? })第三件事就是處理實(shí)際的Http連接,將http請(qǐng)求委托給另外一個(gè)actor處理。
??????????? .match(HttpRequest.class, r -> { ??????????????? int id = Constants.ID.getAndIncrement(); ??????????????? String path = String.valueOf(r.uri().path()); ?????????????? ?if("/sell".equals(path)){ ??????????????????? ... //邏輯處理 ??????????????? }else if("/buy".equals(path)){ ??????????????????? ... //邏輯處理 ??????????????? }else{ ??????????????????? handleUnexpected(r); ??????????????? } ????????? ??})?
整個(gè)HttpActor代碼如下,業(yè)務(wù)邏輯以買賣為模型:
private static class HttpActor extends AbstractActor { ? ??? private static final HttpProtocol HTTP_1_1 = HttpProtocols.HTTP$div1$u002E1(); ? ??? public HttpActor() { ??????? final Router router = partitionAndCreateRouter(); ??????? ????????receive(ReceiveBuilder ??????????? .match(HttpRequest.class, r -> { ??????????????? int id = Constants.ID.getAndIncrement(); ??????????????? String path = String.valueOf(r.uri().path()); ?????????????? ?if("/sell".equals(path)){ ??????????????????? String productId = r.uri().query().get("productId").get(); ??????????????????? ... ??????????????????? SalesOrder so = new SalesOrder(price, productId, quantity, id); ??????????????????? so.setSeller(new Seller(who)); ??????????????????? router.route(so, self()); ??????????????????? replyOK(id); ??????????????? }else if("/buy".equals(path)){ ??????????????????? ... ??????????????? }else{ ??????????????????? handleUnexpected(r); ??????????????? } ????????? ??}).match(Tcp.Connected.class, r ->{ ??????????????? sender().tell(new Http.Register(self(), Http.EmptyFastPath$.MODULE$), self());//tell that connection will be handled here! ??????????? }).build()); ??? }?
該案例使用的驅(qū)動(dòng)包有:?Akka,?Spray, and this?Mysql async driver., 整個(gè)源碼下載:GitHub
Actor模型教程
NIO原理與應(yīng)用
總結(jié)
以上是生活随笔為你收集整理的Spray + Akka高性能异步IO并发的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: java中使用akka手记三 clust
- 下一篇: 解决Eclipse Debug 的sou