NewPendingTransactions始末

  • nono
  • 更新于 2022-08-16 02:23
  • 阅读 2387

NewPendingTransactions始末

Geth 提供了realtime event接口,通过websocket 链接节点之后,发送 {"id": 1, "method": "eth_subscribe", "params": ["newPendingTransactions"]}可以得到实时的pendingTransactions的hash.

今天梳理下,对应部分的代码来讲讲这部分的工作原理.相应的代码 主要涉及三个个文件 eth/filters/api.go,eth/filters/filter_system.go,eth/backend.go

入口从func (s *Ethereum) APIs() []rpc.API 开始.再早些的代码逻辑不在本次的讨论范围内.

func (s *Ethereum) APIs() []rpc.API {
    apis := ethapi.GetAPIs(s.APIBackend, s.blockchain)

    // Append any APIs exposed explicitly by the consensus engine
    apis = append(apis, s.engine.APIs(s.BlockChain())...)

    // Append all the local APIs and return
    return append(apis, []rpc.API{
                ...
         {
            Namespace: "eth",
            Version:   "1.0",
            Service:   filters.NewPublicFilterAPI(s.APIBackend, false, 5*time.Minute, s.config.RangeLimit),
            Public:    true,
        }, 
                ...}}}

NewPublicFilterAPI(s) 新建了一个PublicEthereumAPI 协议的对应,并注册到APIs中关于Geth的api协议.

// NewPublicFilterAPI returns a new PublicFilterAPI instance.
func NewPublicFilterAPI(backend Backend, lightMode bool, timeout time.Duration, rangeLimit bool) *PublicFilterAPI {
    api := &PublicFilterAPI{
        backend:    backend,
        events:     NewEventSystem(backend, lightMode),
        filters:    make(map[rpc.ID]*filter),
        timeout:    timeout,
        rangeLimit: rangeLimit,
    }
    go api.timeoutLoop(timeout)

    return api
}

api.timeoutLoop 这里的这个goruntime 就是控制事件订阅超时关闭的工作,逻辑不在本次讨论范围内. 在创建PublicFilterApi的是成员events 由 NewEventSystem 创建.该实例管理了监听事件的启动和过滤

//filter_system.go  
func NewEventSystem(backend Backend, lightMode bool) *EventSystem {
    m := &EventSystem{
        backend:       backend,
        lightMode:     lightMode,
        install:       make(chan *subscription),
        uninstall:     make(chan *subscription),
        txsCh:         make(chan core.NewTxsEvent, txChanSize),
        logsCh:        make(chan []*types.Log, logsChanSize),
        rmLogsCh:      make(chan core.RemovedLogsEvent, rmLogsChanSize),
        pendingLogsCh: make(chan []*types.Log, logsChanSize),
        chainCh:       make(chan core.ChainEvent, chainEvChanSize),
    }

    // Subscribe events
    m.txsSub = m.backend.SubscribeNewTxsEvent(m.txsCh)
    m.logsSub = m.backend.SubscribeLogsEvent(m.logsCh)
    m.rmLogsSub = m.backend.SubscribeRemovedLogsEvent(m.rmLogsCh)
    m.chainSub = m.backend.SubscribeChainEvent(m.chainCh)
    m.pendingLogsSub = m.backend.SubscribePendingLogsEvent(m.pendingLogsCh)

    // Make sure none of the subscriptions are empty
    if m.txsSub == nil || m.logsSub == nil || m.rmLogsSub == nil || m.chainSub == nil || m.pendingLogsSub == nil {
        log.Crit("Subscribe for event system failed")
    }

    go m.eventLoop()
    return m
}

EventSystem.install 监听新增的订阅事件ws连入的时候,通过install <- 来创建一个新的订阅 EventSystem.uninstall销毁阅订阅事件的管道 EventSystem.txsCh交易发生变换时写入的管道 其他的与newPendingTransactions没有太大关系,这里就不提及了 m.txsSub = m.backend.SubscribeNewTxsEvent(m.txsCh)这行代码,新建一个NewTxsEvent订阅,pool中交易发生变化时,会写入到txsCh. go m.eventLoop()主要的生命周期goruntime,负责处理收到的信息,并分发给对应的filters

//filter_system.go 
// eventLoop (un)installs filters and processes mux events.
func (es *EventSystem) eventLoop() {
    // Ensure all subscriptions get cleaned up
    defer func() {
        es.txsSub.Unsubscribe()
                ...
    }()

    index := make(filterIndex)
    for i := UnknownSubscription; i &lt; FullPendingTransactionsSubscription; i++ {
        index[i] = make(map[rpc.ID]*subscription)
    }

    for {
        select {
        case ev := &lt;-es.txsCh://新交易handler
            es.handleTxsEvent(index, ev)
        ...
        case f := &lt;-es.install: //新建订阅
            if f.typ == MinedAndPendingLogsSubscription {
                // the type are logs and pending logs subscriptions
                index[LogsSubscription][f.id] = f
                index[PendingLogsSubscription][f.id] = f
            } else {
                index[f.typ][f.id] = f
            }
            close(f.installed)

        case f := &lt;-es.uninstall: //取消订阅
            if f.typ == MinedAndPendingLogsSubscription {
                // the type are logs and pending logs subscriptions
                delete(index[LogsSubscription], f.id)
                delete(index[PendingLogsSubscription], f.id)
            } else {
                delete(index[f.typ], f.id)
            }
            close(f.err)

        // System stopped
        case &lt;-es.txsSub.Err():
            return
        case &lt;-es.logsSub.Err():
            return
        case &lt;-es.rmLogsSub.Err():
            return
        case &lt;-es.chainSub.Err():
            return
        }
    }
}

