使用Go实现扫块充值功能

  • 曲弯
  • 发布于 1天前
  • 阅读 60

使用Go实现扫块充值功能下面是一个完整的、可用于生产环境的Go语言实现方案,用于实现交易所的扫块充值功能。本方案基于以太坊网络,但架构设计支持扩展到其他EVM兼容链。一、项目结构与依赖1.1项目目录结构block-scanner/├──cmd/│└──scanner/│

<!--StartFragment-->

使用Go实现扫块充值功能

下面是一个完整的、可用于生产环境的Go语言实现方案,用于实现交易所的扫块充值功能。本方案基于以太坊网络,但架构设计支持扩展到其他EVM兼容链。

一、项目结构与依赖

1.1 项目目录结构

block-scanner/
├── cmd/
│   └── scanner/
│       └── main.go          # 程序入口
├── internal/
│   ├── config/
│   │   └── config.go        # 配置管理
│   ├── scanner/
│   │   ├── scanner.go       # 扫描器主逻辑
│   │   ├── processor.go     # 交易处理器
│   │   └── storage.go       # 存储接口
│   ├── model/
│   │   ├── transaction.go   # 交易模型
│   │   └── address.go       # 地址模型
│   └── ethclient/
│       └── client.go        # 以太坊客户端封装
├── pkg/
│   └── database/
│       ├── redis.go         # Redis客户端
│       └── postgres.go      # PostgreSQL客户端
├── go.mod
└── go.sum

1.2 主要依赖

// go.mod
module block-scanner

go 1.21

require (
    github.com/ethereum/go-ethereum v1.13.5
    github.com/redis/go-redis/v9 v9.4.0
    github.com/jackc/pgx/v5 v5.5.0
    github.com/spf13/viper v1.17.0
    go.uber.org/zap v1.26.0
    github.com/robfig/cron/v3 v3.0.1
)

二、核心实现代码

2.1 配置管理 (internal/config/config.go)

package config

import (
    "fmt"
    "time"

    "github.com/spf13/viper"
)

type Config struct {
    // 节点配置
    Ethereum struct {
        RPCURL      string `mapstructure:"rpc_url"`
        WebSocketURL string `mapstructure:"ws_url"`
        ChainID     int64  `mapstructure:"chain_id"`
    } `mapstructure:"ethereum"`

    // 扫描配置
    Scanner struct {
        StartBlock          uint64        `mapstructure:"start_block"`
        ConfirmationBlocks  uint64        `mapstructure:"confirmation_blocks"`
        ScanInterval        time.Duration `mapstructure:"scan_interval"`
        BatchSize           uint64        `mapstructure:"batch_size"`
        MaxRetries          int           `mapstructure:"max_retries"`
        RetryDelay          time.Duration `mapstructure:"retry_delay"`
    } `mapstructure:"scanner"`

    // 数据库配置
    Database struct {
        PostgresDSN string `mapstructure:"postgres_dsn"`
        RedisURL    string `mapstructure:"redis_url"`
    } `mapstructure:"database"`

    // 服务配置
    Server struct {
        Port            int           `mapstructure:"port"`
        ShutdownTimeout time.Duration `mapstructure:"shutdown_timeout"`
    } `mapstructure:"server"`
}

func LoadConfig(configPath string) (*Config, error) {
    viper.SetConfigFile(configPath)
    viper.SetConfigType("yaml")

    // 设置默认值
    viper.SetDefault("scanner.scan_interval", "3s")
    viper.SetDefault("scanner.batch_size", 100)
    viper.SetDefault("scanner.max_retries", 3)
    viper.SetDefault("scanner.retry_delay", "1s")
    viper.SetDefault("scanner.confirmation_blocks", 12)
    viper.SetDefault("server.port", 8080)
    viper.SetDefault("server.shutdown_timeout", "30s")

    if err := viper.ReadInConfig(); err != nil {
        return nil, fmt.Errorf("读取配置文件失败: %w", err)
    }

    var config Config
    if err := viper.Unmarshal(&config); err != nil {
        return nil, fmt.Errorf("解析配置文件失败: %w", err)
    }

    return &config, nil
}

2.2 数据模型 (internal/model/transaction.go)

package model

import (
    "time"
    "math/big"
)

// 交易类型
type TransactionType string

const (
    TxTypeETH  TransactionType = "ETH"
    TxTypeERC20 TransactionType = "ERC20"
    TxTypeERC721 TransactionType = "ERC721"
)

// 交易状态
type TransactionStatus string

const (
    TxStatusPending   TransactionStatus = "pending"
    TxStatusConfirmed TransactionStatus = "confirmed"
    TxStatusFailed    TransactionStatus = "failed"
    TxStatusDropped   TransactionStatus = "dropped"
)

