使用Go实现扫块充值功能下面是一个完整的、可用于生产环境的Go语言实现方案,用于实现交易所的扫块充值功能。本方案基于以太坊网络,但架构设计支持扩展到其他EVM兼容链。一、项目结构与依赖1.1项目目录结构block-scanner/├──cmd/│└──scanner/│
<!--StartFragment-->
下面是一个完整的、可用于生产环境的Go语言实现方案,用于实现交易所的扫块充值功能。本方案基于以太坊网络,但架构设计支持扩展到其他EVM兼容链。
block-scanner/
├── cmd/
│ └── scanner/
│ └── main.go # 程序入口
├── internal/
│ ├── config/
│ │ └── config.go # 配置管理
│ ├── scanner/
│ │ ├── scanner.go # 扫描器主逻辑
│ │ ├── processor.go # 交易处理器
│ │ └── storage.go # 存储接口
│ ├── model/
│ │ ├── transaction.go # 交易模型
│ │ └── address.go # 地址模型
│ └── ethclient/
│ └── client.go # 以太坊客户端封装
├── pkg/
│ └── database/
│ ├── redis.go # Redis客户端
│ └── postgres.go # PostgreSQL客户端
├── go.mod
└── go.sum
// go.mod
module block-scanner
go 1.21
require (
github.com/ethereum/go-ethereum v1.13.5
github.com/redis/go-redis/v9 v9.4.0
github.com/jackc/pgx/v5 v5.5.0
github.com/spf13/viper v1.17.0
go.uber.org/zap v1.26.0
github.com/robfig/cron/v3 v3.0.1
)
package config
import (
"fmt"
"time"
"github.com/spf13/viper"
)
type Config struct {
// 节点配置
Ethereum struct {
RPCURL string `mapstructure:"rpc_url"`
WebSocketURL string `mapstructure:"ws_url"`
ChainID int64 `mapstructure:"chain_id"`
} `mapstructure:"ethereum"`
// 扫描配置
Scanner struct {
StartBlock uint64 `mapstructure:"start_block"`
ConfirmationBlocks uint64 `mapstructure:"confirmation_blocks"`
ScanInterval time.Duration `mapstructure:"scan_interval"`
BatchSize uint64 `mapstructure:"batch_size"`
MaxRetries int `mapstructure:"max_retries"`
RetryDelay time.Duration `mapstructure:"retry_delay"`
} `mapstructure:"scanner"`
// 数据库配置
Database struct {
PostgresDSN string `mapstructure:"postgres_dsn"`
RedisURL string `mapstructure:"redis_url"`
} `mapstructure:"database"`
// 服务配置
Server struct {
Port int `mapstructure:"port"`
ShutdownTimeout time.Duration `mapstructure:"shutdown_timeout"`
} `mapstructure:"server"`
}
func LoadConfig(configPath string) (*Config, error) {
viper.SetConfigFile(configPath)
viper.SetConfigType("yaml")
// 设置默认值
viper.SetDefault("scanner.scan_interval", "3s")
viper.SetDefault("scanner.batch_size", 100)
viper.SetDefault("scanner.max_retries", 3)
viper.SetDefault("scanner.retry_delay", "1s")
viper.SetDefault("scanner.confirmation_blocks", 12)
viper.SetDefault("server.port", 8080)
viper.SetDefault("server.shutdown_timeout", "30s")
if err := viper.ReadInConfig(); err != nil {
return nil, fmt.Errorf("读取配置文件失败: %w", err)
}
var config Config
if err := viper.Unmarshal(&config); err != nil {
return nil, fmt.Errorf("解析配置文件失败: %w", err)
}
return &config, nil
}
package model
import (
"time"
"math/big"
)
// 交易类型
type TransactionType string
const (
TxTypeETH TransactionType = "ETH"
TxTypeERC20 TransactionType = "ERC20"
TxTypeERC721 TransactionType = "ERC721"
)
// 交易状态
type TransactionStatus string
const (
TxStatusPending TransactionStatus = "pending"
TxStatusConfirmed TransactionStatus = "confirmed"
TxStatusFailed TransactionStatus = "failed"
TxStatusDropped TransactionStatus = "dropped"
)
// 链上交易
type BlockchainTransaction struct {
ID string `json:"id" db:"id"`
TxHash string `json:"tx_hash" db:"tx_hash"`
BlockNumber uint64 `json:"block_number" db:"block_number"`
BlockHash string `json:"block_hash" db:"block_hash"`
FromAddress string `json:"from_address" db:"from_address"`
ToAddress string `json:"to_address" db:"to_address"`
Value *big.Int `json:"value" db:"value"`
GasPrice *big.Int `json:"gas_price" db:"gas_price"`
GasUsed uint64 `json:"gas_used" db:"gas_used"`
Nonce uint64 `json:"nonce" db:"nonce"`
TransactionIndex uint `json:"transaction_index" db:"transaction_index"`
Status TransactionStatus `json:"status" db:"status"`
Type TransactionType `json:"type" db:"type"`
TokenAddress *string `json:"token_address,omitempty" db:"token_address"`
TokenSymbol *string `json:"token_symbol,omitempty" db:"token_symbol"`
TokenDecimals *int `json:"token_decimals,omitempty" db:"token_decimals"`
Confirmations uint64 `json:"confirmations" db:"confirmations"`
CreatedAt time.Time `json:"created_at" db:"created_at"`
UpdatedAt time.Time `json:"updated_at" db:"updated_at"`
}
// 充值记录
type Deposit struct {
ID string `json:"id" db:"id"`
UserID string `json:"user_id" db:"user_id"`
TransactionID string `json:"transaction_id" db:"transaction_id"`
Address string `json:"address" db:"address"`
Amount *big.Int `json:"amount" db:"amount"`
AssetType TransactionType `json:"asset_type" db:"asset_type"`
Status TransactionStatus `json:"status" db:"status"`
Confirmations uint64 `json:"confirmations" db:"confirmations"`
ConfirmedAt *time.Time `json:"confirmed_at,omitempty" db:"confirmed_at"`
CreatedAt time.Time `json:"created_at" db:"created_at"`
UpdatedAt time.Time `json:"updated_at" db:"updated_at"`
}
// 用户地址映射
type UserAddress struct {
ID string `json:"id" db:"id"`
UserID string `json:"user_id" db:"user_id"`
Address string `json:"address" db:"address"`
ChainType string `json:"chain_type" db:"chain_type"`
IsActive bool `json:"is_active" db:"is_active"`
CreatedAt time.Time `json:"created_at" db:"created_at"`
}
package ethclient
import (
"context"
"fmt"
"math/big"
"time"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
"go.uber.org/zap"
)
type Client struct {
rpcClient *ethclient.Client
wsClient *ethclient.Client
logger *zap.Logger
rpcURL string
wsURL string
chainID *big.Int
}
func NewClient(rpcURL, wsURL string, logger *zap.Logger) (*Client, error) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
// 创建RPC客户端
rpcClient, err := ethclient.DialContext(ctx, rpcURL)
if err != nil {
return nil, fmt.Errorf("连接RPC节点失败: %w", err)
}
// 获取链ID
chainID, err := rpcClient.ChainID(ctx)
if err != nil {
return nil, fmt.Errorf("获取链ID失败: %w", err)
}
var wsClient *ethclient.Client
if wsURL != "" {
wsClient, err = ethclient.DialContext(ctx, wsURL)
if err != nil {
logger.Warn("WebSocket连接失败,将使用轮询模式", zap.Error(err))
}
}
return &Client{
rpcClient: rpcClient,
wsClient: wsClient,
logger: logger,
rpcURL: rpcURL,
wsURL: wsURL,
chainID: chainID,
}, nil
}
// 获取最新区块号
func (c *Client) GetLatestBlockNumber(ctx context.Context) (uint64, error) {
return c.rpcClient.BlockNumber(ctx)
}
// 获取区块详情
func (c *Client) GetBlockByNumber(ctx context.Context, blockNumber uint64) (*types.Block, error) {
return c.rpcClient.BlockByNumber(ctx, big.NewInt(int64(blockNumber)))
}
// 获取交易详情
func (c *Client) GetTransactionReceipt(ctx context.Context, txHash common.Hash) (*types.Receipt, error) {
return c.rpcClient.TransactionReceipt(ctx, txHash)
}
// 获取交易详情(包含发送方)
func (c *Client) GetTransactionByHash(ctx context.Context, txHash common.Hash) (*types.Transaction, bool, error) {
return c.rpcClient.TransactionByHash(ctx, txHash)
}
// 订阅新区块头(WebSocket)
func (c *Client) SubscribeNewHead(ctx context.Context) (chan *types.Header, ethereum.Subscription, error) {
if c.wsClient == nil {
return nil, nil, fmt.Errorf("WebSocket客户端未初始化")
}
headers := make(chan *types.Header)
sub, err := c.wsClient.SubscribeNewHead(ctx, headers)
if err != nil {
return nil, nil, err
}
return headers, sub, nil
}
// 关闭连接
func (c *Client) Close() {
if c.rpcClient != nil {
c.rpcClient.Close()
}
if c.wsClient != nil {
c.wsClient.Close()
}
}
package scanner
import (
"context"
"math/big"
"block-scanner/internal/model"
)
// Storage 定义数据存储接口
type Storage interface {
// 地址管理
GetUserAddresses(ctx context.Context) ([]model.UserAddress, error)
GetUserByAddress(ctx context.Context, address string) (*model.UserAddress, error)
// 交易管理
SaveTransaction(ctx context.Context, tx *model.BlockchainTransaction) error
GetTransactionByHash(ctx context.Context, txHash string) (*model.BlockchainTransaction, error)
UpdateTransactionStatus(ctx context.Context, txHash string, status model.TransactionStatus, confirmations uint64) error
// 充值管理
CreateDeposit(ctx context.Context, deposit *model.Deposit) error
UpdateDepositStatus(ctx context.Context, depositID string, status model.TransactionStatus, confirmations uint64) error
GetPendingDeposits(ctx context.Context) ([]model.Deposit, error)
// 扫描状态管理
GetLastScannedBlock(ctx context.Context) (uint64, error)
SaveLastScannedBlock(ctx context.Context, blockNumber uint64) error
// 批量操作
BatchSaveTransactions(ctx context.Context, txs []*model.BlockchainTransaction) error
BatchCreateDeposits(ctx context.Context, deposits []*model.Deposit) error
}
// RedisStorage Redis实现
type RedisStorage struct {
// Redis客户端实例
}
// PostgresStorage PostgreSQL实现
type PostgresStorage struct {
// PostgreSQL客户端实例
}
package scanner
import (
"context"
"fmt"
"math/big"
"strings"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"go.uber.org/zap"
"block-scanner/internal/model"
)
// ERC20 Transfer 事件签名
var erc20TransferEventSig = common.HexToHash("0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef")
type TransactionProcessor struct {
storage Storage
logger *zap.Logger
client *ethclient.Client
}
func NewTransactionProcessor(storage Storage, client *ethclient.Client, logger *zap.Logger) *TransactionProcessor {
return &TransactionProcessor{
storage: storage,
logger: logger,
client: client,
}
}
// 处理区块中的所有交易
func (p *TransactionProcessor) ProcessBlock(ctx context.Context, block *types.Block) error {
p.logger.Info("开始处理区块",
zap.Uint64("block_number", block.NumberU64()),
zap.Int("transaction_count", len(block.Transactions())),
)
var deposits []*model.Deposit
var transactions []*model.BlockchainTransaction
// 获取所有用户地址用于快速匹配
userAddresses, err := p.storage.GetUserAddresses(ctx)
if err != nil {
return fmt.Errorf("获取用户地址失败: %w", err)
}
// 构建地址映射表
addressMap := make(map[string]model.UserAddress)
for _, addr := range userAddresses {
addressMap[strings.ToLower(addr.Address)] = addr
}
// 处理普通ETH转账
for _, tx := range block.Transactions() {
// 获取交易发送方
from, err := types.Sender(types.LatestSignerForChainID(p.client.ChainID), tx)
if err != nil {
p.logger.Warn("获取交易发送方失败", zap.Error(err))
continue
}
to := tx.To()
if to == nil {
// 合约创建交易,跳过
continue
}
// 检查是否为充值地址
toAddressLower := strings.ToLower(to.Hex())
if userAddr, exists := addressMap[toAddressLower]; exists {
// 获取交易收据
receipt, err := p.client.GetTransactionReceipt(ctx, tx.Hash())
if err != nil {
p.logger.Warn("获取交易收据失败", zap.Error(err))
continue
}
// 检查交易状态
if receipt.Status != types.ReceiptStatusSuccessful {
p.logger.Info("交易失败,跳过", zap.String("tx_hash", tx.Hash().Hex()))
continue
}
// 保存交易记录
blockchainTx := &model.BlockchainTransaction{
ID: generateID(),
TxHash: tx.Hash().Hex(),
BlockNumber: block.NumberU64(),
BlockHash: block.Hash().Hex(),
FromAddress: from.Hex(),
ToAddress: to.Hex(),
Value: tx.Value(),
GasPrice: tx.GasPrice(),
GasUsed: receipt.GasUsed,
Nonce: tx.Nonce(),
TransactionIndex: receipt.TransactionIndex,
Status: model.TxStatusConfirmed,
Type: model.TxTypeETH,
Confirmations: 1, // 初始确认数
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
}
transactions = append(transactions, blockchainTx)
// 创建充值记录
deposit := &model.Deposit{
ID: generateID(),
UserID: userAddr.UserID,
TransactionID: blockchainTx.ID,
Address: to.Hex(),
Amount: tx.Value(),
AssetType: model.TxTypeETH,
Status: model.TxStatusConfirmed,
Confirmations: 1,
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
}
deposits = append(deposits, deposit)
p.logger.Info("发现ETH充值",
zap.String("user_id", userAddr.UserID),
zap.String("tx_hash", tx.Hash().Hex()),
zap.String("amount", tx.Value().String()),
)
}
}
// 处理合约日志(ERC20转账)
for _, receipt := range block.Receipts() {
for _, log := range receipt.Logs {
// 检查是否为ERC20 Transfer事件
if len(log.Topics) == 3 && log.Topics[0] == erc20TransferEventSig {
// 解析事件参数
from := common.BytesToAddress(log.Topics[1].Bytes())
to := common.BytesToAddress(log.Topics[2].Bytes())
value := new(big.Int).SetBytes(log.Data)
// 检查是否为充值地址
toAddressLower := strings.ToLower(to.Hex())
if userAddr, exists := addressMap[toAddressLower]; exists {
// 获取交易详情
tx, _, err := p.client.GetTransactionByHash(ctx, receipt.TxHash)
if err != nil {
p.logger.Warn("获取交易详情失败", zap.Error(err))
continue
}
// 保存交易记录
blockchainTx := &model.BlockchainTransaction{
ID: generateID(),
TxHash: receipt.TxHash.Hex(),
BlockNumber: block.NumberU64(),
BlockHash: block.Hash().Hex(),
FromAddress: from.Hex(),
ToAddress: to.Hex(),
Value: value,
GasPrice: tx.GasPrice(),
GasUsed: receipt.GasUsed,
Nonce: tx.Nonce(),
TransactionIndex: receipt.TransactionIndex,
Status: model.TxStatusConfirmed,
Type: model.TxTypeERC20,
TokenAddress: &log.Address.Hex(),
Confirmations: 1,
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
}
transactions = append(transactions, blockchainTx)
// 创建充值记录
deposit := &model.Deposit{
ID: generateID(),
UserID: userAddr.UserID,
TransactionID: blockchainTx.ID,
Address: to.Hex(),
Amount: value,
AssetType: model.TxTypeERC20,
Status: model.TxStatusConfirmed,
Confirmations: 1,
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
}
deposits = append(deposits, deposit)
p.logger.Info("发现ERC20充值",
zap.String("user_id", userAddr.UserID),
zap.String("tx_hash", receipt.TxHash.Hex()),
zap.String("token_address", log.Address.Hex()),
zap.String("amount", value.String()),
)
}
}
}
}
// 批量保存数据
if len(transactions) > 0 {
if err := p.storage.BatchSaveTransactions(ctx, transactions); err != nil {
return fmt.Errorf("批量保存交易失败: %w", err)
}
}
if len(deposits) > 0 {
if err := p.storage.BatchCreateDeposits(ctx, deposits); err != nil {
return fmt.Errorf("批量创建充值记录失败: %w", err)
}
}
p.logger.Info("区块处理完成",
zap.Uint64("block_number", block.NumberU64()),
zap.Int("transactions_saved", len(transactions)),
zap.Int("deposits_created", len(deposits)),
)
return nil
}
// 更新交易确认数
func (p *TransactionProcessor) UpdateConfirmations(ctx context.Context, latestBlockNumber uint64) error {
// 获取待确认的充值记录
pendingDeposits, err := p.storage.GetPendingDeposits(ctx)
if err != nil {
return fmt.Errorf("获取待确认充值记录失败: %w", err)
}
for _, deposit := range pendingDeposits {
// 获取交易详情
tx, err := p.storage.GetTransactionByHash(ctx, deposit.TransactionID)
if err != nil {
p.logger.Warn("获取交易详情失败", zap.Error(err))
continue
}
// 计算确认数
if tx.BlockNumber > latestBlockNumber {
continue
}
confirmations := latestBlockNumber - tx.BlockNumber + 1
// 更新交易确认数
if err := p.storage.UpdateTransactionStatus(ctx, tx.TxHash, tx.Status, confirmations); err != nil {
p.logger.Warn("更新交易确认数失败", zap.Error(err))
continue
}
// 更新充值记录确认数
if err := p.storage.UpdateDepositStatus(ctx, deposit.ID, deposit.Status, confirmations); err != nil {
p.logger.Warn("更新充值记录确认数失败", zap.Error(err))
continue
}
// 如果达到足够确认数,触发余额更新
if confirmations >= p.confirmationBlocks {
p.triggerBalanceUpdate(ctx, deposit)
}
}
return nil
}
func (p *TransactionProcessor) triggerBalanceUpdate(ctx context.Context, deposit *model.Deposit) {
// 这里应该调用内部API或消息队列来更新用户余额
p.logger.Info("触发余额更新",
zap.String("deposit_id", deposit.ID),
zap.String("user_id", deposit.UserID),
zap.String("amount", deposit.Amount.String()),
)
}
func generateID() string {
// 使用UUID或雪花算法生成唯一ID
return common.BytesToHash([]byte(time.Now().String())).Hex()
}
package scanner
import (
"context"
"fmt"
"sync"
"time"
"github.com/ethereum/go-ethereum/core/types"
"github.com/robfig/cron/v3"
"go.uber.org/zap"
"block-scanner/internal/config"
"block-scanner/internal/ethclient"
)
type Scanner struct {
config *config.Config
client *ethclient.Client
storage Storage
processor *TransactionProcessor
logger *zap.Logger
// 控制变量
mu sync.RWMutex
isRunning bool
lastBlockNumber uint64
shutdownChan chan struct{}
wg sync.WaitGroup
}
func NewScanner(cfg *config.Config, storage Storage, logger *zap.Logger) (*Scanner, error) {
// 创建以太坊客户端
client, err := ethclient.NewClient(cfg.Ethereum.RPCURL, cfg.Ethereum.WebSocketURL, logger)
if err != nil {
return nil, fmt.Errorf("创建以太坊客户端失败: %w", err)
}
// 创建交易处理器
processor := NewTransactionProcessor(storage, client, logger)
return &Scanner{
config: cfg,
client: client,
storage: storage,
processor: processor,
logger: logger,
shutdownChan: make(chan struct{}),
}, nil
}
// 启动扫描器
func (s *Scanner) Start(ctx context.Context) error {
s.mu.Lock()
if s.isRunning {
s.mu.Unlock()
return fmt.Errorf("扫描器已在运行")
}
s.isRunning = true
s.mu.Unlock()
s.logger.Info("启动区块扫描器")
// 启动扫描循环
s.wg.Add(1)
go s.scanLoop(ctx)
// 启动确认数更新定时任务
s.wg.Add(1)
go s.confirmationUpdateLoop(ctx)
// 启动监控指标
s.wg.Add(1)
go s.metricsLoop(ctx)
return nil
}
// 扫描循环
func (s *Scanner) scanLoop(ctx context.Context) {
defer s.wg.Done()
// 获取上次扫描的区块
lastScannedBlock, err := s.storage.GetLastScannedBlock(ctx)
if err != nil {
s.logger.Error("获取上次扫描区块失败", zap.Error(err))
lastScannedBlock = s.config.Scanner.StartBlock
}
// 如果WebSocket可用,使用事件驱动模式
if s.client.WSClient != nil {
s.logger.Info("使用WebSocket事件驱动模式")
s.eventDrivenScan(ctx, lastScannedBlock)
} else {
s.logger.Info("使用轮询模式")
s.pollingScan(ctx, lastScannedBlock)
}
}
// 事件驱动扫描(WebSocket)
func (s *Scanner) eventDrivenScan(ctx context.Context, startBlock uint64) {
// 先补扫历史区块
if err := s.catchUpHistoricalBlocks(ctx, startBlock); err != nil {
s.logger.Error("补扫历史区块失败", zap.Error(err))
}
// 订阅新区块
headers, sub, err := s.client.SubscribeNewHead(ctx)
if err != nil {
s.logger.Error("订阅新区块失败,切换到轮询模式", zap.Error(err))
s.pollingScan(ctx, s.lastBlockNumber)
return
}
defer sub.Unsubscribe()
for {
select {
case <-s.shutdownChan:
s.logger.Info("收到关闭信号,停止扫描")
return
case <-ctx.Done():
s.logger.Info("上下文取消,停止扫描")
return
case err := <-sub.Err():
s.logger.Error("订阅错误", zap.Error(err))
// 切换到轮询模式
s.pollingScan(ctx, s.lastBlockNumber)
return
case header := <-headers:
s.processNewBlock(ctx, header.Number.Uint64())
}
}
}
// 轮询扫描
func (s *Scanner) pollingScan(ctx context.Context, startBlock uint64) {
ticker := time.NewTicker(s.config.Scanner.ScanInterval)
defer ticker.Stop()
// 先补扫历史区块
if err := s.catchUpHistoricalBlocks(ctx, startBlock); err != nil {
s.logger.Error("补扫历史区块失败", zap.Error(err))
}
for {
select {
case <-s.shutdownChan:
s.logger.Info("收到关闭信号,停止扫描")
return
case <-ctx.Done():
s.logger.Info("上下文取消,停止扫描")
return
case <-ticker.C:
// 获取最新区块号
latestBlock, err := s.client.GetLatestBlockNumber(ctx)
if err != nil {
s.logger.Error("获取最新区块号失败", zap.Error(err))
continue
}
// 如果已经是最新区块,等待下次轮询
if s.lastBlockNumber >= latestBlock {
continue
}
// 扫描新区块
nextBlock := s.lastBlockNumber + 1
for blockNum := nextBlock; blockNum <= latestBlock; blockNum++ {
if err := s.processBlockByNumber(ctx, blockNum); err != nil {
s.logger.Error("处理区块失败",
zap.Uint64("block_number", blockNum),
zap.Error(err),
)
break
}
s.lastBlockNumber = blockNum
}
// 保存扫描进度
if err := s.storage.SaveLastScannedBlock(ctx, s.lastBlockNumber); err != nil {
s.logger.Error("保存扫描进度失败", zap.Error(err))
}
}
}
}
// 补扫历史区块
func (s *Scanner) catchUpHistoricalBlocks(ctx context.Context, startBlock uint64) error {
s.logger.Info("开始补扫历史区块",
zap.Uint64("start_block", startBlock),
zap.Uint64("batch_size", s.config.Scanner.BatchSize),
)
// 获取最新区块号
latestBlock, err := s.client.GetLatestBlockNumber(ctx)
if err != nil {
return fmt.Errorf("获取最新区块号失败: %w", err)
}
// 如果起始区块已经超过最新区块,直接返回
if startBlock > latestBlock {
s.lastBlockNumber = latestBlock
return nil
}
// 批量处理历史区块
for fromBlock := startBlock; fromBlock <= latestBlock; fromBlock += s.config.Scanner.BatchSize {
select {
case <-s.shutdownChan:
return fmt.Errorf("扫描被中断")
case <-ctx.Done():
return ctx.Err()
default:
toBlock := fromBlock + s.config.Scanner.BatchSize - 1
if toBlock > latestBlock {
toBlock = latestBlock
}
s.logger.Info("批量处理历史区块",
zap.Uint64("from_block", fromBlock),
zap.Uint64("to_block", toBlock),
)
for blockNum := fromBlock; blockNum <= toBlock; blockNum++ {
if err := s.processBlockByNumber(ctx, blockNum); err != nil {
return fmt.Errorf("处理区块 %d 失败: %w", blockNum, err)
}
s.lastBlockNumber = blockNum
}
// 保存进度
if err := s.storage.SaveLastScannedBlock(ctx, s.lastBlockNumber); err != nil {
s.logger.Error("保存扫描进度失败", zap.Error(err))
}
// 避免请求过快
time.Sleep(100 * time.Millisecond)
}
}
s.logger.Info("历史区块补扫完成",
zap.Uint64("last_scanned_block", s.lastBlockNumber),
)
return nil
}
// 处理指定区块号的区块
func (s *Scanner) processBlockByNumber(ctx context.Context, blockNumber uint64) error {
// 重试机制
var lastErr error
for i := 0; i < s.config.Scanner.MaxRetries; i++ {
block, err := s.client.GetBlockByNumber(ctx, blockNumber)
if err != nil {
lastErr = err
s.logger.Warn("获取区块失败,准备重试",
zap.Uint64("block_number", blockNumber),
zap.Int("retry", i+1),
zap.Error(err),
)
time.Sleep(s.config.Scanner.RetryDelay)
continue
}
// 处理区块
if err := s.processor.ProcessBlock(ctx, block); err != nil {
return fmt.Errorf("处理区块失败: %w", err)
}
return nil
}
return fmt.Errorf("获取区块 %d 失败,重试 %d 次后放弃: %w",
blockNumber, s.config.Scanner.MaxRetries, lastErr)
}
// 处理新区块
func (s *Scanner) processNewBlock(ctx context.Context, blockNumber uint64) {
s.logger.Info("发现新区块", zap.Uint64("block_number", blockNumber))
if err := s.processBlockByNumber(ctx, blockNumber); err != nil {
s.logger.Error("处理新区块失败", zap.Error(err))
return
}
s.lastBlockNumber = blockNumber
// 保存扫描进度
if err := s.storage.SaveLastScannedBlock(ctx, blockNumber); err != nil {
s.logger.Error("保存扫描进度失败", zap.Error(err))
}
}
// 确认数更新循环
func (s *Scanner) confirmationUpdateLoop(ctx context.Context) {
defer s.wg.Done()
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-s.shutdownChan:
return
case <-ctx.Done():
return
case <-ticker.C:
latestBlock, err := s.client.GetLatestBlockNumber(ctx)
if err != nil {
s.logger.Error("获取最新区块号失败", zap.Error(err))
continue
}
if err := s.processor.UpdateConfirmations(ctx, latestBlock); err != nil {
s.logger.Error("更新确认数失败", zap.Error(err))
}
}
}
}
// 监控指标循环
func (s *Scanner) metricsLoop(ctx context.Context) {
defer s.wg.Done()
c := cron.New()
// 每分钟收集一次指标
_, err := c.AddFunc("@every 1m", func() {
s.collectMetrics(ctx)
})
if err != nil {
s.logger.Error("添加定时任务失败", zap.Error(err))
return
}
c.Start()
<-s.shutdownChan
c.Stop()
}
// 收集监控指标
func (s *Scanner) collectMetrics(ctx context.Context) {
metrics := map[string]interface{}{
"last_scanned_block": s.lastBlockNumber,
"is_running": s.isRunning,
"timestamp": time.Now().Unix(),
}
// 这里可以将指标推送到Prometheus或监控系统
s.logger.Debug("收集监控指标", zap.Any("metrics", metrics))
}
// 优雅关闭
func (s *Scanner) Shutdown() {
s.mu.Lock()
if !s.isRunning {
s.mu.Unlock()
return
}
s.isRunning = false
s.mu.Unlock()
s.logger.Info("开始关闭扫描器...")
// 发送关闭信号
close(s.shutdownChan)
// 等待所有goroutine结束
done := make(chan struct{})
go func() {
s.wg.Wait()
close(done)
}()
// 设置超时
select {
case <-done:
s.logger.Info("扫描器已完全关闭")
case <-time.After(s.config.Server.ShutdownTimeout):
s.logger.Warn("关闭超时,强制退出")
}
// 关闭客户端连接
s.client.Close()
}
// 获取当前状态
func (s *Scanner) GetStatus() map[string]interface{} {
s.mu.RLock()
defer s.mu.RUnlock()
return map[string]interface{}{
"is_running": s.isRunning,
"last_block_number": s.lastBlockNumber,
"config": map[string]interface{}{
"scan_interval": s.config.Scanner.ScanInterval.String(),
"confirmation_blocks": s.config.Scanner.ConfirmationBlocks,
"batch_size": s.config.Scanner.BatchSize,
},
}
}
package main
import (
"context"
"log"
"net/http"
"os"
"os/signal"
"syscall"
"time"
"go.uber.org/zap"
"block-scanner/internal/config"
"block-scanner/internal/scanner"
"block-scanner/pkg/database"
)
func main() {
// 初始化日志
logger, err := zap.NewProduction()
if err != nil {
log.Fatalf("初始化日志失败: %v", err)
}
defer logger.Sync()
// 加载配置
cfg, err := config.LoadConfig("config.yaml")
if err != nil {
logger.Fatal("加载配置失败", zap.Error(err))
}
// 初始化数据库
pgDB, err := database.NewPostgres(cfg.Database.PostgresDSN)
if err != nil {
logger.Fatal("连接PostgreSQL失败", zap.Error(err))
}
defer pgDB.Close()
redisClient, err := database.NewRedis(cfg.Database.RedisURL)
if err != nil {
logger.Fatal("连接Redis失败", zap.Error(err))
}
defer redisClient.Close()
// 创建存储实例
storage := scanner.NewPostgresStorage(pgDB, redisClient)
// 创建扫描器
scanner, err := scanner.NewScanner(cfg, storage, logger)
if err != nil {
logger.Fatal("创建扫描器失败", zap.Error(err))
}
// 创建HTTP服务器用于健康检查
mux := http.NewServeMux()
mux.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
status := scanner.GetStatus()
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(status)
})
mux.HandleFunc("/metrics", func(w http.ResponseWriter, r *http.Request) {
// 暴露Prometheus指标
})
server := &http.Server{
Addr: fmt.Sprintf(":%d", cfg.Server.Port),
Handler: mux,
}
// 启动HTTP服务器
go func() {
logger.Info("启动HTTP服务器", zap.Int("port", cfg.Server.Port))
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
logger.Error("HTTP服务器错误", zap.Error(err))
}
}()
// 创建上下文
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// 启动扫描器
if err := scanner.Start(ctx); err != nil {
logger.Fatal("启动扫描器失败", zap.Error(err))
}
// 等待中断信号
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
logger.Info("扫描器已启动,等待中断信号...")
<-sigChan
logger.Info("收到中断信号,开始关闭...")
// 优雅关闭
scanner.Shutdown()
// 关闭HTTP服务器
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer shutdownCancel()
if err := server.Shutdown(shutdownCtx); err != nil {
logger.Error("HTTP服务器关闭失败", zap.Error(err))
}
logger.Info("程序已完全退出")
}
# 以太坊节点配置
ethereum:
rpc_url: "https://mainnet.infura.io/v3/YOUR_INFURA_KEY"
ws_url: "wss://mainnet.infura.io/ws/v3/YOUR_INFURA_KEY"
chain_id: 1
# 扫描器配置
scanner:
start_block: 19000000 # 开始扫描的区块高度
confirmation_blocks: 12 # 确认区块数
scan_interval: "3s" # 轮询间隔
batch_size: 100 # 批量处理大小
max_retries: 3 # 最大重试次数
retry_delay: "1s" # 重试延迟
# 数据库配置
database:
postgres_dsn: "postgres://user:password@localhost:5432/block_scanner?sslmode=disable"
redis_url: "redis://localhost:6379/0"
# 服务器配置
server:
port: 8080
shutdown_timeout: "30s"
# 1. 安装依赖
go mod download
# 2. 构建
go build -o block-scanner ./cmd/scanner
# 3. 运行
./block-scanner
# 4. 或者使用Docker
docker build -t block-scanner .
docker run -d --name scanner \
-v $(pwd)/config.yaml:/app/config.yaml \
block-scanner
-- PostgreSQL表结构
CREATE TABLE blockchain_transactions (
id VARCHAR(66) PRIMARY KEY,
tx_hash VARCHAR(66) UNIQUE NOT NULL,
block_number BIGINT NOT NULL,
block_hash VARCHAR(66) NOT NULL,
from_address VARCHAR(42) NOT NULL,
to_address VARCHAR(42) NOT NULL,
value NUMERIC(78, 0) NOT NULL,
gas_price NUMERIC(78, 0) NOT NULL,
gas_used BIGINT NOT NULL,
nonce BIGINT NOT NULL,
transaction_index INTEGER NOT NULL,
status VARCHAR(20) NOT NULL,
type VARCHAR(10) NOT NULL,
token_address VARCHAR(42),
token_symbol VARCHAR(20),
token_decimals INTEGER,
confirmations BIGINT NOT NULL DEFAULT 1,
created_at TIMESTAMP NOT NULL,
updated_at TIMESTAMP NOT NULL
);
CREATE TABLE deposits (
id VARCHAR(36) PRIMARY KEY,
user_id VARCHAR(36) NOT NULL,
transaction_id VARCHAR(66) NOT NULL REFERENCES blockchain_transactions(id),
address VARCHAR(42) NOT NULL,
amount NUMERIC(78, 0) NOT NULL,
asset_type VARCHAR(10) NOT NULL,
status VARCHAR(20) NOT NULL,
confirmations BIGINT NOT NULL DEFAULT 1,
confirmed_at TIMESTAMP,
created_at TIMESTAMP NOT NULL,
updated_at TIMESTAMP NOT NULL
);
CREATE TABLE user_addresses (
id VARCHAR(36) PRIMARY KEY,
user_id VARCHAR(36) NOT NULL,
address VARCHAR(42) UNIQUE NOT NULL,
chain_type VARCHAR(20) NOT NULL,
is_active BOOLEAN NOT NULL DEFAULT true,
created_at TIMESTAMP NOT NULL
);
CREATE INDEX idx_tx_hash ON blockchain_transactions(tx_hash);
CREATE INDEX idx_block_number ON blockchain_transactions(block_number);
CREATE INDEX idx_to_address ON blockchain_transactions(to_address);
CREATE INDEX idx_deposit_user ON deposits(user_id);
CREATE INDEX idx_deposit_status ON deposits(status);
CREATE INDEX idx_user_address ON user_addresses(address);
// 需要监控的关键指标
type ScannerMetrics struct {
// 扫描状态
IsRunning bool `json:"is_running"`
LastScannedBlock uint64 `json:"last_scanned_block"`
BlocksBehind int64 `json:"blocks_behind"`
// 性能指标
BlocksPerSecond float64 `json:"blocks_per_second"`
TransactionsTotal uint64 `json:"transactions_total"`
DepositsTotal uint64 `json:"deposits_total"`
// 错误指标
RPCErrors uint64 `json:"rpc_errors"`
ProcessingErrors uint64 `json:"processing_errors"`
DBErrors uint64 `json:"db_errors"`
// 资源使用
MemoryUsageMB float64 `json:"memory_usage_mb"`
CPUUsagePercent float64 `json:"cpu_usage_percent"`
}
# Prometheus告警规则
groups:
- name: block_scanner
rules:
- alert: ScannerStopped
expr: block_scanner_is_running ==
<!--EndFragment-->
如果觉得我的文章对您有用,请随意打赏。你的支持将鼓励我继续创作!