钱包业务层 - 6. 实现交易所回滚业务

在交易所钱包中,回滚是必须要考虑的一件事情。因为去中心化网络的不能由中心化节点控制。所以在多个矿工同时挖出新区块的时候,会形成临时分叉。但是出现分叉又必然回归到主链中,因为无论怎么分叉,总会有一条链是最长的。区块链中,只会承认长的那条链,而短的那条链中的交易,会被抛弃或重组。

什么是回滚业务

在交易所钱包中,回滚是必须要考虑的一件事情。因为去中心化网络的不能由中心化节点控制。所以在多个矿工同时挖出新区块的时候,会形成临时分叉。 但是出现分叉又必然回归到主链中,因为无论怎么分叉,总会有一条链是最长的。区块链中,只会承认长的那条链,而短的那条链中的交易,会被抛弃或重组。

我们可以想象一下,如果我们钱包中不处理回滚业务。此时如果我们用户进行充值,我们钱包系统中,扫链发现了这笔交易,充值完成。但是,此时发生了链上的重组, 链上资金又回到了用户的地址,但是我们交易所又给他分配了相应的资金,这样则会造成我们交易所资金的亏损。

完整项目 github 地址(如果对您有用,请给个小 star ⭐️):

  1. exchange-wallet-service:钱包业务层服务: <https://github.com/Shawn-Shaw-x/exchange-wallet-service>
  2. signature-machine:离线签名机:  <https://github.com/Shawn-Shaw-x/signature-machine>
  3. chains-union-rpc:多链统一 rpc:  <https://github.com/Shawn-Shaw-x/chains-union-rpc>

详解回滚

回滚分为两种类型:

  • 长分叉回滚: 这种情况指得是,当存在节点群进行网络操控,进行断网的情况下出块,并且出了相对较长的链。当这个节点群重新联网时,区块链网络会对比两条链的长度, 并选择较长的一条链作为主链,短的那条链则被抛弃,其中交易会被丢弃,资金没有实际转出去。

  • 短分叉重组: 短分叉重组的情况指得是,当我们区块链网络进行了短暂的分叉,通常只有一到两个块的分叉,很快又可以进行恢复。 此时,被抛弃的分叉中仍然有效的交易会被重组打包进新的区块中,交易并不一定会实际丢失。此时表现为:交易的 id 一致,但是其区块的 hash 仍然是不一致的。

无论是回滚还是重组,我们在宏观的层面上,都可以看做回滚进行处理就行了。 例如:

  1. 我们有一笔充值 100 的交易,资金从外部地址转移到交易所用户地址中。

  2. 交易所检测到了交易,用户地址余额记录增加 100

  3. 在没到确认位之前发生回滚,链上资金回到外部地址,用户地址余额记录减少 100

  4. 但如果是重组交易重新打包呢?其实流程仍然是一致的,回滚业务照常处理即可, 用户地址余额记录减少 100

  5. 因为是重组交易被重新打包了,那我们进行回滚业务完了后(出错的块会被删除),正常扫块,仍可以发现这笔充值交易,那么,用户的资金成功转出去了, 交易所的用户地址成功收到了这笔资金,用户地址又重新加上了 100,这样就相当于 +100-100+100。数据仍然是正确的。

可能有细心的朋友关注到了,上面我提到了一个确认位的概念,那么,什么是确认位呢,在交易所中有什么作用呢?

答:

确认位是交易所为了降低链回滚等不确定因素影响的一种安全机制,常用于充值交易中。

  1. 用户充值 100 ,交易所发现后交易后,记录上账,但不能进行提现。防止在短时间内链回滚造成资金的损失。

  2. 假如我们设置了确认位为 10 个块后确认,那么在 10 个块内发生回滚,则交易所会回收用户在交易所的资金,保证交易所不出现亏损。

  3. 假如在过了 10 个块后,链才发生回滚,那么交易所只能自认倒霉了(资损),自己承担链回滚造成的损失(链上资金回到外部地址、交易所资金也分配给用户了)

回滚业务的流程

上面讲了这么多,但其实还都是些概念性的东西,没进入到交易所内回滚业务是怎么实现的。下面,我将以一张流程图,来给你分析明白, 交易所内的回滚流程是怎么进行处理的。

image.png

由上面图中可以看到,我们回滚任务仍然是一个定时任务。他主要的任务是监听我们生产者的状态。

  1. 如果交易同步器(生产者)在扫描区块链,发现链上的区块和数据库中的区块 hash 不一致(parentHash 不等于上一个块的 hash) 则认为链上发生了回滚的情况。这时候,我们的同步器会无法正常生产数据,并标记为回滚状态。

  2. 在回滚任务的这个定时任务中,如果发现了生产者是回滚状态,则其执行回滚任务。

  3. 回滚任务首先要执行发现回滚块的任务,其操作流程是:以同步器标记的需要回滚的块往回找,回溯到其 parentHash 与前一个块 hash 相等位置。 此时,则获取到了需要回滚的块的范围(例如块 100 到块 90 需要被回滚)

  4. 在获得了回滚范围之后,我们所需要处理的任务只剩下处理数据库了。我们只需要将数据块中的 90 - 100 这个回滚范围内的块备份一下, 放到一个回滚表中,然后把原表删除掉(等同步器去重新扫这范围内的块即可)。 然后我们还需要处理充值、提现、归集、热转冷、冷转热交易以及交易流水表、余额表的数据库状态即可。 对充值、提现、归集、热转冷、冷转热、流水表标记为“已回滚”的状态,处理余额表(逆向加减余额)

