日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 运维知识 > windows >内容正文

windows

akka使用_使用Akka简化交易系统

發(fā)布時間:2023/12/3 windows 44 豆豆
生活随笔 收集整理的這篇文章主要介紹了 akka使用_使用Akka简化交易系统 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

akka使用

我的同事正在開發(fā)一種交易系統(tǒng),該系統(tǒng)可以處理大量的傳入交易。 每筆交易都涵蓋一種Instrument (例如債券或股票),并且具有某些(現(xiàn)在)不重要的屬性。 他們堅持使用Java(<8),所以我們堅持下去:

class Instrument implements Serializable, Comparable<Instrument> {private final String name;public Instrument(String name) {this.name = name;}//...Java boilerplate}public class Transaction {private final Instrument instrument;public Transaction(Instrument instrument) {this.instrument = instrument;}//...Java boilerplate}

Instrument稍后將用作HashMap的鍵,因此將來我們會主動實現(xiàn)Comparable<Instrument> 。 這是我們的領(lǐng)域,現(xiàn)在的要求是:

  • 交易進入系統(tǒng),需要盡快處理(無論如何)
  • 我們可以自由地以任何順序處理它們
  • …但是,同一種工具的交易需要按照進來時完全相同的順序進行處理。
  • 最初的實現(xiàn)很簡單–將所有傳入的事務(wù)放入一個使用者的隊列(例如ArrayBlockingQueue )中。 這滿足了最后一個要求,因為隊列在所有事務(wù)中保留了嚴格的FIFO順序。 但是,這樣的架構(gòu)阻止了針對不同工具的不相關(guān)交易的并發(fā)處理,從而浪費了令人信服的吞吐量提高。 毫無疑問,這種實現(xiàn)盡管很簡單,卻成為了瓶頸。

    第一個想法是以某種方式分別按工具和過程工具拆分傳入的交易。 我們提出了以下數(shù)據(jù)結(jié)構(gòu):

    priavate final ConcurrentMap<Instrument, Queue<Transaction>> queues = new ConcurrentHashMap<Instrument, Queue<Transaction>>();public void accept(Transaction tx) {final Instrument instrument = tx.getInstrument();if (queues.get(instrument) == null) {queues.putIfAbsent(instrument, new LinkedBlockingQueue<Transaction>());}final Queue<Transaction> queue = queues.get(instrument);queue.add(tx); }

    ! 但是最壞的時刻還沒有到來。 您如何確保最多一個線程一次處理每個隊列? 畢竟,否則,兩個線程可以從一個隊列(一種工具)中提取項目,并以相反的順序處理它們,這是不允許的。 最簡單的情況是每個隊列都有一個Thread -這無法擴展,因為我們期望成千上萬種不同的工具。 因此,我們可以說N線程,讓每個線程處理隊列的一個子集,例如instrument.hashCode() % N告訴我們哪個線程負責(zé)處理給定的隊列。 但是由于以下三個原因,它仍然不夠完美:

  • 一個線程必須“觀察”許多隊列(很可能是忙于等待),并始終對其進行遍歷。 或者,隊列可能以某種方式喚醒其父線程
  • 在最壞的情況下,所有工具都將具有沖突的哈希碼,僅針對一個線程-這實際上與我們最初的解決方案相同
  • 這只是該死的復(fù)雜! 漂亮的代碼并不復(fù)雜!
  • 實現(xiàn)這種怪異是可能的,但是困難且容易出錯。 此外,還有另一個非功能性的要求:儀器來來往往,隨著時間的流逝,成千上萬的儀器。 一段時間后,我們應(yīng)刪除代表最近未見過的儀器的地圖條目。 否則我們會發(fā)生內(nèi)存泄漏。

    如果您能提出一些更簡單的解決方案,請告訴我。 同時,讓我告訴你我對同事的建議。 如您所料,它是Akka –結(jié)果非常簡單。 我們需要兩種角色: Dispatcher和Processor 。 Dispatcher有一個實例,并接收所有傳入的事務(wù)。 它的職責(zé)是為每個Instrument找到或生成工作Processor角色,并將交易推向該角色:

