钱包业务层 - 7. 实现交易所钱包通知业务

通知业务指得是,交易所钱包将交易的状态变更情况通知上层调用者,告知他们某笔交易的状态如何。

通知业务是什么

通知业务指得是,交易所钱包将交易的状态变更情况通知上层调用者,告知他们某笔交易的状态如何。

完整项目 github 地址(如果对您有用,请给个小 star ⭐️):

  1. exchange-wallet-service:钱包业务层服务: <https://github.com/Shawn-Shaw-x/exchange-wallet-service>
  2. signature-machine:离线签名机:  <https://github.com/Shawn-Shaw-x/signature-machine>
  3. chains-union-rpc:多链统一 rpc:  <https://github.com/Shawn-Shaw-x/chains-union-rpc>

在交易所的通知业务中,钱包的通知实现相对比较简单。因为我们所有的交易请求都是有项目方(或者交易所业务层)请求调用的,为了保证我们整个交易所系统的安全性,所以我们钱包业务中,不会去主动构造交易。 所以说,我们在进行发起链上交易交互的时候,我们钱包业务中,所需要做的就是:给上层调用者发送通知,告诉他们这笔交易的状态是怎样的,方便他们进行相关业务操作。 例如:

  • 充值:
    1. 钱包发现这笔充值交易,通知业务交易已上账。
  1. 钱包发现某笔交易过了确认位,通知业务某笔交易已成功。
  • 回滚:
    1. 钱包发现某笔交易被回滚,通知业务交易已被回滚。

通知业务的流程图

image.png

交易所钱包系统中,我们的交易发现器会将交易从链上发现,并且变更交易的状态到数据库中。

  1. 通过查询数据库获取需要发送通知但还未通知的交易。例如充值上账、充值确认、提现已广播、交易回滚等。

  2. 构建相应的通知内容,通过项目方(业务层)配置的 http 接口进行发送出去

  3. 如果发送成功,则变更交易的通知状态为已成功通知

通知业务的实现

来上代码:


/*启动通知任务*/
func (nf *Notifier) Start() error {
    log.Info("start notifier worker...")
    nf.tasks.Go(func() error {
        for {
            select {
            case &lt;-nf.ticker.C:
                var txn []Transaction
                /*每个项目方去查询相应业务表,发出通知*/
                for _, businessId := range nf.businessIds {
                    log.Info("start notifier business", "business", businessId, "txn", txn)

                    /*查出应通知的充值交易*/
                    needNotifyDeposits, err := nf.db.Deposits.QueryNotifyDeposits(businessId)
                    if err != nil {
                        log.Error("Query notify deposits fail", "err", err)
                    }
                    /*查出应通知的提现*/
                    needNotifyWithdraws, err := nf.db.Withdraws.QueryNotifyWithdraws(businessId)
                    if err != nil {
                        log.Error("Query notify withdraw fail", "err", err)
                    }
                    /*查出应通知的内部交易*/
                    needNotifyInternals, err := nf.db.Internals.QueryNotifyInternal(businessId)
                    if err != nil {
                        log.Error("Query notify internal fail", "err", err)
                    }

                    /*构建通知请求体*/
                    notifyRequest, err := nf.BuildNotifyTransaction(needNotifyDeposits, needNotifyWithdraws, needNotifyInternals)
                    if err != nil {
                        log.Error("Build notify transaction fail", "err", err)
                    }
                    if notifyRequest.Txn == nil || len(notifyRequest.Txn) == 0 {
                        log.Warn("no notify transaction to notify, wait for notify")
                        continue
                    }

                    /*发送通知*/
                    notify, err := nf.notifier[businessId].BusinessNotify(notifyRequest)
                    if err != nil {
                        log.Error("notify business platform fail", "err", err)
                    }
                    log.Info("After notify", "business", businessId, "notifyStatus", notify, "deposits", needNotifyDeposits, "err", err)
                    err = nf.AfterNotify(businessId, notify, needNotifyDeposits, needNotifyWithdraws, needNotifyInternals)
                    if err != nil {
                        log.Error("change notified status fail", "err", err)
                    }

                }
            case &lt;-nf.resourceCtx.Done():
                log.Info("notifier worker shutting down")
                return nil
            }
        }
    })
    return nil
}