// 链上交易
type BlockchainTransaction struct {
    ID                string            `json:"id" db:"id"`
    TxHash            string            `json:"tx_hash" db:"tx_hash"`
    BlockNumber       uint64            `json:"block_number" db:"block_number"`
    BlockHash         string            `json:"block_hash" db:"block_hash"`
    FromAddress       string            `json:"from_address" db:"from_address"`
    ToAddress         string            `json:"to_address" db:"to_address"`
    Value             *big.Int          `json:"value" db:"value"`
    GasPrice          *big.Int          `json:"gas_price" db:"gas_price"`
    GasUsed           uint64            `json:"gas_used" db:"gas_used"`
    Nonce             uint64            `json:"nonce" db:"nonce"`
    TransactionIndex  uint              `json:"transaction_index" db:"transaction_index"`
    Status            TransactionStatus `json:"status" db:"status"`
    Type              TransactionType   `json:"type" db:"type"`
    TokenAddress      *string           `json:"token_address,omitempty" db:"token_address"`
    TokenSymbol       *string           `json:"token_symbol,omitempty" db:"token_symbol"`
    TokenDecimals     *int              `json:"token_decimals,omitempty" db:"token_decimals"`
    Confirmations     uint64            `json:"confirmations" db:"confirmations"`
    CreatedAt         time.Time         `json:"created_at" db:"created_at"`
    UpdatedAt         time.Time         `json:"updated_at" db:"updated_at"`
}

// 充值记录
type Deposit struct {
    ID                string            `json:"id" db:"id"`
    UserID            string            `json:"user_id" db:"user_id"`
    TransactionID     string            `json:"transaction_id" db:"transaction_id"`
    Address           string            `json:"address" db:"address"`
    Amount            *big.Int          `json:"amount" db:"amount"`
    AssetType         TransactionType   `json:"asset_type" db:"asset_type"`
    Status            TransactionStatus `json:"status" db:"status"`
    Confirmations     uint64            `json:"confirmations" db:"confirmations"`
    ConfirmedAt       *time.Time        `json:"confirmed_at,omitempty" db:"confirmed_at"`
    CreatedAt         time.Time         `json:"created_at" db:"created_at"`
    UpdatedAt         time.Time         `json:"updated_at" db:"updated_at"`
}

// 用户地址映射
type UserAddress struct {
    ID        string    `json:"id" db:"id"`
    UserID    string    `json:"user_id" db:"user_id"`
    Address   string    `json:"address" db:"address"`
    ChainType string    `json:"chain_type" db:"chain_type"`
    IsActive  bool      `json:"is_active" db:"is_active"`
    CreatedAt time.Time `json:"created_at" db:"created_at"`
}

2.3 以太坊客户端封装 (internal/ethclient/client.go)

package ethclient

import (
    "context"
    "fmt"
    "math/big"
    "time"

    "github.com/ethereum/go-ethereum"
    "github.com/ethereum/go-ethereum/common"
    "github.com/ethereum/go-ethereum/core/types"
    "github.com/ethereum/go-ethereum/ethclient"
    "go.uber.org/zap"
)

type Client struct {
    rpcClient  *ethclient.Client
    wsClient   *ethclient.Client
    logger     *zap.Logger
    rpcURL     string
    wsURL      string
    chainID    *big.Int
}

func NewClient(rpcURL, wsURL string, logger *zap.Logger) (*Client, error) {
    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    defer cancel()

    // 创建RPC客户端
    rpcClient, err := ethclient.DialContext(ctx, rpcURL)
    if err != nil {
        return nil, fmt.Errorf("连接RPC节点失败: %w", err)
    }

    // 获取链ID
    chainID, err := rpcClient.ChainID(ctx)
    if err != nil {
        return nil, fmt.Errorf("获取链ID失败: %w", err)
    }

    var wsClient *ethclient.Client
    if wsURL != "" {
        wsClient, err = ethclient.DialContext(ctx, wsURL)
        if err != nil {
            logger.Warn("WebSocket连接失败,将使用轮询模式", zap.Error(err))
        }
    }

    return &Client{
        rpcClient: rpcClient,
        wsClient:  wsClient,
        logger:    logger,
        rpcURL:    rpcURL,
        wsURL:     wsURL,
        chainID:   chainID,
    }, nil
}

// 获取最新区块号
func (c *Client) GetLatestBlockNumber(ctx context.Context) (uint64, error) {
    return c.rpcClient.BlockNumber(ctx)
}

// 获取区块详情
func (c *Client) GetBlockByNumber(ctx context.Context, blockNumber uint64) (*types.Block, error) {
    return c.rpcClient.BlockByNumber(ctx, big.NewInt(int64(blockNumber)))
}

// 获取交易详情
func (c *Client) GetTransactionReceipt(ctx context.Context, txHash common.Hash) (*types.Receipt, error) {
    return c.rpcClient.TransactionReceipt(ctx, txHash)
}

// 获取交易详情(包含发送方)
func (c *Client) GetTransactionByHash(ctx context.Context, txHash common.Hash) (*types.Transaction, bool, error) {
    return c.rpcClient.TransactionByHash(ctx, txHash)
}

// 订阅新区块头(WebSocket)
func (c *Client) SubscribeNewHead(ctx context.Context) (chan *types.Header, ethereum.Subscription, error) {
    if c.wsClient == nil {
        return nil, nil, fmt.Errorf("WebSocket客户端未初始化")
    }

    headers := make(chan *types.Header)
    sub, err := c.wsClient.SubscribeNewHead(ctx, headers)
    if err != nil {
        return nil, nil, err
    }

    return headers, sub, nil
}

// 关闭连接
func (c *Client) Close() {
    if c.rpcClient != nil {
        c.rpcClient.Close()
    }
    if c.wsClient != nil {
        c.wsClient.Close()
    }
}

