构建一个无服务器Twitter机器人以跟踪流动性池与QuickNode

  • QuickNode
  • 发布于 2025-01-30 21:21
  • 阅读 21

本文介绍了如何利用QuickNode的Streams和Functions产品设置自动化的Twitter(X)机器人,以实时追踪Uniswap V2和V3的新流动性池。通过该教程,读者将学习如何创建流、配置过滤器、开发无服务器功能以及向Twitter发布更新,适用于各种区块链网络。教程详细、结构清晰,并附有代码示例。

概述

处理 DeFi 上的实时数据可能具有挑战性且耗时。然而,QuickNode 的 StreamsFunctions 产品提供了一种强大的解决方案来自动化和简化这一过程。在本指南中,我们将逐步指导你设置一个 Twitter (X) 机器人,该机器人使用 QuickNode 的 StreamsFunctions 跟踪 新的流动性池 在 Uniswap V2 和 V3 上。这个机器人将捕获事件数据,过滤以识别新的流动性池,并通过 JavaScript/TypeScript 库发布更新到 Twitter (X)。

通过利用 QuickNode 的 无服务器基础设施,无需任何服务器端代码或维护数据库。这使得解决方案更加高效且具有成本效益。

尽管我们在本教程中使用 Base Mainnet,但可以将相同的方法应用于 QuickNode 支持的 其他区块链网络。只需确保修改过滤功能以匹配所需链的特定事件和合约地址。

你将要做的事情

  • 配置 QuickNode 的 Streams 以使用其 过滤 功能检测 Uniswap V2/V3 上的新流动性池。
  • 使用 QuickNode 的 Functions 设置一个无服务器函数,后者在其 Node.js 运行时中本地支持 Twitter 库,以处理数据并直接发送更新到 Twitter (X)。
  • 创建一个自动化的实时 Twitter 机器人,以分享 Base 主网的新流动性池的见解。

你将需要的东西

Streams 和 Functions

在区块链开发中,处理实时数据和自动化响应可能是复杂且资源密集型的。QuickNode 的 Streams 和 Functions 产品旨在简化此过程,赋能开发者专注于构建优秀的应用而不必担心管理服务器或处理无关数据。

Streams

Streams 是一个强大的区块链数据流和 ETL(提取、转换、加载)服务,使实时和历史区块链数据易于访问。Streams 提供了一种高效的方式来接收实时区块链数据或填补历史数据,而不依赖于持续轮询或复杂基础设施。

Streams 的主要功能

  • 实时数据流:即时接收实时区块链数据,无需持续轮询。
  • 历史数据回填:可根据预先的成本和完成估算检索历史区块链数据。
  • 可定制过滤器:应用基于 JavaScript 的规则来定制你接收的数据,跟踪事件、解码数据等。
  • 成本效益:按数据量计费,Streams 保持成本可预测。
  • 数据一致性:无缝处理链重组,确保没有数据丢失。
  • 压缩:通过优化数据传输最小化带宽使用。
  • 与 Functions 集成:通过无服务器处理增强 Streams,解锁更强大的工作流。

Functions

Functions 提供一个无服务器平台,让你能够即时过滤、转换和增强区块链数据,而无需维护服务器或基础设施。它允许你处理实时数据、集成外部 API,并创建会自动扩展的无服务器 API。

Functions 的主要功能

  • 无服务器灵活性:在 JavaScript/TypeScript 中编写、部署并运行代码,而不需担心基础设施。
  • 外部库支持:直接在你的函数中使用流行库如 ethers.jstwitter-api-v2
  • API 准备:每个 Function 自动作为 API 暴露,使集成无缝。
  • 键值存储:高效存储和检索数据,适合管理大规模数据点清单,例如钱包地址,缓存代币名称和符号。

为了让你更容易熟悉 Functions,我们创建了一个 Functions 库,其中包含常用函数的集合。你可以在 这里 找到该库。

本指南中我们如何使用 Streams 和 Functions

