如何使用流和过滤器创建一个Uniswap机器人

  • QuickNode
  • 发布于 2025-01-30 13:45
  • 阅读 17

本文详细介绍了如何利用QuickNode的Streams和Filters工具构建一个交易机器人,专注于监听和解析Uniswap V3上的ETH与USDC交易。文章提供了全面的步骤,包括需要的工具、配置本地Webhook、创建Stream和Filters,以及执行交易的代码实例,非常适合想要深入理解区块链数据流和自动化交易的开发者。

概述

构建一个交易机器人涉及多个组件,比如获取区块链数据、实现合适的数据解析,然后创建你的策略。幸运的是,你可以使用 QuickNode 的 Streams and Filters 工具来解决常见的数据检索和清理任务,最小化你在构建和管理准确高效的数据管道所需的时间。在本指南中,我们将向你展示如何使用 Streams 和 Streams Filters 来构建自己的交易机器人,以监听 Uniswap V3 上的 ETH => USDC 交换。

让我们开始吧!

你将需要的

依赖项 版本
node ^18.18
dotenv ^16.4.5
ethers ^6.13.1
express ^4.19.2

你将要做的

  • 回顾一下 QuickNode 的 Streams 和 Streams Filters 的工作原理
  • 准备一个本地 Webhook 来监听和解析传入的 Streams 数据
  • 在 QuickNode 上创建一个 Stream + Streams Filter,以拉取实时区块链数据并解析有关 USDC/ETH 对上的 Uniswap V3 交换

Streams

Streams 是一种区块链数据解决方案,可用于检索多个链的实时和历史区块数据,如 Ethereum、Optimism、Base 和 更多。通过 Streams,你可以将数据发送到多个目标,如 Webhooks、S3 Buckets、PostgreSQL、Snowflake 和 Functions。Streams 允许你选择特定的数据模式(例如,区块、交易、收据、日志、痕迹)以满足不同的数据需求,并帮助你管理 RPC 基础设施。有关更多信息,请访问 Streams - 文档 页面。

Streams & Filters

为了使 Streams 更加强大,你可以使用 Streams Filters 在把区块链数据发送给你之前先进行解析。目前,Streams Filters 支持 JavaScript(ECMAScript)代码,可以通过两种方式设置,在 QuickNode 控制面板内或通过 Streams REST API

实现 Streams Filters 有几个优点,例如:

  • 成本效益:仅对过滤的并发送到目标的数据产生费用,这样可以最小化传输不需要数据的成本
  • 有效负载定制:在将 Stream 发送到目标之前自定义有效负载

现在我们对 Streams 和 Filters 有了更好的理解,让我们开始编码吧。

项目前提:创建一个以太坊节点端点

虽然我们在驱动 Streams 时不需要访问节点端点,但在发送 Uniswap V3 交换交易时需要它。虽然我们可以运行自己的节点,但在 QuickNode,我们使启动区块链节点变得快捷容易。你可以在 这里注册一个帐户。创建 Ethereum Sepolia 端点后,获取 HTTP URL。它应该看起来像这样:

Quicknode 主网端点截图

项目前提:从 QuickNode 多链水龙头获取 ETH

为了在链上交换代币,你需要 ETH 来支付手续费。由于我们使用的是 Sepolia 测试网,我们可以从 多链 QuickNode 水龙头 获取一些测试 ETH。

导航到 多链 QuickNode 水龙头 并连接你的钱包(例如,MetaMask、Coinbase 钱包)或粘贴你的钱包地址以获取测试 ETH。请注意,在以太坊主网使用 EVM 水龙头需要满足 0.001 ETH 的主网余额要求。你还可以通过推特或使用你的 QuickNode 帐户登录以获得奖励!

多链水龙头截图

准备 Webhook