2.4 存储接口 (internal/scanner/storage.go)

package scanner

import (
    "context"
    "math/big"

    "block-scanner/internal/model"
)

// Storage 定义数据存储接口
type Storage interface {
    // 地址管理
    GetUserAddresses(ctx context.Context) ([]model.UserAddress, error)
    GetUserByAddress(ctx context.Context, address string) (*model.UserAddress, error)

    // 交易管理
    SaveTransaction(ctx context.Context, tx *model.BlockchainTransaction) error
    GetTransactionByHash(ctx context.Context, txHash string) (*model.BlockchainTransaction, error)
    UpdateTransactionStatus(ctx context.Context, txHash string, status model.TransactionStatus, confirmations uint64) error

    // 充值管理
    CreateDeposit(ctx context.Context, deposit *model.Deposit) error
    UpdateDepositStatus(ctx context.Context, depositID string, status model.TransactionStatus, confirmations uint64) error
    GetPendingDeposits(ctx context.Context) ([]model.Deposit, error)

    // 扫描状态管理
    GetLastScannedBlock(ctx context.Context) (uint64, error)
    SaveLastScannedBlock(ctx context.Context, blockNumber uint64) error

    // 批量操作
    BatchSaveTransactions(ctx context.Context, txs []*model.BlockchainTransaction) error
    BatchCreateDeposits(ctx context.Context, deposits []*model.Deposit) error
}

// RedisStorage Redis实现
type RedisStorage struct {
    // Redis客户端实例
}

// PostgresStorage PostgreSQL实现
type PostgresStorage struct {
    // PostgreSQL客户端实例
}

2.5 交易处理器 (internal/scanner/processor.go)

package scanner

import (
    "context"
    "fmt"
    "math/big"
    "strings"

    "github.com/ethereum/go-ethereum/common"
    "github.com/ethereum/go-ethereum/core/types"
    "go.uber.org/zap"

    "block-scanner/internal/model"
)

// ERC20 Transfer 事件签名
var erc20TransferEventSig = common.HexToHash("0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef")

type TransactionProcessor struct {
    storage Storage
    logger  *zap.Logger
    client  *ethclient.Client
}

func NewTransactionProcessor(storage Storage, client *ethclient.Client, logger *zap.Logger) *TransactionProcessor {
    return &TransactionProcessor{
        storage: storage,
        logger:  logger,
        client:  client,
    }
}

