NewPendingTransactions始末
Geth 提供了realtime event接口,通过websocket 链接节点之后,发送
{"id": 1, "method": "eth_subscribe", "params": ["newPendingTransactions"]}
可以得到实时的pendingTransactions的hash.
eth/filters/api.go
,eth/filters/filter_system.go
,eth/backend.go
入口从func (s *Ethereum) APIs() []rpc.API
开始.再早些的代码逻辑不在本次的讨论范围内.
func (s *Ethereum) APIs() []rpc.API {
apis := ethapi.GetAPIs(s.APIBackend, s.blockchain)
// Append any APIs exposed explicitly by the consensus engine
apis = append(apis, s.engine.APIs(s.BlockChain())...)
// Append all the local APIs and return
return append(apis, []rpc.API{
...
{
Namespace: "eth",
Version: "1.0",
Service: filters.NewPublicFilterAPI(s.APIBackend, false, 5*time.Minute, s.config.RangeLimit),
Public: true,
},
...}}}
// NewPublicFilterAPI returns a new PublicFilterAPI instance.
func NewPublicFilterAPI(backend Backend, lightMode bool, timeout time.Duration, rangeLimit bool) *PublicFilterAPI {
api := &PublicFilterAPI{
backend: backend,
events: NewEventSystem(backend, lightMode),
filters: make(map[rpc.ID]*filter),
timeout: timeout,
rangeLimit: rangeLimit,
}
go api.timeoutLoop(timeout)
return api
}
api.timeoutLoop
这里的这个goruntime 就是控制事件订阅超时关闭的工作,逻辑不在本次讨论范围内.
在创建PublicFilterApi的是成员events 由 NewEventSystem 创建.该实例管理了监听事件的启动和过滤
//filter_system.go
func NewEventSystem(backend Backend, lightMode bool) *EventSystem {
m := &EventSystem{
backend: backend,
lightMode: lightMode,
install: make(chan *subscription),
uninstall: make(chan *subscription),
txsCh: make(chan core.NewTxsEvent, txChanSize),
logsCh: make(chan []*types.Log, logsChanSize),
rmLogsCh: make(chan core.RemovedLogsEvent, rmLogsChanSize),
pendingLogsCh: make(chan []*types.Log, logsChanSize),
chainCh: make(chan core.ChainEvent, chainEvChanSize),
}
// Subscribe events
m.txsSub = m.backend.SubscribeNewTxsEvent(m.txsCh)
m.logsSub = m.backend.SubscribeLogsEvent(m.logsCh)
m.rmLogsSub = m.backend.SubscribeRemovedLogsEvent(m.rmLogsCh)
m.chainSub = m.backend.SubscribeChainEvent(m.chainCh)
m.pendingLogsSub = m.backend.SubscribePendingLogsEvent(m.pendingLogsCh)
// Make sure none of the subscriptions are empty
if m.txsSub == nil || m.logsSub == nil || m.rmLogsSub == nil || m.chainSub == nil || m.pendingLogsSub == nil {
log.Crit("Subscribe for event system failed")
}
go m.eventLoop()
return m
}
EventSystem.install
监听新增的订阅事件ws连入的时候,通过install <- 来创建一个新的订阅
EventSystem.uninstall
销毁阅订阅事件的管道
EventSystem.txsCh
交易发生变换时写入的管道
其他的与newPendingTransactions
没有太大关系,这里就不提及了
m.txsSub = m.backend.SubscribeNewTxsEvent(m.txsCh)
这行代码,新建一个NewTxsEvent订阅,pool中交易发生变化时,会写入到txsCh.
go m.eventLoop()
主要的生命周期goruntime,负责处理收到的信息,并分发给对应的filters
//filter_system.go
// eventLoop (un)installs filters and processes mux events.
func (es *EventSystem) eventLoop() {
// Ensure all subscriptions get cleaned up
defer func() {
es.txsSub.Unsubscribe()
...
}()
index := make(filterIndex)
for i := UnknownSubscription; i < FullPendingTransactionsSubscription; i++ {
index[i] = make(map[rpc.ID]*subscription)
}
for {
select {
case ev := <-es.txsCh://新交易handler
es.handleTxsEvent(index, ev)
...
case f := <-es.install: //新建订阅
if f.typ == MinedAndPendingLogsSubscription {
// the type are logs and pending logs subscriptions
index[LogsSubscription][f.id] = f
index[PendingLogsSubscription][f.id] = f
} else {
index[f.typ][f.id] = f
}
close(f.installed)
case f := <-es.uninstall: //取消订阅
if f.typ == MinedAndPendingLogsSubscription {
// the type are logs and pending logs subscriptions
delete(index[LogsSubscription], f.id)
delete(index[PendingLogsSubscription], f.id)
} else {
delete(index[f.typ], f.id)
}
close(f.err)
// System stopped
case <-es.txsSub.Err():
return
case <-es.logsSub.Err():
return
case <-es.rmLogsSub.Err():
return
case <-es.chainSub.Err():
return
}
}
}
index
的数据类型是 type filterIndex map[Type]map[rpc.ID]*subscription
可以理解为一个二维数组[x,y]x表示的是订阅的类型,y表示rpc session的唯一标识.虽然他是个map,但是完全可以用二维数组的思维理解.下面是所有的Type,即订阅的类型.newPendingTransactions 对应的订阅事件类型是PendingTransactionsSubscription
const (
// UnknownSubscription indicates an unknown subscription type
UnknownSubscription Type = iota
// LogsSubscription queries for new or removed (chain reorg) logs
LogsSubscription
// PendingLogsSubscription queries for logs in pending blocks
PendingLogsSubscription
// MinedAndPendingLogsSubscription queries for logs in mined and pending blocks.
MinedAndPendingLogsSubscription
// PendingTransactionsSubscription queries tx hashes for pending
// transactions entering the pending state
PendingTransactionsSubscription
// BlocksSubscription queries hashes for blocks that are imported
BlocksSubscription
// LastSubscription keeps track of the last index
LastIndexSubscription
)
es.handleTxsEvent(index, ev)
为处理txsch 事件的处理函数,txsch实际上是一个[]*types.Transaction
来看下对应的代码
//filter_system.go
func (es *EventSystem) handleTxsEvent(filters filterIndex, ev core.NewTxsEvent) {
hashes := make([]common.Hash, 0, len(ev.Txs))
for _, tx := range ev.Txs {
hashes = append(hashes, tx.Hash())
}
for _, f := range filters[PendingTransactionsSubscription] {
f.hashes <- hashes
}
}
直接从ev中copy Hashes之后全部写入到 index[PendingTransactionsSubscription]
的subscription.hashes
.即当前所有存活的类型为PendingTransactionsSubscription
订阅者.事件分发的部分到上面这里就结束了.下面来看看用户是如何创建一个新订阅的
//eth/filters/api.go
// NewPendingTransactions creates a subscription that is triggered each time a transaction
// enters the transaction pool and was signed from one of the transactions this nodes manages.
func (api *PublicFilterAPI) NewPendingTransactions(ctx context.Context) (*rpc.Subscription, error) {
notifier, supported := rpc.NotifierFromContext(ctx)
if !supported {
return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
}
rpcSub := notifier.CreateSubscription()
gopool.Submit(func() {
txHashes := make(chan []common.Hash, 128)
pendingTxSub := api.events.SubscribePendingTxs(txHashes)
for {
select {
case hashes := <-txHashes:
// To keep the original behaviour, send a single tx hash in one notification.
// TODO(rjl493456442) Send a batch of tx hashes in one notification
for _, h := range hashes {
notifier.Notify(rpcSub.ID, h)
}
case <-rpcSub.Err():
pendingTxSub.Unsubscribe()
return
case <-notifier.Closed():
pendingTxSub.Unsubscribe()
return
}
}
})
return rpcSub, nil
}
当向rpc发送这个订阅命令的时候
{"id": 1, "method": "eth_subscribe", "params": ["newPendingTransactions"]}
最后 调用的就是上面的NewPendingTransactions的方法.主要的核心逻辑看这里
txHashes := make(chan []common.Hash, 128)
创建了一个读写的128容量的common.Hash 类型的管道,
pendingTxSub := api.events.SubscribePendingTxs(txHashes)
创建一个新的PendingTxs订阅,该方法会将,创建好的subscription通过下面的subscribe方法发送给install,进行注册,即我们上文提到的index
// subscribe installs the subscription in the event broadcast loop.
func (es *EventSystem) subscribe(sub *subscription) *Subscription {
es.install <- sub
<-sub.installed
return &Subscription{ID: sub.id, f: sub, es: es}
}
在for循环中等待读取 txHashes的消息,即subscription.hashes
然后通过
notifier.Notify(rpcSub.ID, h)
返回给对应RPC.ID的ws session.
到这里基本上就结束了,另外还有退出订阅一些接口,就不讲了.我看过几个版本的Geth,该版本是bsc 最新的release版本,之前也有看过Heco的geth版本但是也是老早之前的事情,不同版本的Geth在这方面的实现上有差异,但是基本流程差不多.看这个目的是为了FullPendingTransactionsSubscription
监听交易的时候不用再去查一次,交易的详细数据.效果如图
如果觉得我的文章对您有用,请随意打赏。你的支持将鼓励我继续创作!