讲到这里,有同学可能要问了我们回滚不需要重新发起交易的吗,为什么只需要更新数据库即可?

实际上,我们并不是不重新发起交易,而是把这个发起交易的权限给到了项目方(或者交易所钱包业务层), 我们只负责通知,然后让业务层去重新发起调用发起交易。(例如提现,我们通知他回滚了让他重新发起提现即可。我们不做偷偷发起交易这种事情。。。)

回滚业务的实现

说多了无益,下面我们直接来 show 代码:


/*启动*/
func (fb *Fallback) Start() error {
    log.Info("start fallback.........")
    fb.tasks.Go(func() error {
        for {
            select {
            case &lt;-fb.ticker.C:
                if fb.BaseSynchronizer.isFallback {
                    log.Info("fallback task", "synchronizer fallback handle", fb.BaseSynchronizer.fallbackBlockHeader.Number)
                    if err := fb.onFallback(fb.BaseSynchronizer.fallbackBlockHeader); err != nil {
                        log.Error("failed to notify fallback", "err", err)
                    }
                    dbLatestBlockHeader, err := fb.database.Blocks.LatestBlocks()
                    if err != nil {
                        log.Error("query latest block fail", "err", err)
                    }
                    /*传入新的 dbLatestBlockHeader,重新启动扫块*/
                    fb.BaseSynchronizer.blockBatch = rpcclient.NewBatchBlock(fb.rpcClient, dbLatestBlockHeader, big.NewInt(int64(fb.confirmations)))
                    /*处理完回滚,取消回滚状态*/
                    fb.BaseSynchronizer.isFallback = false
                    fb.BaseSynchronizer.fallbackBlockHeader = nil
                }
            case &lt;-fb.resourceCtx.Done():
                log.Info("stop fallback.........")
                return nil
            }
        }
    })
    return nil
}

/*回滚区块表、充值、提现、内部、流水、余额表处理*/
func (fb *Fallback) onFallback(fallbackBlockHeader *rpcclient.BlockHeader) error {
    reorgBlockHeaders, chainBlocks, entryBlockHeader, err := fb.findFallbackEntry(fallbackBlockHeader)
    if err != nil {
        log.Error("failed to find fallback entry", "err", err)
        return err
    }

    businessList, err := fb.database.Business.QueryBusinessList()
    if err != nil {
        log.Error("failed to query business list", "err", err)
        return err
    }

    var fallbackBalances []*database.TokenBalance
    for _, business := range businessList {
        log.Info("handle business", "businessUid", business.BusinessUid)
        /*范围内的交易记录*/
        transactionList, err := fb.database.Transactions.QueryFallBackTransactions(business.BusinessUid, entryBlockHeader.Number, fallbackBlockHeader.Number)
        if err != nil {
            log.Error("failed to query fallback transactions", "err", err)
            return err
        }
        for _, transaction := range transactionList {
            fallbackBalances = append(fallbackBalances, &database.TokenBalance{
                FromAddress:  transaction.FromAddress,
                ToAddress:    transaction.ToAddress,
                TokenAddress: transaction.TokenAddress,
                Balance:      transaction.Amount,
                TxType:       transaction.TxType,
            })
        }
    }

    retryStrategy := &retry.ExponentialStrategy{Min: 1000, Max: 20_000, MaxJitter: 250}
    if _, err := retry.Do[interface{}](fb.resourceCtx, 10, retryStrategy, func() (interface{}, error) {
        if err := fb.database.Transaction(func(tx *database.DB) error {
            if len(reorgBlockHeaders) > 0 {
                /*被回滚的区块备份*/
                if err := tx.ReorgBlocks.StoreReorgBlocks(reorgBlockHeaders); err != nil {
                    log.Error("failed to store reorg blocks", "err", err)
                    return err
                }
                log.Info("store reorg block success", "totalTx", len(reorgBlockHeaders))
            }

            if len(chainBlocks) > 0 {
                if err := tx.Blocks.DeleteBlocksByNumber(chainBlocks); err != nil {
                    return err
                }
                log.Info("delete block success", "totalTx", len(chainBlocks))
            }
            /*存在回滚块,标记其中交易(根据交易通知业务层去让其做逆向交易)*/
            if fallbackBlockHeader.Number.Cmp(entryBlockHeader.Number) > 0 {
                for _, business := range businessList {
                    /*充值回滚*/
                    if err := tx.Deposits.HandleFallBackDeposits(business.BusinessUid, entryBlockHeader.Number, fallbackBlockHeader.Number); err != nil {
                        log.Error("failed to handle fallback deposits", "err", err)
                        return err
                    }
                    /*提现回滚*/
                    if err := tx.Withdraws.HandleFallBackWithdraw(business.BusinessUid, entryBlockHeader.Number, fallbackBlockHeader.Number); err != nil {
                        log.Error("failed to handle fallback withdraws", "err", err)
                        return err
                    }

                    /*内部交易回滚*/
                    if err := tx.Internals.HandleFallBackInternals(business.BusinessUid, entryBlockHeader.Number, fallbackBlockHeader.Number); err != nil {
                        log.Error("failed to handle fallback internals", "err", err)
                        return err
                    }
                    /*流水表回滚*/
                    if err := tx.Transactions.HandleFallBackTransactions(business.BusinessUid, entryBlockHeader.Number, fallbackBlockHeader.Number); err != nil {
                        log.Error("failed to handle fallback transactions", "err", err)
                        return err
                    }
                    /*余额回滚*/
                    if err := tx.Balances.UpdateFallBackBalance(business.BusinessUid, fallbackBalances); err != nil {
                        log.Error("failed to update fallback balance", "err", err)
                        return err
                    }
                }
            }
            return nil
        }); err != nil {
            log.Error("unable to persist fallback batch", "err", err)
            return nil, err
        }
        return nil, nil
    }); err != nil {
        return err
    }

    return nil
}

