在区块链应用开发中,实时监听某种代币的交易是一个常见需求。本文将详细介绍如何使用go-ethereum(也称为geth)客户端库来监听以太坊网络上的USDT(Tether)代币交易。USDT简介USDT是一种稳定币,在以太坊网络上以ERC-20代币的形式存在。它的智能合约实现了标准的ERC-20
在区块链应用开发中,实时监听某种代币的交易是一个常见需求。本文将详细介绍如何使用go-ethereum(也称为geth)客户端库来监听以太坊网络上的USDT(Tether)代币交易。
USDT是一种稳定币,在以太坊网络上以ERC-20代币的形式存在。它的智能合约实现了标准的ERC-20接口,包括Transfer事件,该事件在每次代币转账时都会被触发。
以太坊主网上USDT合约地址:0xdAC17F958D2ee523a2206206994597C13D831ec7
在开始之前,需要准备以下内容:
首先创建一个新的Go项目并安装必要的依赖:
bash
Copy
mkdir usdt-monitor
cd usdt-monitor
go mod init github.com/your-username/usdt-monitor
go get github.com/ethereum/go-ethereum为了正确解析USDT合约的事件,我们需要定义其ABI(Application Binary Interface)。ERC-20代币的Transfer事件定义如下:
go
Copy
package main
import (
    "context"
    "fmt"
    "log"
    "math/big"
    "strings"
    "github.com/ethereum/go-ethereum"
    "github.com/ethereum/go-ethereum/accounts/abi"
    "github.com/ethereum/go-ethereum/common"
    "github.com/ethereum/go-ethereum/core/types"
    "github.com/ethereum/go-ethereum/ethclient"
)
// USDT合约ABI中的Transfer事件部分
const usdtABI = `[
    {
        "anonymous": false,
        "inputs": [
            {
                "indexed": true,
                "name": "from",
                "type": "address"
            },
            {
                "indexed": true,
                "name": "to",
                "type": "address"
            },
            {
                "indexed": false,
                "name": "value",
                "type": "uint256"
            }
        ],
        "name": "Transfer",
        "type": "event"
    }
]`接下来,我们需要连接到以太坊节点:
go
Copy
func main() {
    // 连接到以太坊节点 (使用WebSocket)
    client, err := ethclient.Dial("wss://mainnet.infura.io/ws/v3/YOUR_INFURA_PROJECT_ID")
    if err != nil {
        log.Fatalf("Failed to connect to the Ethereum client: %v", err)
    }
    // 打印连接信息
    fmt.Println("Connected to Ethereum WebSocket node")
    // 后续代码
}我们需要创建一个过滤器来监听USDT合约地址上的Transfer事件:
go
Copy
func main() {
    // ... 前面的连接代码
    // USDT合约地址
    usdtContractAddress := common.HexToAddress("0xdAC17F958D2ee523a2206206994597C13D831ec7")
    // 解析ABI
    parsedABI, err := abi.JSON(strings.NewReader(usdtABI))
    if err != nil {
        log.Fatalf("Failed to parse ABI: %v", err)
    }
    // 获取Transfer事件的ID
    transferEventSignature := []byte("Transfer(address,address,uint256)")
    transferEventID := crypto.Keccak256Hash(transferEventSignature)
    // 创建过滤查询
    query := ethereum.FilterQuery{
        Addresses: []common.Address{usdtContractAddress},
        Topics: [][]common.Hash{{transferEventID}},
    }
    // 后续代码
}现在我们可以使用SubscribeFilterLogs方法来订阅USDT的Transfer事件:
go
Copy
func main() {
    // ... 前面的代码
    // 创建日志通道
    logs := make(chan types.Log)
    // 订阅事件
    sub, err := client.SubscribeFilterLogs(context.Background(), query, logs)
    if err != nil {
        log.Fatalf("Failed to subscribe to logs: %v", err)
    }
    fmt.Println("Monitoring USDT transfers...")
    // 处理事件
    for {
        select {
        case err := <-sub.Err():
            log.Fatalf("Subscription error: %v", err)
        case vLog := <-logs:
            // 解析Transfer事件
            event, err := parseTransferEvent(vLog, parsedABI)
            if err != nil {
                log.Printf("Error parsing log: %v", err)
                continue
            }
            // 打印事件信息
            fmt.Printf("Transfer: %s -> %s, Amount: %s USDT\n",
                event.From.Hex(),
                event.To.Hex(),
                formatUSDT(event.Value))
        }
    }
}SubscribeFilterLogs方法解析SubscribeFilterLogs方法?SubscribeFilterLogs是go-ethereum库中提供的一个强大方法,但必须使用WebSocket连接(wss\://)而不是HTTP/HTTPS。它允许我们:
除了SubscribeFilterLogs外,go-ethereum还提供了其他几种获取事件日志的方法:
FilterLogs:
go
Copy
logs, err := client.FilterLogs(ctx, query)这个方法用于查询特定区块范围内的历史事件日志。
GetLogs:
go
Copy
logs, err := client.GetLogs(ctx, query)这是一个较低级别的API,直接调用以太坊JSON-RPC的eth_getLogs方法。
轮询方式:通过定期调用FilterLogs,检查新的区块并获取新的事件:
go
Copy
ticker := time.NewTicker(15 * time.Second)
defer ticker.Stop()
var lastBlock uint64
for range ticker.C {
   header, err := client.HeaderByNumber(context.Background(), nil)
   if err != nil {
       log.Printf("Error getting latest block: %v", err)
       continue
   }
   currentBlock := header.Number.Uint64()
   if currentBlock <= lastBlock {
       continue
   }
   query.FromBlock = big.NewInt(int64(lastBlock + 1))
   query.ToBlock = big.NewInt(int64(currentBlock))
   logs, err := client.FilterLogs(context.Background(), query)
   if err != nil {
       log.Printf("Error filtering logs: %v", err)
       continue
   }
   for _, vLog := range logs {
       // 处理日志...
   }
   lastBlock = currentBlock
}方法  优点  缺点SubscribeFilterLogs   • 实时推送事件<br>• 资源利用率高<br>• 低延迟<br>• 代码简洁<br>• 自动处理重新连接   • 必须使用WebSocket连接(wss://)<br>• 长连接可能会断开<br>• 对于历史数据不方便<br>• 可能错过事件(网络不稳定时)
FilterLogs  • 可查询任意区块范围<br>• 更可靠地获取所有事件<br>• 适合批量处理<br>• 不依赖长连接    • 不是实时的<br>• 需要自行跟踪处理的区块<br>• 对历史长区间查询可能超时
轮询方式    • 实现简单<br>• 不依赖WebSocket<br>• 容易控制请求频率<br>• 可靠性高   • 资源利用率低<br>• 延迟高<br>• 额外的区块追踪逻辑<br>• 频繁的API调用SubscribeFilterLogs只能与WebSocket连接一起使用,不能与HTTP/HTTPS连接一起使用。 如果您尝试在HTTP连接上使用SubscribeFilterLogs,将会收到类似以下的错误:
Copy
eth filter subscription not supported over http结合使用不同方法:
FilterLogs获取历史数据和启动时的回填SubscribeFilterLogs监听新事件处理断连和重连:
go
Copy
for {
   select {
   case err := <-sub.Err():
       log.Printf("Subscription error: %v, reconnecting...", err)
       // 重新连接逻辑
       sub, err = client.SubscribeFilterLogs(context.Background(), query, logs)
       if err != nil {
           log.Printf("Failed to resubscribe: %v", err)
           // 执行备份轮询或重试策略
       }
   case vLog := <-logs:
       // 处理日志...
   }
}针对生产环境优化:
Copy
## 步骤六:解析事件数据
接下来,我们需要定义解析事件的结构和函数:
```go
// Transfer事件结构
type TransferEvent struct {
    From  common.Address
    To    common.Address
    Value *big.Int
}
// 解析Transfer事件
func parseTransferEvent(log types.Log, contractABI abi.ABI) (*TransferEvent, error) {
    event := TransferEvent{}
    // 确保这是一个Transfer事件
    if len(log.Topics) < 3 {
        return nil, fmt.Errorf("not enough topics for Transfer event")
    }
    // 从日志主题中提取from和to地址
    event.From = common.HexToAddress(log.Topics[1].Hex())
    event.To = common.HexToAddress(log.Topics[2].Hex())
    // 解析value参数
    err := contractABI.UnpackIntoInterface(&event, "Transfer", log.Data)
    if err != nil {
        return nil, err
    }
    return &event, nil
}
// 格式化USDT金额(USDT有6位小数)
func formatUSDT(value *big.Int) string {
    divisor := big.NewInt(1000000) // 10^6 for USDT
    quotient := new(big.Int).Div(value, divisor)
    remainder := new(big.Int).Mod(value, divisor)
    return fmt.Sprintf("%s.%06s", quotient.String(), fmt.Sprintf("%06s", remainder.String()))
}如果你还想查询历史上的USDT交易,可以使用FilterLogs方法:
go
Copy
func getHistoricalTransfers(client *ethclient.Client, contractAddress common.Address, fromBlock, toBlock *big.Int) {
    // 创建过滤查询
    query := ethereum.FilterQuery{
        FromBlock: fromBlock,
        ToBlock:   toBlock,
        Addresses: []common.Address{contractAddress},
        Topics:    [][]common.Hash{{transferEventID}},
    }
    // 获取日志
    logs, err := client.FilterLogs(context.Background(), query)
    if err != nil {
        log.Fatalf("Failed to filter logs: %v", err)
    }
    fmt.Printf("Found %d historical transfers\n", len(logs))
    // 处理每个日志
    for _, vLog := range logs {
        event, err := parseTransferEvent(vLog, parsedABI)
        if err != nil {
            log.Printf("Error parsing log: %v", err)
            continue
        }
        fmt.Printf("Block %d: %s -> %s, Amount: %s USDT\n",
            vLog.BlockNumber,
            event.From.Hex(),
            event.To.Hex(),
            formatUSDT(event.Value))
    }
}以下是一个完整的示例程序,它监听USDT的实时转账事件:
go
Copy
package main
import (
    "context"
    "crypto/ecdsa"
    "fmt"
    "log"
    "math/big"
    "strings"
    "github.com/ethereum/go-ethereum"
    "github.com/ethereum/go-ethereum/accounts/abi"
    "github.com/ethereum/go-ethereum/common"
    "github.com/ethereum/go-ethereum/core/types"
    "github.com/ethereum/go-ethereum/crypto"
    "github.com/ethereum/go-ethereum/ethclient"
)
// USDT合约ABI中的Transfer事件部分
const usdtABI = `[
    {
        "anonymous": false,
        "inputs": [
            {
                "indexed": true,
                "name": "from",
                "type": "address"
            },
            {
                "indexed": true,
                "name": "to",
                "type": "address"
            },
            {
                "indexed": false,
                "name": "value",
                "type": "uint256"
            }
        ],
        "name": "Transfer",
        "type": "event"
    }
]`
// Transfer事件结构
type TransferEvent struct {
    From  common.Address
    To    common.Address
    Value *big.Int
}
func main() {
    // 连接到以太坊节点 (使用WebSocket)
    client, err := ethclient.Dial("wss://mainnet.infura.io/ws/v3/YOUR_INFURA_PROJECT_ID")
    if err != nil {
        log.Fatalf("Failed to connect to the Ethereum client: %v", err)
    }
    defer client.Close()
    fmt.Println("Connected to Ethereum WebSocket node")
    // USDT合约地址
    usdtContractAddress := common.HexToAddress("0xdAC17F958D2ee523a2206206994597C13D831ec7")
    // 解析ABI
    parsedABI, err := abi.JSON(strings.NewReader(usdtABI))
    if err != nil {
        log.Fatalf("Failed to parse ABI: %v", err)
    }
    // 获取Transfer事件的ID
    transferEventSignature := []byte("Transfer(address,address,uint256)")
    transferEventID := crypto.Keccak256Hash(transferEventSignature)
    // 创建过滤查询
    query := ethereum.FilterQuery{
        Addresses: []common.Address{usdtContractAddress},
        Topics:    [][]common.Hash{{transferEventID}},
    }
    // 创建日志通道
    logs := make(chan types.Log)
    // 订阅事件
    sub, err := client.SubscribeFilterLogs(context.Background(), query, logs)
    if err != nil {
        log.Fatalf("Failed to subscribe to logs: %v", err)
    }
    fmt.Println("Monitoring USDT transfers...")
    // 处理事件
    for {
        select {
        case err := <-sub.Err():
            log.Fatalf("Subscription error: %v", err)
        case vLog := <-logs:
            // 解析Transfer事件
            event, err := parseTransferEvent(vLog, parsedABI)
            if err != nil {
                log.Printf("Error parsing log: %v", err)
                continue
            }
            // 打印事件信息
            fmt.Printf("Transfer: %s -> %s, Amount: %s USDT\n",
                event.From.Hex(),
                event.To.Hex(),
                formatUSDT(event.Value))
        }
    }
}
// 解析Transfer事件
func parseTransferEvent(log types.Log, contractABI abi.ABI) (*TransferEvent, error) {
    event := TransferEvent{}
    // 确保这是一个Transfer事件
    if len(log.Topics) < 3 {
        return nil, fmt.Errorf("not enough topics for Transfer event")
    }
    // 从日志主题中提取from和to地址
    event.From = common.HexToAddress(log.Topics[1].Hex())
    event.To = common.HexToAddress(log.Topics[2].Hex())
    // 解析value参数
    if len(log.Data) > 0 {
        // 解析非索引参数(value)
        err := contractABI.UnpackIntoInterface(&event.Value, "Transfer", log.Data)
        if err != nil {
            return nil, err
        }
    }
    return &event, nil
}
// 格式化USDT金额(USDT有6位小数)
func formatUSDT(value *big.Int) string {
    divisor := big.NewInt(1000000) // 10^6 for USDT
    quotient := new(big.Int).Div(value, divisor)
    remainder := new(big.Int).Mod(value, divisor)
    return fmt.Sprintf("%s.%06s", quotient.String(), fmt.Sprintf("%06s", remainder.String()))
}query.Topics中添加额外的条件。go
Copy
// 使用WebSocket连接 (这是使用SubscribeFilterLogs的必要条件)
client, err := ethclient.Dial("wss://mainnet.infura.io/ws/v3/YOUR_INFURA_PROJECT_ID")通过使用go-ethereum库,我们可以轻松地监听以太坊网络上的USDT交易事件。这种方法可以应用于多种场景,如构建交易监控系统、钱包通知服务或交易分析工具。
完整的实现还可以包括更多功能,如过滤特定金额的交易、监控多个代币、处理链重组等。根据你的具体需求,可以进一步扩展和优化上述代码。
如果您正在寻找更加完善的区块链数据监控和分析解决方案,不妨试试 CPBOX.io。作为专业的区块链技术服务提供商,CPBOX.io 提供了一套全面的工具和服务,帮助开发者和企业轻松实现区块链数据的监控、分析和利用。
无论您是进行区块链钱包开发、DeFi应用构建,还是需要进行链上数据分析,CPBOX.io都能为您提供可靠的技术支持和服务。
访问 www.cpbox.io 了解更多,开启您的区块链数据之旅!
 
                如果觉得我的文章对您有用,请随意打赏。你的支持将鼓励我继续创作!