日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

QuickFIX 源码分析

發布時間:2024/3/7 编程问答 58 豆豆
生活随笔 收集整理的這篇文章主要介紹了 QuickFIX 源码分析 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

FIX是Financial Information eXchange的簡稱。FIX是一種專門為實時電子證券交易設計的標準消息協議。FIX協議由FIX protocol, Ltd(FPL)所有并維護。FIX協議的網址為http://www.fixprotocol.org
QuickFix/J是實現了FIX協議所有版本及其功能的開源軟件,100%使用JAVA實現。
QuickFix/J的網址為http://www.quickfixj.org
QuickFix/J的源代碼可以從http://sourceforge.net/projects/quickfixj/files/QuickFIX_J 下載,也可以去QuickFix/J的官方網站,進入下載頁面下載源代碼。
那么能用QuickFix/J做什么事情呢?關注股票的兄弟們一定留意過LevelII這個名詞,他是中國股票交易新行情的簡稱。簡單說,可以將QuickFix/J的代碼改造一下,就用來接受深圳證券交易的LevelII行情數據。當然接受行情不是免費的,需要諸多的商務手續,但是本文僅僅討論QuickFix/J的開源代碼的設計和實現,并且側重于QuickFix/J的客戶端實現。服務器端留在以后的文章介紹。關于上海證券交易所的LevelII數據的格式和接受,跟深圳的有諸多的不同,也留在以后討論。
首先QuickFixJ代碼功能主要有兩大部分,一部分是Fix協議數據的解析,另外一部分是客戶端跟服務器端建立連接并維持回話,傳輸數據。第一部分將主要介紹QuickFix/J的傳輸部分的實現。
(一) QuickFix/J傳輸功能部分
QuickFix/J的連接管理和傳輸功能是基于MINA框架實現的。MINA是什么?MINA是Apache旗下的一個網絡應用框架,能夠幫助大家輕松的開發高性能、高擴展性的網絡程序。它使用NIO在傳輸協議(比如TCP/IP,UDP/IP)之上提供了抽象的、事件驅動的、異步處理的API。MINA的網址為http://mina.apache.org。
A). QuickFix/J客戶端用到的主要類的功能說明(V1.5.0)

  • quickfix.Initiator:定義了一些從配置中獲取通信協議、主機、端口及重連接的時間間隔的KEY,僅僅是key而已,沒有其他的。
  • quickfix.mina.SessionConnector:定義了一些helper方法,為initiator和acceptor提供公用的功能,比如獲取Session,創建Session,動態添加/刪除Session,判斷是否已經登陸。最重要的是他定義了SessionTimerTask這個內部類。SessionTimerTask的功能有自動重連登陸、檢查和更新Session時間戳、發送心跳消息。
  • quickfix.mina.initiator.AbstractSocketInitiator:是SocketInitiator的基礎抽象基類,繼承了SessionConnector和Initiator。在QuickFix/J中提供了兩種默認的具體實現,分別是SocketInitiator和ThreadedSocketInitiator。這兩種具體實現的功能都一樣,兩者的區別僅僅是處理消息時使用線程的策略不同,具體請參考7和8。抽象類AbstractSocketInitiator提供的功能有:
    a) 遍歷配置文件取得所有[session]節的配置并創建相應的FixSession (如果[session]中沒有指定ConnectionType或者明確指定了ConnectionType為initiator,則建立FixSession(quickfix.Session),其他類型的ConnectionType無效,如acceptor)。配置文件中可以指定多個[session]。
    b) 通過已經生成的FixSession和傳入的eventHandlingStrategy創建IoSessionInitiator,并保存入initiators(Set類型的緩存)中。
    那么FixSession和IoSessionInitiator有什么區別呢?請參考5、6。
    c) 啟動、關閉客戶端(initiator)。啟動initiator時首先啟動應用層的SessionTimer(請參考2),然后啟動連接層的initiator(IoSessionInitiator)。關閉initiator時,先關閉連接層的initiator(IoSessionInitiator),再關閉應用層的SessionTimer。
  • quickfix.SessionID:是Session的唯一標識。SessionID中包含beginString(必須),senderCompID(必須),senderSubID (可選),senderLocationID(可選),targetCompID(必須),targetSubID(可選),targetSubID(可選),targetLocationID(可選),sessionQualifier(可選)。sessionQualifer用于區分具有相同的targetCompID不同的session,只能用在initiator角色中。SessionID.toString生成的可讀的Session ID字符串組成為:beginString:senderCompID/senderSubID/senderLocationID->targetCompID/targetSubID/targetLocationID/sessionQualifier。如果可選值未設置則在Session ID字符串中默認空字符串。
  • quickfix.Session:Session是FIX消息通訊中最基本的抽象。
    a) fixSession維護Session內部消息的自增序列號、自動錯誤恢復、與通信對方(counterpart)建立通信信道(communication channel)。
    b) Session是獨立于特定的傳輸層協議的。Session被新建時,消息序列號置為1,每次通信序列號自增,直到Session被重置(reset)。每個Session能夠跨越多個傳輸連接(并非同時跨越,而是說第一次網絡連接斷開后,隨后重連,雖然底層的網絡連接已經是新建的了,但是Session還能保持跟斷網之前是同一個Session)。
    c) fixSession中核心邏輯在next方法中。next(message)首先檢查SessionTime,如果超過1秒未刷新則刷新時間戳;如果發現Session不存在,則執行reset,重置Session。然后取到消息的header,msgType進行檢查,首先beginString不正確,拋異常并退出。然后從dataDictionaryProvider取得數據字典,驗證數據字典。然后根據消息類型,分別回調用戶接口。回調用戶函數的入口在驗證完數據字典之后,請注意verify(message)函數,所有的普通消息通過這個函數去回調application的fromApp(message, sessionID)的。verify -> veriry -> fromCallback -> fromAdmin/fromApp。關于消息的解析,其中普通Message是通過quickfix.MessageUtils.parse將String類型的消息解析成Message。
  • quickfix.mina.initiator.IoSessionInitiator:使用MINA提供的傳輸層的API,建立、維護同服務器之間的傳輸層的網絡連接,而不是應用層的網絡連接。這些網絡功能都在一個叫做quickfix.mina.initiator.IoSessionInitiator.ConnectTask的一個私有的TimerTask中實現。具體實現功能有連接(包括普通連接和加密SSL連接)、重連、判斷是否應該重連、處理連接異常、啟動和停止ConnectTask。
  • quickfix.SocketInitiator:使用單獨的線程去為所有的Session處理消息。SocketInitiator提供的功能有:
    a) 初始化,即用eventHandlingStrategy創建Initiator,然后注冊此SocketInitiator所管理的全部Session,然后啟動Initiator,最后調用eventHandlingStrategy.blockInThread()在另外的后臺線程中去處理SessionTimer收到的插入隊列的消息。啟動Initiator做的事情依次是:先啟動SessionTimer去監聽從傳輸層過來的消息,如果沒有Logon則先Logon,然后在收到消息后回調用戶代碼處理消息;啟動reconnectTask去建立和維護傳輸層的網絡連接。
    b) 啟動Initiator,在另外的線程中后臺(Daemon)處理消息。
    c) 阻塞Initiator,在同一線程中處理消息。
    d) 停止Initiator。分為強制停止和非強制停止。強制或者非強制Logout所有FixSession,停止連接層的Initiator,取消注冊所有此SocketInitiator所管理的全部Session。
    e) 關于a) b)如何處理來自底層的消息的邏輯,請參考11和12。因為這里所謂的處理消息實際上是直接或者間接調用了SingleThreadedEventHandlingStrategy的block處理消息。
  • quickfix.ThreadedSocketInitiator:為每一個Session使用一個單獨的線程去處理消息。功能參考7。除了線程工作模式不一樣,功能和7完全一樣。
  • quickfix.SessionState: Session和對方通信過程中使用的helper類。主要功能就是存儲了Session的所有狀態,并且提供了響應API訪問這些狀態。狀態包括heartBeatInterval,heartBeatMillis,是否需要heartBeat,判斷Session所在應用程序的角色(客戶端Initiator or 服務器端Acceptor),lastReceivedTime,lastSentTime,獲取logger,判斷作為客戶端的Initiator登陸消息是否已經發出,判斷作為服務器端的Acceptor登陸消息是否收到,判斷是否需要登陸,判斷登陸是否TimeOut,messageStore,testRequestCounter,判斷是否需要TestRequest,判斷是否處于TimeOut狀態,將收到的Message入隊(enqueue),出隊(dequeue),鎖定和解鎖發送/接受的Sequence Number,獲取、設置自增的下一個Sequence Number,重置(即將Sequence清空,重新從1開始計數),設置、獲取Logout的原因。
  • quickfix.mina.EventHandlingStrategy:用于不同版本FIX協議處理事件的策略的接口,是應用級處理消息回調接口的根源。當傳輸層消息到達時調用此接口onMessage,可以這么理解,onMessage是EventHandlingStrategy的輸入,這個輸入來自底層。getSessionConnector獲取和這個策略相關的SessionConnector,即獲取和這個Session相關的Initiator/Acceptor去處理響應的輸入消息,一般情況下是逐層將消息向上層傳出,回調用戶的函數處理該消息。getQueueSize獲取當前被處理消息隊列的長度。EventHandlingStrategy一般作為AbstractSocketInitiator的成員,建立IoSessionInitiator時傳給IoSessionInitiator,請參考3 b)。目前在QuickFix/J中有兩個具體實現,分別是quickfix.mina.SingleThreadedEventHandlingStrategy和quickfix.mina.ThreadPerSessionEventHandlingStrategy。這兩個具體實現類的說明請參考11,12。
  • quickfix.mina.SingleThreadedEventHandlingStrategy:是QuickFix/J處理消息的核心類。處理消息時即便有多個Session也使用單線程模式。
    a) 為了不阻塞輸入,那么就需要一個eventQueue來臨時快速的存儲收到的所有消息。
    b) onMessage接到底層傳入的消息包裝成SessionMessageEvent,首先將其存入eventQueue。
    c) 那么SessionMessageEvent里面有什么?SessionEvent僅僅是把fixSession和Message包裝到一起,并且提供了處理Message的方法processMessage。可以這樣理解,。
    d) getSessionConnector獲取需要處理應用級的connector以便處理eventQueue中的消息。
    e) getMessage從eventQueue中取出SessionMessageEvent待處理。
    f) block就是應用程序級別處理消息的入口。block判斷HandlingMessage是否應該繼續運新,如果是則從消息隊列中取出SessionMessageEvent,調用其中的processMessage去處理該Message。
    g) processMessage如何處理了收到的消息呢?它會調用fixSession的next方法,將消息傳給Session,由fixSession再接力將消息回調到用戶手中。請參考5。
    h) 也許你會注意到處理消息的block在run中始終被調用,而且沒有任何sleep時間,難道它在沒有消息的時候始終不停的死循環運行且絲毫不休息?CPU會保持100%?實際上效果不是這樣的,其中的秘密在于它使用了BlockingQueue做到了和sleep相同效果的事情。在沒有消息的時候,這個循環會每休息一秒再執行下一次循環。如何做到這樣的效果呢?原因是如果eventQueue中如果沒有消息,而該eventQueue設置了阻塞超時1000毫秒,則取消息的操作會等待最多1000毫秒,如果沒有等到消息則超時退出不再等待,執行完畢本次循環,如果等到了則按照正常流程處理消息。這樣做最大的好處就是,如果eventQueue中有事件,那么就會連續不斷的處理,如果沒有消息,就會休息timeout毫秒再查看。
    i) blockInThread,在新啟動的后臺線程中處理SessionMessageEvent
  • quickfix.mina.ThreadPerSessionEventHandlingStrategy:同樣是QuickFix/J處理消息的核心類。和單線程模式不同的是該策略會為每個session啟動一個新的線程去處理消息。
    a) 由于是每個Session對應一個線程,因此該策略內部需要一個稱之為dispatchersMap作為緩存為每個Session保存響應的處理線程(MessageDispatchingThread)引用。
    b) 當onMessage收到來自底層的輸入消息時,根據輸入的fixSession從dispatchers中取到相應的處理線程,并將該消息加入(enqueue)到該線程內部的消息隊列中待處理。
    c) 每個dispatcher(MessageDispatchingThread類型)內部均維護了自己的消息隊列,和單線程模式不同在于,消息隊列中的消息僅僅是Message,不是SessionMessageEvent。處理Message的邏輯從單線程中的SessionMessageEvent中移出到dispatcher中。
  • quickfix.DataDictionaryProvider
    :是一個接口,為指定的session protocol或者application version提供數據字典。getSessionDataDictionary根據提供的beginString 即協議版本獲取相應的數據字典。getApplicationDataDictionary根據提供的application version ID和custom application ID獲取數據字典。application version ID在FIXT.1.1之前由BeginString字段確定。custom application ID是可選值,不是必須的。
  • quickfix.DefaultDataDictionaryProvider:是QuickFix/J提供的DataDictionaryProvider的默認實現。在DefaultDataDictionaryProvider中,有兩種數據字典,一種是傳輸用的數據字典,一種是應用程序用的數據字典,分別緩存在兩個Map中。這個DefaultDataDictionaryProvider是在創建Session時由默認的DefaultSessionFactory根據beginString創建的。addApplicationDictionary和addTransportDictionary分別用于向DefaultDataDictionaryProvider添加新的數據字典。目前QuickFix/J的實現中,在DefaultSessionFactory中初始化Session時添加字典。對于fixt之前版本的數據字典,每個數據字典會被同時添加進入到傳輸數據字典和應用程序數據字典中。
    B). 網絡數據在QuickFix/J中的流向
    ConnectTask -> ioConnector.connect(sockAddress, ioHandler) -> MINA建立和服務器端的通信。收到網絡數據,ioConnector觸發相應事件,并把事件交給ioHandler(InitiatorIoHandler)的processMessage -> processMessage中調用eventHandlingStrategy.onMessage(quickfixSession, message) ,將消息向外回調 -> SingleThreadedEventHandlingStrategy.onMessage(quickfixSession, message) 將收到的消息入隊(enQueue)到eventQueue -> SingleThreadedEventHandlingStrategy.blockInThread中啟動單獨的后臺線程,依次從eventQueue取出消息處理,向session回調 -> SessionMessageEvent.quickfixSession.next(message) -> quickFixSession.next根據msgType判斷回調 ->逐層回調 (verify -> veriry -> fromCallback -> fromAdmin/fromApp),從fromAdmin/fromApp(msg, sessionID)回調用戶處理邏輯。
  • 總結

    以上是生活随笔為你收集整理的QuickFIX 源码分析的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。