为了创建一个交易机器人,使用 Webhook 目标可能比使用 S3 和 PostgreSQL 更合适。我们将使用 Express.js 服务器来检索 Stream 数据、分析它,并决定是否购买。逻辑步骤如下:

  • Streams 将监听来自以太坊 Sepolia 上每个新生成的区块的 带收据的区块 数据
  • Streams Filters 代码将解析交易收据数据,过滤出通过 ETH/USDC 池的 ETH 购买。具体步骤为:
  1. 过滤 exactInputSingle 交换方法,该方法的函数签名为 0x04e45aaf
  2. 检查要交换的代币;在本例中,是 tokenOut 字段。我们添加一个条件,确保这是 WETH 地址。我们还通过检查收据数据对象的日志中是否存在 Swap 主题事件的哈希来验证交换是否发生。
  3. 如果交易的规模超出某个阈值(例如,$100),则将数据添加到数组中,然后发送到 Webhook。
  4. 在 Express.js 服务器上接收 Stream 通知,验证确实超过阈值,并生成一个 Uniswap 交换交易,以通过 ETH/USDC 池购买 ETH。

注意:触发交换的 USDC 阈值是可配置的。你可以在过滤代码中调整 THRESHOLD_IN_USDC 的值,以根据你的交易策略设置更小或更大的金额。

danger

此交易机器人不适用于生产,仅用于演示目的,以展示 Streams + Filters 的一些用例。

现在进入代码部分。

项目设置

导航到你希望项目文件存放的位置,然后运行以下命令:

mkdir streams-filters-uniswap-bot
cd streams-filters-uniswap-bot
npm init es6 -y
npm i dotenv ethers express
echo > index.mjs

接下来,你需要使用私钥和端点 URL 配置 .env 文件,使用以下格式:

RPC_URL=https://ethereum-sepolia.quiknode.pro/AUTHTOKEN/
PRIVATE_KEY=YOUR_PRIVATE_KEY

记得保存文件。

你还需要通过参考以下地址下载 ABIs 来配置项目:

或者,你可以查看这个 仓库,将 abis 文件夹移动到你的项目文件夹中。记得保存文件。

随着我们的项目设置了依赖项、.env 配置和 ABIs,让我们设置脚本。

准备 JavaScript 脚本

打开 index.mjs 文件,并复制粘贴以下代码:

import express from 'express';
import { ethers } from 'ethers';
import QUOTER_ABI from './abis/quoter.json' assert { type: 'json' };
import SWAP_ROUTER_ABI from './abis/swaprouter.json' assert { type: 'json' };
import TOKEN_IN_ABI from './abis/weth.json' assert { type: 'json' };
import dotenv from 'dotenv';

dotenv.config();

const app = express();
app.use(express.json({ limit: '2mb' }));
const port = 8000;

// 部署地址
const SWAP_ROUTER_ADDRESS = ethers.getAddress('0x3bFA4769FB09eefC5a80d6E87c3B9C650f7Ae48E');
const QUOTER_ADDRESS = ethers.getAddress('0xEd1f6473345F45b75F8179591dd5bA1888cf2FB3');
const WETH_ADDRESS = ethers.getAddress('0xfFf9976782d46CC05630D1f6eBAb18b2324d6B14');
const USDC_ADDRESS = ethers.getAddress('0x1c7D4B196Cb0C7B01d743Fbc6116a902379C7238');

// 提供者、合约和签名实例
const provider = new ethers.JsonRpcProvider(process.env.RPC_URL);
const quoterContract = new ethers.Contract(QUOTER_ADDRESS, QUOTER_ABI, provider);
const signer = new ethers.Wallet(process.env.PRIVATE_KEY, provider);
let hasSwappedToday = false;

// 代币配置
const WETH = {
    chainId: 11155111,
    address: WETH_ADDRESS,
    decimals: 18,
    symbol: 'WETH',
    name: 'Wrapped Ether'
};

const USDC = {
    chainId: 11155111,
    address: USDC_ADDRESS,
    decimals: 6,
    symbol: 'USDC',
    name: 'USD//C'
};

app.use(express.json());

async function quoteAndLogSwap(quoterContract, fee, signer, amountIn) {
    try {
      const quotedAmountOut = await quoterContract.quoteExactInputSingle.staticCall({
        tokenIn: WETH_ADDRESS,
        tokenOut: USDC_ADDRESS,
        fee: fee,
        amountIn: amountIn,
        sqrtPriceLimitX96: 0,
      });

      const amountOut = quotedAmountOut[0];

      console.log(`-------------------------------`);
      console.log(`Token Swap will result in: ${ethers.formatUnits(amountOut, 6)} USDC for ${ethers.formatEther(amountIn)} WETH`);
      return amountOut;
    } catch (error) {
      console.error('在 quoteAndLogSwap 中发生错误:', error);
      throw error;
    }
  }

