Filecoin - Sector状态管理逻辑

  • Star Li
  • 更新于 2020-06-16 11:54
  • 阅读 4254

Sector的状态管理基于状态机。通用状态机的实现是通过go-statemachine实现。状态的存储通过go-statestore实现。在这些模块的基础上,storage-fsm实现了Sector的状态定义以及状态处理函数。

好久不看go语言的代码了,最近有空换换脑子,看看Sector的状态管理,纯go语言实现。看看大型项目的代码设计,对代码以及项目的设计开发有莫大的好处。Lotus的go语言部分的最新的代码,采用模块化设计。Lotus的代码也是一步步演进的,第一版的代码也是所有的逻辑耦合在一起,后面才逐步的模块化。

Lotus Miner将用户需要存储的数据“打包”成一个个Sector,并对Sector进行处理。本文就讲讲Sector的状态管理。

01 模块框架

Sector状态管理相关的模块如下: image.png

Sector的状态管理基于状态机。Lotus实现了通用的状态机(statemachine),在go-statemachine包中。在StateMachine之上,进一步抽象出了StateGroup,管理多个状态机。StateMachine只是实现状态的转变,具体状态是通过go-statestore进行存储。在StateMachine之上,定义状态转变的规则以及状态对应的处理函数,这些就是在具体的业务。SectorState就是Lotus管理Sector的具体业务。在这些底层模块的基础上,Lotus的相关代码调用就比较简单。先从状态管理的底层模块,StateMachine讲起:

02 StateMachine

StateMachine定义在go-statemachine包中,machine.go:

 type StateMachine struct {
 planner  Planner
 eventsIn chan Event

 name      interface{}
 st        *statestore.StoredState
 stateType reflect.Type

 stageDone chan struct{}
 closing   chan struct{}
 closed    chan struct{}

 busy int32
 }

其中,planner是抽象出来的状态机的状态转化函数。Planner接收Event,结合当前的状态user,确定下一步的处理。

type Planner func(events []Event, user interface{}) (interface{}, uint64, error)

st是存储的状态。stateType是存储状态的类型。

StateMachine的核心是run函数,分成三部分:接收Event,状态处理,下一步的调用。其中状态处理是关键:

err := fsm.mutateUser(func(user interface{}) (err error) {
 nextStep, processed, err = fsm.planner(pendingEvents, user)
 ustate = user
 if xerrors.Is(err, ErrTerminated) {
 terminated = true
 return nil
 }
 return err
})

mutateUser就是查看当前存储的状态(StoredState),并执行planner函数,并将planner处理后的状态存储。planner函数返回下一步的处理,已经处理的Event 的个数,有可能的出错。run函数会启动另外一个go routine执行nextStep。

mutateUser具体的实现是利用go的反射(reflect),实现抽象和模块化。相关的逻辑感兴趣的小伙伴可以自己查看。

03 StateGroup

往往业务需要很多StateMachine。举个例子,Lotus的Miner存储多个Sector,每个Sector都由一个StateMachine维护独立的状态。StateGroup就是实现多个“同样”的StateMachine,定义在group.go中。

 type StateGroup struct {
 sts       *statestore.StateStore
 hnd       StateHandler
 stateType reflect.Type

 closing      chan struct{}
 initNotifier sync.Once

 lk  sync.Mutex
 sms map[datastore.Key]*StateMachine
 }

其中,sms就是StateMachine的状态数组。StateHandler是StateMachine的状态处理函数接口:

 type StateHandler interface {
 Plan(events []Event, user interface{}) (interface{}, uint64, error)
 }

也就是说,在StateMachine状态机上,需要实现StateHandler接口(Plan函数,实现状态的转换)。

04 StoredState

StoredState是抽象的Key-Value的存储。相对比较简单,Get/Put接口,非常容易理解。

05 SectorState

storage-fsm实现了和Sector状态相关的业务逻辑。也就是状态的定义,状态的转换函数都是在这个包里实现。整个Sector的信息定义在storage-fsm/types.go中:

 type SectorInfo struct {
 State        SectorState
 SectorNumber abi.SectorNumber
 Nonce        uint64
 SectorType abi.RegisteredProof
 // Packing
 Pieces []Piece
 // PreCommit1
 TicketValue   abi.SealRandomness
 TicketEpoch   abi.ChainEpoch
 PreCommit1Out storage.PreCommit1Out
 // PreCommit2
 CommD *cid.Cid
 CommR *cid.Cid
 Proof []byte
 PreCommitMessage *cid.Cid
 // WaitSeed
 SeedValue abi.InteractiveSealRandomness
 SeedEpoch abi.ChainEpoch
 // Committing
 CommitMessage *cid.Cid
 InvalidProofs uint64
 // Faults
 FaultReportMsg *cid.Cid
 // Debug
 LastErr string
 Log []Log
 }