index的数据类型是 type filterIndex map[Type]map[rpc.ID]*subscription可以理解为一个二维数组[x,y]x表示的是订阅的类型,y表示rpc session的唯一标识.虽然他是个map,但是完全可以用二维数组的思维理解.下面是所有的Type,即订阅的类型.newPendingTransactions 对应的订阅事件类型是PendingTransactionsSubscription


const (
    // UnknownSubscription indicates an unknown subscription type
    UnknownSubscription Type = iota
    // LogsSubscription queries for new or removed (chain reorg) logs
    LogsSubscription
    // PendingLogsSubscription queries for logs in pending blocks
    PendingLogsSubscription
    // MinedAndPendingLogsSubscription queries for logs in mined and pending blocks.
    MinedAndPendingLogsSubscription
    // PendingTransactionsSubscription queries tx hashes for pending
    // transactions entering the pending state
    PendingTransactionsSubscription
    // BlocksSubscription queries hashes for blocks that are imported
    BlocksSubscription
    // LastSubscription keeps track of the last index
    LastIndexSubscription
)

es.handleTxsEvent(index, ev)为处理txsch 事件的处理函数,txsch实际上是一个[]*types.Transaction来看下对应的代码

//filter_system.go
func (es *EventSystem) handleTxsEvent(filters filterIndex, ev core.NewTxsEvent) {
    hashes := make([]common.Hash, 0, len(ev.Txs))
    for _, tx := range ev.Txs {
        hashes = append(hashes, tx.Hash())
    }
    for _, f := range filters[PendingTransactionsSubscription] {
        f.hashes &lt;- hashes
    }
}

直接从ev中copy Hashes之后全部写入到 index[PendingTransactionsSubscription]subscription.hashes.即当前所有存活的类型为PendingTransactionsSubscription订阅者.事件分发的部分到上面这里就结束了.下面来看看用户是如何创建一个新订阅的

//eth/filters/api.go
// NewPendingTransactions creates a subscription that is triggered each time a transaction
// enters the transaction pool and was signed from one of the transactions this nodes manages.
func (api *PublicFilterAPI) NewPendingTransactions(ctx context.Context) (*rpc.Subscription, error) {
    notifier, supported := rpc.NotifierFromContext(ctx)
    if !supported {
        return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
    }

    rpcSub := notifier.CreateSubscription()

    gopool.Submit(func() {
        txHashes := make(chan []common.Hash, 128)
        pendingTxSub := api.events.SubscribePendingTxs(txHashes)

        for {
            select {
            case hashes := &lt;-txHashes:
                // To keep the original behaviour, send a single tx hash in one notification.
                // TODO(rjl493456442) Send a batch of tx hashes in one notification
                for _, h := range hashes {
                    notifier.Notify(rpcSub.ID, h)
                }
            case &lt;-rpcSub.Err():
                pendingTxSub.Unsubscribe()
                return
            case &lt;-notifier.Closed():
                pendingTxSub.Unsubscribe()
                return
            }
        }
    })

    return rpcSub, nil
}
当向rpc发送这个订阅命令的时候

{"id": 1, "method": "eth_subscribe", "params": ["newPendingTransactions"]}最后 调用的就是上面的NewPendingTransactions的方法.主要的核心逻辑看这里 txHashes := make(chan []common.Hash, 128)创建了一个读写的128容量的common.Hash 类型的管道, pendingTxSub := api.events.SubscribePendingTxs(txHashes)创建一个新的PendingTxs订阅,该方法会将,创建好的subscription通过下面的subscribe方法发送给install,进行注册,即我们上文提到的index

// subscribe installs the subscription in the event broadcast loop.
func (es *EventSystem) subscribe(sub *subscription) *Subscription {
    es.install &lt;- sub
    &lt;-sub.installed
    return &Subscription{ID: sub.id, f: sub, es: es}
}

在for循环中等待读取 txHashes的消息,即subscription.hashes然后通过 notifier.Notify(rpcSub.ID, h)返回给对应RPC.ID的ws session.

到这里基本上就结束了,另外还有退出订阅一些接口,就不讲了.我看过几个版本的Geth,该版本是bsc 最新的release版本,之前也有看过Heco的geth版本但是也是老早之前的事情,不同版本的Geth在这方面的实现上有差异,但是基本流程差不多.看这个目的是为了FullPendingTransactionsSubscription监听交易的时候不用再去查一次,交易的详细数据.效果如图

image.png

点赞 3
收藏 1
分享
本文参与登链社区写作激励计划 ,好文好收益,欢迎正在阅读的你也加入。

4 条评论

请先 登录 后评论
nono
nono
0xcC85...BC32
江湖只有他的大名,没有他的介绍。