// 处理区块中的所有交易
func (p *TransactionProcessor) ProcessBlock(ctx context.Context, block *types.Block) error {
    p.logger.Info("开始处理区块",
        zap.Uint64("block_number", block.NumberU64()),
        zap.Int("transaction_count", len(block.Transactions())),
    )

    var deposits []*model.Deposit
    var transactions []*model.BlockchainTransaction

    // 获取所有用户地址用于快速匹配
    userAddresses, err := p.storage.GetUserAddresses(ctx)
    if err != nil {
        return fmt.Errorf("获取用户地址失败: %w", err)
    }

    // 构建地址映射表
    addressMap := make(map[string]model.UserAddress)
    for _, addr := range userAddresses {
        addressMap[strings.ToLower(addr.Address)] = addr
    }

    // 处理普通ETH转账
    for _, tx := range block.Transactions() {
        // 获取交易发送方
        from, err := types.Sender(types.LatestSignerForChainID(p.client.ChainID), tx)
        if err != nil {
            p.logger.Warn("获取交易发送方失败", zap.Error(err))
            continue
        }

        to := tx.To()
        if to == nil {
            // 合约创建交易,跳过
            continue
        }

        // 检查是否为充值地址
        toAddressLower := strings.ToLower(to.Hex())
        if userAddr, exists := addressMap[toAddressLower]; exists {
            // 获取交易收据
            receipt, err := p.client.GetTransactionReceipt(ctx, tx.Hash())
            if err != nil {
                p.logger.Warn("获取交易收据失败", zap.Error(err))
                continue
            }

            // 检查交易状态
            if receipt.Status != types.ReceiptStatusSuccessful {
                p.logger.Info("交易失败,跳过", zap.String("tx_hash", tx.Hash().Hex()))
                continue
            }

            // 保存交易记录
            blockchainTx := &model.BlockchainTransaction{
                ID:               generateID(),
                TxHash:           tx.Hash().Hex(),
                BlockNumber:      block.NumberU64(),
                BlockHash:        block.Hash().Hex(),
                FromAddress:      from.Hex(),
                ToAddress:        to.Hex(),
                Value:            tx.Value(),
                GasPrice:         tx.GasPrice(),
                GasUsed:          receipt.GasUsed,
                Nonce:            tx.Nonce(),
                TransactionIndex: receipt.TransactionIndex,
                Status:           model.TxStatusConfirmed,
                Type:             model.TxTypeETH,
                Confirmations:    1, // 初始确认数
                CreatedAt:        time.Now(),
                UpdatedAt:        time.Now(),
            }
            transactions = append(transactions, blockchainTx)

            // 创建充值记录
            deposit := &model.Deposit{
                ID:            generateID(),
                UserID:        userAddr.UserID,
                TransactionID: blockchainTx.ID,
                Address:       to.Hex(),
                Amount:        tx.Value(),
                AssetType:     model.TxTypeETH,
                Status:        model.TxStatusConfirmed,
                Confirmations: 1,
                CreatedAt:     time.Now(),
                UpdatedAt:     time.Now(),
            }
            deposits = append(deposits, deposit)

            p.logger.Info("发现ETH充值",
                zap.String("user_id", userAddr.UserID),
                zap.String("tx_hash", tx.Hash().Hex()),
                zap.String("amount", tx.Value().String()),
            )
        }
    }

    // 处理合约日志(ERC20转账)
    for _, receipt := range block.Receipts() {
        for _, log := range receipt.Logs {
            // 检查是否为ERC20 Transfer事件
            if len(log.Topics) == 3 && log.Topics[0] == erc20TransferEventSig {
                // 解析事件参数
                from := common.BytesToAddress(log.Topics[1].Bytes())
                to := common.BytesToAddress(log.Topics[2].Bytes())
                value := new(big.Int).SetBytes(log.Data)

                // 检查是否为充值地址
                toAddressLower := strings.ToLower(to.Hex())
                if userAddr, exists := addressMap[toAddressLower]; exists {
                    // 获取交易详情
                    tx, _, err := p.client.GetTransactionByHash(ctx, receipt.TxHash)
                    if err != nil {
                        p.logger.Warn("获取交易详情失败", zap.Error(err))
                        continue
                    }

                    // 保存交易记录
                    blockchainTx := &model.BlockchainTransaction{
                        ID:               generateID(),
                        TxHash:           receipt.TxHash.Hex(),
                        BlockNumber:      block.NumberU64(),
                        BlockHash:        block.Hash().Hex(),
                        FromAddress:      from.Hex(),
                        ToAddress:        to.Hex(),
                        Value:            value,
                        GasPrice:         tx.GasPrice(),
                        GasUsed:          receipt.GasUsed,
                        Nonce:            tx.Nonce(),
                        TransactionIndex: receipt.TransactionIndex,
                        Status:           model.TxStatusConfirmed,
                        Type:             model.TxTypeERC20,
                        TokenAddress:     &log.Address.Hex(),
                        Confirmations:    1,
                        CreatedAt:        time.Now(),
                        UpdatedAt:        time.Now(),
                    }
                    transactions = append(transactions, blockchainTx)

                    // 创建充值记录
                    deposit := &model.Deposit{
                        ID:            generateID(),
                        UserID:        userAddr.UserID,
                        TransactionID: blockchainTx.ID,
                        Address:       to.Hex(),
                        Amount:        value,
                        AssetType:     model.TxTypeERC20,
                        Status:        model.TxStatusConfirmed,
                        Confirmations: 1,
                        CreatedAt:     time.Now(),
                        UpdatedAt:     time.Now(),
                    }
                    deposits = append(deposits, deposit)

                    p.logger.Info("发现ERC20充值",
                        zap.String("user_id", userAddr.UserID),
                        zap.String("tx_hash", receipt.TxHash.Hex()),
                        zap.String("token_address", log.Address.Hex()),
                        zap.String("amount", value.String()),
                    )
                }
            }
        }
    }

    // 批量保存数据
    if len(transactions) > 0 {
        if err := p.storage.BatchSaveTransactions(ctx, transactions); err != nil {
            return fmt.Errorf("批量保存交易失败: %w", err)
        }
    }

    if len(deposits) > 0 {
        if err := p.storage.BatchCreateDeposits(ctx, deposits); err != nil {
            return fmt.Errorf("批量创建充值记录失败: %w", err)
        }
    }

    p.logger.Info("区块处理完成",
        zap.Uint64("block_number", block.NumberU64()),
        zap.Int("transactions_saved", len(transactions)),
        zap.Int("deposits_created", len(deposits)),
    )

    return nil
}

// 更新交易确认数
func (p *TransactionProcessor) UpdateConfirmations(ctx context.Context, latestBlockNumber uint64) error {
    // 获取待确认的充值记录
    pendingDeposits, err := p.storage.GetPendingDeposits(ctx)
    if err != nil {
        return fmt.Errorf("获取待确认充值记录失败: %w", err)
    }

    for _, deposit := range pendingDeposits {
        // 获取交易详情
        tx, err := p.storage.GetTransactionByHash(ctx, deposit.TransactionID)
        if err != nil {
            p.logger.Warn("获取交易详情失败", zap.Error(err))
            continue
        }

        // 计算确认数
        if tx.BlockNumber > latestBlockNumber {
            continue
        }

        confirmations := latestBlockNumber - tx.BlockNumber + 1

        // 更新交易确认数
        if err := p.storage.UpdateTransactionStatus(ctx, tx.TxHash, tx.Status, confirmations); err != nil {
            p.logger.Warn("更新交易确认数失败", zap.Error(err))
            continue
        }

        // 更新充值记录确认数
        if err := p.storage.UpdateDepositStatus(ctx, deposit.ID, deposit.Status, confirmations); err != nil {
            p.logger.Warn("更新充值记录确认数失败", zap.Error(err))
            continue
        }

        // 如果达到足够确认数,触发余额更新
        if confirmations >= p.confirmationBlocks {
            p.triggerBalanceUpdate(ctx, deposit)
        }
    }

    return nil
}

