日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Spray + Akka高性能异步IO并发

發布時間:2024/4/17 编程问答 43 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spray + Akka高性能异步IO并发 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.


http://ju.outofmemory.cn/entry/70913

http://www.jdon.com/concurrent/spray-akka.html


如何使用Java建立像Node.js那樣非堵塞異步事件并發IO服務器呢?Spray是基于NIO2高并發框架,雖然Tomcat 8也是基于NIO2,但是Spary的線程數要低得到,降低CPU上下文切換的負載;Akka和其Mysql庫包都是相同線程執行上下文(?execution context),因為在非堵塞前提下,性能拼的就不是線程數目越多越好,正好相反,線程數目越低,越接近理論理論最佳點。

啟動設置

  為了啟動Spary和Akka,需要一個main啟動函數,在main函數中,我們創建一個Actor系統:

??? ActorSystem system = ActorSystem.create("system");

??? ActorRef listener = system.actorOf(Props.create(HttpActor.class), "httpActor");

  listener是使用Actor用來處理Http請求。然后要設定監聽Http的端口:

????InetSocketAddress endpoint = new InetSocketAddress(3000); ??? int backlog = 100; ??? List<Inet.SocketOption> options = JavaConversions.asScalaBuffer(new ArrayList<Inet.SocketOption>()).toList(); ??? Option<ServerSettings> settings = scala.Option.empty();

  最后,綁定Actor到監聽的Http端口:

??? Bind bind = new Http.Bind(listener, endpoint, backlog, options, settings, sslEngineProvider);

??? IO.apply(spray.can.Http$.MODULE$, system).tell(bind, ActorRef.noSender());

?

  整個main函數主要代碼如下:

public static final ActorSystem system = ActorSystem.create("system"); ? public static void main(String[] args) { ??? ... ??? ActorRef listener = system.actorOf(Props.create(HttpActor.class), "httpActor"); ???? ????InetSocketAddress endpoint = new InetSocketAddress(3000); ??? int backlog = 100; ??? List<Inet.SocketOption> options = JavaConversions.asScalaBuffer(new ArrayList<Inet.SocketOption>()).toList(); ??? Option<ServerSettings> settings = scala.Option.empty(); ??? ServerSSLEngineProvider sslEngineProvider = null; ??? Bind bind = new Http.Bind(listener, endpoint, backlog, options, settings, sslEngineProvider); ??? IO.apply(spray.can.Http$.MODULE$, system).tell(bind, ActorRef.noSender()); ??? ??? ... }

  設置好啟動函數以后,下面是真正開始在Actor里處理進來Http請求了。

請求處理器

  首先,我們因為使用的是原生Java代碼,不是Scala,因此需要將Scala集成到Java中,可能比較丑陋,可以用專門類包裝一下,引入Scala的Http協議:

HttpProtocol HTTP_1_1 = HttpProtocols.HTTP$div1$u002E1();

  Http Actor為了響應Http請求做三件事,第一件是創建一個路由router,這樣能夠根據請求URL:http://xxx/path中不同的/path分別處理:

Router router = partitionAndCreateRouter();

  第二件是處理新的連接,告訴Spray這個actor不僅接受Http連接,也處理實際http連接:

???? ??}).match(Tcp.Connected.class, r ->{ ??????????????? sender().tell(new Http.Register(self(), Http.EmptyFastPath$.MODULE$), self());//tell that connection will be handled here! ???????? })

  第三件事就是處理實際的Http連接,將http請求委托給另外一個actor處理。

??????????? .match(HttpRequest.class, r -> { ??????????????? int id = Constants.ID.getAndIncrement(); ??????????????? String path = String.valueOf(r.uri().path()); ?????????????? ?if("/sell".equals(path)){ ??????????????????? ... //邏輯處理 ??????????????? }else if("/buy".equals(path)){ ??????????????????? ... //邏輯處理 ??????????????? }else{ ??????????????????? handleUnexpected(r); ??????????????? } ????????? ??})

?

  整個HttpActor代碼如下,業務邏輯以買賣為模型:

private static class HttpActor extends AbstractActor { ? ??? private static final HttpProtocol HTTP_1_1 = HttpProtocols.HTTP$div1$u002E1(); ? ??? public HttpActor() { ??????? final Router router = partitionAndCreateRouter(); ??????? ????????receive(ReceiveBuilder ??????????? .match(HttpRequest.class, r -> { ??????????????? int id = Constants.ID.getAndIncrement(); ??????????????? String path = String.valueOf(r.uri().path()); ?????????????? ?if("/sell".equals(path)){ ??????????????????? String productId = r.uri().query().get("productId").get(); ??????????????????? ... ??????????????????? SalesOrder so = new SalesOrder(price, productId, quantity, id); ??????????????????? so.setSeller(new Seller(who)); ??????????????????? router.route(so, self()); ??????????????????? replyOK(id); ??????????????? }else if("/buy".equals(path)){ ??????????????????? ... ??????????????? }else{ ??????????????????? handleUnexpected(r); ??????????????? } ????????? ??}).match(Tcp.Connected.class, r ->{ ??????????????? sender().tell(new Http.Register(self(), Http.EmptyFastPath$.MODULE$), self());//tell that connection will be handled here! ??????????? }).build()); ??? }

?

  該案例使用的驅動包有:?Akka,?Spray, and this?Mysql async driver., 整個源碼下載:GitHub

Actor模型教程

NIO原理與應用


總結

以上是生活随笔為你收集整理的Spray + Akka高性能异步IO并发的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。