/*通知之前:更新通知前状态*/
func (nf *Notifier) AfterNotify(businessId string, notifySuccess bool, deposits []*database.Deposits, withdraws []*database.Withdraws, internals []*database.Internals) error {
    if !notifySuccess {
        log.Warn("notify business platform fail", "business", businessId)
        return fmt.Errorf("notify business platform fail, businessId: %v", businessId)
    }
    depositsNotifyStatus := constant.TxStatusNotified
    withdrawNotifyStatus := constant.TxStatusNotified
    internalNotifyStatus := constant.TxStatusNotified

    // 过滤状态为 0 的交易
    var updateStutusDepositTxn []*database.Deposits
    for _, deposit := range deposits {
        if deposit.Status != constant.TxStatusCreateUnsigned {
            updateStutusDepositTxn = append(updateStutusDepositTxn, deposit)
        }
    }
    /*更新通知前状态(待通知)*/
    retryStrategy := &retry.ExponentialStrategy{Min: 1000, Max: 20_000, MaxJitter: 250}
    if _, err := retry.Do[interface{}](nf.resourceCtx, 10, retryStrategy, func() (interface{}, error) {
        if err := nf.db.Transaction(func(tx *database.DB) error {
            if len(deposits) > 0 {
                if err := tx.Deposits.UpdateDepositsStatusByTxHash(businessId, depositsNotifyStatus, updateStutusDepositTxn); err != nil {
                    return err
                }
            }
            if len(withdraws) > 0 {
                if err := tx.Withdraws.UpdateWithdrawStatusByTxHash(businessId, withdrawNotifyStatus, withdraws); err != nil {
                    return err
                }
            }

            if len(internals) > 0 {
                if err := tx.Internals.UpdateInternalStatusByTxHash(businessId, internalNotifyStatus, internals); err != nil {
                    return err
                }
            }
            return nil
        }); err != nil {
            log.Error("unable to persist batch", "err", err)
            return nil, err
        }
        return nil, nil
    }); err != nil {
        return err
    }
    return nil
}

通知业务测试

  1. 写个程序用于模拟项目方(钱包层)接收通知
    
    type NotifyRequest struct {
    Txn []httpclient.Transaction `json:"txn"`
    }

func main() { http.HandleFunc("/exchange-wallet/notify", func(w http.ResponseWriter, r *http.Request) { log.Println("📩 Received a request")

    body, err := io.ReadAll(r.Body)
    if err != nil {
        http.Error(w, "failed to read body", http.StatusInternalServerError)
        return
    }
    defer r.Body.Close()

    var req NotifyRequest
    if err := json.Unmarshal(body, &req); err != nil {
        http.Error(w, "invalid JSON", http.StatusBadRequest)
        log.Println("❌ Invalid JSON:", err)
        return
    }

    // 打印格式化的 JSON
    fmt.Println("🧾 Parsed JSON request:")
    pretty, _ := json.MarshalIndent(req, "", "  ")
    fmt.Println(string(pretty))

    w.Header().Set("Content-Type", "application/json")
    w.Write([]byte(`{"success":true}`))
})

addr := "127.0.0.1:9997/exchange-wallet/notify"
log.Println("🚀 Mock Notify Server listening on", addr)
if err := http.ListenAndServe("127.0.0.1:9997", nil); err != nil {
    log.Fatal("❌ Server failed:", err)
}

}


2. **启动这个模拟程序**

![image.png](https://img.learnblockchain.cn/attachments/2025/05/iUy82kst68278ae371eea.png)

3. **充值一笔试试,等待 10 个确认位**

![image.png](https://img.learnblockchain.cn/attachments/2025/05/GcYKPwGq68278af88ea9b.png)

至此,钱包的所有业务解析完毕!!!

`下篇文章来给大家总结下交易所钱包项目吧(画个巨大的图来整合所有业务模块)`

后续,钱包有一些 bug 以及优化什么的,等我慢慢 `fix` 吧 hhh
如果有愿意参加这个开源项目去参与贡献的,欢迎联系我(或者直接 `github` 上 `issue` 交流也可以的)
点赞 0
收藏 0
分享
本文参与登链社区写作激励计划 ,好文好收益,欢迎正在阅读的你也加入。

0 条评论

请先 登录 后评论
shawn_shaw
shawn_shaw
web3潜水员、技术爱好者、web3钱包开发工程师。欢迎闲聊唠嗑、精进技术、交流工作机会。vx:cola_ocean