Sector的状态管理基于状态机。通用状态机的实现是通过go-statemachine实现。状态的存储通过go-statestore实现。在这些模块的基础上,storage-fsm实现了Sector的状态定义以及状态处理函数。
好久不看go语言的代码了,最近有空换换脑子,看看Sector的状态管理,纯go语言实现。看看大型项目的代码设计,对代码以及项目的设计开发有莫大的好处。Lotus的go语言部分的最新的代码,采用模块化设计。Lotus的代码也是一步步演进的,第一版的代码也是所有的逻辑耦合在一起,后面才逐步的模块化。
Lotus Miner将用户需要存储的数据“打包”成一个个Sector,并对Sector进行处理。本文就讲讲Sector的状态管理。
Sector状态管理相关的模块如下:
Sector的状态管理基于状态机。Lotus实现了通用的状态机(statemachine),在go-statemachine包中。在StateMachine之上,进一步抽象出了StateGroup,管理多个状态机。StateMachine只是实现状态的转变,具体状态是通过go-statestore进行存储。在StateMachine之上,定义状态转变的规则以及状态对应的处理函数,这些就是在具体的业务。SectorState就是Lotus管理Sector的具体业务。在这些底层模块的基础上,Lotus的相关代码调用就比较简单。先从状态管理的底层模块,StateMachine讲起:
StateMachine定义在go-statemachine包中,machine.go:
type StateMachine struct {
planner Planner
eventsIn chan Event
name interface{}
st *statestore.StoredState
stateType reflect.Type
stageDone chan struct{}
closing chan struct{}
closed chan struct{}
busy int32
}
其中,planner是抽象出来的状态机的状态转化函数。Planner接收Event,结合当前的状态user,确定下一步的处理。
type Planner func(events []Event, user interface{}) (interface{}, uint64, error)
st是存储的状态。stateType是存储状态的类型。
StateMachine的核心是run函数,分成三部分:接收Event,状态处理,下一步的调用。其中状态处理是关键:
err := fsm.mutateUser(func(user interface{}) (err error) {
nextStep, processed, err = fsm.planner(pendingEvents, user)
ustate = user
if xerrors.Is(err, ErrTerminated) {
terminated = true
return nil
}
return err
})
mutateUser就是查看当前存储的状态(StoredState),并执行planner函数,并将planner处理后的状态存储。planner函数返回下一步的处理,已经处理的Event 的个数,有可能的出错。run函数会启动另外一个go routine执行nextStep。
mutateUser具体的实现是利用go的反射(reflect),实现抽象和模块化。相关的逻辑感兴趣的小伙伴可以自己查看。
往往业务需要很多StateMachine。举个例子,Lotus的Miner存储多个Sector,每个Sector都由一个StateMachine维护独立的状态。StateGroup就是实现多个“同样”的StateMachine,定义在group.go中。
type StateGroup struct {
sts *statestore.StateStore
hnd StateHandler
stateType reflect.Type
closing chan struct{}
initNotifier sync.Once
lk sync.Mutex
sms map[datastore.Key]*StateMachine
}
其中,sms就是StateMachine的状态数组。StateHandler是StateMachine的状态处理函数接口:
type StateHandler interface {
Plan(events []Event, user interface{}) (interface{}, uint64, error)
}
也就是说,在StateMachine状态机上,需要实现StateHandler接口(Plan函数,实现状态的转换)。
StoredState是抽象的Key-Value的存储。相对比较简单,Get/Put接口,非常容易理解。
storage-fsm实现了和Sector状态相关的业务逻辑。也就是状态的定义,状态的转换函数都是在这个包里实现。整个Sector的信息定义在storage-fsm/types.go中:
type SectorInfo struct {
State SectorState
SectorNumber abi.SectorNumber
Nonce uint64
SectorType abi.RegisteredProof
// Packing
Pieces []Piece
// PreCommit1
TicketValue abi.SealRandomness
TicketEpoch abi.ChainEpoch
PreCommit1Out storage.PreCommit1Out
// PreCommit2
CommD *cid.Cid
CommR *cid.Cid
Proof []byte
PreCommitMessage *cid.Cid
// WaitSeed
SeedValue abi.InteractiveSealRandomness
SeedEpoch abi.ChainEpoch
// Committing
CommitMessage *cid.Cid
InvalidProofs uint64
// Faults
FaultReportMsg *cid.Cid
// Debug
LastErr string
Log []Log
}
SectorInfo包括了Sector状态,Precommit1/2的数据,Committing的数据等等。其中,SectorState描述了Sector的具体状态。Sector的所有的状态定义在sector_state.go文件中:
const (
UndefinedSectorState SectorState = ""
// happy path
Empty SectorState = "Empty"
Packing SectorState = "Packing" // sector not in sealStore, and not on chain
PreCommit1 SectorState = "PreCommit1" // do PreCommit1
PreCommit2 SectorState = "PreCommit2" // do PreCommit1
PreCommitting SectorState = "PreCommitting" // on chain pre-commit
WaitSeed SectorState = "WaitSeed" // waiting for seed
Committing SectorState = "Committing"
CommitWait SectorState = "CommitWait" // waiting for message to land on chain
FinalizeSector SectorState = "FinalizeSector"
Proving SectorState = "Proving"
// error modes
FailedUnrecoverable SectorState = "FailedUnrecoverable"
SealFailed SectorState = "SealFailed"
PreCommitFailed SectorState = "PreCommitFailed"
ComputeProofFailed SectorState = "ComputeProofFailed"
CommitFailed SectorState = "CommitFailed"
PackingFailed SectorState = "PackingFailed"
Faulty SectorState = "Faulty" // sector is corrupted or gone for some reason
FaultReported SectorState = "FaultReported" // sector has been declared as a fault on chain
FaultedFinal SectorState = "FaultedFinal" // fault declared on chain
)
熟悉SDR算法的小伙伴,会发现很多熟悉的字眼:PreCommit1,PreCommit2,Commiting等等。结合状态处理函数,就能很清楚各个状态的含义以及需要处理的内容。
fsm.go中Sealing结构的Plan函数是Sector状态的处理函数:
func (m *Sealing) Plan(events []statemachine.Event, user interface{}) (interface{}, uint64, error) {
next, err := m.plan(events, user.(*SectorInfo))
if err != nil || next == nil {
return nil, uint64(len(events)), err
}
return func(ctx statemachine.Context, si SectorInfo) error {
err := next(ctx, si)
if err != nil {
log.Errorf("unhandled sector error (%d): %+v", si.SectorNumber, err)
return nil
}
return nil
}, uint64(len(events)), nil // TODO: This processed event count is not very correct
}
Plan函数的实现也比较简单,调用plan处理当前的状态,并返回接下来需要处理的函数。状态的处理又可以分成两部分来看:1/ 状态转换的定义 2/ 状态的处理。
针对Sector的状态转换的定义,在fsmPlanners中:
var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *SectorInfo) error{
UndefinedSectorState: planOne(on(SectorStart{}, Packing)),
Packing: planOne(on(SectorPacked{}, PreCommit1)),
PreCommit1: planOne(
on(SectorPreCommit1{}, PreCommit2),
on(SectorSealPreCommitFailed{}, SealFailed),
on(SectorPackingFailed{}, PackingFailed),
),
PreCommit2: planOne(
on(SectorPreCommit2{}, PreCommitting),
on(SectorSealPreCommitFailed{}, SealFailed),
on(SectorPackingFailed{}, PackingFailed),
),
...
比如说,在PreCommit1的状态,接收到SectorPreCommit1事件,将进入PreCommit2状态。所有Sector的状态转换如下图:
在处理好状态好,就进入相应状态的处理程序。以PreCommit1状态为例,相应的处理函数是handlePreCommit1。
func (m *Sealing) handlePreCommit1(ctx statemachine.Context, sector SectorInfo) error {
tok, epoch, err := m.api.ChainHead(ctx.Context())
...
pc1o, err := m.sealer.SealPreCommit1(ctx.Context(), m.minerSector(sector.SectorNumber), ticketValue, sector.pieceInfos())
if err != nil {
return ctx.Send(SectorSealPreCommitFailed{xerrors.Errorf("seal pre commit(1) failed: %w", err)})
}
...
}
很清楚看出,在PreCommit1的处理函数中,会通过SealPrecommit1调用rust-fil-proofs实现Precommit1的计算。最后总结一下,各个状态的含义:
Empty - 空状态
Packing - 打包状态,多个Piece填充到一个Sector中
PreCommit1 - PreCommit1计算
PreCommit2 - PreCommit2计算
PreCommitting - 提交Precommit2的结果到链上
WaitSeed - 等待随机种子(给定10个区块的时间,让随机数种子不可提前预测)
Committing - 计算Commit1/Commit2,并将证明提交到链上
CommitWait - 等待链上确认 FinalizeSector - Sector状态确定
Sector的状态管理基于状态机。通用状态机的实现是通过go-statemachine实现。状态的存储通过go-statestore实现。在这些模块的基础上,storage-fsm实现了Sector的状态定义以及状态处理函数。
我的公众号星想法有很多原创高质量文章,欢迎大家扫码关注。
如果觉得我的文章对您有用,请随意打赏。你的支持将鼓励我继续创作!