使用go-ethereum监听USDT交易事件

  • CryptoBox
  • 发布于 2025-03-21 15:41
  • 阅读 888

在区块链应用开发中,实时监听某种代币的交易是一个常见需求。本文将详细介绍如何使用go-ethereum(也称为geth)客户端库来监听以太坊网络上的USDT(Tether)代币交易。USDT简介USDT是一种稳定币,在以太坊网络上以ERC-20代币的形式存在。它的智能合约实现了标准的ERC-20

在区块链应用开发中,实时监听某种代币的交易是一个常见需求。本文将详细介绍如何使用go-ethereum(也称为geth)客户端库来监听以太坊网络上的USDT(Tether)代币交易。

USDT简介

USDT是一种稳定币,在以太坊网络上以ERC-20代币的形式存在。它的智能合约实现了标准的ERC-20接口,包括Transfer事件,该事件在每次代币转账时都会被触发。

以太坊主网上USDT合约地址:0xdAC17F958D2ee523a2206206994597C13D831ec7

准备工作

在开始之前,需要准备以下内容:

  1. Go语言开发环境(Go 1.16+)
  2. 以太坊节点的访问点(可以是本地节点或远程服务如Infura)
  3. go-ethereum库

步骤一:设置项目

首先创建一个新的Go项目并安装必要的依赖:

bash
Copy
mkdir usdt-monitor
cd usdt-monitor
go mod init github.com/your-username/usdt-monitor
go get github.com/ethereum/go-ethereum

步骤二:定义USDT合约的ABI

为了正确解析USDT合约的事件,我们需要定义其ABI(Application Binary Interface)。ERC-20代币的Transfer事件定义如下:

go
Copy
package main

import (
    "context"
    "fmt"
    "log"
    "math/big"
    "strings"

    "github.com/ethereum/go-ethereum"
    "github.com/ethereum/go-ethereum/accounts/abi"
    "github.com/ethereum/go-ethereum/common"
    "github.com/ethereum/go-ethereum/core/types"
    "github.com/ethereum/go-ethereum/ethclient"
)

// USDT合约ABI中的Transfer事件部分
const usdtABI = `[
    {
        "anonymous": false,
        "inputs": [
            {
                "indexed": true,
                "name": "from",
                "type": "address"
            },
            {
                "indexed": true,
                "name": "to",
                "type": "address"
            },
            {
                "indexed": false,
                "name": "value",
                "type": "uint256"
            }
        ],
        "name": "Transfer",
        "type": "event"
    }
]`

步骤三:连接以太坊节点

接下来,我们需要连接到以太坊节点:

go
Copy
func main() {
    // 连接到以太坊节点 (使用WebSocket)
    client, err := ethclient.Dial("wss://mainnet.infura.io/ws/v3/YOUR_INFURA_PROJECT_ID")
    if err != nil {
        log.Fatalf("Failed to connect to the Ethereum client: %v", err)
    }

    // 打印连接信息
    fmt.Println("Connected to Ethereum WebSocket node")

    // 后续代码
}

步骤四:创建事件过滤器

我们需要创建一个过滤器来监听USDT合约地址上的Transfer事件:

go
Copy
func main() {
    // ... 前面的连接代码

    // USDT合约地址
    usdtContractAddress := common.HexToAddress("0xdAC17F958D2ee523a2206206994597C13D831ec7")

    // 解析ABI
    parsedABI, err := abi.JSON(strings.NewReader(usdtABI))
    if err != nil {
        log.Fatalf("Failed to parse ABI: %v", err)
    }

    // 获取Transfer事件的ID
    transferEventSignature := []byte("Transfer(address,address,uint256)")
    transferEventID := crypto.Keccak256Hash(transferEventSignature)

    // 创建过滤查询
    query := ethereum.FilterQuery{
        Addresses: []common.Address{usdtContractAddress},
        Topics: [][]common.Hash{{transferEventID}},
    }

    // 后续代码
}

步骤五:监听事件日志

现在我们可以使用SubscribeFilterLogs方法来订阅USDT的Transfer事件:

go
Copy
func main() {
    // ... 前面的代码

    // 创建日志通道
    logs := make(chan types.Log)

    // 订阅事件
    sub, err := client.SubscribeFilterLogs(context.Background(), query, logs)
    if err != nil {
        log.Fatalf("Failed to subscribe to logs: %v", err)
    }

    fmt.Println("Monitoring USDT transfers...")

    // 处理事件
    for {
        select {
        case err := <-sub.Err():
            log.Fatalf("Subscription error: %v", err)
        case vLog := <-logs:
            // 解析Transfer事件
            event, err := parseTransferEvent(vLog, parsedABI)
            if err != nil {
                log.Printf("Error parsing log: %v", err)
                continue
            }

            // 打印事件信息
            fmt.Printf("Transfer: %s -> %s, Amount: %s USDT\n",
                event.From.Hex(),
                event.To.Hex(),
                formatUSDT(event.Value))
        }
    }
}