使用 Streams 处理实时数据

我们设置一个 Stream 来监测 Uniswap V2 和 V3 上的新流动性池。利用其过滤能力,只将相关事件发送到目标,以减少不必要的数据处理。

例如,下面是发送到 Function 的数据样本,而不是一堆数据:

[\
    {\
        "poolAddress": "0x0000000000000000000000000000000000000001",\
        "token0": "0x4200000000000000000000000000000000000006",\
        "token1": "0xce15615dec6758825849e8d093c695e86ec0f8e0",\
        "txHash": "0xc90fce8ede27f1840e2f50b7ce68b3a74a80b20136167e49359a50d3f4a7f09a",\
        "type": "Uniswap V3"\
    }\
]

使用 Functions 进行处理和自动化

Stream 的过滤数据将被发送到一个 Function,我们在其中:

  • 解析并丰富数据,例如使用 ethers.js 检索代币名称和符号,该库在 Functions Node.js 运行时中原生包含。
  • 键值存储 中存储代币名称和符号,以避免重复区块链调用。
  • 将数据格式化为可操作的见解(例如,为 Twitter 制作一条消息)。
  • 通过 Twitter API 自动向 Twitter (X) 发布更新。

使用 Streams 跟踪 Uniswap V2 和 V3 上的新流动性池

在本节中,我们将创建一个 Stream 来监控 Uniswap V2 和 V3 上的 新流动性池,并设置一个过滤函数来处理这些事件。此函数将提取相关数据,包括代币地址、池地址和交易详细信息,为我们应用程序的进一步使用做准备。

步骤 1:创建一个 Stream

让我们开始创建一个 Stream 来监测 Uniswap V2 和 V3 上的新流动性池。

  • 登录你的 QuickNode 仪表板,并导航到 Streams 部分。
  • 点击 创建新 Stream
  • 选择 Base Mainnet 作为网络。
  • 选择数据集 Receipts 以捕获来自交易的详细事件日志。
  • Stream 开始Stream 结束 保持为默认值,最新区块不结束,因为我们从现在开始追踪并继续追踪。
  • 点击 在流之前修改负载 以过滤原始数据。
  • 添加自定义过滤器以缩小数据范围,只保留相关的流动性池创建事件:
    • 监视由 Uniswap V2 和 V3 工厂合约发出的事件。
    • 目标特定事件主题,例如 Uniswap V2 的 PairCreated 和 Uniswap V3 的 PoolCreated

这是你将使用的 JavaScript 过滤器:

点击展开