async function checkAndApproveToken(tokenAddress, spenderAddress, amount, signer) {
    const tokenContract = new ethers.Contract(tokenAddress, TOKEN_IN_ABI, signer);
    const currentAllowance = await tokenContract.allowance(signer.address, spenderAddress);

    if (currentAllowance < amount) {
        console.log('授予额度不足。正在批准代币...');
        console.log(`-------------------------------`);
        const approvalTx = await tokenContract.approve(spenderAddress, amount);
        await approvalTx.wait();
        console.log('批准交易已确认');
        console.log(`-------------------------------`);
    } else {
        console.log('已经存在足够的授予额度');
        console.log(`-------------------------------`);
    }
}

async function swapEthToUsdc(ethAmount) {
    const swapRouter = new ethers.Contract(SWAP_ROUTER_ADDRESS, SWAP_ROUTER_ABI, signer);

    // 将 ETH 金额转换为 Wei
    const amountIn = ethers.parseEther(ethAmount.toString());

    // 检查 ETH 余额
    const balance = await provider.getBalance(signer.address);
    console.log(`-------------------------------`);
    console.log(`ETH 余额: ${ethers.formatEther(balance)} ETH`);
    console.log(`-------------------------------`);

    if (balance < amountIn) {
        console.error('ETH 余额不足');
        return;
    }

    try {
        console.log(`正在交换 ${ethAmount} ETH 获取 USDC...`);

        // 获取报价
        let quotedAmountOut;
        try {
            quotedAmountOut = await quoteAndLogSwap(quoterContract, 3000, signer, amountIn);
            console.log(`-------------------------------`);
            console.log(`报价金额: ${ethers.formatUnits(quotedAmountOut, 6)} USDC`);
            console.log(`-------------------------------`);
        } catch (quoteError) {
            console.error('获取报价时出错:', quoteError);
            return;
        }

        // 检查并根据需要批准 WETH
        await checkAndApproveToken(WETH_ADDRESS, SWAP_ROUTER_ADDRESS, amountIn, signer);

        // 计算最小输出金额并考虑 5% 滑点
        const minAmountOut = quotedAmountOut * 95n / 100n;

        // 准备交换的参数
        const params = {
            tokenIn: WETH_ADDRESS,
            tokenOut: USDC_ADDRESS,
            fee: 3000, // 0.3%费用等级
            recipient: await signer.getAddress(),
            deadline: Math.floor(Date.now() / 1000) + 60 * 20, // 20分钟后
            amountIn: amountIn,
            amountOutMinimum: minAmountOut,
            sqrtPriceLimitX96: 0
        };

        // 估算Gas
        const gasEstimate = await swapRouter.exactInputSingle.estimateGas(params, { value: amountIn });
        console.log('预计Gas:', gasEstimate.toString());
        console.log(`-------------------------------`);

        // 获取当前的Gas价格
        const feeData = await provider.getFeeData();
        const gasPrice = feeData.gasPrice;
        console.log('当前Gas价格:', ethers.formatUnits(gasPrice, 'gwei'), 'gwei');
        console.log(`-------------------------------`);
        // 执行交换
        console.log('发送交易...');
        const tx = await swapRouter.exactInputSingle(params, {
            nonce: await signer.getNonce(),
            value: amountIn,
            gasLimit: gasEstimate * 12n / 10n, // 在Gas估算上加 20% 的缓冲
            gasPrice: gasPrice
        });

        console.log(`交易已发送。哈希: ${tx.hash}`);
        console.log(`-------------------------------`);
        console.log('等待交易确认...');
        console.log(`-------------------------------`);

        const receipt = await tx.wait();
        console.log(`交易在区块 ${receipt.blockNumber} 中确认`);
        console.log(`-------------------------------`);

        if (receipt.status === 1) {
            console.log('交易成功');
            // 查找 USDC 的转移事件
            const usdcTransferEvent = receipt.logs.find(log =>
                log.address.toLowerCase() === USDC_ADDRESS.toLowerCase() &&
                log.topics[0] === ethers.id("Transfer(address,address,uint256)")
            );

            if (usdcTransferEvent) {
                console.log(`-------------------------------`);
                const amountOut = ethers.dataSlice(usdcTransferEvent.data, 0);
                console.log(`已收到 ${ethers.formatUnits(amountOut, 6)} USDC`);
                console.log(`-------------------------------`);
                return ethers.formatUnits(amountOut, 6);
            } else {
                console.log('在日志中未找到 USDC 转移事件');
                return '0';
            }
        } else {
            console.log('交易失败');
            return '0';
        }

    } catch (error) {
        console.error('交换过程中的错误:', error);
        if (error.transaction) {
            console.error('交易详细信息:', error.transaction);
        }
        if (error.receipt) {
            console.error('交易收据:', error.receipt);
        }
        throw error;
    }
}