func (p *TransactionProcessor) triggerBalanceUpdate(ctx context.Context, deposit *model.Deposit) {
    // 这里应该调用内部API或消息队列来更新用户余额
    p.logger.Info("触发余额更新",
        zap.String("deposit_id", deposit.ID),
        zap.String("user_id", deposit.UserID),
        zap.String("amount", deposit.Amount.String()),
    )
}

func generateID() string {
    // 使用UUID或雪花算法生成唯一ID
    return common.BytesToHash([]byte(time.Now().String())).Hex()
}

2.6 扫描器主逻辑 (internal/scanner/scanner.go)

package scanner

import (
    "context"
    "fmt"
    "sync"
    "time"

    "github.com/ethereum/go-ethereum/core/types"
    "github.com/robfig/cron/v3"
    "go.uber.org/zap"

    "block-scanner/internal/config"
    "block-scanner/internal/ethclient"
)

type Scanner struct {
    config    *config.Config
    client    *ethclient.Client
    storage   Storage
    processor *TransactionProcessor
    logger    *zap.Logger

    // 控制变量
    mu              sync.RWMutex
    isRunning       bool
    lastBlockNumber uint64
    shutdownChan    chan struct{}
    wg              sync.WaitGroup
}

func NewScanner(cfg *config.Config, storage Storage, logger *zap.Logger) (*Scanner, error) {
    // 创建以太坊客户端
    client, err := ethclient.NewClient(cfg.Ethereum.RPCURL, cfg.Ethereum.WebSocketURL, logger)
    if err != nil {
        return nil, fmt.Errorf("创建以太坊客户端失败: %w", err)
    }

    // 创建交易处理器
    processor := NewTransactionProcessor(storage, client, logger)

    return &Scanner{
        config:    cfg,
        client:    client,
        storage:   storage,
        processor: processor,
        logger:    logger,
        shutdownChan: make(chan struct{}),
    }, nil
}

// 启动扫描器
func (s *Scanner) Start(ctx context.Context) error {
    s.mu.Lock()
    if s.isRunning {
        s.mu.Unlock()
        return fmt.Errorf("扫描器已在运行")
    }
    s.isRunning = true
    s.mu.Unlock()

    s.logger.Info("启动区块扫描器")

    // 启动扫描循环
    s.wg.Add(1)
    go s.scanLoop(ctx)

    // 启动确认数更新定时任务
    s.wg.Add(1)
    go s.confirmationUpdateLoop(ctx)

    // 启动监控指标
    s.wg.Add(1)
    go s.metricsLoop(ctx)

    return nil
}

// 扫描循环
func (s *Scanner) scanLoop(ctx context.Context) {
    defer s.wg.Done()

    // 获取上次扫描的区块
    lastScannedBlock, err := s.storage.GetLastScannedBlock(ctx)
    if err != nil {
        s.logger.Error("获取上次扫描区块失败", zap.Error(err))
        lastScannedBlock = s.config.Scanner.StartBlock
    }

    // 如果WebSocket可用,使用事件驱动模式
    if s.client.WSClient != nil {
        s.logger.Info("使用WebSocket事件驱动模式")
        s.eventDrivenScan(ctx, lastScannedBlock)
    } else {
        s.logger.Info("使用轮询模式")
        s.pollingScan(ctx, lastScannedBlock)
    }
}

// 事件驱动扫描(WebSocket)
func (s *Scanner) eventDrivenScan(ctx context.Context, startBlock uint64) {
    // 先补扫历史区块
    if err := s.catchUpHistoricalBlocks(ctx, startBlock); err != nil {
        s.logger.Error("补扫历史区块失败", zap.Error(err))
    }

    // 订阅新区块
    headers, sub, err := s.client.SubscribeNewHead(ctx)
    if err != nil {
        s.logger.Error("订阅新区块失败,切换到轮询模式", zap.Error(err))
        s.pollingScan(ctx, s.lastBlockNumber)
        return
    }
    defer sub.Unsubscribe()

    for {
        select {
        case &lt;-s.shutdownChan:
            s.logger.Info("收到关闭信号,停止扫描")
            return
        case &lt;-ctx.Done():
            s.logger.Info("上下文取消,停止扫描")
            return
        case err := &lt;-sub.Err():
            s.logger.Error("订阅错误", zap.Error(err))
            // 切换到轮询模式
            s.pollingScan(ctx, s.lastBlockNumber)
            return
        case header := &lt;-headers:
            s.processNewBlock(ctx, header.Number.Uint64())
        }
    }
}

