本文详细介绍了如何利用QuickNode的Streams和Filters工具构建一个交易机器人,专注于监听和解析Uniswap V3上的ETH与USDC交易。文章提供了全面的步骤,包括需要的工具、配置本地Webhook、创建Stream和Filters,以及执行交易的代码实例,非常适合想要深入理解区块链数据流和自动化交易的开发者。
构建一个交易机器人涉及多个组件,比如获取区块链数据、实现合适的数据解析,然后创建你的策略。幸运的是,你可以使用 QuickNode 的 Streams and Filters 工具来解决常见的数据检索和清理任务,最小化你在构建和管理准确高效的数据管道所需的时间。在本指南中,我们将向你展示如何使用 Streams 和 Streams Filters 来构建自己的交易机器人,以监听 Uniswap V3 上的 ETH => USDC 交换。
让我们开始吧!
依赖项 | 版本 |
---|---|
node | ^18.18 |
dotenv | ^16.4.5 |
ethers | ^6.13.1 |
express | ^4.19.2 |
Streams 是一种区块链数据解决方案,可用于检索多个链的实时和历史区块数据,如 Ethereum、Optimism、Base 和 更多。通过 Streams,你可以将数据发送到多个目标,如 Webhooks、S3 Buckets、PostgreSQL、Snowflake 和 Functions。Streams 允许你选择特定的数据模式(例如,区块、交易、收据、日志、痕迹)以满足不同的数据需求,并帮助你管理 RPC 基础设施。有关更多信息,请访问 Streams - 文档 页面。
为了使 Streams 更加强大,你可以使用 Streams Filters 在把区块链数据发送给你之前先进行解析。目前,Streams Filters 支持 JavaScript(ECMAScript)代码,可以通过两种方式设置,在 QuickNode 控制面板内或通过 Streams REST API。
实现 Streams Filters 有几个优点,例如:
现在我们对 Streams 和 Filters 有了更好的理解,让我们开始编码吧。
虽然我们在驱动 Streams 时不需要访问节点端点,但在发送 Uniswap V3 交换交易时需要它。虽然我们可以运行自己的节点,但在 QuickNode,我们使启动区块链节点变得快捷容易。你可以在 这里注册一个帐户。创建 Ethereum Sepolia 端点后,获取 HTTP URL。它应该看起来像这样:
为了在链上交换代币,你需要 ETH 来支付手续费。由于我们使用的是 Sepolia 测试网,我们可以从 多链 QuickNode 水龙头 获取一些测试 ETH。
导航到 多链 QuickNode 水龙头 并连接你的钱包(例如,MetaMask、Coinbase 钱包)或粘贴你的钱包地址以获取测试 ETH。请注意,在以太坊主网使用 EVM 水龙头需要满足 0.001 ETH 的主网余额要求。你还可以通过推特或使用你的 QuickNode 帐户登录以获得奖励!
为了创建一个交易机器人,使用 Webhook 目标可能比使用 S3 和 PostgreSQL 更合适。我们将使用 Express.js 服务器来检索 Stream 数据、分析它,并决定是否购买。逻辑步骤如下:
exactInputSingle
交换方法,该方法的函数签名为 0x04e45aaf
。 tokenOut
字段。我们添加一个条件,确保这是 WETH 地址
。我们还通过检查收据数据对象的日志中是否存在 Swap
主题事件的哈希来验证交换是否发生。注意:触发交换的 USDC 阈值是可配置的。你可以在过滤代码中调整 THRESHOLD_IN_USDC
的值,以根据你的交易策略设置更小或更大的金额。
danger
此交易机器人不适用于生产,仅用于演示目的,以展示 Streams + Filters 的一些用例。
现在进入代码部分。
导航到你希望项目文件存放的位置,然后运行以下命令:
mkdir streams-filters-uniswap-bot
cd streams-filters-uniswap-bot
npm init es6 -y
npm i dotenv ethers express
echo > index.mjs
接下来,你需要使用私钥和端点 URL 配置 .env 文件,使用以下格式:
RPC_URL=https://ethereum-sepolia.quiknode.pro/AUTHTOKEN/
PRIVATE_KEY=YOUR_PRIVATE_KEY
记得保存文件。
你还需要通过参考以下地址下载 ABIs 来配置项目:
或者,你可以查看这个 仓库,将 abis
文件夹移动到你的项目文件夹中。记得保存文件。
随着我们的项目设置了依赖项、.env 配置和 ABIs,让我们设置脚本。
打开 index.mjs 文件,并复制粘贴以下代码:
import express from 'express';
import { ethers } from 'ethers';
import QUOTER_ABI from './abis/quoter.json' assert { type: 'json' };
import SWAP_ROUTER_ABI from './abis/swaprouter.json' assert { type: 'json' };
import TOKEN_IN_ABI from './abis/weth.json' assert { type: 'json' };
import dotenv from 'dotenv';
dotenv.config();
const app = express();
app.use(express.json({ limit: '2mb' }));
const port = 8000;
// 部署地址
const SWAP_ROUTER_ADDRESS = ethers.getAddress('0x3bFA4769FB09eefC5a80d6E87c3B9C650f7Ae48E');
const QUOTER_ADDRESS = ethers.getAddress('0xEd1f6473345F45b75F8179591dd5bA1888cf2FB3');
const WETH_ADDRESS = ethers.getAddress('0xfFf9976782d46CC05630D1f6eBAb18b2324d6B14');
const USDC_ADDRESS = ethers.getAddress('0x1c7D4B196Cb0C7B01d743Fbc6116a902379C7238');
// 提供者、合约和签名实例
const provider = new ethers.JsonRpcProvider(process.env.RPC_URL);
const quoterContract = new ethers.Contract(QUOTER_ADDRESS, QUOTER_ABI, provider);
const signer = new ethers.Wallet(process.env.PRIVATE_KEY, provider);
let hasSwappedToday = false;
// 代币配置
const WETH = {
chainId: 11155111,
address: WETH_ADDRESS,
decimals: 18,
symbol: 'WETH',
name: 'Wrapped Ether'
};
const USDC = {
chainId: 11155111,
address: USDC_ADDRESS,
decimals: 6,
symbol: 'USDC',
name: 'USD//C'
};
app.use(express.json());
async function quoteAndLogSwap(quoterContract, fee, signer, amountIn) {
try {
const quotedAmountOut = await quoterContract.quoteExactInputSingle.staticCall({
tokenIn: WETH_ADDRESS,
tokenOut: USDC_ADDRESS,
fee: fee,
amountIn: amountIn,
sqrtPriceLimitX96: 0,
});
const amountOut = quotedAmountOut[0];
console.log(`-------------------------------`);
console.log(`Token Swap will result in: ${ethers.formatUnits(amountOut, 6)} USDC for ${ethers.formatEther(amountIn)} WETH`);
return amountOut;
} catch (error) {
console.error('在 quoteAndLogSwap 中发生错误:', error);
throw error;
}
}
async function checkAndApproveToken(tokenAddress, spenderAddress, amount, signer) {
const tokenContract = new ethers.Contract(tokenAddress, TOKEN_IN_ABI, signer);
const currentAllowance = await tokenContract.allowance(signer.address, spenderAddress);
if (currentAllowance < amount) {
console.log('授予额度不足。正在批准代币...');
console.log(`-------------------------------`);
const approvalTx = await tokenContract.approve(spenderAddress, amount);
await approvalTx.wait();
console.log('批准交易已确认');
console.log(`-------------------------------`);
} else {
console.log('已经存在足够的授予额度');
console.log(`-------------------------------`);
}
}
async function swapEthToUsdc(ethAmount) {
const swapRouter = new ethers.Contract(SWAP_ROUTER_ADDRESS, SWAP_ROUTER_ABI, signer);
// 将 ETH 金额转换为 Wei
const amountIn = ethers.parseEther(ethAmount.toString());
// 检查 ETH 余额
const balance = await provider.getBalance(signer.address);
console.log(`-------------------------------`);
console.log(`ETH 余额: ${ethers.formatEther(balance)} ETH`);
console.log(`-------------------------------`);
if (balance < amountIn) {
console.error('ETH 余额不足');
return;
}
try {
console.log(`正在交换 ${ethAmount} ETH 获取 USDC...`);
// 获取报价
let quotedAmountOut;
try {
quotedAmountOut = await quoteAndLogSwap(quoterContract, 3000, signer, amountIn);
console.log(`-------------------------------`);
console.log(`报价金额: ${ethers.formatUnits(quotedAmountOut, 6)} USDC`);
console.log(`-------------------------------`);
} catch (quoteError) {
console.error('获取报价时出错:', quoteError);
return;
}
// 检查并根据需要批准 WETH
await checkAndApproveToken(WETH_ADDRESS, SWAP_ROUTER_ADDRESS, amountIn, signer);
// 计算最小输出金额并考虑 5% 滑点
const minAmountOut = quotedAmountOut * 95n / 100n;
// 准备交换的参数
const params = {
tokenIn: WETH_ADDRESS,
tokenOut: USDC_ADDRESS,
fee: 3000, // 0.3%费用等级
recipient: await signer.getAddress(),
deadline: Math.floor(Date.now() / 1000) + 60 * 20, // 20分钟后
amountIn: amountIn,
amountOutMinimum: minAmountOut,
sqrtPriceLimitX96: 0
};
// 估算Gas
const gasEstimate = await swapRouter.exactInputSingle.estimateGas(params, { value: amountIn });
console.log('预计Gas:', gasEstimate.toString());
console.log(`-------------------------------`);
// 获取当前的Gas价格
const feeData = await provider.getFeeData();
const gasPrice = feeData.gasPrice;
console.log('当前Gas价格:', ethers.formatUnits(gasPrice, 'gwei'), 'gwei');
console.log(`-------------------------------`);
// 执行交换
console.log('发送交易...');
const tx = await swapRouter.exactInputSingle(params, {
nonce: await signer.getNonce(),
value: amountIn,
gasLimit: gasEstimate * 12n / 10n, // 在Gas估算上加 20% 的缓冲
gasPrice: gasPrice
});
console.log(`交易已发送。哈希: ${tx.hash}`);
console.log(`-------------------------------`);
console.log('等待交易确认...');
console.log(`-------------------------------`);
const receipt = await tx.wait();
console.log(`交易在区块 ${receipt.blockNumber} 中确认`);
console.log(`-------------------------------`);
if (receipt.status === 1) {
console.log('交易成功');
// 查找 USDC 的转移事件
const usdcTransferEvent = receipt.logs.find(log =>
log.address.toLowerCase() === USDC_ADDRESS.toLowerCase() &&
log.topics[0] === ethers.id("Transfer(address,address,uint256)")
);
if (usdcTransferEvent) {
console.log(`-------------------------------`);
const amountOut = ethers.dataSlice(usdcTransferEvent.data, 0);
console.log(`已收到 ${ethers.formatUnits(amountOut, 6)} USDC`);
console.log(`-------------------------------`);
return ethers.formatUnits(amountOut, 6);
} else {
console.log('在日志中未找到 USDC 转移事件');
return '0';
}
} else {
console.log('交易失败');
return '0';
}
} catch (error) {
console.error('交换过程中的错误:', error);
if (error.transaction) {
console.error('交易详细信息:', error.transaction);
}
if (error.receipt) {
console.error('交易收据:', error.receipt);
}
throw error;
}
}
app.post('/swap', async (req, res) => {
try {
// 检查是否已经执行了交换
if (hasSwappedToday) {
console.log('已经执行了交换');
return res.status(200).json({
success: false,
message: '交换次数达到上限'
});
}
const { result, message } = req.body;
// 检查请求体是否无效
if (!result && !message) {
console.log('请求体无效');
return res.status(200).json({
success: false,
message: '请求体无效'
});
}
// 处理“条件满足”情况
if (result && Array.isArray(result) && result.length > 0) {
const transaction = result[0]; // 假设我们对第一个匹配的交易感兴趣
console.log(`在区块 ${transaction.block} 检测到交易,金额: ${transaction.amount0USDC} USDC,哈希: ${transaction.transactionHash}`);
const THRESHOLD_IN_USDC = 0.50;
const validSwap = result.find(item => Math.abs(parseFloat(item.amount0USDC)) >= THRESHOLD_IN_USDC);
if (validSwap) {
const swappedAmount = await swapEthToUsdc(0.01); // 自定义购买值
if (swappedAmount !== '0') {
hasSwappedToday = true; // 在成功交换后将标志设置为 true
console.log('交换成功。将不再执行更多交换。');
}
return res.status(200).json({
success: true,
message: '交换成功',
swappedAmount: swappedAmount
});
} else {
console.log('没有超过阈值的交换金额');
return res.status(200).json({
success: false,
message: '没有超过阈值的交换金额'
});
}
}
// 处理“未满足”情况
if (message === "没有符合条件的交易") {
const blockNumber = req.body.block;
console.log(`在区块 ${blockNumber} 中没有符合条件的交易`);
return res.status(200).json({
success: false,
message: '没有符合条件的交易',
block: blockNumber
});
}
} catch (error) {
console.error('执行交换时发生错误:', error);
return res.status(200).json({
success: false,
error: '交换执行失败',
details: error.message,
transaction: error.transaction,
receipt: error.receipt
});
}
});
app.listen(port, () => {
console.log(`-------------------------------`);
console.log(`服务器正在运行在端口 ${port}`);
console.log(`-------------------------------`);
});
以下是上述 Express.js 服务器代码的主要功能概述:
我们定义了一个 /swap
POST 端点,该端点:
交换过程涉及到:
注意:该机器人配置为仅执行一次交换。成功后不会再进行任何交易,直到服务器重启。可以修改为按时间间隔(例如,每天)执行。
代码中包括了错误处理和整个过程的日志记录。接下来,我们将本地运行 Express.js 服务器,然后设置它以通过 ngrok 远程工作。
要启动本地 Express 服务器,请运行以下命令:
node index.mjs
你将看到输出类似于:服务器正在运行在端口 8000
。
接下来,我们将设置 Webhook API,通过 Express.js 和 ngrok 构建。
为了使我们的 Express 服务器在远程服务器上可用,我们将使用 ngrok。首先,你需要确保已安装 ngrok 并且你的 ngrok 帐户以你的授权Token进行身份验证。
你可以使用以下 ngrok 命令进行身份验证:
ngrok authtoken [your_authtoken_here]
身份验证后,在同一项目目录中启动远程服务器:
ngrok http 8000
你将看到类似于以下输出:
所显示的 URL 将 API 调用转发到你的本地主机。现在我们可以继续下一步,即在 QuickNode 上创建我们的 Stream 和 Filters。
导航至 QuickNode Streams 页面并点击 创建 Stream 按钮。
在 Stream 设置 页面上,将 Stream 对齐为以下配置:
然后,利用以下代码来过滤 Stream。该代码过滤出包含来自 USDC/ETH 对的 Uniswap V3 交换的交易收据。稍后在我们的 Express.js 脚本中,我们将进一步解析此数据以获取确切的交换金额并根据此做出决策。
function main(stream) {
try {
const data = stream.data
var block = data[0].block;
var receipts = data[0].receipts;
var transactions = data[0].block.transactions;
const USDC_ADDRESS = '0x1c7D4B196Cb0C7B01d743Fbc6116a902379C7238'.toLowerCase();
const FUNCTION_SIGNATURE = '0x04e45aaf';
const THRESHOLD_IN_USDC = 0.50; // 在此处设置阈值,例如 0.50 USDC
// 过滤交易
var filteredList = transactions.filter(tx => {
if (tx.input && tx.input.startsWith(FUNCTION_SIGNATURE)) {
let tokenOutAddress = '0x' + tx.input.slice(98, 138).toLowerCase();
return tokenOutAddress === USDC_ADDRESS;
}
return false;
});
// 匹配过滤过的交易和收据,检查日志,并提取数据
var result = filteredList.map(tx => {
var matchedReceipt = receipts.find(receipt => receipt.transactionHash === tx.hash);
if (matchedReceipt) {
var relevantLog = matchedReceipt.logs.find(log =>
log.topics &&
log.topics[0] === "0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67"
);
if (relevantLog) {
// 从 logData 解码 amount0 以获取 USDC
let amount0Wei = decodeAmount0(relevantLog.data);
let amount0USDC = weiToUSDC(amount0Wei);
// 仅当金额超过阈值时才包含在结果中
if (Math.abs(amount0USDC) >= THRESHOLD_IN_USDC) {
return {
block: Number(block.number),
transactionHash: tx.hash,
amount0Wei: amount0Wei,
amount0USDC: amount0USDC
};
}
}
}
return null;
}).filter(Boolean); // 移除任何 null 条目
// 检查结果数组是否为空
if (result.length === 0) {
return {
block: Number(block.number),
message: "没有符合条件的交易"
};
}
return {
result
};
} catch (e) {
return {
error: e.message
};
}
}
function decodeAmount0(logData) {
// 如果存在则去掉 '0x' 前缀
logData = logData.startsWith('0x') ? logData.slice(2) : logData;
// amount0 是第一个参数,所以我们从索引 0 开始
let amount0Hex = logData.slice(0, 64);
// 将十六进制转换为 BigInt 再转为字符串
let amount0 = BigInt('0x' + amount0Hex).toString();
// 如果数字为负(第一个位为 1),我们需要转换它
if (amount0Hex[0] >= '8') {
amount0 = (BigInt(2) ** BigInt(256) - BigInt(amount0)).toString();
amount0 = '-' + amount0;
}
return amount0;
}
function weiToUSDC(weiAmount) {
// 将 wei 转换为 USDC(6 位小数)
return Math.abs(parseFloat(weiAmount) / 1e6);
}
你可以点击 运行测试 按钮以确保你的过滤源代码语法正确。你也可以在符合阈值的历史区块上进行测试,以确保你的 Filter 逻辑有效。
测试结果在这两种情况下应类似如下所示:
条件满足
{
"result": [\
{\
"block": 6226105,\
"amount0USDC": 0.960396,\
"amount0Wei": "-960396",\
"transactionHash": "0x743a9dd612a7b9e6a4e0493c1b4655bbcde1e27eb5056072df0c8a7dbce0f809"\
}\
]
}
未满足
{
"block": 6227555,
"message": "没有符合条件的交易"
}
对于其他 Stream 设置,我们不需要设置任何其他内容,不过,如果你想实时检测重组,可以启用 最新区块延迟 或 在重组时重流 功能以处理重组。有关重组的更多信息,请查看 这里。完成页面后,点击 下一步,你需要配置你的 Stream 目标。
然后,在 Stream 目标 页面上,设置目标为 Webhook 并配置为以下细节。如果某个字段未提及,可以保持不变。
ngrok http 8000
命令中获得的 ngrok URL。重要 要在 ngrok URL 的末尾添加 /swap
(例如, https://f9e3-26222-fb91-501-4202-e523-f123-cb69-cf6b.ngrok-free.app/swap)Content-Type: application/json
填写完整信息后,点击 测试目标 按钮以验证设置。这将发送来自 Streams 的真实数据样本(原始或经过过滤,具体取决于你的配置),以及你定义的任何自定义头。
重要:如果你暂停你的 Stream,稍后重新激活,请确保将“Stream 开始”设置为“最新区块”。否则,Stream 将处理从暂停时到当前区块之间的所有区块,这可能会导致意外行为。
最后,点击 创建一个 Stream 按钮。一旦 Stream 启动,监视你运行 ngrok 的终端窗口和本地 Express API。交易可能需要一段时间才能发生,因此你可以自己创建一个来进行测试。
以下是当满足过滤条件事件时的交易日志应如下所示:
-------------------------------
服务器正在运行在端口 8000
-------------------------------
在区块 6227658 中没有符合条件的交易
在区块 6227659 中没有符合条件的交易
在区块 6227660 中没有符合条件的交易
在区块 6227661 中没有符合条件的交易
在区块 6227662 中没有符合条件的交易
在区块 6227663 中没有符合条件的交易
在区块 6227664 中没有符合条件的交易
在区块 6227665 中没有符合条件的交易
在区块 6227666 中检测到交易,金额: 1.377949 USDC,哈希: 0x9c5999952976b4a252560eadc935f3cf8599963e180c28a3feec3507a76bb892
-------------------------------
ETH 余额: 1.043311552476760578 ETH
-------------------------------
正在交换 0.01 ETH 获取 USDC...
-------------------------------
Token Swap will result in: 0.915917 USDC for 0.01 WETH
-------------------------------
报价金额: 0.915917 USDC
-------------------------------
已经存在足够的授予额度
-------------------------------
预计Gas: 141702
-------------------------------
当前Gas价格: 10.575560558 gwei
-------------------------------
发送交易...
交易已发送。哈希: 0x7096ebfce2f3a46111f072bd3b2956590ca4e7dd5bacd80c385b913a4b7c24bd
-------------------------------
等待交易确认...
-------------------------------
交易在区块 6227667 中确认
-------------------------------
交易成功
-------------------------------
已收到 0.915917 USDC
-------------------------------
交换成功。将不再执行更多交换。
已经执行了交换
已经执行了交换
...
恭喜你!你刚刚使用 Streams 和 Streams Filters 创建了一个交易机器人。要了解更多信息,请查看以下资源。
查看以下资源列表,以增强你对 Streams、Streams Filters 和 Uniswap 的理解。
订阅我们的 新闻通讯,获取有关以太坊的更多文章和指南。如果你有任何反馈,请随时通过 Twitter 联系我们。你总是可以在我们的 Discord 社区服务器上与我们交谈,遇见一些你见过的最酷的开发者 :)
让我们知道 如果你有任何反馈或新主题的请求。我们很乐意听取你的意见。
- 原文链接: quicknode.com/guides/qui...
- 登链社区 AI 助手,为大家转译优秀英文文章,如有翻译不通的地方,还请包涵~
如果觉得我的文章对您有用,请随意打赏。你的支持将鼓励我继续创作!