在交易所钱包中,回滚是必须要考虑的一件事情。因为去中心化网络的不能由中心化节点控制。所以在多个矿工同时挖出新区块的时候,会形成临时分叉。但是出现分叉又必然回归到主链中,因为无论怎么分叉,总会有一条链是最长的。区块链中,只会承认长的那条链,而短的那条链中的交易,会被抛弃或重组。
在交易所钱包中,回滚是必须要考虑的一件事情。因为去中心化网络的不能由中心化节点控制。所以在多个矿工同时挖出新区块的时候,会形成临时分叉。 但是出现分叉又必然回归到主链中,因为无论怎么分叉,总会有一条链是最长的。区块链中,只会承认长的那条链,而短的那条链中的交易,会被抛弃或重组。
我们可以想象一下,如果我们钱包中不处理回滚业务。此时如果我们用户进行充值,我们钱包系统中,扫链发现了这笔交易,充值完成。但是,此时发生了链上的重组, 链上资金又回到了用户的地址,但是我们交易所又给他分配了相应的资金,这样则会造成我们交易所资金的亏损。
完整项目 github
地址(如果对您有用,请给个小 star
⭐️):
exchange-wallet-service
:钱包业务层服务: <https://github.com/Shawn-Shaw-x/exchange-wallet-service>signature-machine
:离线签名机: <https://github.com/Shawn-Shaw-x/signature-machine>chains-union-rpc
:多链统一 rpc
: <https://github.com/Shawn-Shaw-x/chains-union-rpc>长分叉回滚: 这种情况指得是,当存在节点群进行网络操控,进行断网的情况下出块,并且出了相对较长的链。当这个节点群重新联网时,区块链网络会对比两条链的长度, 并选择较长的一条链作为主链,短的那条链则被抛弃,其中交易会被丢弃,资金没有实际转出去。
短分叉重组:
短分叉重组的情况指得是,当我们区块链网络进行了短暂的分叉,通常只有一到两个块的分叉,很快又可以进行恢复。
此时,被抛弃的分叉中仍然有效的交易会被重组打包进新的区块中,交易并不一定会实际丢失。此时表现为:交易的 id
一致,但是其区块的 hash
仍然是不一致的。
无论是回滚还是重组,我们在宏观的层面上,都可以看做回滚进行处理就行了。 例如:
我们有一笔充值 100
的交易,资金从外部地址转移到交易所用户地址中。
交易所检测到了交易,用户地址余额记录增加 100
。
在没到确认位之前发生回滚,链上资金回到外部地址,用户地址余额记录减少 100
。
但如果是重组交易重新打包呢?其实流程仍然是一致的,回滚业务照常处理即可, 用户地址余额记录减少 100
。
因为是重组交易被重新打包了,那我们进行回滚业务完了后(出错的块会被删除),正常扫块,仍可以发现这笔充值交易,那么,用户的资金成功转出去了,
交易所的用户地址成功收到了这笔资金,用户地址又重新加上了 100
,这样就相当于 +100-100+100
。数据仍然是正确的。
可能有细心的朋友关注到了,上面我提到了一个确认位的概念,那么,什么是确认位呢,在交易所中有什么作用呢?
答:
确认位是交易所为了降低链回滚等不确定因素影响的一种安全机制,常用于充值交易中。
用户充值 100
,交易所发现后交易后,记录上账,但不能进行提现。防止在短时间内链回滚造成资金的损失。
假如我们设置了确认位为 10
个块后确认,那么在 10
个块内发生回滚,则交易所会回收用户在交易所的资金,保证交易所不出现亏损。
假如在过了 10
个块后,链才发生回滚,那么交易所只能自认倒霉了(资损),自己承担链回滚造成的损失(链上资金回到外部地址、交易所资金也分配给用户了)
上面讲了这么多,但其实还都是些概念性的东西,没进入到交易所内回滚业务是怎么实现的。下面,我将以一张流程图,来给你分析明白, 交易所内的回滚流程是怎么进行处理的。
由上面图中可以看到,我们回滚任务仍然是一个定时任务。他主要的任务是监听我们生产者的状态。
如果交易同步器(生产者)在扫描区块链,发现链上的区块和数据库中的区块 hash
不一致(parentHash
不等于上一个块的 hash
)
则认为链上发生了回滚的情况。这时候,我们的同步器会无法正常生产数据,并标记为回滚状态。
在回滚任务的这个定时任务中,如果发现了生产者是回滚状态,则其执行回滚任务。
回滚任务首先要执行发现回滚块的任务,其操作流程是:以同步器标记的需要回滚的块往回找,回溯到其 parentHash 与前一个块 hash 相等位置。
此时,则获取到了需要回滚的块的范围(例如块 100
到块 90
需要被回滚)
在获得了回滚范围之后,我们所需要处理的任务只剩下处理数据库了。我们只需要将数据块中的 90 - 100
这个回滚范围内的块备份一下,
放到一个回滚表中,然后把原表删除掉(等同步器去重新扫这范围内的块即可)。
然后我们还需要处理充值、提现、归集、热转冷、冷转热交易以及交易流水表、余额表的数据库状态即可。
对充值、提现、归集、热转冷、冷转热、流水表标记为“已回滚”的状态,处理余额表(逆向加减余额)
讲到这里,有同学可能要问了:我们回滚不需要重新发起交易的吗,为什么只需要更新数据库即可?
实际上,我们并不是不重新发起交易,而是把这个发起交易的权限给到了项目方(或者交易所钱包业务层), 我们只负责通知,然后让业务层去重新发起调用发起交易。(例如提现,我们通知他回滚了让他重新发起提现即可。我们不做偷偷发起交易这种事情。。。)
说多了无益,下面我们直接来 show
代码:
/*启动*/
func (fb *Fallback) Start() error {
log.Info("start fallback.........")
fb.tasks.Go(func() error {
for {
select {
case <-fb.ticker.C:
if fb.BaseSynchronizer.isFallback {
log.Info("fallback task", "synchronizer fallback handle", fb.BaseSynchronizer.fallbackBlockHeader.Number)
if err := fb.onFallback(fb.BaseSynchronizer.fallbackBlockHeader); err != nil {
log.Error("failed to notify fallback", "err", err)
}
dbLatestBlockHeader, err := fb.database.Blocks.LatestBlocks()
if err != nil {
log.Error("query latest block fail", "err", err)
}
/*传入新的 dbLatestBlockHeader,重新启动扫块*/
fb.BaseSynchronizer.blockBatch = rpcclient.NewBatchBlock(fb.rpcClient, dbLatestBlockHeader, big.NewInt(int64(fb.confirmations)))
/*处理完回滚,取消回滚状态*/
fb.BaseSynchronizer.isFallback = false
fb.BaseSynchronizer.fallbackBlockHeader = nil
}
case <-fb.resourceCtx.Done():
log.Info("stop fallback.........")
return nil
}
}
})
return nil
}
/*回滚区块表、充值、提现、内部、流水、余额表处理*/
func (fb *Fallback) onFallback(fallbackBlockHeader *rpcclient.BlockHeader) error {
reorgBlockHeaders, chainBlocks, entryBlockHeader, err := fb.findFallbackEntry(fallbackBlockHeader)
if err != nil {
log.Error("failed to find fallback entry", "err", err)
return err
}
businessList, err := fb.database.Business.QueryBusinessList()
if err != nil {
log.Error("failed to query business list", "err", err)
return err
}
var fallbackBalances []*database.TokenBalance
for _, business := range businessList {
log.Info("handle business", "businessUid", business.BusinessUid)
/*范围内的交易记录*/
transactionList, err := fb.database.Transactions.QueryFallBackTransactions(business.BusinessUid, entryBlockHeader.Number, fallbackBlockHeader.Number)
if err != nil {
log.Error("failed to query fallback transactions", "err", err)
return err
}
for _, transaction := range transactionList {
fallbackBalances = append(fallbackBalances, &database.TokenBalance{
FromAddress: transaction.FromAddress,
ToAddress: transaction.ToAddress,
TokenAddress: transaction.TokenAddress,
Balance: transaction.Amount,
TxType: transaction.TxType,
})
}
}
retryStrategy := &retry.ExponentialStrategy{Min: 1000, Max: 20_000, MaxJitter: 250}
if _, err := retry.Do[interface{}](fb.resourceCtx, 10, retryStrategy, func() (interface{}, error) {
if err := fb.database.Transaction(func(tx *database.DB) error {
if len(reorgBlockHeaders) > 0 {
/*被回滚的区块备份*/
if err := tx.ReorgBlocks.StoreReorgBlocks(reorgBlockHeaders); err != nil {
log.Error("failed to store reorg blocks", "err", err)
return err
}
log.Info("store reorg block success", "totalTx", len(reorgBlockHeaders))
}
if len(chainBlocks) > 0 {
if err := tx.Blocks.DeleteBlocksByNumber(chainBlocks); err != nil {
return err
}
log.Info("delete block success", "totalTx", len(chainBlocks))
}
/*存在回滚块,标记其中交易(根据交易通知业务层去让其做逆向交易)*/
if fallbackBlockHeader.Number.Cmp(entryBlockHeader.Number) > 0 {
for _, business := range businessList {
/*充值回滚*/
if err := tx.Deposits.HandleFallBackDeposits(business.BusinessUid, entryBlockHeader.Number, fallbackBlockHeader.Number); err != nil {
log.Error("failed to handle fallback deposits", "err", err)
return err
}
/*提现回滚*/
if err := tx.Withdraws.HandleFallBackWithdraw(business.BusinessUid, entryBlockHeader.Number, fallbackBlockHeader.Number); err != nil {
log.Error("failed to handle fallback withdraws", "err", err)
return err
}
/*内部交易回滚*/
if err := tx.Internals.HandleFallBackInternals(business.BusinessUid, entryBlockHeader.Number, fallbackBlockHeader.Number); err != nil {
log.Error("failed to handle fallback internals", "err", err)
return err
}
/*流水表回滚*/
if err := tx.Transactions.HandleFallBackTransactions(business.BusinessUid, entryBlockHeader.Number, fallbackBlockHeader.Number); err != nil {
log.Error("failed to handle fallback transactions", "err", err)
return err
}
/*余额回滚*/
if err := tx.Balances.UpdateFallBackBalance(business.BusinessUid, fallbackBalances); err != nil {
log.Error("failed to update fallback balance", "err", err)
return err
}
}
}
return nil
}); err != nil {
log.Error("unable to persist fallback batch", "err", err)
return nil, err
}
return nil, nil
}); err != nil {
return err
}
return nil
}
/*找到回滚初始块并返回*/
func (fb *Fallback) findFallbackEntry(fallbackBlockHeader *rpcclient.BlockHeader) ([]database.ReorgBlocks, []database.Blocks, *rpcclient.BlockHeader, error) {
var reorgBlockHeaders []database.ReorgBlocks
var chainBlocks []database.Blocks
lastBlockHeader := fallbackBlockHeader
/*寻找到回滚的分叉点*/
for {
/*往回查找*/
lastBlockNumber := new(big.Int).Sub(lastBlockHeader.Number, bigint.One)
log.Info("start get block header info...", "last block number", lastBlockNumber)
/*链上这个块*/
chainBlockHeader, err := fb.rpcClient.GetBlockHeader(lastBlockNumber)
if err != nil {
log.Warn("failed to get block header info from chain", "err", err)
return nil, nil, nil, fmt.Errorf("failed to get block header info from chain: %w", err)
}
/*数据库中*/
dbBlockHeader, err := fb.database.Blocks.QueryBlocksByNumber(lastBlockNumber)
if err != nil {
log.Warn("failed to get block header info from database", "err", err)
return nil, nil, nil, fmt.Errorf("failed to get block header info from database: %w", err)
}
log.Info("query blocks from database success", "last block number", lastBlockNumber)
/*需要删除的*/
chainBlocks = append(chainBlocks, database.Blocks{
Hash: dbBlockHeader.Hash,
ParentHash: dbBlockHeader.ParentHash,
Number: dbBlockHeader.Number,
Timestamp: dbBlockHeader.Timestamp,
})
/*需要备份的*/
reorgBlockHeaders = append(reorgBlockHeaders, database.ReorgBlocks{
Hash: dbBlockHeader.Hash,
ParentHash: dbBlockHeader.ParentHash,
Number: dbBlockHeader.Number,
Timestamp: dbBlockHeader.Timestamp,
})
log.Info("lastBlockHeader chainBlockHeader", "lastBlockParentHash", lastBlockHeader.ParentHash, "lastBlockNumber", lastBlockHeader.Number, "chainBlockHash", chainBlockHeader.Hash, "chainBlockHeaderNumber", chainBlockHeader.Number)
/*已找到分叉点,正常终止*/
if lastBlockHeader.ParentHash == chainBlockHeader.Hash {
lastBlockHeader = chainBlockHeader
return reorgBlockHeaders, chainBlocks, chainBlockHeader, nil
}
/*往前移动*/
lastBlockHeader = chainBlockHeader
}
}
hash
不正确transaction
流水表、balance
余额表blocks
表、reorgBlocks
表、流水表、余额表如果觉得我的文章对您有用,请随意打赏。你的支持将鼓励我继续创作!