// 轮询扫描
func (s *Scanner) pollingScan(ctx context.Context, startBlock uint64) {
    ticker := time.NewTicker(s.config.Scanner.ScanInterval)
    defer ticker.Stop()

    // 先补扫历史区块
    if err := s.catchUpHistoricalBlocks(ctx, startBlock); err != nil {
        s.logger.Error("补扫历史区块失败", zap.Error(err))
    }

    for {
        select {
        case &lt;-s.shutdownChan:
            s.logger.Info("收到关闭信号,停止扫描")
            return
        case &lt;-ctx.Done():
            s.logger.Info("上下文取消,停止扫描")
            return
        case &lt;-ticker.C:
            // 获取最新区块号
            latestBlock, err := s.client.GetLatestBlockNumber(ctx)
            if err != nil {
                s.logger.Error("获取最新区块号失败", zap.Error(err))
                continue
            }

            // 如果已经是最新区块,等待下次轮询
            if s.lastBlockNumber >= latestBlock {
                continue
            }

            // 扫描新区块
            nextBlock := s.lastBlockNumber + 1
            for blockNum := nextBlock; blockNum &lt;= latestBlock; blockNum++ {
                if err := s.processBlockByNumber(ctx, blockNum); err != nil {
                    s.logger.Error("处理区块失败",
                        zap.Uint64("block_number", blockNum),
                        zap.Error(err),
                    )
                    break
                }
                s.lastBlockNumber = blockNum
            }

            // 保存扫描进度
            if err := s.storage.SaveLastScannedBlock(ctx, s.lastBlockNumber); err != nil {
                s.logger.Error("保存扫描进度失败", zap.Error(err))
            }
        }
    }
}

// 补扫历史区块
func (s *Scanner) catchUpHistoricalBlocks(ctx context.Context, startBlock uint64) error {
    s.logger.Info("开始补扫历史区块",
        zap.Uint64("start_block", startBlock),
        zap.Uint64("batch_size", s.config.Scanner.BatchSize),
    )

    // 获取最新区块号
    latestBlock, err := s.client.GetLatestBlockNumber(ctx)
    if err != nil {
        return fmt.Errorf("获取最新区块号失败: %w", err)
    }

    // 如果起始区块已经超过最新区块,直接返回
    if startBlock > latestBlock {
        s.lastBlockNumber = latestBlock
        return nil
    }

    // 批量处理历史区块
    for fromBlock := startBlock; fromBlock &lt;= latestBlock; fromBlock += s.config.Scanner.BatchSize {
        select {
        case &lt;-s.shutdownChan:
            return fmt.Errorf("扫描被中断")
        case &lt;-ctx.Done():
            return ctx.Err()
        default:
            toBlock := fromBlock + s.config.Scanner.BatchSize - 1
            if toBlock > latestBlock {
                toBlock = latestBlock
            }

            s.logger.Info("批量处理历史区块",
                zap.Uint64("from_block", fromBlock),
                zap.Uint64("to_block", toBlock),
            )

            for blockNum := fromBlock; blockNum &lt;= toBlock; blockNum++ {
                if err := s.processBlockByNumber(ctx, blockNum); err != nil {
                    return fmt.Errorf("处理区块 %d 失败: %w", blockNum, err)
                }
                s.lastBlockNumber = blockNum
            }

            // 保存进度
            if err := s.storage.SaveLastScannedBlock(ctx, s.lastBlockNumber); err != nil {
                s.logger.Error("保存扫描进度失败", zap.Error(err))
            }

            // 避免请求过快
            time.Sleep(100 * time.Millisecond)
        }
    }

    s.logger.Info("历史区块补扫完成",
        zap.Uint64("last_scanned_block", s.lastBlockNumber),
    )

    return nil
}

// 处理指定区块号的区块
func (s *Scanner) processBlockByNumber(ctx context.Context, blockNumber uint64) error {
    // 重试机制
    var lastErr error
    for i := 0; i &lt; s.config.Scanner.MaxRetries; i++ {
        block, err := s.client.GetBlockByNumber(ctx, blockNumber)
        if err != nil {
            lastErr = err
            s.logger.Warn("获取区块失败,准备重试",
                zap.Uint64("block_number", blockNumber),
                zap.Int("retry", i+1),
                zap.Error(err),
            )
            time.Sleep(s.config.Scanner.RetryDelay)
            continue
        }

        // 处理区块
        if err := s.processor.ProcessBlock(ctx, block); err != nil {
            return fmt.Errorf("处理区块失败: %w", err)
        }

        return nil
    }

    return fmt.Errorf("获取区块 %d 失败,重试 %d 次后放弃: %w",
        blockNumber, s.config.Scanner.MaxRetries, lastErr)
}

// 处理新区块
func (s *Scanner) processNewBlock(ctx context.Context, blockNumber uint64) {
    s.logger.Info("发现新区块", zap.Uint64("block_number", blockNumber))

    if err := s.processBlockByNumber(ctx, blockNumber); err != nil {
        s.logger.Error("处理新区块失败", zap.Error(err))
        return
    }

    s.lastBlockNumber = blockNumber

    // 保存扫描进度
    if err := s.storage.SaveLastScannedBlock(ctx, blockNumber); err != nil {
        s.logger.Error("保存扫描进度失败", zap.Error(err))
    }
}

// 确认数更新循环
func (s *Scanner) confirmationUpdateLoop(ctx context.Context) {
    defer s.wg.Done()

    ticker := time.NewTicker(30 * time.Second)
    defer ticker.Stop()

    for {
        select {
        case &lt;-s.shutdownChan:
            return
        case &lt;-ctx.Done():
            return
        case &lt;-ticker.C:
            latestBlock, err := s.client.GetLatestBlockNumber(ctx)
            if err != nil {
                s.logger.Error("获取最新区块号失败", zap.Error(err))
                continue
            }

            if err := s.processor.UpdateConfirmations(ctx, latestBlock); err != nil {
                s.logger.Error("更新确认数失败", zap.Error(err))
            }
        }
    }
}