function main(stream) {
    // 定义过滤常量
    const uniswapV3PoolCreatedTopicHash =
        '0x783cca1c0412dd0d695e784568c96da2e9c22ff989357a2e8b1d9b2b4e6b7118' // Uniswap V3 PoolCreated 事件主题
    const uniswapV2PairCreatedTopicHash =
        '0x0d3648bd0f6ba80134a33ba9275ac585d9d315f0ad8355cddefde31afa28d0e9' // Uniswap V2 PairCreated 事件主题
    const uniswapV3FactoryAddress = '0x33128a8fC17869897dcE68Ed026d694621f6FDfD' // Uniswap V3 工厂地址
    const uniswapV2FactoryAddress = '0x8909Dc15e40173Ff4699343b6eB8132c65e18eC6' // Uniswap V2 工厂地址

    // 初始化池数组
    const pools = []
    const data = stream.data[0]

    // 循环处理流中的每个交易收据
    data.forEach(receipt => {
        // 过滤 Uniswap V3 PoolCreated 事件
        const newV3Pools = receipt.logs.filter(
            log =>
                log.address.toLowerCase() === uniswapV3FactoryAddress.toLowerCase() &&
                log.topics[0] === uniswapV3PoolCreatedTopicHash
        )

        // 过滤 Uniswap V2 PairCreated 事件
        const newV2Pools = receipt.logs.filter(
            log =>
                log.address.toLowerCase() === uniswapV2FactoryAddress.toLowerCase() &&
                log.topics[0] === uniswapV2PairCreatedTopicHash
        )

        // 处理 Uniswap V3 池创建事件
        newV3Pools.forEach(poolEvent => {
            const token0 = '0x' + poolEvent.topics[1].slice(26) // Token0 地址
            const token1 = '0x' + poolEvent.topics[2].slice(26) // Token1 地址
            const poolAddress = '0x' + poolEvent.data.slice(-40) // 新池地址

            // 将 Uniswap V3 池创建对象推送到数组
            pools.push({
                type: 'Uniswap V3',
                token0,
                token1,
                poolAddress,
                txHash: receipt.transactionHash,
                timestamp: receipt.timestamp, // 假设收据包含时间戳字段
            })
        })

        // 处理 Uniswap V2 交易对创建事件
        newV2Pools.forEach(pairEvent => {
            const token0 = '0x' + pairEvent.topics[1].slice(26) // Token0 地址
            const token1 = '0x' + pairEvent.topics[2].slice(26) // Token1 地址
            const poolAddress = '0x' + pairEvent.data.slice(26, 66) // 新交易对地址

            // 将 Uniswap V2 池创建对象推送到数组
            pools.push({
                type: 'Uniswap V2',
                token0,
                token1,
                poolAddress,
                txHash: receipt.transactionHash,
                timestamp: receipt.timestamp,
            })
        })
    })

    return pools.length > 0 ? pools : null
}
  • 区块号 字段中输入测试区块号(即 22537210),并点击 运行。你应该会在输出中看到 Uniswap V3 和 V2 池创建事件的列表,如下所示:
[\
    {\
        "poolAddress": "0x0000000000000000000000000000000000000001",\
        "token0": "0x4200000000000000000000000000000000000006",\
        "token1": "0xce15615dec6758825849e8d093c695e86ec0f8e0",\
        "txHash": "0xc90fce8ede27f1840e2f50b7ce68b3a74a80b20136167e49359a50d3f4a7f09a",\
        "type": "Uniswap V3"\
    }\
]
  • 点击 下一步 继续到下一步骤。

步骤 2:选择目标

设置 Stream 后,系统会提示你选择一个目标。

  • 从可用选项中选择 创建新 Function。这将使我们能够在无服务器 Function 中处理过滤后的数据。
  • 为你的 Function 命名(例如,“Uniswap 池处理器”)。
  • 选择 Node.js 20 作为运行时。
  • 继续设置处理数据的 Function 逻辑(在下一步中详细说明)。

步骤 3:创建一个 Function

这个 Function 自动化了处理 QuickNode Streams 检测到的新流动性池的推文发布过程。

  • 它首先检查 键值存储 中是否有缓存的代币元数据(名称和符号)。如果数据未缓存,则使用 ethers.js 库提取它,并将其存储在键值存储中以供将来使用。如果键值存储不可用,则 Function 每次都将直接调用区块链以提取代币元数据。
  • 一旦丰富了元数据,该 Function 将数据格式化为推文消息,高亮显示代币名称、符号和池地址等详细信息。
  • 使用 twitter-api-v2 库,它发布这些推文,返回池地址和交易哈希或遇到的任何错误。

这是你将使用的 JavaScript Function:

点击展开

const { TwitterApi } = require('twitter-api-v2')
const { ethers } = require('ethers')

// Twitter API 凭证
const client = new TwitterApi({
    appKey: 'YOUR_APP_KEY',
    appSecret: 'YOUR_APP_SECRET',
    accessToken: 'YOUR_ACCESS_TOKEN',
    accessSecret: 'YOUR_ACCESS_SECRET',
})

const twitterClient = client.readWrite

// 区块链提供者设置(例如,QuickNode RPC)
const provider = new ethers.JsonRpcProvider('YOUR_QUICKNODE_ENDPOINT')

// 获取代币元数据的 ERC-20 ABI
const erc20Abi = [\
    'function name() view returns (string)',\
    'function symbol() view returns (string)',\
]