    public class Dispatcher extends UntypedActor {private final Map<Instrument, ActorRef> instrumentProcessors = new HashMap<Instrument, ActorRef>();@Overridepublic void onReceive(Object message) throws Exception {if (message instanceof Transaction) {dispatch(((Transaction) message));} else {unhandled(message);}}private void dispatch(Transaction tx) {final ActorRef processor = findOrCreateProcessorFor(tx.getInstrument());processor.tell(tx, self());}private ActorRef findOrCreateProcessorFor(Instrument instrument) {final ActorRef maybeActor = instrumentProcessors.get(instrument);if (maybeActor != null) {return maybeActor;} else {final ActorRef actorRef = context().actorOf(Props.create(Processor.class), instrument.getName());instrumentProcessors.put(instrument, actorRef);return actorRef;}} }

    這很簡單。 由于我們的Dispatcher actor實際上是單線程的,因此不需要同步。 我們幾乎沒有收到Transaction ,查找或創(chuàng)建Processor并進一步傳遞Transaction 。 這是Processor實現(xiàn)的樣子:

    public class Processor extends UntypedActor {private final LoggingAdapter log = Logging.getLogger(getContext().system(), this);@Overridepublic void onReceive(Object message) throws Exception {if (message instanceof Transaction) {process(((Transaction) message));} else {unhandled(message);}}private void process(Transaction tx) {log.info("Processing {}", tx);} }

    而已! 有趣的是,我們的Akka實現(xiàn)幾乎與我們第一個使用隊列映射的想法相同。 畢竟,參與者只是一個隊列,還有一個(邏輯)線程處理該隊列中的項目。 區(qū)別在于:Akka管理有限的線程池,并可能在成千上萬的參與者之間共享它。 而且,由于每個工具都有其專用(和“單線程”)執(zhí)行器,因此可以保證每個工具的事務(wù)的順序處理。

    還有一件事。 如前所述,有大量的樂器,我們不想讓演員們出現(xiàn)一段時間了。 假設(shè)如果Processor在一個小時內(nèi)未收到任何交易,則應(yīng)停止并收集垃圾。 如果以后我們收到此類工具的新交易,則可以隨時重新創(chuàng)建它。 這是一個非常棘手的問題–我們必須確保,如果在處理器決定刪除自身時交易到達,我們就不能放棄該交易。 Processor沒有停止自身,而是向其父Processor發(fā)出空閑時間過長的信號。 然后, Dispatcher將發(fā)送PoisonPill到它。 因為ProcessorIdle和Transaction消息都按順序處理,所以沒有交易發(fā)送到不再存在的參與者的風(fēng)險。

    每個setReceiveTimeout通過使用setReceiveTimeout安排超時來獨立地管理其生命周期:

    public class Processor extends UntypedActor {@Overridepublic void preStart() throws Exception {context().setReceiveTimeout(Duration.create(1, TimeUnit.HOURS));}@Overridepublic void onReceive(Object message) throws Exception {//...if (message instanceof ReceiveTimeout) {log.debug("Idle for two long, shutting down");context().parent().tell(ProcessorIdle.INSTANCE, self());} else {unhandled(message);}}}enum ProcessorIdle {INSTANCE }

    顯然,當(dāng)Processor在一個小時內(nèi)未收到任何消息時,它會向其父級( Dispatcher )輕輕發(fā)出信號。 但是演員仍然活著,并且只要一個小時后發(fā)生交易就可以處理交易。 Dispatcher作用是殺死給定的Processor并將其從地圖中刪除:

    public class Dispatcher extends UntypedActor {private final BiMap<Instrument, ActorRef> instrumentProcessors = HashBiMap.create();public void onReceive(Object message) throws Exception {//...if (message == ProcessorIdle.INSTANCE) {removeIdleProcessor(sender());sender().tell(PoisonPill.getInstance(), self());} else {unhandled(message);}}private void removeIdleProcessor(ActorRef idleProcessor) {instrumentProcessors.inverse().remove(idleProcessor);}private void dispatch(Transaction tx) {final ActorRef processor = findOrCreateProcessorFor(tx.getInstrument());processor.tell(tx, self());}//...}

