Spray + Akka高性能异步IO并发
http://ju.outofmemory.cn/entry/70913
http://www.jdon.com/concurrent/spray-akka.html
如何使用Java建立像Node.js那樣非堵塞異步事件并發IO服務器呢?Spray是基于NIO2高并發框架,雖然Tomcat 8也是基于NIO2,但是Spary的線程數要低得到,降低CPU上下文切換的負載;Akka和其Mysql庫包都是相同線程執行上下文(?execution context),因為在非堵塞前提下,性能拼的就不是線程數目越多越好,正好相反,線程數目越低,越接近理論理論最佳點。
啟動設置
為了啟動Spary和Akka,需要一個main啟動函數,在main函數中,我們創建一個Actor系統:
??? ActorSystem system = ActorSystem.create("system");
??? ActorRef listener = system.actorOf(Props.create(HttpActor.class), "httpActor");
listener是使用Actor用來處理Http請求。然后要設定監聽Http的端口:
????InetSocketAddress endpoint = new InetSocketAddress(3000); ??? int backlog = 100; ??? List<Inet.SocketOption> options = JavaConversions.asScalaBuffer(new ArrayList<Inet.SocketOption>()).toList(); ??? Option<ServerSettings> settings = scala.Option.empty();最后,綁定Actor到監聽的Http端口:
??? Bind bind = new Http.Bind(listener, endpoint, backlog, options, settings, sslEngineProvider);
??? IO.apply(spray.can.Http$.MODULE$, system).tell(bind, ActorRef.noSender());
?
整個main函數主要代碼如下:
public static final ActorSystem system = ActorSystem.create("system"); ? public static void main(String[] args) { ??? ... ??? ActorRef listener = system.actorOf(Props.create(HttpActor.class), "httpActor"); ???? ????InetSocketAddress endpoint = new InetSocketAddress(3000); ??? int backlog = 100; ??? List<Inet.SocketOption> options = JavaConversions.asScalaBuffer(new ArrayList<Inet.SocketOption>()).toList(); ??? Option<ServerSettings> settings = scala.Option.empty(); ??? ServerSSLEngineProvider sslEngineProvider = null; ??? Bind bind = new Http.Bind(listener, endpoint, backlog, options, settings, sslEngineProvider); ??? IO.apply(spray.can.Http$.MODULE$, system).tell(bind, ActorRef.noSender()); ??? ??? ... }設置好啟動函數以后,下面是真正開始在Actor里處理進來Http請求了。
請求處理器
首先,我們因為使用的是原生Java代碼,不是Scala,因此需要將Scala集成到Java中,可能比較丑陋,可以用專門類包裝一下,引入Scala的Http協議:
HttpProtocol HTTP_1_1 = HttpProtocols.HTTP$div1$u002E1();
Http Actor為了響應Http請求做三件事,第一件是創建一個路由router,這樣能夠根據請求URL:http://xxx/path中不同的/path分別處理:
Router router = partitionAndCreateRouter();
第二件是處理新的連接,告訴Spray這個actor不僅接受Http連接,也處理實際http連接:
???? ??}).match(Tcp.Connected.class, r ->{ ??????????????? sender().tell(new Http.Register(self(), Http.EmptyFastPath$.MODULE$), self());//tell that connection will be handled here! ???????? })第三件事就是處理實際的Http連接,將http請求委托給另外一個actor處理。
??????????? .match(HttpRequest.class, r -> { ??????????????? int id = Constants.ID.getAndIncrement(); ??????????????? String path = String.valueOf(r.uri().path()); ?????????????? ?if("/sell".equals(path)){ ??????????????????? ... //邏輯處理 ??????????????? }else if("/buy".equals(path)){ ??????????????????? ... //邏輯處理 ??????????????? }else{ ??????????????????? handleUnexpected(r); ??????????????? } ????????? ??})?
整個HttpActor代碼如下,業務邏輯以買賣為模型:
private static class HttpActor extends AbstractActor { ? ??? private static final HttpProtocol HTTP_1_1 = HttpProtocols.HTTP$div1$u002E1(); ? ??? public HttpActor() { ??????? final Router router = partitionAndCreateRouter(); ??????? ????????receive(ReceiveBuilder ??????????? .match(HttpRequest.class, r -> { ??????????????? int id = Constants.ID.getAndIncrement(); ??????????????? String path = String.valueOf(r.uri().path()); ?????????????? ?if("/sell".equals(path)){ ??????????????????? String productId = r.uri().query().get("productId").get(); ??????????????????? ... ??????????????????? SalesOrder so = new SalesOrder(price, productId, quantity, id); ??????????????????? so.setSeller(new Seller(who)); ??????????????????? router.route(so, self()); ??????????????????? replyOK(id); ??????????????? }else if("/buy".equals(path)){ ??????????????????? ... ??????????????? }else{ ??????????????????? handleUnexpected(r); ??????????????? } ????????? ??}).match(Tcp.Connected.class, r ->{ ??????????????? sender().tell(new Http.Register(self(), Http.EmptyFastPath$.MODULE$), self());//tell that connection will be handled here! ??????????? }).build()); ??? }?
該案例使用的驅動包有:?Akka,?Spray, and this?Mysql async driver., 整個源碼下載:GitHub
Actor模型教程
NIO原理與應用
總結
以上是生活随笔為你收集整理的Spray + Akka高性能异步IO并发的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: java中使用akka手记三 clust
- 下一篇: 解决Eclipse Debug 的sou