在之前的开发步骤中,我们实现了交易的同步器,负责将区块链上的区块扫描下来,并解析交易筛选出与我们交易所内所有项目方有关的地址,放到一个同步管道中。(属于生产者的角色)在这步的开发中,我们将实现一个消费者角色,也就是交易的发现器。
在之前的开发步骤中,我们实现了交易的同步器,负责将区块链上的区块扫描下来,并解析交易筛选出 与我们交易所内所有项目方有关的地址,放到一个同步管道中。(属于生产者的角色) 在这步的开发中,我们将实现一个消费者角色,也就是交易的发现器。 在这个发现器中,我们将实现充值、提现、归集、转冷、转热交易的链上发现处理, 并且完成充值确认位的处理,交易流水的入库处理。
完整项目 github
地址(如果对您有用,请给个小 star
):
exchange-wallet-service
:钱包业务层服务: <https://github.com/Shawn-Shaw-x/exchange-wallet-service>signature-machine
:离线签名机: <https://github.com/Shawn-Shaw-x/signature-machine>chains-union-rpc
:多链统一 rpc
: <https://github.com/Shawn-Shaw-x/chains-union-rpc>在交易所钱包业务中,充值业务无异于重中之重。充值业务指得是:用户在外部使用其他地址,将链上资产发送到我们交易所的用户地址里面去(用户地址是分配给交易所用户的地址,但是私钥是用户不可见的,实际控制权在交易所)
在我们的钱包业务中,交易所的充值业务可以直接在交易的发现器里面处理,因为充值业务与其他业务不同,充值业务是我们持续扫链获取到这个交易,在交易发现器里面进行第一次入库处理即可。而提现、归集、热转冷、冷转热交易都是先有项目方进行发起,库中已有了交易记录,在交易发现器中,只需要更新其交易状态即可。
我们可以具体看一下,在交易发现器这个消费者中,各种业务都做了什么:
充值交易:
在充值交易中,由于充值是用户在交易所外部主动发起的,故交易所在扫链同步器扫描到这笔交易并在交易发现器中处理这笔之前,都是不知道有这笔充值业务的。
故我们在交易发现器中,通过同步器标记的交易类型为 ’充值交易‘,我们获得这笔交易后,可以在充值 deposit
表中新增一条这笔充值交易。然后设定充值交易状态为 “充值中”,表示这笔交易已发现,但还未确认。
另外,在充值的用户地址中,有这 balance
和 lockBalance
两个字段,用于锁定用户充值的金额,直到这笔交易被完全确认。例如:
用户充值了 100 eth
lockBalance = oldBalance + 100
当充值交易到达确认位后,balance = balance + 100
;lockBalance = LockBalance - 100
;
在计算用户可用余额的时候,只需要计算 balance
即可
total = balance + lockBalance
;对于充值业务,我们还需要处理的是充值的确认位。即我们还得在这里查询一下我们数据库中历史的充值交易,检查其区块高度和我们正在处理的交易的区块高度的差距。如果:当前区块高度 - 库中充值记录的区块高度 > 确认位
。则我我们可以认为这笔交易已经过了确认位,可以更新这笔充值交易的状态为 “充值完成”
提现交易:
对于提现业务,在交易发现器中,我们只需查出数据库中的这条提现记录,并更新提现交易已完成即可。因为,用户的提现是用户主动向我们交易所中发起提现而来的,这条提现记录已经存在于我们的数据库 “withdraw
” 表中。所以在我们的交易发现器中,处理这笔提现记录只需要修改提现状态为“完成”即可。
与充值交易类似,提现交易的热钱包地址的余额也有区分 balance
和 lockBalance
。(因为提现的发现到我们交易发现器发现链上这笔交易这个过程中存在时间差,所以在这个时间差之间需要锁定这笔交易的资金)
用户发起提现 100 eth
balance = oldBalance -100
;lockBalance = oldLockBalance + 100
交易发现后,lockBalance = LockBalance - 100
归集交易、热转冷、冷转热交易:
对于归集、热转冷、冷转热交易,在这里,我们统一称之为 内部交易,存在数据库的 “internal
” 表中。内部交易和提现业务类型,是由项目方主动向我们钱包层业务发起的,所以这条归集交易的记录也是提前存在我们数据库中。我们交易的发现器在发现这笔内部交易时,也是只需更新我们的内部交易记录的状态为 “完成” 即可。
地址余额处理:
在我们的钱包业务系统中,我们通过 “balance
” 余额表维护了我们交易所所有地址的余额信息,例如我们在充值中提到的 balance
和 lockBalance
,实际上是维护在这一个表中。
在交易发现器中,我们确认了充值、提现、归集、热转冷、冷转热交易,都涉及到余额的变更,所以我们除了更新交易记录外,还需要同步维护地址的余额情况。例如:充值到用户地址中,用户的余额需要加上本次金额。提现中,交易所的热钱包地址需要减去当次交易的金额。
交易流水表: 在交易发现器中,除了充值、提现、归集、热转冷、冷转热的相关表要处理外。我们在此处还有一个总的交易流水表要入库处理。交易流水表实际上是以上所有交易的汇总,用户后续的对账等业务的处理。
对于交易发现器而言,相当于一个消费者的角色,主要负责消费交易同步器生产的交易记录信息。主要是负责消费一个核心管道
businessChannel
这个管道里面的交易记录。
交易发现器实际上,是使用一个协程进行异步处理的,主要负责充值、提现、归集、内部交易的发现处理入库,并且负责保存区块表、余额表的维护。
在充值业务中,因为充值是用户在交易所外部发起的,所以我们一整套的充值业务都是在交易发现器中处理(只需发现处理即可)。(不像提现、归集等在钱包业务层内部发起、在发现器中发现处理)
充值业务的流程可以解释如下:
用户在外部将资金转入交易所的用户地址
交易所的钱包层会一个启动扫链任务(交易同步器),分析区块的高度和 hash
,如果当前数据库的最高区块大于链上最新区块或者出现链不连续(当前区块的 parentHash
不等于上一个区块的 hash
),则说明出现回滚,扫块任务空转,走回滚的逻辑。
扫描到一批区块后,或者这批次区块的交易列表,然后解析筛选出和我们交易所用户地址有关的交易列表。(分析方法为:from
地址为外部地址,to
地址为我们交易所的用户地址,则为一笔充值交易)
区块同步器(生产者)处理完以上内容后,将交易通过管道传输,发送给交易发现器(消费者)去进行处理充值交易。处理方法为:1. 处理管道中的充值交易入库,更新余额表。2. 查出旧的充值交易,超过确认位的充值交易进行确认。
无论是新的充值交易还是旧的充值交易,都需要进行向用户(交易所业务层)去发送通知,告知用户充值已经发现、完成。
协程异步启动交易发现器
/*协程异步处理任务*/
f.tasks.Go(func() error {
log.Info("handle deposit task start")
for batch := range f.BaseSynchronizer.businessChannels {
log.Info("deposit business channel", "batch length", len(batch))
log.Error("=================", "batch length", len(batch))
/* 实现所有交易处理*/
if err := f.handleBatch(batch); err != nil {
log.Info("failed to handle batch, stopping L2 Synchronizer:", "err", err)
return fmt.Errorf("failed to handle batch, stopping L2 Synchronizer: %w", err)
}
}
return nil
})
消费 businessChannel 中的交易 businessChannel 中一个map存放的是所有项目方的这批次的交易列表。将其按项目方取出来, 然后分别对每一笔交易进行入库处理,需要处理的任务如下:
/*
处理所有推送过来交易(一批次,所有有关项目方的都在这个 map 中)
充值:库中原来没有,入库、更新余额。库中的充值更新确认位
提现:库中原来有记录(项目方提交的),更新状态为已发现
归集:库中原来有记录(项目方提交的),更新状态为已发现
热转冷、冷转热:库中原来有记录(项目方提交的),更新状态为已发现
交易流水:入库 transaction 表
*/
func (f *Finder) handleBatch(batch map[string]*BatchTransactions) error {
/*查出项目方列表*/
businessList, err := f.BaseSynchronizer.database.Business.QueryBusinessList()
if err != nil {
log.Error("failed to query business list", "err", err)
return err
}
if businessList == nil || len(businessList) <= 0 {
err := fmt.Errorf("failed to query business list")
return err
}
/*项目方分别处理*/
for _, business := range businessList {
_, exists := batch[business.BusinessUid]
if !exists {
/*不存在则跳过*/
continue
}
/*存库用*/
var (
/*流水表*/
transactionFlowList []*database.Transactions
/*充值表*/
depositList []*database.Deposits
/*提现表*/
withdrawList []*database.Withdraws
/*内部表*/
internals []*database.Internals
/*余额表*/
balances []*database.TokenBalance
)
log.Info("handle business flow", "businessId", business.BusinessUid, "chainLatestBlock", batch[business.BusinessUid].BlockHeight, "txn", len(batch[business.BusinessUid].Transactions))
for _, tx := range batch[business.BusinessUid].Transactions {
/*每笔交易分别处理*/
log.Info("Request transaction from chain account", "txHash", tx.Hash, "fromAddress", tx.FromAddress)
txItem, err := f.BaseSynchronizer.rpcClient.GetTransactionByHash(tx.Hash)
if err != nil {
log.Info("failed to get transaction by hash", "hash", tx.Hash, "err", err)
return err
}
if txItem == nil {
err := fmt.Errorf("GetTransactionByHash txItem is nil: TxHash = %s", tx.Hash)
return err
}
amountBigInt, _ := new(big.Int).SetString(txItem.Value, 10)
log.Info("transaction amount", "amountBigInt", amountBigInt, "FromAddress", tx.FromAddress, "TokenAddress", tx.TokenAddress, "TokenAddress", tx.ToAddress)
/*代币余额,ETH 主币余额*/
balances = append(
balances,
&database.TokenBalance{
FromAddress: common.HexToAddress(tx.FromAddress),
ToAddress: common.HexToAddress(txItem.To),
TokenAddress: common.HexToAddress(txItem.ContractAddress),
Balance: amountBigInt,
TxType: tx.TxType,
},
)
log.Info("get transaction success", "txHash", txItem.Hash)
transactionFlow, err := f.BuildTransaction(tx, txItem)
if err != nil {
log.Info("handle transaction fail", "err", err)
return err
}
/*放入交易流水列表,等待入库*/
transactionFlowList = append(transactionFlowList, transactionFlow)
switch tx.TxType {
/*充值*/
case constant.TxTypeDeposit:
depositItem, _ := f.HandleDeposit(tx, txItem)
depositList = append(depositList, depositItem)
break
/*提现*/
case constant.TxTypeWithdraw:
withdrawItem, _ := f.HandleWithdraw(tx, txItem)
withdrawList = append(withdrawList, withdrawItem)
break
/*内部(归集、转冷、转热)*/
case constant.TxTypeCollection, constant.TxTypeCold2Hot, constant.TxTypeHot2Cold:
internelItem, _ := f.HandleInternalTx(tx, txItem)
internals = append(internals, internelItem)
break
default:
break
}
}
/*数据库重试策略*/
retryStrategy := &retry.ExponentialStrategy{Min: 1000, Max: 20_000, MaxJitter: 250}
/*重试*/
if _, err := retry.Do[interface{}](f.resourceCtx, 10, retryStrategy, func() (interface{}, error) {
/*事务*/
if err := f.BaseSynchronizer.database.Gorm.Transaction(func(tx *gorm.DB) error {
/* 1. 充值业务处理*/
if len(depositList) > 0 {
log.Info("Store deposit transaction success", "totalTx", len(depositList))
if err := f.BaseSynchronizer.database.Deposits.StoreDeposits(business.BusinessUid, depositList); err != nil {
return err
}
}
/* 2. 充值确认位处理*/
if err := f.BaseSynchronizer.database.Deposits.UpdateDepositsConfirms(business.BusinessUid, batch[business.BusinessUid].BlockHeight, uint64(f.confirms)); err != nil {
log.Info("Handle confims fail", "totalTx", "err", err)
return err
}
/* 3. 余额处理*/
if len(balances) > 0 {
log.Info("Handle balances transaction success", "totalTx", len(balances))
if err := f.BaseSynchronizer.database.Balances.UpdateOrCreate(business.BusinessUid, balances); err != nil {
return err
}
}
/* 4. 提现状态处理*/
if len(withdrawList) > 0 {
if err := f.BaseSynchronizer.database.Withdraws.UpdateWithdrawStatusByTxHash(business.BusinessUid, constant.TxStatusWalletDone, withdrawList); err != nil {
return err
}
}
/* 5. 内部交易状态处理*/
if len(internals) > 0 {
if err := f.BaseSynchronizer.database.Internals.UpdateInternalStatusByTxHash(business.BusinessUid, constant.TxStatusWalletDone, internals); err != nil {
return err
}
}
/* 6. 交易流水表入库*/
if len(transactionFlowList) > 0 {
if err := f.BaseSynchronizer.database.Transactions.StoreTransactions(business.BusinessUid, transactionFlowList, uint64(len(transactionFlowList))); err != nil {
return err
}
}
return nil
}); err != nil {
log.Error("unable to persist batch", "err", err)
return nil, err
}
return nil, nil
}); err != nil {
return err
}
}
return nil
}
## 交易发现器(消费者)、充值业务测试
1. **启动之前余额**

2. **转入资金**

3. **运行 ./exchange-wallet-service work**

4. **启动之后余额(等待确认位之后(10 个块))**

如果觉得我的文章对您有用,请随意打赏。你的支持将鼓励我继续创作!