Tendermint mempool分析
概述
在Tendermint/mempool/doc.go文件里,有對mempool詳細的解釋。
- 當客戶端提交一筆交易時,這筆交易首先會被mempool進行打包。而mempool的主要作用之一就是保存從其他peer或者自己收到的交易。以便其它模塊的使用。
- 交易進入到mempool之前還會調用一次客戶端的CheckTx() 函數。并把檢查通過的交易放入到交易池中。
- 交易被放到交易池保證了交易的順序性。
- 共識引擎通過調用Update() 和 ReapMaxBytesMaxGas() 方法來更新內存池中的交易。
- mempool的底層使用的是鏈表
mempool 接口
我們先看一下在mempool/mempool.go 文件里定義了的mempool功能的接口。
// 對內存池的更新需要與提交區塊同步,這樣應用程序就可以在提交時重置它們的瞬間狀態。 type Mempool interface {// CheckTx對應用程序執行一個新事務,來確定它的有效性,以及是否應該將它添加到內存池。CheckTx(tx types.Tx, callback func(*abci.Response), txInfo TxInfo) error// ReapMaxBytesMaxGas從內存池中獲取最多為maxBytes字節總數的事務,條件是總gasWanted必須小于maxGas。// 如果兩個最大值都是負數,則所有返回交易的規模沒有上限,返回所有可用的交易ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs// ReapMaxTxs從內存池中獲取最多的事務。// 如果兩個最大值都是負數,則所有返回交易的規模沒有上限,返回所有可用的交易ReapMaxTxs(max int) types.Txs// Lock 鎖定mempool。共識必須能持鎖才能安全更新。Lock()// 解鎖mempoolUnlock()// Update通知內存池,給定的txs已經提交,可以被丟棄。// Update 應該在區塊被共識提交后調用Update(blockHeight int64,blockTxs types.Txs,deliverTxResponses []*abci.ResponseDeliverTx,newPreFn PreCheckFunc,newPostFn PostCheckFunc,) error// FlushAppConn flushes the mempool connection to ensure async reqResCb calls are// done. E.g. from CheckTx.// NOTE: Lock/Unlock must be managed by callerFlushAppConn() error// Flush removes all transactions from the mempool and cacheFlush()// TxsAvailable returns a channel which fires once for every height,// and only when transactions are available in the mempool.// NOTE: the returned channel may be nil if EnableTxsAvailable was not called.TxsAvailable() <-chan struct{}// EnableTxsAvailable initializes the TxsAvailable channel, ensuring it will// trigger once every height when transactions are available.EnableTxsAvailable()// Size returns the number of transactions in the mempool.Size() int// TxsBytes returns the total size of all txs in the mempool.TxsBytes() int64// InitWAL creates a directory for the WAL file and opens a file itself. If// there is an error, it will be of type *PathError.InitWAL() error// CloseWAL closes and discards the underlying WAL file.// Any further writes will not be relayed to disk.CloseWAL() }ClistMempool
ClistMempool.go 實現了mempool中定義的接口,并添加了一些其他的功能。其主要作用就是檢查交易的合法性并判斷是否加入交易池。
NewCListMempool
NewClistMempool 使用給定的配置和連接的application等參數,創建一個新的mempool。
func NewCListMempool(config *cfg.MempoolConfig,proxyAppConn proxy.AppConnMempool,height int64,options ...CListMempoolOption, ) *CListMempool {// 初始化相關的成員變量mempool := &CListMempool{// 給定的配置config: config,// 應用層連接proxyAppConn: proxyAppConn,// 創建一個雙向鏈表,用來保存交易txs: clist.New(),height: height,recheckCursor: nil,recheckEnd: nil,logger: log.NewNopLogger(),metrics: NopMetrics(),}if config.CacheSize > 0 {// 創建內存池緩存mempool.cache = newMapTxCache(config.CacheSize)} else {mempool.cache = nopTxCache{}}// 設置了代理連接的回調函數為globalCb(req *abci.Request, res *abci.Response)// 因為交易池在收到交易后會把交易提交給APP,根據APP的返回來決定后續如何這個交易// 如何處理,所以在APP處理完后的交易后回調mempool.globalCb 進而讓mempool來繼續決定當前交易如何處理proxyAppConn.SetResponseCallback(mempool.globalCb)for _, option := range options {option(mempool)}return mempool }globalCb
Global callback 將會在每次ABCI 響應后進行調用。
func (mem *CListMempool) globalCb(req *abci.Request, res *abci.Response) {if mem.recheckCursor == nil {// 符合要求,什么也不做return}mem.metrics.RecheckTimes.Add(1)// 也是檢測交易是否符合要求,符合就什么也不做mem.resCbRecheck(req, res)// update metricsmem.metrics.Size.Set(float64(mem.Size())) }CheckTx
func (mem *CListMempool) CheckTx(tx types.Tx, cb func(*abci.Response), txInfo TxInfo) error {// 并發控制,也就是說當共識引擎在進行交易池讀取和更新的時候,此函數應該是阻塞的mem.updateMtx.RLock()// use defer to unlock mutex because application (*local client*) might panicdefer mem.updateMtx.RUnlock()txSize := len(tx)// 判斷交易池是否滿了if err := mem.isFull(txSize); err != nil {return err}// 如果已經超過了設置的內存池則放棄加入if txSize > mem.config.MaxTxBytes {return ErrTxTooLarge{mem.config.MaxTxBytes, txSize}}if mem.preCheck != nil {if err := mem.preCheck(tx); err != nil {return ErrPreCheck{err}}}// NOTE: writing to the WAL and calling proxy must be done before adding tx// to the cache. otherwise, if either of them fails, next time CheckTx is// called with tx, ErrTxInCache will be returned without tx being checked at// all even once.if mem.wal != nil {// TODO: Notify administrators when WAL fails_, err := mem.wal.Write(append([]byte(tx), newline...))if err != nil {return fmt.Errorf("wal.Write: %w", err)}}// NOTE: proxyAppConn may error if tx buffer is fullif err := mem.proxyAppConn.Error(); err != nil {return err}// 先加入交易池cache 如果cache中存在此交易則返回falseif !mem.cache.Push(tx) {// Record a new sender for a tx we've already seen.// Note it's possible a tx is still in the cache but no longer in the mempool// (eg. after committing a block, txs are removed from mempool but not cache),// so we only record the sender for txs still in the mempool.if e, ok := mem.txsMap.Load(TxKey(tx)); ok {memTx := e.(*clist.CElement).Value.(*mempoolTx)_, loaded := memTx.senders.LoadOrStore(txInfo.SenderID, true)// TODO: consider punishing peer for dups,// its non-trivial since invalid txs can become valid,// but they can spam the same tx with little cost to them atm.if loaded {return ErrTxInCache}}mem.logger.Debug("tx exists already in cache", "tx_hash", tx.Hash())return nil}ctx := context.Background()if txInfo.Context != nil {ctx = txInfo.Context}// 此時把交易傳給proxyAppConn,并得到APP檢查的結果reqRes, err := mem.proxyAppConn.CheckTxAsync(ctx, abci.RequestCheckTx{Tx: tx})if err != nil {// 如果檢查不通過,就從交易池緩存中移除這筆交易mem.cache.Remove(tx)return err}reqRes.SetCallback(mem.reqResCb(tx, txInfo.SenderID, txInfo.SenderP2PID, cb))return nil }Reactor
mempool reactor主要功能是在peer之間廣播包含交易的mempool。
跳過Reactor結構體和NewReactor(),我們直接來看OnStart()
OnStart
func (r *Reactor) OnStart() error {if !r.config.Broadcast {r.Logger.Info("tx broadcasting is disabled")}// 處理Mempool通道的消息,最后會調用mempool中的CheckTx方法來判斷是否要加入到內存池go r.processMempoolCh()// 處理每個節點的更新go r.processPeerUpdates()return nil }下圖為從客戶端提交的交易添加到交易池的過程:
最后
至此,Tendermint mempool源碼就分析完了。如果有錯誤之處,希望路過的大佬能夠指點指點。最后推薦一位大佬的公眾號,歡迎關注哦:區塊鏈技術棧
另外這個地址上還有很多區塊鏈學習資料:https://github.com/mindcarver/blockchain_guide
參考文章:https://gitee.com/wupeaking/tendermint_code_analysis/blob/master/Mempool%E6%BA%90%E7%A0%81%E5%88%86%E6%9E%90.md
總結
以上是生活随笔為你收集整理的Tendermint mempool分析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: ipxe u盘启动linux内核,iPX
- 下一篇: Clist循环链表的实现