app.post('/swap', async (req, res) => {
    try {
        // 检查是否已经执行了交换
        if (hasSwappedToday) {
            console.log('已经执行了交换');
            return res.status(200).json({
                success: false,
                message: '交换次数达到上限'
            });
        }

        const { result, message } = req.body;

        // 检查请求体是否无效
        if (!result && !message) {
            console.log('请求体无效');
            return res.status(200).json({
                success: false,
                message: '请求体无效'
            });
        }

        // 处理“条件满足”情况
        if (result && Array.isArray(result) && result.length > 0) {
            const transaction = result[0]; // 假设我们对第一个匹配的交易感兴趣
            console.log(`在区块 ${transaction.block} 检测到交易,金额: ${transaction.amount0USDC} USDC,哈希: ${transaction.transactionHash}`);

            const THRESHOLD_IN_USDC = 0.50;
            const validSwap = result.find(item => Math.abs(parseFloat(item.amount0USDC)) >= THRESHOLD_IN_USDC);

            if (validSwap) {
                const swappedAmount = await swapEthToUsdc(0.01); // 自定义购买值

                if (swappedAmount !== '0') {
                    hasSwappedToday = true; // 在成功交换后将标志设置为 true
                    console.log('交换成功。将不再执行更多交换。');
                }

                return res.status(200).json({
                    success: true,
                    message: '交换成功',
                    swappedAmount: swappedAmount
                });
            } else {
                console.log('没有超过阈值的交换金额');
                return res.status(200).json({
                    success: false,
                    message: '没有超过阈值的交换金额'
                });
            }
        }

        // 处理“未满足”情况
        if (message === "没有符合条件的交易") {
            const blockNumber = req.body.block;
            console.log(`在区块 ${blockNumber} 中没有符合条件的交易`);
            return res.status(200).json({
                success: false,
                message: '没有符合条件的交易',
                block: blockNumber
            });
        }

    } catch (error) {
        console.error('执行交换时发生错误:', error);
        return res.status(200).json({
            success: false,
            error: '交换执行失败',
            details: error.message,
            transaction: error.transaction,
            receipt: error.receipt
        });
    }
});

app.listen(port, () => {
    console.log(`-------------------------------`);
    console.log(`服务器正在运行在端口 ${port}`);
    console.log(`-------------------------------`);
});

以下是上述 Express.js 服务器代码的主要功能概述:

我们定义了一个 /swap POST 端点,该端点:

  • 接收来自 Streams 的过滤交换数据
  • 检查过滤的数据是否符合某些标准(例如,最小 USDC 金额)。
  • 如果发现有效的交换,它通过控制台询问用户确认。
  • 如果得到确认,它将执行 0.01 WETH 交换为 USDC 的操作。

交换过程涉及到:

  • 批准代币支出。
  • 获取池信息。
  • 获取交换报价。
  • 准备交换参数。
  • 执行交换交易(发送交易)

注意:该机器人配置为仅执行一次交换。成功后不会再进行任何交易,直到服务器重启。可以修改为按时间间隔(例如,每天)执行。

代码中包括了错误处理和整个过程的日志记录。接下来,我们将本地运行 Express.js 服务器,然后设置它以通过 ngrok 远程工作。

启动 Express 服务器

要启动本地 Express 服务器,请运行以下命令:

node index.mjs

你将看到输出类似于:服务器正在运行在端口 8000

接下来,我们将设置 Webhook API,通过 Express.js 和 ngrok 构建。

设置 ngrok

为了使我们的 Express 服务器在远程服务器上可用,我们将使用 ngrok。首先,你需要确保已安装 ngrok 并且你的 ngrok 帐户以你的授权Token进行身份验证。

