pkg/objstore包主要是提供一个统一的接口,而实现则是由pkg/kvstore包来完成。venus-worker主动向venus-sector-manager的接口进行连接。问题VSM主要功能是什么?vsm如何与worker进行通信,通信协议是什么?worker会和vsm以外
pkg/objstore包主要是提供一个统一的接口,而实现则是由pkg/kvstore包来完成。venus-worker主动向venus-sector-manager的接口进行连接。vsm/core包定义了对外的接口格式。vsm/modules/poster包主要负责定时产生windowPost proof,然后通过venus.messager服务把proof msg发送给node。venus-worker服务通过api调用vsm/modules/sealer包的SubmitPreCommit()方法,进行preCommit消息的提交。而vsm/modeuls/sealer包会调用vsm/modules/impl/commitmgr包里实现具体如何向node节点进行提交信息。vsm/miner包主要负责通过gateway和venus-miner进行建立长链接。venus-miner会通过长链接主动向vsm/miner包发送ComputeProof命令,让其计算winningPost。vsm/modules/miner会通过vsm/modules/impl/prover包来实现具体的计算任务。vsm/moduels/impl/prover包会直接通过ffi直接调用rust库的函数进行计算。也可以通过vsm/moduels/impl/prover/ext包来调用vsm/pkg/extproc包来使用外部执行器来进行proof产生。src/sealing/worker/task/planner/sealer.rs文件的Planner类方法exec()会执行状态机状态转换,大部分sealer.rs文件的方法都会调用common.rs文件中的方法,common.rs文件中的函数会task的具体执行的processor定义在vc-processors/src/builtin/processors.rs文件中,这个文件会直接调用官方定义的fil-proofs包来实现具体的过程。整个worker主要设计思路是:根据config文件设置的sealingthread的数量,开启多少个sealingthread线程,线程里面会根据sealingthread的配置参数build出一个task,然后调用exec方法执行task,执行中最主要的调用方法是task.handle()方法,这个方法会调用planner.exec()方法。planner是一个trait,sealer,snapup,unseal,rebuild四种类型的planner分别是实现了planner trait的plan(),exec()两个方法。调用planner.exec()方法后,方法内部会根据当前的state状态,调用不同的方法进行处理,就是一个状态机。代码:
match state {
State::Empty => inner.empty(),
State::Allocated => inner.add_piece(),
State::PieceAdded => inner.build_tree_d(),
State::TreeDBuilt => inner.snap_encode(),
.....
.....
}
很多方法都会共用common.rs文件中的方法。但这个common.rs文件,也不是最后的执行者,common.rs的很多方法都会调用processor文件的内容,processor其实就是每次阶段执行的具体方法。当然咯,后面最主要调用的是fil-proofs包的内容才能实现。
watchdog::start_module()方法会生成一个线程,在线程内部会调用module的run()方法。SealingThread实现了Module trait,sealingthread module 的run方法会有一个loop循环,从而sealingthread线程会一直存在。完成一个封装任务以后,会开启新一轮的封装任务。
每一轮封装任务的开始状态都是empty状态,handle_empty()方法会进行状态切换,这个方法里面会调用damocles-manager的AllocateSectorrpc接口,获取下一个要封装的sector信息(主要是sectornumber),然后开始新一轮的封装任务。
damocles-manager的AllocateSector接口是会把sectornumber直接增加,然后返回给worker的,收到sector信息后,如果worker停止封装的话,manager也不会知道,从而worker再次调用AllocateSectorrpc接口的话,sectornumer还是会直接增加的(manager可以批量一次申请好几sector封装任务的,所以不一定是加1,也可以加N,N自己定义),从而导致sectornumber的浪费,因为上一个sectornumber因为worker的异常,而没有真正的封装。
pub fn start_module(&mut self, m: impl 'static + Module) {
.........
let hdl = thread::spawn(move || { //生成一个线程
........
let _guard = span.enter();
info!("start");
let res = m.run(ctx); // SealingThread module运行run方法
info!("stop");
let _ = res_tx.send(res);
});
self.modules.push((id, should_wait, hdl, res_rx));
}
fn run(&mut self, ctx: Ctx) -> Result<()> {
.......
'SEAL_LOOP: loop { // 不停的循环
......
if let Err(failure) = self.seal_one(&ctx, resume_state.take()) { // 开启一个sector的封装
......
.......
}
........
}
fn seal_one(&mut self, ctx: &Ctx, state: Option<State>) -> Result<(), Failure> {
let task = Task::build(ctx, &self.ctrl_ctx, &mut self.config, &mut self.store)?;
task.exec(state) //开始执行任务
}
// task/mod.rs
pub fn exec(mut self, state: Option<State>) -> Result<(), Failure> {
......
loop { // 循环完成一个个状态切换
......
.....
let handle_res = self.handle(event.take()); // handle方法处理每个状态
......
.....
if let Err(rerr) = self.report_state( // 每完成一个状态切换,就调用mananger的rpc,汇报一下自己的state给mananger。
SectorStateChange {
prev: prev.as_str().to_owned(),
next: self.sector.state.as_str().to_owned(),
event: format!("{:?}", event),
},
fail,
) {
error!("report state failed: {:?}", rerr);
};
}
fn handle(&mut self, event: Option<Event>) -> Result<Option<Event>, Failure> {
,,,,,,
let planner = get_planner(self.sector.plan.as_deref()).perm()?; // 共有sealer,snapup,rebuild,unseal四个planner
,,,,,,
,,,,,,
planner.exec(self) // planner执行具体的封装流程
}
// task/planner/sealer.rs
fn exec(&self, task: &mut Task<'_>) -> Result<Option<Event>, Failure> {
let state = task.sector.state;
let inner = Sealer { task };
//根据不同的状态,调用不同的状态处理方法;不同的planner的状态流程是不一样的。
match state {
State::Empty => inner.handle_empty(),
State::Allocated => inner.handle_allocated(),
State::DealsAcquired => inner.handle_deals_acquired(),
State::PieceAdded => inner.handle_piece_added(),
State::TreeDBuilt => inner.handle_tree_d_built(),
State::TicketAssigned => inner.handle_ticket_assigned(),
State::PC1Done => inner.handle_pc1_done(),
State::PC2Done => inner.handle_pc2_done(),
State::PCSubmitted => inner.handle_pc_submitted(),
State::PCLanded => inner.handle_pc_landed(),
State::Persisted => inner.handle_persisted(),
State::PersistanceSubmitted => inner.handle_persistance_submitted(),
State::SeedAssigned => inner.handle_seed_assigned(),
State::C1Done => inner.handle_c1_done(),
State::C2Done => inner.handle_c2_done(),
State::ProofSubmitted => inner.handle_proof_submitted(),
State::Finished => return Ok(None),
State::Aborted => {
return Err(TaskAborted.into());
}
other => return Err(anyhow!("unexpected state {:?} in sealer planner", other).abort()),
}
.map(From::from)
}
从以上我们可以总结得出几个概念:
如果觉得我的文章对您有用,请随意打赏。你的支持将鼓励我继续创作!