/*找到回滚初始块并返回*/
func (fb *Fallback) findFallbackEntry(fallbackBlockHeader *rpcclient.BlockHeader) ([]database.ReorgBlocks, []database.Blocks, *rpcclient.BlockHeader, error) {
    var reorgBlockHeaders []database.ReorgBlocks
    var chainBlocks []database.Blocks

    lastBlockHeader := fallbackBlockHeader

    /*寻找到回滚的分叉点*/
    for {
        /*往回查找*/
        lastBlockNumber := new(big.Int).Sub(lastBlockHeader.Number, bigint.One)
        log.Info("start get block header info...", "last block number", lastBlockNumber)

        /*链上这个块*/
        chainBlockHeader, err := fb.rpcClient.GetBlockHeader(lastBlockNumber)
        if err != nil {
            log.Warn("failed to get block header info from chain", "err", err)
            return nil, nil, nil, fmt.Errorf("failed to get block header info from chain: %w", err)
        }
        /*数据库中*/
        dbBlockHeader, err := fb.database.Blocks.QueryBlocksByNumber(lastBlockNumber)
        if err != nil {
            log.Warn("failed to get block header info from database", "err", err)
            return nil, nil, nil, fmt.Errorf("failed to get block header info from database: %w", err)
        }
        log.Info("query blocks from database success", "last block number", lastBlockNumber)
        /*需要删除的*/
        chainBlocks = append(chainBlocks, database.Blocks{
            Hash:       dbBlockHeader.Hash,
            ParentHash: dbBlockHeader.ParentHash,
            Number:     dbBlockHeader.Number,
            Timestamp:  dbBlockHeader.Timestamp,
        })
        /*需要备份的*/
        reorgBlockHeaders = append(reorgBlockHeaders, database.ReorgBlocks{
            Hash:       dbBlockHeader.Hash,
            ParentHash: dbBlockHeader.ParentHash,
            Number:     dbBlockHeader.Number,
            Timestamp:  dbBlockHeader.Timestamp,
        })
        log.Info("lastBlockHeader chainBlockHeader", "lastBlockParentHash", lastBlockHeader.ParentHash, "lastBlockNumber", lastBlockHeader.Number, "chainBlockHash", chainBlockHeader.Hash, "chainBlockHeaderNumber", chainBlockHeader.Number)

        /*已找到分叉点,正常终止*/
        if lastBlockHeader.ParentHash == chainBlockHeader.Hash {
            lastBlockHeader = chainBlockHeader
            return reorgBlockHeaders, chainBlocks, chainBlockHeader, nil
        }
        /*往前移动*/
        lastBlockHeader = chainBlockHeader
    }
}

回滚测试

  1. 修改数据库,模拟区块发生回滚,区块 hash 不正确

image.png

  1. 在不连续的区块上,伪造一笔交易,测试回滚

image.png

  1. 回滚之前的 transaction 流水表、balance 余额表

image.png

image.png

  1. 回滚之后的 blocks 表、reorgBlocks 表、流水表、余额表

image.png

image.png

image.png

image.png

点赞 0
收藏 0
分享
本文参与登链社区写作激励计划 ,好文好收益,欢迎正在阅读的你也加入。

0 条评论

请先 登录 后评论
shawn_shaw
shawn_shaw
web3潜水员、技术爱好者、web3钱包开发工程师。欢迎闲聊唠嗑、精进技术、交流工作机会。vx:cola_ocean