你可以使用以下 ngrok 命令进行身份验证:

ngrok authtoken [your_authtoken_here]

身份验证后,在同一项目目录中启动远程服务器:

ngrok http 8000

你将看到类似于以下输出:

ngrok 图像

所显示的 URL 将 API 调用转发到你的本地主机。现在我们可以继续下一步,即在 QuickNode 上创建我们的 Stream 和 Filters。

设置 Streams + Filters

导航至 QuickNode Streams 页面并点击 创建 Stream 按钮。

Stream 设置 页面上,将 Stream 对齐为以下配置:

  • :Ethereum
  • 网络:Sepolia
  • 数据集:带收据的区块
  • Stream 开启:最新区块
  • Stream 结束:在此输入结束块号 / 如果希望 Stream 无限存在可以保持不变
  • Stream 有效载荷:在流式传输之前修改 Stream

然后,利用以下代码来过滤 Stream。该代码过滤出包含来自 USDC/ETH 对的 Uniswap V3 交换的交易收据。稍后在我们的 Express.js 脚本中,我们将进一步解析此数据以获取确切的交换金额并根据此做出决策。

function main(stream) {
    try {
        const data = stream.data
        var block = data[0].block;
        var receipts = data[0].receipts;
        var transactions = data[0].block.transactions;

        const USDC_ADDRESS = '0x1c7D4B196Cb0C7B01d743Fbc6116a902379C7238'.toLowerCase();
        const FUNCTION_SIGNATURE = '0x04e45aaf';
        const THRESHOLD_IN_USDC = 0.50; // 在此处设置阈值,例如 0.50 USDC

        // 过滤交易
        var filteredList = transactions.filter(tx => {
            if (tx.input && tx.input.startsWith(FUNCTION_SIGNATURE)) {
                let tokenOutAddress = '0x' + tx.input.slice(98, 138).toLowerCase();
                return tokenOutAddress === USDC_ADDRESS;
            }
            return false;
        });

        // 匹配过滤过的交易和收据,检查日志,并提取数据
        var result = filteredList.map(tx => {
            var matchedReceipt = receipts.find(receipt => receipt.transactionHash === tx.hash);
            if (matchedReceipt) {
                var relevantLog = matchedReceipt.logs.find(log =>
                    log.topics &&
                    log.topics[0] === "0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67"
                );

                if (relevantLog) {
                    // 从 logData 解码 amount0 以获取 USDC
                    let amount0Wei = decodeAmount0(relevantLog.data);
                    let amount0USDC = weiToUSDC(amount0Wei);

                    // 仅当金额超过阈值时才包含在结果中
                    if (Math.abs(amount0USDC) >= THRESHOLD_IN_USDC) {
                        return {
                            block: Number(block.number),
                            transactionHash: tx.hash,
                            amount0Wei: amount0Wei,
                            amount0USDC: amount0USDC
                        };
                    }
                }
            }
            return null;
        }).filter(Boolean); // 移除任何 null 条目

        // 检查结果数组是否为空
        if (result.length === 0) {
            return {
                block: Number(block.number),
                message: "没有符合条件的交易"
            };
        }

        return {
            result
        };
    } catch (e) {
        return {
            error: e.message
        };
    }
}

function decodeAmount0(logData) {
    // 如果存在则去掉 '0x' 前缀
    logData = logData.startsWith('0x') ? logData.slice(2) : logData;
    // amount0 是第一个参数,所以我们从索引 0 开始
    let amount0Hex = logData.slice(0, 64);
    // 将十六进制转换为 BigInt 再转为字符串
    let amount0 = BigInt('0x' + amount0Hex).toString();
    // 如果数字为负(第一个位为 1),我们需要转换它
    if (amount0Hex[0] >= '8') {
        amount0 = (BigInt(2) ** BigInt(256) - BigInt(amount0)).toString();
        amount0 = '-' + amount0;
    }
    return amount0;
}

function weiToUSDC(weiAmount) {
    // 将 wei 转换为 USDC(6 位小数)
    return Math.abs(parseFloat(weiAmount) / 1e6);
}

你可以点击 运行测试 按钮以确保你的过滤源代码语法正确。你也可以在符合阈值的历史区块上进行测试,以确保你的 Filter 逻辑有效。