    不便之處。 instrumentProcessors過去是Map<Instrument, ActorRef> 。 事實證明這是不夠的,因為我們突然不得不按值刪除此映射中的條目。 換句話說,我們需要找到一個映射到給定ActorRef ( Processor )的鍵( Instrument )。 有多種處理方法(例如,空閑Processor可以發(fā)送它處理的Instrumnt ),但是我改用了BiMap<K, V> 。 之所以可以使用它,是因為指定的Instrument和ActorRef都是唯一的(每個演員actor)。 使用BiMap我可以簡單地對地圖進行inverse() (從BiMap<Instrument, ActorRef>到BiMap<ActorRef, Instrument>并將ActorRef視為鍵。

    這個Akka例子只不過是“ hello,world ”。 但是與復(fù)雜的解決方案相比,我們必須使用并發(fā)隊列,鎖和線程池進行編寫,這是完美的。 我的隊友非常興奮,以至于最終他們決定將整個應(yīng)用程序重寫為Akka。

    翻譯自: https://www.javacodegeeks.com/2014/06/simplifying-trading-system-with-akka.html

    akka使用

    總結(jié)

    以上是生活随笔為你收集整理的akka使用_使用Akka简化交易系统的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。

    主站蜘蛛池模板: 韩国精品av| 欧美影视一区二区三区 | 欧美日韩国产a | 久久免费的精品国产v∧ | 欧美性猛交xxx乱久交 | 九一国产在线 | 久热欧美| 国产亚洲精久久久久久无码苍井空 | 色欲一区二区三区精品a片 在线观看黄网站 | 97精品国产97久久久久久春色 | 国产情侣第一页 | 日韩一级片免费 | 男人和女人做爽爽视频 | 国产露脸无套对白在线播放 | 99久久婷婷国产综合精品电影 | 国产精品呦呦 | 国产偷国产偷av亚洲清高 | 亚洲免费成人网 | 亚洲AV无码乱码国产精品牛牛 | 激情视频在线观看免费 | 亚洲天堂av一区二区三区 | 亚洲国产精品人人爽夜夜爽 | www.看毛片| jizz色| 日本色视| 日本中文字幕不卡 | 羞羞影院体验区 | 污视频在线网站 | 欧美一级做a爰片免费视频 成人激情在线观看 | 日韩不卡一区二区三区 | 久久精品色妇熟妇丰满人妻 | 国精品无码一区二区三区 | 国产一级片网站 | 青青草原成人网 | 亚洲乱熟 | 欧美色资源 | 少妇久久久久久被弄高潮 | 国产欧美日韩精品在线观看 | 蜜臀av性久久久久蜜臀aⅴ四虎 | 亚洲熟妇av一区二区三区漫画 | 久久噜噜色综合一区二区 | 天天草夜夜| 国产在线网 | 久久日韩| 成人激情免费视频 | 欧美一区二区日韩一区二区 | 在线高清av | 中国字幕一色哟哟 | 日韩最新中文字幕 | 日韩色影院 | 中文字幕被公侵犯的漂亮人妻 | 国产精品电影一区二区三区 | 天堂av2018| 日韩精品福利视频 | 国产吞精囗交免费视频网站 | 亚洲av毛片基地 | 超碰成人97| 色婷婷导航 | 永久免费看片在线播放 | 伊人网色 | 午夜免费大片 | 女仆裸体打屁屁羞羞免费 | 亚洲伊人天堂 | 99国产精品99 | 国产亚洲自拍一区 | 国产精品影院在线观看 | 亚洲精品一级二级 | 欧美日韩不卡在线 | 巨胸挤奶视频www网站 | 少妇免费毛片久久久久久久久 | 日韩高清不卡 | 国产ts丝袜人妖系列视频 | 国产男男chinese网站 | 亚洲精品男人的天堂 | 精品日韩一区二区三区四区 | 国产奶头好大揉着好爽视频 | 国产稀缺精品盗摄盗拍 | 欧美黑吊大战白妞欧美大片 | 爱爱色图 | 好吊操这里有精品 | 日本免费一区二区三区四区 | 五月激情在线观看 | 日本波多野结衣在线 | 久久人人人 | 精品国产91久久久久久久妲己 | 久久亚洲美女 | 国产乱码77777777 | 我要看免费毛片 | 色婷婷在线影院 | 国产黄色精品 | 日韩在线欧美 | 午夜精品视频在线观看 | 九色porny自拍视频 | 国产一区二区三区视频在线播放 | 欧美日一区二区三区 | 越南少妇做受xxx片 亚洲av综合色区无码一二三区 | 怡红院院av| 538在线精品视频 | av免费播放 |