SubscribeFilterLogs方法解析

为什么使用SubscribeFilterLogs方法?

SubscribeFilterLogs是go-ethereum库中提供的一个强大方法,但必须使用WebSocket连接(wss\://)而不是HTTP/HTTPS。它允许我们:

  1. 实时监听:以推送的方式接收新的事件日志,无需轮询
  2. 资源效率:相比轮询方式,它更加高效,减少了不必要的网络请求
  3. 低延迟:能够在事件发生后立即接收通知,最小化延迟
  4. 长连接支持:通过WebSocket维持长连接,避免频繁建立和断开连接的开销

其他可用的方法

除了SubscribeFilterLogs外,go-ethereum还提供了其他几种获取事件日志的方法:

  1. FilterLogs

    go
    Copy
    logs, err := client.FilterLogs(ctx, query)

    这个方法用于查询特定区块范围内的历史事件日志。

  2. GetLogs

    go
    Copy
    logs, err := client.GetLogs(ctx, query)

    这是一个较低级别的API,直接调用以太坊JSON-RPC的eth_getLogs方法。

  3. 轮询方式:通过定期调用FilterLogs,检查新的区块并获取新的事件:

    go
    Copy
    ticker := time.NewTicker(15 * time.Second)
    defer ticker.Stop()
    
    var lastBlock uint64
    
    for range ticker.C {
       header, err := client.HeaderByNumber(context.Background(), nil)
       if err != nil {
           log.Printf("Error getting latest block: %v", err)
           continue
       }
    
       currentBlock := header.Number.Uint64()
       if currentBlock <= lastBlock {
           continue
       }
    
       query.FromBlock = big.NewInt(int64(lastBlock + 1))
       query.ToBlock = big.NewInt(int64(currentBlock))
    
       logs, err := client.FilterLogs(context.Background(), query)
       if err != nil {
           log.Printf("Error filtering logs: %v", err)
           continue
       }
    
       for _, vLog := range logs {
           // 处理日志...
       }
    
       lastBlock = currentBlock
    }

各种方法的优缺点比较

方法  优点  缺点SubscribeFilterLogs   • 实时推送事件<br>• 资源利用率高<br>• 低延迟<br>• 代码简洁<br>• 自动处理重新连接   • 必须使用WebSocket连接(wss://)<br>• 长连接可能会断开<br>• 对于历史数据不方便<br>• 可能错过事件(网络不稳定时)
FilterLogs  • 可查询任意区块范围<br>• 更可靠地获取所有事件<br>• 适合批量处理<br>• 不依赖长连接    • 不是实时的<br>• 需要自行跟踪处理的区块<br>• 对历史长区间查询可能超时
轮询方式    • 实现简单<br>• 不依赖WebSocket<br>• 容易控制请求频率<br>• 可靠性高   • 资源利用率低<br>• 延迟高<br>• 额外的区块追踪逻辑<br>• 频繁的API调用

重要注意事项

SubscribeFilterLogs只能与WebSocket连接一起使用,不能与HTTP/HTTPS连接一起使用。 如果您尝试在HTTP连接上使用SubscribeFilterLogs,将会收到类似以下的错误:

Copy
eth filter subscription not supported over http

最佳实践建议

  1. 结合使用不同方法

    • 使用FilterLogs获取历史数据和启动时的回填
    • 使用SubscribeFilterLogs监听新事件
    • 在订阅断开时使用轮询作为备份机制
  2. 处理断连和重连

    go
    Copy
    for {
       select {
       case err := <-sub.Err():
           log.Printf("Subscription error: %v, reconnecting...", err)
           // 重新连接逻辑
           sub, err = client.SubscribeFilterLogs(context.Background(), query, logs)
           if err != nil {
               log.Printf("Failed to resubscribe: %v", err)
               // 执行备份轮询或重试策略
           }
       case vLog := <-logs:
           // 处理日志...
       }
    }
  3. 针对生产环境优化

    • 实现指数退避重连机制
    • 记录最后处理的区块高度,以便从断点继续
    • 考虑链重组的影响,不立即处理最近的事件(等待确认)
Copy

## 步骤六:解析事件数据

接下来,我们需要定义解析事件的结构和函数:

```go
// Transfer事件结构
type TransferEvent struct {
    From  common.Address
    To    common.Address
    Value *big.Int
}

// 解析Transfer事件
func parseTransferEvent(log types.Log, contractABI abi.ABI) (*TransferEvent, error) {
    event := TransferEvent{}

    // 确保这是一个Transfer事件
    if len(log.Topics) < 3 {
        return nil, fmt.Errorf("not enough topics for Transfer event")
    }

    // 从日志主题中提取from和to地址
    event.From = common.HexToAddress(log.Topics[1].Hex())
    event.To = common.HexToAddress(log.Topics[2].Hex())

    // 解析value参数
    err := contractABI.UnpackIntoInterface(&event, "Transfer", log.Data)
    if err != nil {
        return nil, err
    }

    return &event, nil
}

// 格式化USDT金额(USDT有6位小数)
func formatUSDT(value *big.Int) string {
    divisor := big.NewInt(1000000) // 10^6 for USDT

    quotient := new(big.Int).Div(value, divisor)
    remainder := new(big.Int).Mod(value, divisor)

    return fmt.Sprintf("%s.%06s", quotient.String(), fmt.Sprintf("%06s", remainder.String()))
}

步骤七:处理历史交易(可选)

如果你还想查询历史上的USDT交易,可以使用FilterLogs方法:

go
Copy
func getHistoricalTransfers(client *ethclient.Client, contractAddress common.Address, fromBlock, toBlock *big.Int) {
    // 创建过滤查询
    query := ethereum.FilterQuery{
        FromBlock: fromBlock,
        ToBlock:   toBlock,
        Addresses: []common.Address{contractAddress},
        Topics:    [][]common.Hash{{transferEventID}},
    }

    // 获取日志
    logs, err := client.FilterLogs(context.Background(), query)
    if err != nil {
        log.Fatalf("Failed to filter logs: %v", err)
    }

    fmt.Printf("Found %d historical transfers\n", len(logs))

    // 处理每个日志
    for _, vLog := range logs {
        event, err := parseTransferEvent(vLog, parsedABI)
        if err != nil {
            log.Printf("Error parsing log: %v", err)
            continue
        }

        fmt.Printf("Block %d: %s -> %s, Amount: %s USDT\n",
            vLog.BlockNumber,
            event.From.Hex(),
            event.To.Hex(),
            formatUSDT(event.Value))
    }
}

完整代码示例

以下是一个完整的示例程序,它监听USDT的实时转账事件:

go
Copy
package main

import (
    "context"
    "crypto/ecdsa"
    "fmt"
    "log"
    "math/big"
    "strings"

    "github.com/ethereum/go-ethereum"
    "github.com/ethereum/go-ethereum/accounts/abi"
    "github.com/ethereum/go-ethereum/common"
    "github.com/ethereum/go-ethereum/core/types"
    "github.com/ethereum/go-ethereum/crypto"
    "github.com/ethereum/go-ethereum/ethclient"
)

// USDT合约ABI中的Transfer事件部分
const usdtABI = `[
    {
        "anonymous": false,
        "inputs": [
            {
                "indexed": true,
                "name": "from",
                "type": "address"
            },
            {
                "indexed": true,
                "name": "to",
                "type": "address"
            },
            {
                "indexed": false,
                "name": "value",
                "type": "uint256"
            }
        ],
        "name": "Transfer",
        "type": "event"
    }
]`

// Transfer事件结构
type TransferEvent struct {
    From  common.Address
    To    common.Address
    Value *big.Int
}

func main() {
    // 连接到以太坊节点 (使用WebSocket)
    client, err := ethclient.Dial("wss://mainnet.infura.io/ws/v3/YOUR_INFURA_PROJECT_ID")
    if err != nil {
        log.Fatalf("Failed to connect to the Ethereum client: %v", err)
    }
    defer client.Close()

    fmt.Println("Connected to Ethereum WebSocket node")

    // USDT合约地址
    usdtContractAddress := common.HexToAddress("0xdAC17F958D2ee523a2206206994597C13D831ec7")

    // 解析ABI
    parsedABI, err := abi.JSON(strings.NewReader(usdtABI))
    if err != nil {
        log.Fatalf("Failed to parse ABI: %v", err)
    }

    // 获取Transfer事件的ID
    transferEventSignature := []byte("Transfer(address,address,uint256)")
    transferEventID := crypto.Keccak256Hash(transferEventSignature)

    // 创建过滤查询
    query := ethereum.FilterQuery{
        Addresses: []common.Address{usdtContractAddress},
        Topics:    [][]common.Hash{{transferEventID}},
    }

    // 创建日志通道
    logs := make(chan types.Log)

    // 订阅事件
    sub, err := client.SubscribeFilterLogs(context.Background(), query, logs)
    if err != nil {
        log.Fatalf("Failed to subscribe to logs: %v", err)
    }

    fmt.Println("Monitoring USDT transfers...")

    // 处理事件
    for {
        select {
        case err := <-sub.Err():
            log.Fatalf("Subscription error: %v", err)
        case vLog := <-logs:
            // 解析Transfer事件
            event, err := parseTransferEvent(vLog, parsedABI)
            if err != nil {
                log.Printf("Error parsing log: %v", err)
                continue
            }

            // 打印事件信息
            fmt.Printf("Transfer: %s -> %s, Amount: %s USDT\n",
                event.From.Hex(),
                event.To.Hex(),
                formatUSDT(event.Value))
        }
    }
}

// 解析Transfer事件
func parseTransferEvent(log types.Log, contractABI abi.ABI) (*TransferEvent, error) {
    event := TransferEvent{}

    // 确保这是一个Transfer事件
    if len(log.Topics) < 3 {
        return nil, fmt.Errorf("not enough topics for Transfer event")
    }

    // 从日志主题中提取from和to地址
    event.From = common.HexToAddress(log.Topics[1].Hex())
    event.To = common.HexToAddress(log.Topics[2].Hex())

    // 解析value参数
    if len(log.Data) > 0 {
        // 解析非索引参数(value)
        err := contractABI.UnpackIntoInterface(&event.Value, "Transfer", log.Data)
        if err != nil {
            return nil, err
        }
    }

    return &event, nil
}

// 格式化USDT金额(USDT有6位小数)
func formatUSDT(value *big.Int) string {
    divisor := big.NewInt(1000000) // 10^6 for USDT

    quotient := new(big.Int).Div(value, divisor)
    remainder := new(big.Int).Mod(value, divisor)

    return fmt.Sprintf("%s.%06s", quotient.String(), fmt.Sprintf("%06s", remainder.String()))
}

优化和注意事项

  1. 错误处理:在生产环境中,应该添加更完善的错误处理机制,包括重试逻辑。
  2. 存储事件:对于实际应用,你可能需要将监听到的事件存储到数据库中。
  3. 过滤特定地址:如果只关心特定地址的USDT交易,可以在query.Topics中添加额外的条件。
  4. 性能优化:对于高吞吐量的应用,可以考虑使用批处理或并行处理来提高性能。
  5. WebSocket连接:使用WebSocket而不是HTTP连接可以减少延迟并提高实时性。
go
Copy
// 使用WebSocket连接 (这是使用SubscribeFilterLogs的必要条件)
client, err := ethclient.Dial("wss://mainnet.infura.io/ws/v3/YOUR_INFURA_PROJECT_ID")
  1. 处理重组:区块链可能发生重组,需要处理可能被回滚的事件。

结论

通过使用go-ethereum库,我们可以轻松地监听以太坊网络上的USDT交易事件。这种方法可以应用于多种场景,如构建交易监控系统、钱包通知服务或交易分析工具。

完整的实现还可以包括更多功能,如过滤特定金额的交易、监控多个代币、处理链重组等。根据你的具体需求,可以进一步扩展和优化上述代码。

CPBOX.io - 您的区块链开发伙伴

如果您正在寻找更加完善的区块链数据监控和分析解决方案,不妨试试 CPBOX.io。作为专业的区块链技术服务提供商,CPBOX.io 提供了一套全面的工具和服务,帮助开发者和企业轻松实现区块链数据的监控、分析和利用。

CPBOX.io 优势:

  • 全链数据支持: 不仅限于以太坊,还支持BSC、Polygon、Tron等多条主流公链
  • 监听服务: 灵活监听想要的交易数据,通知到Email、Telegram等各类渠道
  • 丰富的API接口: 简单易用的接口,轻松集成到您的应用中
  • 定制化解决方案: 针对不同业务场景提供个性化的区块链数据解决方案
  • 高扩展性: 可以同时支持监听几十上百万地址的交易
  • 强大的技术支持: 专业的技术团队为您提供全方位支持

无论您是进行区块链钱包开发、DeFi应用构建,还是需要进行链上数据分析,CPBOX.io都能为您提供可靠的技术支持和服务。

访问 www.cpbox.io 了解更多,开启您的区块链数据之旅!

参考资料

  1. go-ethereum GitHub 仓库
  2. ERC-20 代币标准
  3. USDT 合约地址
点赞 0
收藏 0
分享
本文参与登链社区写作激励计划 ,好文好收益,欢迎正在阅读的你也加入。

0 条评论

请先 登录 后评论
CryptoBox
CryptoBox
0x9099...2eE6
https://www.cpbox.io 是集web3批量工具, 一键发Token, 市值管理为一身的专业web3工具