/**
 * 从区块链或键值存储获取代币详细信息(名称和符号)。
 * @param {string} tokenAddress - ERC-20 代币的地址。
 * @returns {Promise<Object>} - 带有名称和符号的代币详细信息。
 */
async function getTokenDetails(tokenAddress) {
    try {
        // 检查键值存储中的缓存代币元数据
        const cachedData = await qnLib.qnGetSet(tokenAddress)
        if (cachedData) {
            console.log(`缓存命中代币: ${tokenAddress}`)
            return JSON.parse(cachedData) // 返回缓存数据
        }

        // 缓存未命中:从区块链获取代币详细信息
        const contract = new ethers.Contract(tokenAddress, erc20Abi, provider)
        const name = await contract.name()
        const symbol = await contract.symbol()

        // 缓存获取的数据
        await qnLib.qnAddSet(tokenAddress, JSON.stringify({ name, symbol }))
        console.log(`缓存代币数据: ${tokenAddress}`)

        return { name, symbol }
    } catch (error) {
        console.error(
            `获取代币 ${tokenAddress} 详细信息时出错:`,
            error.message
        )
        return { name: null, symbol: null }
    }
}

/**
 * 为新流动性池创建推文消息。
 * @param {Object} pool - 新流动性池的数据。
 * @returns {string} - 格式化的推文消息。
 */
function createTweet(pool) {
    return `
LP Tracker Bot, by QuickNode's Streams and Functions
🚀 Base 上新创建的 ${pool.type} 流动性池!

🔹 代币 0: ${pool.token0Symbol || pool.token0} (${
        pool.token0Name || '未知'
    })
🔹 代币 1: ${pool.token1Symbol || pool.token1} (${
        pool.token1Name || '未知'
    })

池地址: ${pool.poolAddress}
    `
}

/**
 * 为新流动性池发布推文。
 * @param {Array} pools - 池数据数组。
 * @returns {Promise<Array>} - 相关池信息的推文 ID 数组。
 */
async function postToTwitter(pools) {
    const results = []

    for (const pool of pools) {
        const message = createTweet(pool)
        try {
            const createdTweet = await twitterClient.v2.tweet(message)
            results.push({
                tweetId: createdTweet.id_str,
                poolAddress: pool.poolAddress,
                txHash: pool.txHash,
            })
        } catch (error) {
            results.push({
                error: `发布与池 ${pool.poolAddress} 相关的推文失败`,
                message: error.message,
            })
        }
    }

    return results
}

/**
 * 处理流数据并发布到 Twitter 的主函数。
 * @param {Object} params - 传递给函数的参数。
 * @returns {Promise<Object>} - 操作结果。
 */
async function main(params) {
    console.log('正在处理流数据...')

    try {
        const { data } = params

        // 验证输入
        if (!Array.isArray(data) || data.length === 0) {
            return {
                message: '没有数据处理',
                result: '没有推文发送。',
            }
        }

        // 通过代币名称和符号丰富数据
        for (const pool of data) {
            const token0Details = await getTokenDetails(pool.token0)
            const token1Details = await getTokenDetails(pool.token1)

            pool.token0Name = token0Details.name
            pool.token0Symbol = token0Details.symbol
            pool.token1Name = token1Details.name
            pool.token1Symbol = token1Details.symbol
        }

        // 发布到 Twitter
        const tweetResults = await postToTwitter(data)

        return {
            message: 'Streams 数据已发送到 Twitter',
            processedPools: data.length,
            tweetResults,
        }
    } catch (error) {
        console.error('主函数出错:', error.message)
        return {
            error: '无法将数据发送到 Twitter',
            details: error.message,
        }
    }
}

// 导出主函数以供 QuickNode 使用
module.exports = { main }

步骤 4:设置 Twitter (X) API 密钥和 QuickNode 端点

在运行此 Function 之前,请确保你已配置必要的凭据和 URL:

