在区块链应用开发中,实时监听某种代币的交易是一个常见需求。本文将详细介绍如何使用go-ethereum(也称为geth)客户端库来监听以太坊网络上的USDT(Tether)代币交易。USDT简介USDT是一种稳定币,在以太坊网络上以ERC-20代币的形式存在。它的智能合约实现了标准的ERC-20
在区块链应用开发中,实时监听某种代币的交易是一个常见需求。本文将详细介绍如何使用go-ethereum(也称为geth)客户端库来监听以太坊网络上的USDT(Tether)代币交易。
USDT是一种稳定币,在以太坊网络上以ERC-20代币的形式存在。它的智能合约实现了标准的ERC-20接口,包括Transfer
事件,该事件在每次代币转账时都会被触发。
以太坊主网上USDT合约地址:0xdAC17F958D2ee523a2206206994597C13D831ec7
在开始之前,需要准备以下内容:
首先创建一个新的Go项目并安装必要的依赖:
bash
Copy
mkdir usdt-monitor
cd usdt-monitor
go mod init github.com/your-username/usdt-monitor
go get github.com/ethereum/go-ethereum
为了正确解析USDT合约的事件,我们需要定义其ABI(Application Binary Interface)。ERC-20代币的Transfer
事件定义如下:
go
Copy
package main
import (
"context"
"fmt"
"log"
"math/big"
"strings"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/accounts/abi"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
)
// USDT合约ABI中的Transfer事件部分
const usdtABI = `[
{
"anonymous": false,
"inputs": [
{
"indexed": true,
"name": "from",
"type": "address"
},
{
"indexed": true,
"name": "to",
"type": "address"
},
{
"indexed": false,
"name": "value",
"type": "uint256"
}
],
"name": "Transfer",
"type": "event"
}
]`
接下来,我们需要连接到以太坊节点:
go
Copy
func main() {
// 连接到以太坊节点 (使用WebSocket)
client, err := ethclient.Dial("wss://mainnet.infura.io/ws/v3/YOUR_INFURA_PROJECT_ID")
if err != nil {
log.Fatalf("Failed to connect to the Ethereum client: %v", err)
}
// 打印连接信息
fmt.Println("Connected to Ethereum WebSocket node")
// 后续代码
}
我们需要创建一个过滤器来监听USDT合约地址上的Transfer事件:
go
Copy
func main() {
// ... 前面的连接代码
// USDT合约地址
usdtContractAddress := common.HexToAddress("0xdAC17F958D2ee523a2206206994597C13D831ec7")
// 解析ABI
parsedABI, err := abi.JSON(strings.NewReader(usdtABI))
if err != nil {
log.Fatalf("Failed to parse ABI: %v", err)
}
// 获取Transfer事件的ID
transferEventSignature := []byte("Transfer(address,address,uint256)")
transferEventID := crypto.Keccak256Hash(transferEventSignature)
// 创建过滤查询
query := ethereum.FilterQuery{
Addresses: []common.Address{usdtContractAddress},
Topics: [][]common.Hash{{transferEventID}},
}
// 后续代码
}
现在我们可以使用SubscribeFilterLogs
方法来订阅USDT的Transfer事件:
go
Copy
func main() {
// ... 前面的代码
// 创建日志通道
logs := make(chan types.Log)
// 订阅事件
sub, err := client.SubscribeFilterLogs(context.Background(), query, logs)
if err != nil {
log.Fatalf("Failed to subscribe to logs: %v", err)
}
fmt.Println("Monitoring USDT transfers...")
// 处理事件
for {
select {
case err := <-sub.Err():
log.Fatalf("Subscription error: %v", err)
case vLog := <-logs:
// 解析Transfer事件
event, err := parseTransferEvent(vLog, parsedABI)
if err != nil {
log.Printf("Error parsing log: %v", err)
continue
}
// 打印事件信息
fmt.Printf("Transfer: %s -> %s, Amount: %s USDT\n",
event.From.Hex(),
event.To.Hex(),
formatUSDT(event.Value))
}
}
}
SubscribeFilterLogs
方法解析SubscribeFilterLogs
方法?SubscribeFilterLogs
是go-ethereum库中提供的一个强大方法,但必须使用WebSocket连接(wss\://)而不是HTTP/HTTPS。它允许我们:
除了SubscribeFilterLogs
外,go-ethereum还提供了其他几种获取事件日志的方法:
FilterLogs
:
go
Copy
logs, err := client.FilterLogs(ctx, query)
这个方法用于查询特定区块范围内的历史事件日志。
GetLogs
:
go
Copy
logs, err := client.GetLogs(ctx, query)
这是一个较低级别的API,直接调用以太坊JSON-RPC的eth_getLogs
方法。
轮询方式:通过定期调用FilterLogs
,检查新的区块并获取新的事件:
go
Copy
ticker := time.NewTicker(15 * time.Second)
defer ticker.Stop()
var lastBlock uint64
for range ticker.C {
header, err := client.HeaderByNumber(context.Background(), nil)
if err != nil {
log.Printf("Error getting latest block: %v", err)
continue
}
currentBlock := header.Number.Uint64()
if currentBlock <= lastBlock {
continue
}
query.FromBlock = big.NewInt(int64(lastBlock + 1))
query.ToBlock = big.NewInt(int64(currentBlock))
logs, err := client.FilterLogs(context.Background(), query)
if err != nil {
log.Printf("Error filtering logs: %v", err)
continue
}
for _, vLog := range logs {
// 处理日志...
}
lastBlock = currentBlock
}
方法 优点 缺点SubscribeFilterLogs • 实时推送事件<br>• 资源利用率高<br>• 低延迟<br>• 代码简洁<br>• 自动处理重新连接 • 必须使用WebSocket连接(wss://)<br>• 长连接可能会断开<br>• 对于历史数据不方便<br>• 可能错过事件(网络不稳定时)
FilterLogs • 可查询任意区块范围<br>• 更可靠地获取所有事件<br>• 适合批量处理<br>• 不依赖长连接 • 不是实时的<br>• 需要自行跟踪处理的区块<br>• 对历史长区间查询可能超时
轮询方式 • 实现简单<br>• 不依赖WebSocket<br>• 容易控制请求频率<br>• 可靠性高 • 资源利用率低<br>• 延迟高<br>• 额外的区块追踪逻辑<br>• 频繁的API调用
SubscribeFilterLogs
只能与WebSocket连接一起使用,不能与HTTP/HTTPS连接一起使用。 如果您尝试在HTTP连接上使用SubscribeFilterLogs
,将会收到类似以下的错误:
Copy
eth filter subscription not supported over http
结合使用不同方法:
FilterLogs
获取历史数据和启动时的回填SubscribeFilterLogs
监听新事件处理断连和重连:
go
Copy
for {
select {
case err := <-sub.Err():
log.Printf("Subscription error: %v, reconnecting...", err)
// 重新连接逻辑
sub, err = client.SubscribeFilterLogs(context.Background(), query, logs)
if err != nil {
log.Printf("Failed to resubscribe: %v", err)
// 执行备份轮询或重试策略
}
case vLog := <-logs:
// 处理日志...
}
}
针对生产环境优化:
Copy
## 步骤六:解析事件数据
接下来,我们需要定义解析事件的结构和函数:
```go
// Transfer事件结构
type TransferEvent struct {
From common.Address
To common.Address
Value *big.Int
}
// 解析Transfer事件
func parseTransferEvent(log types.Log, contractABI abi.ABI) (*TransferEvent, error) {
event := TransferEvent{}
// 确保这是一个Transfer事件
if len(log.Topics) < 3 {
return nil, fmt.Errorf("not enough topics for Transfer event")
}
// 从日志主题中提取from和to地址
event.From = common.HexToAddress(log.Topics[1].Hex())
event.To = common.HexToAddress(log.Topics[2].Hex())
// 解析value参数
err := contractABI.UnpackIntoInterface(&event, "Transfer", log.Data)
if err != nil {
return nil, err
}
return &event, nil
}
// 格式化USDT金额(USDT有6位小数)
func formatUSDT(value *big.Int) string {
divisor := big.NewInt(1000000) // 10^6 for USDT
quotient := new(big.Int).Div(value, divisor)
remainder := new(big.Int).Mod(value, divisor)
return fmt.Sprintf("%s.%06s", quotient.String(), fmt.Sprintf("%06s", remainder.String()))
}
如果你还想查询历史上的USDT交易,可以使用FilterLogs
方法:
go
Copy
func getHistoricalTransfers(client *ethclient.Client, contractAddress common.Address, fromBlock, toBlock *big.Int) {
// 创建过滤查询
query := ethereum.FilterQuery{
FromBlock: fromBlock,
ToBlock: toBlock,
Addresses: []common.Address{contractAddress},
Topics: [][]common.Hash{{transferEventID}},
}
// 获取日志
logs, err := client.FilterLogs(context.Background(), query)
if err != nil {
log.Fatalf("Failed to filter logs: %v", err)
}
fmt.Printf("Found %d historical transfers\n", len(logs))
// 处理每个日志
for _, vLog := range logs {
event, err := parseTransferEvent(vLog, parsedABI)
if err != nil {
log.Printf("Error parsing log: %v", err)
continue
}
fmt.Printf("Block %d: %s -> %s, Amount: %s USDT\n",
vLog.BlockNumber,
event.From.Hex(),
event.To.Hex(),
formatUSDT(event.Value))
}
}
以下是一个完整的示例程序,它监听USDT的实时转账事件:
go
Copy
package main
import (
"context"
"crypto/ecdsa"
"fmt"
"log"
"math/big"
"strings"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/accounts/abi"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethclient"
)
// USDT合约ABI中的Transfer事件部分
const usdtABI = `[
{
"anonymous": false,
"inputs": [
{
"indexed": true,
"name": "from",
"type": "address"
},
{
"indexed": true,
"name": "to",
"type": "address"
},
{
"indexed": false,
"name": "value",
"type": "uint256"
}
],
"name": "Transfer",
"type": "event"
}
]`
// Transfer事件结构
type TransferEvent struct {
From common.Address
To common.Address
Value *big.Int
}
func main() {
// 连接到以太坊节点 (使用WebSocket)
client, err := ethclient.Dial("wss://mainnet.infura.io/ws/v3/YOUR_INFURA_PROJECT_ID")
if err != nil {
log.Fatalf("Failed to connect to the Ethereum client: %v", err)
}
defer client.Close()
fmt.Println("Connected to Ethereum WebSocket node")
// USDT合约地址
usdtContractAddress := common.HexToAddress("0xdAC17F958D2ee523a2206206994597C13D831ec7")
// 解析ABI
parsedABI, err := abi.JSON(strings.NewReader(usdtABI))
if err != nil {
log.Fatalf("Failed to parse ABI: %v", err)
}
// 获取Transfer事件的ID
transferEventSignature := []byte("Transfer(address,address,uint256)")
transferEventID := crypto.Keccak256Hash(transferEventSignature)
// 创建过滤查询
query := ethereum.FilterQuery{
Addresses: []common.Address{usdtContractAddress},
Topics: [][]common.Hash{{transferEventID}},
}
// 创建日志通道
logs := make(chan types.Log)
// 订阅事件
sub, err := client.SubscribeFilterLogs(context.Background(), query, logs)
if err != nil {
log.Fatalf("Failed to subscribe to logs: %v", err)
}
fmt.Println("Monitoring USDT transfers...")
// 处理事件
for {
select {
case err := <-sub.Err():
log.Fatalf("Subscription error: %v", err)
case vLog := <-logs:
// 解析Transfer事件
event, err := parseTransferEvent(vLog, parsedABI)
if err != nil {
log.Printf("Error parsing log: %v", err)
continue
}
// 打印事件信息
fmt.Printf("Transfer: %s -> %s, Amount: %s USDT\n",
event.From.Hex(),
event.To.Hex(),
formatUSDT(event.Value))
}
}
}
// 解析Transfer事件
func parseTransferEvent(log types.Log, contractABI abi.ABI) (*TransferEvent, error) {
event := TransferEvent{}
// 确保这是一个Transfer事件
if len(log.Topics) < 3 {
return nil, fmt.Errorf("not enough topics for Transfer event")
}
// 从日志主题中提取from和to地址
event.From = common.HexToAddress(log.Topics[1].Hex())
event.To = common.HexToAddress(log.Topics[2].Hex())
// 解析value参数
if len(log.Data) > 0 {
// 解析非索引参数(value)
err := contractABI.UnpackIntoInterface(&event.Value, "Transfer", log.Data)
if err != nil {
return nil, err
}
}
return &event, nil
}
// 格式化USDT金额(USDT有6位小数)
func formatUSDT(value *big.Int) string {
divisor := big.NewInt(1000000) // 10^6 for USDT
quotient := new(big.Int).Div(value, divisor)
remainder := new(big.Int).Mod(value, divisor)
return fmt.Sprintf("%s.%06s", quotient.String(), fmt.Sprintf("%06s", remainder.String()))
}
query.Topics
中添加额外的条件。go
Copy
// 使用WebSocket连接 (这是使用SubscribeFilterLogs的必要条件)
client, err := ethclient.Dial("wss://mainnet.infura.io/ws/v3/YOUR_INFURA_PROJECT_ID")
通过使用go-ethereum库,我们可以轻松地监听以太坊网络上的USDT交易事件。这种方法可以应用于多种场景,如构建交易监控系统、钱包通知服务或交易分析工具。
完整的实现还可以包括更多功能,如过滤特定金额的交易、监控多个代币、处理链重组等。根据你的具体需求,可以进一步扩展和优化上述代码。
如果您正在寻找更加完善的区块链数据监控和分析解决方案,不妨试试 CPBOX.io。作为专业的区块链技术服务提供商,CPBOX.io 提供了一套全面的工具和服务,帮助开发者和企业轻松实现区块链数据的监控、分析和利用。
无论您是进行区块链钱包开发、DeFi应用构建,还是需要进行链上数据分析,CPBOX.io都能为您提供可靠的技术支持和服务。
访问 www.cpbox.io 了解更多,开启您的区块链数据之旅!
如果觉得我的文章对您有用,请随意打赏。你的支持将鼓励我继续创作!