pkg/objstore包主要是提供一个统一的接口,而实现则是由pkg/kvstore包来完成。venus-worker主动向venus-sector-manager的接口进行连接。问题VSM主要功能是什么?vsm如何与worker进行通信,通信协议是什么?worker会和vsm以外
pkg/objstore
包主要是提供一个统一的接口,而实现则是由pkg/kvstore
包来完成。venus-worker
主动向venus-sector-manager
的接口进行连接。vsm/core
包定义了对外的接口格式。vsm/modules/poster
包主要负责定时产生windowPost proof,然后通过venus.messager服务把proof msg发送给node。venus-worker
服务通过api调用vsm/modules/sealer
包的SubmitPreCommit()
方法,进行preCommit消息的提交。而vsm/modeuls/sealer
包会调用vsm/modules/impl/commitmgr
包里实现具体如何向node节点进行提交信息。vsm/miner
包主要负责通过gateway
和venus-miner
进行建立长链接。venus-miner
会通过长链接主动向vsm/miner
包发送ComputeProof
命令,让其计算winningPost。vsm/modules/miner
会通过vsm/modules/impl/prover
包来实现具体的计算任务。vsm/moduels/impl/prover
包会直接通过ffi直接调用rust库的函数进行计算。也可以通过vsm/moduels/impl/prover/ext
包来调用vsm/pkg/extproc
包来使用外部执行器来进行proof产生。src/sealing/worker/task/planner/sealer.rs
文件的Planner
类方法exec()
会执行状态机状态转换,大部分sealer.rs
文件的方法都会调用common.rs
文件中的方法,common.rs
文件中的函数会task
的具体执行的processor
定义在vc-processors/src/builtin/processors.rs
文件中,这个文件会直接调用官方定义的fil-proofs
包来实现具体的过程。整个worker主要设计思路是:根据config文件设置的sealingthread
的数量,开启多少个sealingthread线程,线程里面会根据sealingthread
的配置参数build出一个task,然后调用exec
方法执行task,执行中最主要的调用方法是task.handle()
方法,这个方法会调用planner.exec()
方法。planner是一个trait,sealer,snapup,unseal,rebuild
四种类型的planner分别是实现了planner trait的plan(),exec()
两个方法。调用planner.exec()方法后,方法内部会根据当前的state状态,调用不同的方法进行处理,就是一个状态机。代码:
match state {
State::Empty => inner.empty(),
State::Allocated => inner.add_piece(),
State::PieceAdded => inner.build_tree_d(),
State::TreeDBuilt => inner.snap_encode(),
.....
.....
}
很多方法都会共用common.rs
文件中的方法。但这个common.rs
文件,也不是最后的执行者,common.rs的很多方法都会调用processor文件的内容,processor其实就是每次阶段执行的具体方法。当然咯,后面最主要调用的是fil-proofs
包的内容才能实现。
watchdog::start_module()
方法会生成一个线程,在线程内部会调用module的run()方法。SealingThread实现了Module trait,sealingthread module 的run方法会有一个loop循环,从而sealingthread线程会一直存在。完成一个封装任务以后,会开启新一轮的封装任务。
每一轮封装任务的开始状态都是empty状态,handle_empty()
方法会进行状态切换,这个方法里面会调用damocles-manager的AllocateSector
rpc接口,获取下一个要封装的sector信息(主要是sectornumber),然后开始新一轮的封装任务。
damocles-manager
的AllocateSector
接口是会把sectornumber直接增加,然后返回给worker的,收到sector信息后,如果worker停止封装的话,manager也不会知道,从而worker再次调用AllocateSector
rpc接口的话,sectornumer还是会直接增加的(manager可以批量一次申请好几sector封装任务的,所以不一定是加1,也可以加N,N自己定义),从而导致sectornumber的浪费,因为上一个sectornumber因为worker的异常,而没有真正的封装。
pub fn start_module(&mut self, m: impl 'static + Module) {
.........
let hdl = thread::spawn(move || { //生成一个线程
........
let _guard = span.enter();
info!("start");
let res = m.run(ctx); // SealingThread module运行run方法
info!("stop");
let _ = res_tx.send(res);
});
self.modules.push((id, should_wait, hdl, res_rx));
}
fn run(&mut self, ctx: Ctx) -> Result<()> {
.......
'SEAL_LOOP: loop { // 不停的循环
......
if let Err(failure) = self.seal_one(&ctx, resume_state.take()) { // 开启一个sector的封装
......
.......
}
........
}
fn seal_one(&mut self, ctx: &Ctx, state: Option<State>) -> Result<(), Failure> {
let task = Task::build(ctx, &self.ctrl_ctx, &mut self.config, &mut self.store)?;
task.exec(state) //开始执行任务
}
// task/mod.rs
pub fn exec(mut self, state: Option<State>) -> Result<(), Failure> {
......
loop { // 循环完成一个个状态切换
......
.....
let handle_res = self.handle(event.take()); // handle方法处理每个状态
......
.....
if let Err(rerr) = self.report_state( // 每完成一个状态切换,就调用mananger的rpc,汇报一下自己的state给mananger。
SectorStateChange {
prev: prev.as_str().to_owned(),
next: self.sector.state.as_str().to_owned(),
event: format!("{:?}", event),
},
fail,
) {
error!("report state failed: {:?}", rerr);
};
}
fn handle(&mut self, event: Option<Event>) -> Result<Option<Event>, Failure> {
,,,,,,
let planner = get_planner(self.sector.plan.as_deref()).perm()?; // 共有sealer,snapup,rebuild,unseal四个planner
,,,,,,
,,,,,,
planner.exec(self) // planner执行具体的封装流程
}
// task/planner/sealer.rs
fn exec(&self, task: &mut Task<'_>) -> Result<Option<Event>, Failure> {
let state = task.sector.state;
let inner = Sealer { task };
//根据不同的状态,调用不同的状态处理方法;不同的planner的状态流程是不一样的。
match state {
State::Empty => inner.handle_empty(),
State::Allocated => inner.handle_allocated(),
State::DealsAcquired => inner.handle_deals_acquired(),
State::PieceAdded => inner.handle_piece_added(),
State::TreeDBuilt => inner.handle_tree_d_built(),
State::TicketAssigned => inner.handle_ticket_assigned(),
State::PC1Done => inner.handle_pc1_done(),
State::PC2Done => inner.handle_pc2_done(),
State::PCSubmitted => inner.handle_pc_submitted(),
State::PCLanded => inner.handle_pc_landed(),
State::Persisted => inner.handle_persisted(),
State::PersistanceSubmitted => inner.handle_persistance_submitted(),
State::SeedAssigned => inner.handle_seed_assigned(),
State::C1Done => inner.handle_c1_done(),
State::C2Done => inner.handle_c2_done(),
State::ProofSubmitted => inner.handle_proof_submitted(),
State::Finished => return Ok(None),
State::Aborted => {
return Err(TaskAborted.into());
}
other => return Err(anyhow!("unexpected state {:?} in sealer planner", other).abort()),
}
.map(From::from)
}
从以上我们可以总结得出几个概念:
如果觉得我的文章对您有用,请随意打赏。你的支持将鼓励我继续创作!