内存池mempool模块解读第三篇,这部分我主要研究mempool中的节点间Tx同步. 关键代码都位于shared_mempool.rs
中.
内存池mempool模块解读第三篇,这部分我主要研究mempool中的节点间Tx同步. 关键代码都位于shared_mempool.rs
中.
先从mempool的启动过程说起,这里可以把前面两部分内容串联起来.
启动代码位于runtime.rs
中,
/// Handle for Mempool Runtime
pub struct MempoolRuntime {
/// gRPC server to serve request from AC and Consensus
pub grpc_server: ServerHandle, //这个是对外提供grpc服务接口
/// separate shared mempool runtime
pub shared_mempool: Runtime, //这是因为内部使用了tokio的异步编程框架.
}
impl MempoolRuntime {
/// setup Mempool runtime
pub fn bootstrap(
config: &NodeConfig,
network_sender: MempoolNetworkSender,
network_events: MempoolNetworkEvents,
) -> Self {
//访问是加锁的
//这个mempool就是前两部分我们重点讨论的内部缓冲池管理.
let mempool = Arc::new(Mutex::new(CoreMempool::new(&config)));
// setup grpc server
let env = Arc::new(
EnvBuilder::new()
.name_prefix("grpc-mempool-")
.cq_count(unsafe { max(grpcio_sys::gpr_cpu_num_cores() as usize / 2, 2) })
.build(),
);
let handle = MempoolService {
core_mempool: Arc::clone(&mempool),
};
//对外提供grpc服务的接口,是对自动生成的proto中描述的serice的实现
let service = mempool_grpc::create_mempool(handle);
let grpc_server = ::grpcio::ServerBuilder::new(env)
.register_service(service)
.bind(
config.mempool.address.clone(),
config.mempool.mempool_service_port,
) //监听的端口和ip
.build()
.expect("[mempool] unable to create grpc server");
// mempool要访问DB,注意也是通过grpc接口进行访问,没有直接访问DB
let storage_client: Arc<dyn StorageRead> = Arc::new(StorageReadServiceClient::new(
Arc::new(EnvBuilder::new().name_prefix("grpc-mem-sto-").build()),
"localhost",
config.storage.port,
));
//验证Tx合法性的工具,
let vm_validator = Arc::new(VMValidator::new(&config, Arc::clone(&storage_client)));
//这是我们这篇文章的核心,如何与其他节点之间进行通信全都发生在SharedMempool
//这里实际上就返回了一个Runtime,这时候tokio的调度器已经启动完成
let shared_mempool = start_shared_mempool(
config,
mempool,
network_sender,
network_events,
storage_client,
vm_validator,
vec![],
None,
);
Self {
grpc_server: ServerHandle::setup(grpc_server),
shared_mempool,
}
}
}
/// bootstrap of SharedMempool
/// creates separate Tokio Runtime that runs following routines:
/// - outbound_sync_task (task that periodically broadcasts transactions to peers)
/// - inbound_network_task (task that handles inbound mempool messages and network events)
/// - gc_task (task that performs GC of all expired transactions by SystemTTL)
pub(crate) fn start_shared_mempool<V>(
config: &NodeConfig,
mempool: Arc<Mutex<CoreMempool>>,
network_sender: MempoolNetworkSender, //向外他推送新发现的Tx的Channel
network_events: MempoolNetworkEvents, //接受来自其他节点的Mempool事件的Channel
storage_read_client: Arc<dyn StorageRead>,
validator: Arc<V>,
subscribers: Vec<UnboundedSender<SharedMempoolNotification>>,//这个是通知其他模块mempool发生了什么他们感兴趣的事
timer: Option<IntervalStream>,
) -> Runtime
where
V: TransactionValidation + 'static,
{
//因为tokio的宏if_runtime,所以无法识别
let runtime: tokio::runtime::Runtime = Builder::new()
.name_prefix("shared-mem-")
.build()
.expect("[shared mempool] failed to create runtime");
//获取tokio的Executor,这样后续就可以启动task了.
let executor: tokio::runtime::TaskExecutor = runtime.executor();
let peer_info = Arc::new(Mutex::new(PeerInfo::new()));
let smp = SharedMempool {
mempool: mempool.clone(),
config: config.mempool.clone(),
network_sender,
storage_read_client,
validator,
peer_info,
subscribers,
};
let interval =
timer.unwrap_or_else(|| default_timer(config.mempool.shared_mempool_tick_interval_ms));
//在线程池中执行? actor模型?
executor.spawn(
outbound_sync_task(smp.clone(), interval)
.boxed()
.unit_error()
.compat(),
);
//在线程池中执行?
executor.spawn(
inbound_network_task(smp, network_events)
.boxed()
.unit_error()
.compat(),
);
//在线程池中执行?
executor.spawn(
gc_task(mempool, config.mempool.system_transaction_gc_interval_ms)
.boxed()
.unit_error()
.compat(),
);
runtime
}
在start_shared_mempool
中看到有三个关键地方,分别是:
network_sender
是向外推送Tx的通道network_events
接受其他节点Tx以及状态变化等信息的通道subscribers
通知其他模块mempool发生了什么他们感兴趣的事这三个都是future这个crate中的channel,这里的channel和golang中的chan是基本上等价的.简化起见,直接看成通信通道就ok了.
主要有三种消息:
/// This task handles inbound network events.
/// This task handles inbound network events.
async fn inbound_network_task(smp: SharedMempool, network_events: MempoolNetworkEvents)
where
V: TransactionValidation,
{
let peer_info = smp.peer_info.clone();
let subscribers = smp.subscribers.clone();
let max_inbound_syncs = smp.config.shared_mempool_max_concurrent_inbound_syncs;
// Handle the NewPeer/LostPeer events immediatedly, since they are not async
// and we don't want to buffer them or let them get reordered. The inbound
// direct-send messages are placed in a bounded FuturesUnordered queue and
// allowed to execute concurrently. The .buffer_unordered() also correctly
// handles back-pressure, so if mempool is slow the back-pressure will
// propagate down to network.
let f_inbound_network_task = network_events
//filter & map,有必要filter么?
.filter_map(move |network_event| {
trace!("SharedMempoolEvent::NetworkEvent::{:?}", network_event);
match network_event {
Ok(network_event) => match network_event {
Event::NewPeer(peer_id) => {
OP_COUNTERS.inc("smp.event.new_peer");
new_peer(&peer_info, peer_id); //记录新发现了节点,用于后续推送Tx给他
notify_subscribers(
SharedMempoolNotification::PeerStateChange,
&subscribers,
); //同时以PeerStateChange告诉相关订阅方
future::ready(None) //会被过滤掉,这样就不会包含在下面的for_each_concurrent中
}
Event::LostPeer(peer_id) => {
//节点下线,就不要继续推送Tx了
OP_COUNTERS.inc("smp.event.lost_peer");
lost_peer(&peer_info, peer_id);
notify_subscribers(
SharedMempoolNotification::PeerStateChange,
&subscribers,
); //同时以PeerStateChange告诉相关订阅方
future::ready(None)
}
// Pass through messages to next combinator
// 收到了来自其他节点的Tx,这个是后续`for_each_concurrent`
Event::Message((peer_id, msg)) => future::ready(Some((peer_id, msg))),
_ => {
//RpcRequest消息不应该传递到这里
security_log(SecurityEvent::InvalidNetworkEventMP)
.error("UnexpectedNetworkEvent")
.data(&network_event)
.log();
unreachable!("Unexpected network event")
}
},
Err(e) => {
security_log(SecurityEvent::InvalidNetworkEventMP)
.error(&e)
.log();
future::ready(None)
}
}
})
// Run max_inbound_syncs number of `process_incoming_transactions` concurrently
.for_each_concurrent(
//处理收到其他节点推送过来的Tx,具体机制有赖于底层network模块
max_inbound_syncs, /* limit */
move |(peer_id, mut msg)| {
//todo 这块逻辑很复杂,还是要研究一下
OP_COUNTERS.inc("smp.event.message");
let transactions: Vec<_> = msg
.take_transactions()
.into_iter() //这里实际上是一个简单的将grpc Message简单转换成内部的SignedTransaction的过程
.filter_map(|txn| match SignedTransaction::from_proto(txn) {
Ok(t) => Some(t),
Err(e) => {
security_log(SecurityEvent::InvalidTransactionMP)
.error(&e)
.data(&msg)
.log();
None
}
})
.collect();
OP_COUNTERS.inc_by(
&format!("smp.transactions.received.{:?}", peer_id),
transactions.len(),
);
//验证Tx有效性,然后添加到自己的缓冲池中,添加过程调用的是`add_txn`,
// 和处理来自AC的Tx是一样的逻辑
process_incoming_transactions(smp.clone(), peer_id, transactions)
},
);
// drive the inbound futures to completion
f_inbound_network_task.await; //永远不结束
crit!("SharedMempool inbound_network_task terminated");
}
上面这段代码很长,有接近100行,但是如果仔细分析的话基本上就是一句代码,不过这一条语句很复杂,占用了从16行到84行基本上70行. 我不知道该说这是rust语言的表达能力强还是该诟病rust阅读体验极糟.
还有一个需要说明就是这里async和await搭配使用. 因为函数的声明中使用了async关键字,因此实际上函数的返回值会是一个Future.
还有就是第94行的f_inbound_network_task.await
并不是一个死循环,你可以把他想象成一个goroutine,当network_events
这个channel读不出来数据的时候他会放弃CPU占用. 实际上这也是tokio这个框架在做的事.
这个函数值得一说到就是他添加Tx到缓冲池中的方式是TimelineState::NonQualified
,这意味着这种Tx不会再被广播给其他节点.
好处当然是极大的降低了数据传输量. 这种方式在以太坊中肯定是不会采用的,因为这很不利于Tx的快速广播.
当然Libra采用这种方式有他的道理,他是联盟链,节点数量有限,他采用的假设应该是:
有N个节点的联盟链,这个N个节点彼此之间两两互连,总共有N(N−1)/2个连接.
因为Network模块没有研读,所以只是猜测.
/// This task handles [`SyncEvent`], which is periodically emitted for us to
/// broadcast ready to go transactions to peers.
async fn outbound_sync_task<V>(smp: SharedMempool<V>, mut interval: IntervalStream)
where
V: TransactionValidation,
{
let peer_info = smp.peer_info;
let mempool = smp.mempool;
let mut network_sender = smp.network_sender;
let batch_size = smp.config.shared_mempool_batch_size;
let subscribers = smp.subscribers;
//定时死循环,这个执行到await的
/*
当代码执⾏到await! (read_from_network())⾥⾯的时候,发现异步操作还没有完成,
它会直接退出当前这个函数,把CPU让给其他任务执⾏。当这个数据从⽹络上 传输完成了,调度器会再次调⽤这个函数,
它会从上次中断的地⽅恢复执⾏。所以⽤async/await的语法写代码,异步代码的逻辑在源码组织上跟同步代码的逻辑差别
并不⼤。这⾥⾯状态保存和恢复这些琐碎的事情,都由 编译器帮我们完成了。
*/
while let Some(sync_event) = interval.next().await {
trace!("SyncEvent: {:?}", sync_event);
match sync_event {
Ok(_) => {
sync_with_peers(&peer_info, &mempool, &mut network_sender, batch_size).await;
notify_subscribers(SharedMempoolNotification::Sync, &subscribers);
}
Err(e) => {
error!("Error in outbound_sync_task timer interval: {:?}", e);
break;
}
}
}
crit!("SharedMempool outbound_sync_task terminated");
}
相比之下sync_with_peers
要复杂一些,但是其功能非常简单,就是针对每个节点推送所有来自自身AC模块的Tx.
稍微复杂的一点就是为了避免重复,使用了timeline_id
这个技术.
前面文章也介绍过,就是一个非常简单的针对每一个Tx都有一个编号,并且这个编号是单增的.这样在向节点A推送的时候只需要记住上次推送到了第35个,那么下次就从第36个开始即可.
/// sync routine
/// used to periodically broadcast ready to go transactions to peers
async fn sync_with_peers<'a>(
peer_info: &'a Mutex<PeerInfo>, //这个peer_info一个所有其他节点的Map
mempool: &'a Mutex<CoreMempool>, //自己的内存池
network_sender: &'a mut MempoolNetworkSender, //广播消息的通道
batch_size: usize,
) {
// Clone the underlying peer_info map and use this to sync and collect
// state updates. We do this instead of holding the lock for the whole
// function since that would hold the lock across await points which is bad.
let peer_info_copy = peer_info
.lock()
.expect("[shared mempool] failed to acquire peer_info lock")
.deref()
.clone();
let mut state_updates = vec![];
for (peer_id, peer_state) in peer_info_copy.into_iter() {
if peer_state.is_alive {
let timeline_id = peer_state.timeline_id; //timeline_id是给来自自身AC模块的Tx一个唯一的单增编号,避免重复推送
//读取本地的Tx,这些mempool之间的timeline_id都是一样的?
let (transactions, new_timeline_id) = mempool
.lock()
.expect("[shared mempool] failed to acquire mempool lock")
.read_timeline(timeline_id, batch_size);
if !transactions.is_empty() {
OP_COUNTERS.inc_by("smp.sync_with_peers", transactions.len());
let mut msg = MempoolSyncMsg::new();
msg.set_peer_id(peer_id.into());
msg.set_transactions(
transactions
.into_iter()
.map(IntoProto::into_proto)
.collect(),
);
debug!(
"MempoolNetworkSender.send_to peer {} msg {:?}",
peer_id, msg
);
// Since this is a direct-send, this will only error if the network
// module has unexpectedly crashed or shutdown.
network_sender //向指定的`peer_id`推送`transactions`数组
.send_to(peer_id, msg)
.await
.expect("[shared mempool] failed to direct-send mempool sync message");
}
state_updates.push((peer_id, new_timeline_id));
}
}
// Lock the shared peer_info and apply state updates.
let mut peer_info = peer_info
.lock()
.expect("[shared mempool] failed to acquire peer_info lock");
for (peer_id, new_timeline_id) in state_updates {
peer_info
.entry(peer_id) //更新相应节点的timeline_id,不要重复推送了
.and_modify(|t| t.timeline_id = new_timeline_id);
}
}
/// GC all expired transactions by SystemTTL
async fn gc_task(mempool: Arc<Mutex<CoreMempool>>, gc_interval_ms: u64) {
let mut interval = Interval::new_interval(Duration::from_millis(gc_interval_ms)).compat();
while let Some(res) = interval.next().await {
match res {
Ok(_) => {
mempool
.lock()
.expect("[shared mempool] failed to acquire mempool lock")
.gc_by_system_ttl();
}
Err(e) => {
error!("Error in gc_task timer interval: {:?}", e);
break;
}
}
}
crit!("SharedMempool gc_task terminated");
}
从代码来看就非常简单,就是定期调用gc_by_system_ttl
,这个函数我们前面介绍过,就是避免Tx在缓冲池中呆太久,占用空间,从而导致可以打包的交易进不到缓冲池中.
在以太坊中如果是直接收到的Tx会保存在transactions.rlp
这个文件中,就算是发生拥堵也不会丢失.
不知道Libra这种设计,如果发生了拥堵,交易丢失了如何解决.
我的一个猜想可能是:
validator
,他会去validator
上查询自己账户的seq_number
.简化起见,我直接从测试代码中看libra是如何测试多个mempool之间的同步的.相关代码位于mempool/src/core_mempool/unit_tests/shared_mempool_test.rs
#[derive(Default)]
struct SharedMempoolNetwork {
mempools: HashMap<PeerId, Arc<Mutex<CoreMempool>>>,
network_reqs_rxs: HashMap<PeerId, channel::Receiver<NetworkRequest>>,
network_notifs_txs: HashMap<PeerId, channel::Sender<NetworkNotification>>,
runtimes: HashMap<PeerId, Runtime>,
subscribers: HashMap<PeerId, UnboundedReceiver<SharedMempoolNotification>>,
timers: HashMap<PeerId, UnboundedSender<SyncEvent>>,
}
impl SharedMempoolNetwork {
fn bootstrap_with_config(peers: Vec<PeerId>, mut config: NodeConfig) -> Self {
let mut smp = Self::default();
config.mempool.shared_mempool_batch_size = 1;
for peer in peers {
let mempool = Arc::new(Mutex::new(CoreMempool::new(&config)));
//消息是一条通道
let (network_reqs_tx, network_reqs_rx) = channel::new_test(8);
//通知是另一条通道
let (network_notifs_tx, network_notifs_rx) = channel::new_test(8);
let network_sender = MempoolNetworkSender::new(network_reqs_tx);
let network_events = MempoolNetworkEvents::new(network_notifs_rx);
//unbounded是创建没有缓冲区大小限制的channel
let (sender, subscriber) = unbounded();
let (timer_sender, timer_receiver) = unbounded();
let runtime = start_shared_mempool(
&config,
Arc::clone(&mempool),
network_sender, //network_reqs_tx 是我向外发送,其他人接收
network_events, /* network_notifs_rx
* 是我接受来自别人的event,其他人通过network_notifs_tx发给我 */
Arc::new(MockStorageReadClient),
Arc::new(MockVMValidator),
vec![sender], //向外发布订阅
Some(
timer_receiver
.compat()
.map_err(|_| format_err!("test"))
.boxed(),
),
);
smp.mempools.insert(peer, mempool);
smp.network_reqs_rxs.insert(peer, network_reqs_rx);
smp.network_notifs_txs.insert(peer, network_notifs_tx);
smp.subscribers.insert(peer, subscriber);
smp.timers.insert(peer, timer_sender);
smp.runtimes.insert(peer, runtime);
}
smp
}
根据上面的代码,如果我想给peerA发送消息,只需直接向其对应的network_notifs_rxsender
消息即可.
同样如果我想接收peerA向外广播了哪些Tx,只需从network_reqs_rx
接收即可.
时尚测试代码正式这么做的.
这里测试代码没有覆盖到广播Tx这种情形.
这个情形在deliver_message
中被覆盖到.
//send_event,都是发送的NewPeer或者LostPeer事件,是peer向外发送说自己发现了NewPeer(xx)
fn send_event(&mut self, peer: &PeerId, notif: NetworkNotification) {
let network_notifs_tx = self.network_notifs_txs.get_mut(peer).unwrap();
/*
block_on与async,await的区别
1. block_on在普通函数中使用,await只能在标有async的函数中使用
2. block_on会真实的堵塞所在线程,而await只会堵塞所在task,actor可以调度其他任务来运行
*/
block_on(network_notifs_tx.send(notif)).unwrap();
//通过peer关联的network_notifs_tx发出去,peer就会收到,然后产生动作,
// 就是向订阅方提供peerStateChange通知
self.wait_for_event(peer, SharedMempoolNotification::PeerStateChange);
}
/// deliveres next message from given node to it's peer
/// 这个函数实际上是触发peer向外推送自己缓冲池中的Tx,然后通过`network_reqs_rx`接受推送
/// 验证推送内容是否符合预期
fn deliver_message(&mut self, peer: &PeerId) -> (SignedTransaction, PeerId) {
// emulate timer tick,`struct SharedMempool<V>`会向外推送自己的Tx
self.timers
.get(peer)
.unwrap()
.unbounded_send(SyncEvent)
.unwrap();
// await next message from node
let network_reqs_rx = self.network_reqs_rxs.get_mut(peer).unwrap();
let network_req = block_on(network_reqs_rx.next()).unwrap();
match network_req {
NetworkRequest::SendMessage(peer_id, msg) => {
let mut sync_msg: MempoolSyncMsg =
::protobuf::parse_from_bytes(msg.mdata.as_ref()).unwrap();
let transaction: SignedTransaction =
SignedTransaction::from_proto(sync_msg.take_transactions().pop().unwrap())
.unwrap();
// send it to peer,手工转发给相应的接收方
let receiver_network_notif_tx: &mut channel::Sender<NetworkNotification> =
self.network_notifs_txs.get_mut(&peer_id).unwrap();
block_on(
receiver_network_notif_tx.send(NetworkNotification::RecvMessage(*peer, msg)),
)
.unwrap(); //测试代码可以简化,直接等待发送完毕.
// 我的理解是在async函数中可以使用await,普通函数中等待只能block_on,
// 而这个会真的阻塞线程.
// await message delivery
self.wait_for_event(&peer_id, SharedMempoolNotification::NewTransactions); //peer_id这个接收方收到Tx后会向订阅方发布`NewTransactions`事件
// verify transaction was inserted into Mempool
let mempool = self.mempools.get(&peer).unwrap();
let block = mempool.lock().unwrap().get_block(100, HashSet::new());
//确保这个广播出去的Tx会被打包,也就是不会广播自己认为无效的Tx,自己认为seq不连续的tx
assert!(block.iter().any(|t| t == &transaction)); //SignedTransaction实现了eq
(transaction, peer_id)
}
_ => panic!("peer {:?} didn't broadcast transaction", peer),
}
}
}
#[test]
fn test_basic_flow() {
let (peer_a, peer_b) = (PeerId::random(), PeerId::random());
//建立起每个节点的通信Channel
let mut smp = SharedMempoolNetwork::bootstrap(vec![peer_a, peer_b]);
// peer_a 主动添加了三笔连续交易
smp.add_txns(
&peer_a,
vec![
TestTransaction::new(1, 0, 1),
TestTransaction::new(1, 1, 1),
TestTransaction::new(1, 2, 1),
],
);
// A discovers new peer B
smp.send_event(&peer_a, NetworkNotification::NewPeer(peer_b));
for seq in 0..3 {
// A attempts to send message,因为指定的
let transaction = smp.deliver_message(&peer_a).0;
assert_eq!(transaction.sequence_number(), seq);
}
}
从整体来说Libra的mempool和以太坊相比,简单了很多,我觉得有两个原因:
从整体功能来说,这个模块和其他公链的TxPool模块又是非常相似的,基本上就是收集管理Tx,为共识模块提供可以打包的Tx.
同时我们也看到rust进步神速,目前的tokio异步框架已经非常完善,再加上async,await关键字的加持,编写异步程序已经比较简单直接了.
本文作者为深入浅出共建者:白振轩,原文地址:libra的mempool模块解读-3
如果觉得我的文章对您有用,请随意打赏。你的支持将鼓励我继续创作!