测试结果在这两种情况下应类似如下所示:

条件满足

{
  "result": [\
    {\
      "block": 6226105,\
      "amount0USDC": 0.960396,\
      "amount0Wei": "-960396",\
      "transactionHash": "0x743a9dd612a7b9e6a4e0493c1b4655bbcde1e27eb5056072df0c8a7dbce0f809"\
    }\
  ]
}

未满足

{
  "block": 6227555,
  "message": "没有符合条件的交易"
}

对于其他 Stream 设置,我们不需要设置任何其他内容,不过,如果你想实时检测重组,可以启用 最新区块延迟在重组时重流 功能以处理重组。有关重组的更多信息,请查看 这里。完成页面后,点击 下一步,你需要配置你的 Stream 目标。

然后,在 Stream 目标 页面上,设置目标为 Webhook 并配置为以下细节。如果某个字段未提及,可以保持不变。

填写完整信息后,点击 测试目标 按钮以验证设置。这将发送来自 Streams 的真实数据样本(原始或经过过滤,具体取决于你的配置),以及你定义的任何自定义头。

重要:如果你暂停你的 Stream,稍后重新激活,请确保将“Stream 开始”设置为“最新区块”。否则,Stream 将处理从暂停时到当前区块之间的所有区块,这可能会导致意外行为。

最后,点击 创建一个 Stream 按钮。一旦 Stream 启动,监视你运行 ngrok 的终端窗口和本地 Express API。交易可能需要一段时间才能发生,因此你可以自己创建一个来进行测试。

以下是当满足过滤条件事件时的交易日志应如下所示:

-------------------------------
服务器正在运行在端口 8000
-------------------------------
在区块 6227658 中没有符合条件的交易
在区块 6227659 中没有符合条件的交易
在区块 6227660 中没有符合条件的交易
在区块 6227661 中没有符合条件的交易
在区块 6227662 中没有符合条件的交易
在区块 6227663 中没有符合条件的交易
在区块 6227664 中没有符合条件的交易
在区块 6227665 中没有符合条件的交易
在区块 6227666 中检测到交易,金额: 1.377949 USDC,哈希: 0x9c5999952976b4a252560eadc935f3cf8599963e180c28a3feec3507a76bb892
-------------------------------
ETH 余额: 1.043311552476760578 ETH
-------------------------------
正在交换 0.01 ETH 获取 USDC...
-------------------------------
Token Swap will result in: 0.915917 USDC for 0.01 WETH
-------------------------------
报价金额: 0.915917 USDC
-------------------------------
已经存在足够的授予额度
-------------------------------
预计Gas: 141702
-------------------------------
当前Gas价格: 10.575560558 gwei
-------------------------------
发送交易...
交易已发送。哈希: 0x7096ebfce2f3a46111f072bd3b2956590ca4e7dd5bacd80c385b913a4b7c24bd
-------------------------------
等待交易确认...
-------------------------------
交易在区块 6227667 中确认
-------------------------------
交易成功
-------------------------------
已收到 0.915917 USDC
-------------------------------
交换成功。将不再执行更多交换。
已经执行了交换
已经执行了交换
...

恭喜你!你刚刚使用 Streams 和 Streams Filters 创建了一个交易机器人。要了解更多信息,请查看以下资源。

其他想法与资源

查看以下资源列表,以增强你对 Streams、Streams Filters 和 Uniswap 的理解。

结语

订阅我们的 新闻通讯,获取有关以太坊的更多文章和指南。如果你有任何反馈,请随时通过 Twitter 联系我们。你总是可以在我们的 Discord 社区服务器上与我们交谈,遇见一些你见过的最酷的开发者 :)

我们 ❤️ 反馈!

让我们知道 如果你有任何反馈或新主题的请求。我们很乐意听取你的意见。

  • 原文链接: quicknode.com/guides/qui...
  • 登链社区 AI 助手,为大家转译优秀英文文章,如有翻译不通的地方,还请包涵~
点赞 0
收藏 0
分享
本文参与登链社区写作激励计划 ,好文好收益,欢迎正在阅读的你也加入。

0 条评论

请先 登录 后评论
QuickNode
QuickNode
江湖只有他的大名,没有他的介绍。