SectorInfo包括了Sector状态,Precommit1/2的数据,Committing的数据等等。其中,SectorState描述了Sector的具体状态。Sector的所有的状态定义在sector_state.go文件中:

 const (
 UndefinedSectorState SectorState = ""

 // happy path
 Empty          SectorState = "Empty"
 Packing        SectorState = "Packing"       // sector not in sealStore, and not on chain
 PreCommit1     SectorState = "PreCommit1"    // do PreCommit1
 PreCommit2     SectorState = "PreCommit2"    // do PreCommit1
 PreCommitting  SectorState = "PreCommitting" // on chain pre-commit
 WaitSeed       SectorState = "WaitSeed"      // waiting for seed
 Committing     SectorState = "Committing"
 CommitWait     SectorState = "CommitWait" // waiting for message to land on chain
 FinalizeSector SectorState = "FinalizeSector"
 Proving        SectorState = "Proving"
 // error modes
 FailedUnrecoverable SectorState = "FailedUnrecoverable"
 SealFailed          SectorState = "SealFailed"
 PreCommitFailed     SectorState = "PreCommitFailed"
 ComputeProofFailed  SectorState = "ComputeProofFailed"
 CommitFailed        SectorState = "CommitFailed"
 PackingFailed       SectorState = "PackingFailed"
 Faulty              SectorState = "Faulty"        // sector is corrupted or gone for some reason
 FaultReported       SectorState = "FaultReported" // sector has been declared as a fault on chain
 FaultedFinal        SectorState = "FaultedFinal"  // fault declared on chain
 )

熟悉SDR算法的小伙伴,会发现很多熟悉的字眼:PreCommit1,PreCommit2,Commiting等等。结合状态处理函数,就能很清楚各个状态的含义以及需要处理的内容。

fsm.go中Sealing结构的Plan函数是Sector状态的处理函数:

 func (m *Sealing) Plan(events []statemachine.Event, user interface{}) (interface{}, uint64, error) {
 next, err := m.plan(events, user.(*SectorInfo))
 if err != nil || next == nil {
 return nil, uint64(len(events)), err
 }

 return func(ctx statemachine.Context, si SectorInfo) error {
 err := next(ctx, si)
 if err != nil {
 log.Errorf("unhandled sector error (%d): %+v", si.SectorNumber, err)
 return nil
 }

 return nil
 }, uint64(len(events)), nil // TODO: This processed event count is not very correct
 }

Plan函数的实现也比较简单,调用plan处理当前的状态,并返回接下来需要处理的函数。状态的处理又可以分成两部分来看:1/ 状态转换的定义 2/ 状态的处理。

针对Sector的状态转换的定义,在fsmPlanners中:

 var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *SectorInfo) error{
 UndefinedSectorState: planOne(on(SectorStart{}, Packing)),
 Packing:              planOne(on(SectorPacked{}, PreCommit1)),
 PreCommit1: planOne(
 on(SectorPreCommit1{}, PreCommit2),
 on(SectorSealPreCommitFailed{}, SealFailed),
 on(SectorPackingFailed{}, PackingFailed),
 ),
 PreCommit2: planOne(
 on(SectorPreCommit2{}, PreCommitting),
 on(SectorSealPreCommitFailed{}, SealFailed),
 on(SectorPackingFailed{}, PackingFailed),
 ),
...

比如说,在PreCommit1的状态,接收到SectorPreCommit1事件,将进入PreCommit2状态。所有Sector的状态转换如下图:

在处理好状态好,就进入相应状态的处理程序。以PreCommit1状态为例,相应的处理函数是handlePreCommit1。

func (m *Sealing) handlePreCommit1(ctx statemachine.Context, sector SectorInfo) error {
 tok, epoch, err := m.api.ChainHead(ctx.Context())
 ...
 pc1o, err := m.sealer.SealPreCommit1(ctx.Context(), m.minerSector(sector.SectorNumber), ticketValue, sector.pieceInfos())
 if err != nil {
 return ctx.Send(SectorSealPreCommitFailed{xerrors.Errorf("seal pre commit(1) failed: %w", err)})
 }
 ...
 }

很清楚看出,在PreCommit1的处理函数中,会通过SealPrecommit1调用rust-fil-proofs实现Precommit1的计算。最后总结一下,各个状态的含义:

Empty - 空状态

Packing - 打包状态,多个Piece填充到一个Sector中

PreCommit1 - PreCommit1计算

PreCommit2 - PreCommit2计算

PreCommitting - 提交Precommit2的结果到链上

WaitSeed - 等待随机种子(给定10个区块的时间,让随机数种子不可提前预测)

Committing - 计算Commit1/Commit2,并将证明提交到链上

CommitWait - 等待链上确认 FinalizeSector - Sector状态确定

总结:

Sector的状态管理基于状态机。通用状态机的实现是通过go-statemachine实现。状态的存储通过go-statestore实现。在这些模块的基础上,storage-fsm实现了Sector的状态定义以及状态处理函数。


我的公众号星想法有很多原创高质量文章,欢迎大家扫码关注。

公众号-星想法

点赞 0
收藏 0
分享
本文参与登链社区写作激励计划 ,好文好收益,欢迎正在阅读的你也加入。

0 条评论

请先 登录 后评论
Star Li
Star Li
Trapdoor Tech创始人,前猎豹移动技术总监,香港中文大学访问学者。