// 监控指标循环
func (s *Scanner) metricsLoop(ctx context.Context) {
    defer s.wg.Done()

    c := cron.New()

    // 每分钟收集一次指标
    _, err := c.AddFunc("@every 1m", func() {
        s.collectMetrics(ctx)
    })

    if err != nil {
        s.logger.Error("添加定时任务失败", zap.Error(err))
        return
    }

    c.Start()

    &lt;-s.shutdownChan
    c.Stop()
}

// 收集监控指标
func (s *Scanner) collectMetrics(ctx context.Context) {
    metrics := map[string]interface{}{
        "last_scanned_block": s.lastBlockNumber,
        "is_running":         s.isRunning,
        "timestamp":          time.Now().Unix(),
    }

    // 这里可以将指标推送到Prometheus或监控系统
    s.logger.Debug("收集监控指标", zap.Any("metrics", metrics))
}

// 优雅关闭
func (s *Scanner) Shutdown() {
    s.mu.Lock()
    if !s.isRunning {
        s.mu.Unlock()
        return
    }
    s.isRunning = false
    s.mu.Unlock()

    s.logger.Info("开始关闭扫描器...")

    // 发送关闭信号
    close(s.shutdownChan)

    // 等待所有goroutine结束
    done := make(chan struct{})
    go func() {
        s.wg.Wait()
        close(done)
    }()

    // 设置超时
    select {
    case &lt;-done:
        s.logger.Info("扫描器已完全关闭")
    case &lt;-time.After(s.config.Server.ShutdownTimeout):
        s.logger.Warn("关闭超时,强制退出")
    }

    // 关闭客户端连接
    s.client.Close()
}

// 获取当前状态
func (s *Scanner) GetStatus() map[string]interface{} {
    s.mu.RLock()
    defer s.mu.RUnlock()

    return map[string]interface{}{
        "is_running":         s.isRunning,
        "last_block_number":  s.lastBlockNumber,
        "config": map[string]interface{}{
            "scan_interval":        s.config.Scanner.ScanInterval.String(),
            "confirmation_blocks":  s.config.Scanner.ConfirmationBlocks,
            "batch_size":           s.config.Scanner.BatchSize,
        },
    }
}

2.7 主程序入口 (cmd/scanner/main.go)

package main

import (
    "context"
    "log"
    "net/http"
    "os"
    "os/signal"
    "syscall"
    "time"

    "go.uber.org/zap"

    "block-scanner/internal/config"
    "block-scanner/internal/scanner"
    "block-scanner/pkg/database"
)

func main() {
    // 初始化日志
    logger, err := zap.NewProduction()
    if err != nil {
        log.Fatalf("初始化日志失败: %v", err)
    }
    defer logger.Sync()

    // 加载配置
    cfg, err := config.LoadConfig("config.yaml")
    if err != nil {
        logger.Fatal("加载配置失败", zap.Error(err))
    }

    // 初始化数据库
    pgDB, err := database.NewPostgres(cfg.Database.PostgresDSN)
    if err != nil {
        logger.Fatal("连接PostgreSQL失败", zap.Error(err))
    }
    defer pgDB.Close()

    redisClient, err := database.NewRedis(cfg.Database.RedisURL)
    if err != nil {
        logger.Fatal("连接Redis失败", zap.Error(err))
    }
    defer redisClient.Close()

    // 创建存储实例
    storage := scanner.NewPostgresStorage(pgDB, redisClient)

    // 创建扫描器
    scanner, err := scanner.NewScanner(cfg, storage, logger)
    if err != nil {
        logger.Fatal("创建扫描器失败", zap.Error(err))
    }

    // 创建HTTP服务器用于健康检查
    mux := http.NewServeMux()
    mux.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
        status := scanner.GetStatus()
        w.Header().Set("Content-Type", "application/json")
        json.NewEncoder(w).Encode(status)
    })

    mux.HandleFunc("/metrics", func(w http.ResponseWriter, r *http.Request) {
        // 暴露Prometheus指标
    })

    server := &http.Server{
        Addr:    fmt.Sprintf(":%d", cfg.Server.Port),
        Handler: mux,
    }

    // 启动HTTP服务器
    go func() {
        logger.Info("启动HTTP服务器", zap.Int("port", cfg.Server.Port))
        if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
            logger.Error("HTTP服务器错误", zap.Error(err))
        }
    }()

    // 创建上下文
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    // 启动扫描器
    if err := scanner.Start(ctx); err != nil {
        logger.Fatal("启动扫描器失败", zap.Error(err))
    }

    // 等待中断信号
    sigChan := make(chan os.Signal, 1)
    signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)

    logger.Info("扫描器已启动,等待中断信号...")

    &lt;-sigChan
    logger.Info("收到中断信号,开始关闭...")

    // 优雅关闭
    scanner.Shutdown()

    // 关闭HTTP服务器
    shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer shutdownCancel()

    if err := server.Shutdown(shutdownCtx); err != nil {
        logger.Error("HTTP服务器关闭失败", zap.Error(err))
    }

    logger.Info("程序已完全退出")
}