Twitter API 密钥

  • 前往 Twitter 开发者门户,创建一个应用,并生成以下凭证:
    • API 密钥API 密钥秘密
    • 访问Token访问Token秘密
  • 将 Function 代码中的 YOUR_APP_KEYYOUR_APP_SECRETYOUR_ACCESS_TOKENYOUR_ACCESS_SECRET 替换为相应的凭证。
  • 在应用设置中,启用 用户身份验证设置 并将权限设置为 读写,因为这个 Function 将发布推文。
  • 如果被提示输入回调 URI 或重定向 URL,则可以使用 https://example.com。此设置仅与需要用户登录的应用相关,在此情况下可以安全忽略。

QuickNode 端点 URL

  • 如果你还没有, 注册 QuickNode。
  • QuickNode 仪表板 上为所需的区块链网络(例如,Base 主网)创建一个新端点。
  • 将 Function 代码中的 YOUR_QUICKNODE_ENDPOINT 替换为 QuickNode 端点 URL。

步骤 5:测试 Function

选择一个合适的测试区块号以测试 Function(即 22537210),并点击 测试 Function。如果 Function 正常工作,你应该会看到类似于以下的输出:

Function 测试

结果:
{
  "message": "Streams 数据已发送到 Twitter",
  "processedPools": 1,
  "tweetResults": [\
    {\
      "poolAddress": "0xa07aaa70793649db7a80f722491ff6d707b41dcb",\
      "txHash": "0xc90fce8ede27f1840e2f50b7ce68b3a74a80b20136167e49359a50d3f4a7f09a"\
    }\
  ]
}

然后,你可以检查 Twitter (X) 上发布的推文,以验证 Function 是否按预期工作。

推文发布

步骤 6:部署 Function

一旦你对 Function 的功能满意,可以通过点击 创建一个 Stream 将其部署到 QuickNode。这将创建一个新的 Stream 并将 Function 部署到其中。

接下来是什么?

现在你已成功设置一个功能来跟踪并发布有关新流动性池的推文,这里有一些提升你的机器人的想法:

  • 添加更多链 扩展你的应用以监控多个区块链(如 Ethereum、Polygon 或 Binance Smart Chain)的流动性池创建。为每条链创建一个新 Stream,并将 Function 部署到其中。

  • 丰富推文分析 通过添加初始流动性数量、交易费用或部署者地址的先前池初始化等分析信息来增强你的推文。

  • 集成更多目标 使用 Webhook 向 Discord 或 Telegram 等其他平台发送更新。

  • 自动化特定代币的警报 为特定代币或交易对设置条件警报。例如,仅在池中包含流行代币如 USDC 或 DAI 时通知。

无论如何,基于 QuickNode 的 Streams 和 Functions 扩展应用的功能并吸引更广泛的受众是一个很好的方法。探索 QuickNode 文档和资源以了解更多关于如何自定义你的应用并将其与其他平台集成的信息。

结论

通过遵循本指南,你已设置了一个自动化的 Twitter (X) 机器人,该机器人捕获了有关在 Base 上新的流动性池的实时数据,利用了 QuickNode 的 Streams 和 Functions。该机器人可以及时提醒 DeFi 活动,并可以根据需要进一步扩展以跟踪其他事件或指标。

如果你有疑问或卡住,可以在我们的 Discord 中提出。通过关注我们的 Twitter (@QuickNode)或我们的 Telegram 宣传频道 来跟上最新动态。

附加资源

我们 ❤️ 反馈!

告诉我们 如果你有任何反馈或对于新主题的请求。我们很乐意听到你的声音。

  • 原文链接: quicknode.com/guides/qui...
  • 登链社区 AI 助手,为大家转译优秀英文文章,如有翻译不通的地方,还请包涵~
点赞 0
收藏 0
分享
本文参与登链社区写作激励计划 ,好文好收益,欢迎正在阅读的你也加入。

0 条评论

请先 登录 后评论
QuickNode
QuickNode
江湖只有他的大名,没有他的介绍。