三、配置文件示例 (config.yaml)

# 以太坊节点配置
ethereum:
  rpc_url: "https://mainnet.infura.io/v3/YOUR_INFURA_KEY"
  ws_url: "wss://mainnet.infura.io/ws/v3/YOUR_INFURA_KEY"
  chain_id: 1

# 扫描器配置
scanner:
  start_block: 19000000  # 开始扫描的区块高度
  confirmation_blocks: 12 # 确认区块数
  scan_interval: "3s"     # 轮询间隔
  batch_size: 100         # 批量处理大小
  max_retries: 3          # 最大重试次数
  retry_delay: "1s"       # 重试延迟

# 数据库配置
database:
  postgres_dsn: "postgres://user:password@localhost:5432/block_scanner?sslmode=disable"
  redis_url: "redis://localhost:6379/0"

# 服务器配置
server:
  port: 8080
  shutdown_timeout: "30s"

四、部署与运行

4.1 构建与运行

# 1. 安装依赖
go mod download

# 2. 构建
go build -o block-scanner ./cmd/scanner

# 3. 运行
./block-scanner

# 4. 或者使用Docker
docker build -t block-scanner .
docker run -d --name scanner \
  -v $(pwd)/config.yaml:/app/config.yaml \
  block-scanner

4.2 数据库初始化

-- PostgreSQL表结构
CREATE TABLE blockchain_transactions (
    id VARCHAR(66) PRIMARY KEY,
    tx_hash VARCHAR(66) UNIQUE NOT NULL,
    block_number BIGINT NOT NULL,
    block_hash VARCHAR(66) NOT NULL,
    from_address VARCHAR(42) NOT NULL,
    to_address VARCHAR(42) NOT NULL,
    value NUMERIC(78, 0) NOT NULL,
    gas_price NUMERIC(78, 0) NOT NULL,
    gas_used BIGINT NOT NULL,
    nonce BIGINT NOT NULL,
    transaction_index INTEGER NOT NULL,
    status VARCHAR(20) NOT NULL,
    type VARCHAR(10) NOT NULL,
    token_address VARCHAR(42),
    token_symbol VARCHAR(20),
    token_decimals INTEGER,
    confirmations BIGINT NOT NULL DEFAULT 1,
    created_at TIMESTAMP NOT NULL,
    updated_at TIMESTAMP NOT NULL
);

CREATE TABLE deposits (
    id VARCHAR(36) PRIMARY KEY,
    user_id VARCHAR(36) NOT NULL,
    transaction_id VARCHAR(66) NOT NULL REFERENCES blockchain_transactions(id),
    address VARCHAR(42) NOT NULL,
    amount NUMERIC(78, 0) NOT NULL,
    asset_type VARCHAR(10) NOT NULL,
    status VARCHAR(20) NOT NULL,
    confirmations BIGINT NOT NULL DEFAULT 1,
    confirmed_at TIMESTAMP,
    created_at TIMESTAMP NOT NULL,
    updated_at TIMESTAMP NOT NULL
);

CREATE TABLE user_addresses (
    id VARCHAR(36) PRIMARY KEY,
    user_id VARCHAR(36) NOT NULL,
    address VARCHAR(42) UNIQUE NOT NULL,
    chain_type VARCHAR(20) NOT NULL,
    is_active BOOLEAN NOT NULL DEFAULT true,
    created_at TIMESTAMP NOT NULL
);

CREATE INDEX idx_tx_hash ON blockchain_transactions(tx_hash);
CREATE INDEX idx_block_number ON blockchain_transactions(block_number);
CREATE INDEX idx_to_address ON blockchain_transactions(to_address);
CREATE INDEX idx_deposit_user ON deposits(user_id);
CREATE INDEX idx_deposit_status ON deposits(status);
CREATE INDEX idx_user_address ON user_addresses(address);

五、监控与告警

5.1 关键监控指标

// 需要监控的关键指标
type ScannerMetrics struct {
    // 扫描状态
    IsRunning         bool    `json:"is_running"`
    LastScannedBlock  uint64  `json:"last_scanned_block"`
    BlocksBehind      int64   `json:"blocks_behind"`

    // 性能指标
    BlocksPerSecond   float64 `json:"blocks_per_second"`
    TransactionsTotal uint64  `json:"transactions_total"`
    DepositsTotal     uint64  `json:"deposits_total"`

    // 错误指标
    RPCErrors         uint64  `json:"rpc_errors"`
    ProcessingErrors  uint64  `json:"processing_errors"`
    DBErrors          uint64  `json:"db_errors"`

    // 资源使用
    MemoryUsageMB     float64 `json:"memory_usage_mb"`
    CPUUsagePercent   float64 `json:"cpu_usage_percent"`
}

5.2 告警规则示例

# Prometheus告警规则
groups:
  - name: block_scanner
    rules:
      - alert: ScannerStopped
        expr: block_scanner_is_running ==

<!--EndFragment-->

点赞 0
收藏 1
分享
本文参与登链社区写作激励计划 ,好文好收益,欢迎正在阅读的你也加入。

0 条评论

请先 登录 后评论
曲弯
曲弯
0xb51E...CADb
江湖只有他的大名,没有他的介绍。