如何使用 QuickNode Streams 和 Functions 构建实时代币转移索引器

  • QuickNode
  • 发布于 2025-01-21 16:46
  • 阅读 33

本文介绍了如何使用 QuickNode 的 Streams 和 Functions 创建一个实时的代币转移索引器,监控 ERC20、ERC721 和 ERC1155 代币的转移事件,并将处理后的数据发送到 webhook 进行进一步使用。文章提供了详细的步骤、代码示例以及对实际应用的总结,适合开发者参考。

Streams 对所有拥有 QuickNode 计划的用户开放。对于具有独特需求的团队,我们提供定制数据集、专属支持和自定义集成。有关更多信息,请 联系 我们的团队。

概述

在本指南中,我们将引导你使用 QuickNode 的 Streams 和 Functions 创建一个实时代币转移索引器。该系统将监控 Base 区块链上的 ERC20、ERC721 和 ERC1155 代币转移,处理数据,并将 enriched 信息发送到 webhook 以供进一步使用。

你将完成的工作

  • 在 QuickNode 上设置一个 Stream,以过滤来自区块链的 ERC20、ERC721 和 ERC1155 代币转移事件。
  • 实现一个 Function,处理和丰富转移数据,并将处理的数据发送到 webhook。

你将需要

  • 一个 QuickNode 帐户。
  • 对 JavaScript 及以太坊交易和事件的基本了解。查看我们的指南 以太坊交易和事件 以了解更多。
  • 在 Base Mainnet 上的 QuickNode RPC 端点。
  • 一个 QuickNode IPFS 网关
  • 一个 webhook URL 以接收 enriched 的代币转移数据。你可以在 webhook.site 获取一个免费 webhook URL。

构建代币转移索引器

在 QuickNode 上创建 Stream

首先,导航到 QuickNode Streams 页面 在仪表板上并点击“创建 Stream”。

接下来,使用以下设置创建一个 Stream:

  • :Base
  • 网络:Mainnet
  • 数据集:带收据的区块
  • Stream 起始时间:最新区块(你可以根据需求进行更改)
  • Stream 有效负载:在流之前修改 Stream
  • 重组处理:保持不变

选择选项以在流之前修改有效负载。

接下来,复制并粘贴以下代码以过滤 ERC20、ERC721 和 ERC1155 代币转移的流数据,提取关键数据,并生成发送到你的 function 进行处理的有效负载。

function stripPadding(logTopic) {
    return logTopic ? '0x' + logTopic.slice(-40).toLowerCase() : ''
}

function parseSingleData(data) {
    if (!data || data === '0x') return { tokenId: 0, value: 0 }
    const idHex = data.slice(2, 66).replace(/^0+/, '') || '0'
    const valueHex = data.slice(66).replace(/^0+/, '') || '0'
    const id = idHex === '0' ? 0 : BigInt('0x' + idHex)
    const value = valueHex === '0' ? 0 : BigInt('0x' + valueHex)
    return { tokenId: id, value: value }
}

function parseBatchData(data) {
    if (!data || data.length < 130) return { ids: [], values: [] }
    const idsArrayOffset = parseInt(data.slice(2, 66), 16) * 2 + 2
    const valuesArrayOffset = parseInt(data.slice(66, 130), 16) * 2 + 2
    const tokenCount = (valuesArrayOffset - idsArrayOffset) / 64

    const ids = Array.from({ length: tokenCount }, (_, i) => {
        const idHex =
            data
                .slice(idsArrayOffset + i * 64, idsArrayOffset + (i + 1) * 64)
                .replace(/^0+/, '') || '0'
        return idHex === '0' ? 0 : BigInt('0x' + idHex)
    })

    const values = Array.from({ length: tokenCount }, (_, i) => {
        const valueHex =
            data
                .slice(valuesArrayOffset + i * 64, valuesArrayOffset + (i + 1) * 64)
                .replace(/^0+/, '') || '0'
        return valueHex === '0' ? 0 : BigInt('0x' + valueHex)
    })

    return { ids, values }
}

function main(stream) {
    try {
        if (!stream || !stream.data) {
            return null
        }

        const streamData = Array.isArray(stream.data)
            ? stream.data
            : [stream.data]
        const erc20Transfers = []
        const erc721Transfers = []
        const erc1155Transfers = []

        streamData.forEach(stream => {
            if (!stream || !stream.block || !stream.receipts) {
                return
            }

            const blockTimestamp = stream.block.timestamp
                ? parseInt(stream.block.timestamp, 16) * 1000
                : Date.now()

            stream.receipts.forEach(receipt => {
                if (!receipt || !receipt.logs) return

                receipt.logs.forEach(log => {
                    if (!log || !log.topics || log.topics.length === 0) return

                    if (
                        log.topics[0] ===
                        '0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef'
                    ) {
                        if (log.topics.length === 3 && log.data && log.data !== '0x') {
                            const valueHex = log.data.slice(2).replace(/^0+/, '')
                            const value = valueHex ? BigInt('0x' + valueHex).toString() : '0'
                            erc20Transfers.push({
                                type: 'ERC20',
                                sender: stripPadding(log.topics[1]),
                                receiver: stripPadding(log.topics[2]),
                                value: value,
                                contract: log.address,
                                txHash: log.transactionHash,
                                txIndex: log.transactionIndex,
                                blockTimestamp: blockTimestamp,
                            })
                        } else if (
                            log.topics.length === 4 &&
                            (!log.data || log.data === '0x')
                        ) {
                            const tokenId = BigInt(log.topics[3]).toString()
                            erc721Transfers.push({
                                type: 'ERC721',
                                sender: stripPadding(log.topics[1]),
                                receiver: stripPadding(log.topics[2]),
                                tokenId: tokenId,
                                contract: log.address,
                                txHash: log.transactionHash,
                                txIndex: log.transactionIndex,
                                blockTimestamp: blockTimestamp,
                            })
                        }
                    } else if (
                        log.topics[0] ===
                        '0xc3d58168c5ae7397731d063d5bbf3d657854427343f4c083240f7aacaa2d0f62'
                    ) {
                        const { tokenId, value } = parseSingleData(log.data)
                        erc1155Transfers.push({
                            type: 'ERC1155_Single',
                            operator: stripPadding(log.topics[1]),
                            sender: stripPadding(log.topics[2]),
                            receiver: stripPadding(log.topics[3]),
                            tokenId: tokenId.toString(),
                            value: value.toString(),
                            contract: log.address,
                            txHash: log.transactionHash,
                            txIndex: log.transactionIndex,
                            blockTimestamp: blockTimestamp,
                        })
                    } else if (
                        log.topics[0] ===
                        '0x4a39dc06d4c0dbc64b70af90fd698a233a518aa5d07e595d983b8c0526c8f7fb'
                    ) {
                        const { ids, values } = parseBatchData(log.data)
                        ids.forEach((id, index) => {
                            erc1155Transfers.push({
                                type: 'ERC1155_Batch',
                                operator: stripPadding(log.topics[1]),
                                from: stripPadding(log.topics[2]),
                                to: stripPadding(log.topics[3]),
                                tokenId: id.toString(),
                                value: values[index].toString(),
                                contract: log.address,
                                txHash: log.transactionHash,
                                txIndex: log.transactionIndex,
                                blockTimestamp: blockTimestamp,
                            })
                        })
                    }
                })
            })
        })

        if (
            !erc20Transfers.length &&
            !erc721Transfers.length &&
            !erc1155Transfers.length
        ) {
            return null
        }

        return {
            erc20: erc20Transfers,
            erc721: erc721Transfers,
            erc1155: erc1155Transfers,
        }
    } catch (e) {
        console.error('主函数中出现错误:', e)
        return { error: e.message }
    }
}

测试 Stream 过滤器

点击“运行测试”按钮以测试你的过滤器对单个数据块的效果。测试完成后,你将看到 Stream 生成的数据有效负载示例。

点击“下一步”按钮,然后选择“Functions”作为你的 Stream 目标。接下来,在 Function 下拉列表中,选择“创建一个新 Function”选项。

实现 Function

在 Functions 设置页面的“选择命名空间”下,选择“创建一个新命名空间”,并输入名称(例如,TokenIndexing),然后点击“创建命名空间”。接下来,给你的 Function 命名(例如,BaseTokenIndexer),然后单击“下一步”(你可以将所有其他设置保留为默认设置)。在下一步中,粘贴以下代码以 实现 Function,该 Function 通过 RPC 检索代币元数据。确保使用你自己的真实值更新 BASE_QUICKNODE_URLWEBHOOK_URLQUICKNODE_IPFS_GATEWAY 的值。

const { Web3 } = require('web3')
const axios = require('axios')

// 最小 ABI
const ERC20_ABI = [\
    {\
        constant: true,\
        inputs: [],\
        name: 'name',\
        outputs: [{ name: '', type: 'string' }],\
        type: 'function',\
    },\
    {\
        constant: true,\
        inputs: [],\
        name: 'symbol',\
        outputs: [{ name: '', type: 'string' }],\
        type: 'function',\
    },\
    {\
        constant: true,\
        inputs: [],\
        name: 'decimals',\
        outputs: [{ name: '', type: 'uint8' }],\
        type: 'function',\
    },\
]

const ERC721_ABI = [\
    {\
        constant: true,\
        inputs: [],\
        name: 'name',\
        outputs: [{ name: '', type: 'string' }],\
        type: 'function',\
    },\
    {\
        constant: true,\
        inputs: [],\
        name: 'symbol',\
        outputs: [{ name: '', type: 'string' }],\
        type: 'function',\
    },\
    {\
        constant: true,\
        inputs: [{ name: '_tokenId', type: 'uint256' }],\
        name: 'tokenURI',\
        outputs: [{ name: '', type: 'string' }],\
        type: 'function',\
    },\
]

const ERC1155_ABI = [\
    {\
        constant: true,\
        inputs: [],\
        name: 'name',\
        outputs: [{ name: '', type: 'string' }],\
        type: 'function',\
    },\
    {\
        constant: true,\
        inputs: [],\
        name: 'symbol',\
        outputs: [{ name: '', type: 'string' }],\
        type: 'function',\
    },\
    {\
        constant: true,\
        inputs: [{ name: '_id', type: 'uint256' }],\
        name: 'uri',\
        outputs: [{ name: '', type: 'string' }],\
        type: 'function',\
    },\
    {\
        inputs: [{ internalType: 'uint256', name: 'id', type: 'uint256' }],\
        name: 'uri',\
        outputs: [{ internalType: 'string', name: '', type: 'string' }],\
        stateMutability: 'view',\
        type: 'function',\
    },\
]

// 基础 QuickNode URL
const BASE_QUICKNODE_URL =
    'https://my-real-endpoint.base-mainnet.quiknode.pro/token/'

// Webhook URL
const WEBHOOK_URL = 'https://webhook.site/real-webhook-url'

// IPFS 网关 URL
const QUICKNODE_IPFS_GATEWAY = 'https://real-ipfs-gateway/'

// 创建 Web3 实例
let web3

async function setupWeb3() {
    web3 = new Web3(BASE_QUICKNODE_URL)
}

const ERC20_CACHE = new Map()

async function getERC20Info(contractAddress) {
    if (ERC20_CACHE.has(contractAddress)) {
        return ERC20_CACHE.get(contractAddress)
    }

    const contract = new web3.eth.Contract(ERC20_ABI, contractAddress)
    let retries = 3
    while (retries > 0) {
        try {
            const [name, symbol, decimals] = await Promise.all([\
                contract.methods.name().call(),\
                contract.methods.symbol().call(),\
                contract.methods.decimals().call(),\
            ])
            const info = { name, symbol, decimals: parseInt(decimals) }
            ERC20_CACHE.set(contractAddress, info)
            return info
        } catch (error) {
            console.error(
                `获取 ${contractAddress} 的 ERC20 信息时出错(重试 ${4 - retries}/3):`,
                error
            )
            retries--
            if (retries === 0) {
                console.error(
                    `在 3 次尝试后未能获取 ${contractAddress} 的 ERC20 信息`
                )
                return { name: '未知', symbol: '未知', decimals: 18 }
            }
            await new Promise(resolve => setTimeout(resolve, 1000)) // 在重试之前等待 1 秒
        }
    }
}

async function fetchTokenMetadata(url, retries = 3) {
    if (!url || url === '未知') {
        console.log('没有提供有效的元数据 URL')
        return {}
    }

    // 处理 IPFS URI
    if (url.startsWith('ipfs://')) {
        url = `${QUICKNODE_IPFS_GATEWAY}ipfs/${url.slice(7)}`
    }

    for (let i = 0; i < retries; i++) {
        try {
            console.log(`尝试从以下 URL 获取元数据: ${url}`)
            const response = await axios.get(url, { timeout: 5000 })
            let metadata = response.data

            // 如果元数据是字符串,则可能是另一个 URL
            if (
                typeof metadata === 'string' &&
                (metadata.startsWith('http') || metadata.startsWith('ipfs://'))
            ) {
                return await fetchTokenMetadata(metadata, retries - 1)
            }

            console.log('成功获取元数据:', metadata)
            return metadata
        } catch (error) {
            console.error(
                `获取元数据时出错(尝试 ${i + 1}/${retries}):`,
                error.message
            )
            if (i === retries - 1) {
                return {}
            }
            await new Promise(resolve => setTimeout(resolve, 1000)) // 等待 1 秒后重试
        }
    }
    return {}
}

async function getNFTInfo(contractAddress, tokenId, type) {
    const ABI = type === 'ERC721' ? ERC721_ABI : ERC1155_ABI
    const contract = new web3.eth.Contract(ABI, contractAddress)
    try {
        console.log(
            `获取合约:${contractAddress},tokenId: ${tokenId}的 NFT 信息`
        )
        let [name, symbol, tokenURI] = await Promise.all([\
            contract.methods\
                .name()\
                .call()\
                .catch(() => '未知'),\
            contract.methods\
                .symbol()\
                .call()\
                .catch(() => '未知'),\
            (type === 'ERC721'\
                ? contract.methods.tokenURI(tokenId).call()\
                : contract.methods.uri(tokenId).call()\
            ).catch(() => '未知'),\
        ])

        console.log(`原始 tokenURI: ${tokenURI}`)

        // 处理 ERC1155 URI 及 {id} 占位符
        if (type === 'ERC1155' && tokenURI.includes('{id}')) {
            const hexId = web3.utils.padLeft(web3.utils.toHex(tokenId), 64).slice(2)
            tokenURI = tokenURI.replace('{id}', hexId)
            console.log(`调整后的 ERC1155 tokenURI: ${tokenURI}`)
        }

        const metadata = await fetchTokenMetadata(tokenURI)
        console.log(`获取的元数据:`, metadata)

        return {
            name,
            symbol,
            tokenURI,
            metadata,
            image: metadata.image || '没有可用的图像',
            description: metadata.description || '没有可用的描述',
        }
    } catch (error) {
        console.error(
            `获取合约 ${contractAddress} 的 token ${tokenId} 的 NFT 信息时出错:`,
            error
        )
        return {
            name: '未知',
            symbol: '未知',
            tokenURI: '未知',
            metadata: {},
            image: '没有可用的图像',
            description: '没有可用的描述',
        }
    }
}

function formatTokenValue(value, decimals) {
    const divisor = BigInt(10) ** BigInt(decimals)
    const bigIntValue = BigInt(value)
    const integerPart = bigIntValue / divisor
    const fractionalPart = bigIntValue % divisor

    // 用前导零填充小数部分
    const fractionalStr = fractionalPart.toString().padStart(decimals, '0')

    // 去掉尾部零
    const trimmedFractionalStr = fractionalStr.replace(/0+$/, '')

    return trimmedFractionalStr
        ? `${integerPart}.${trimmedFractionalStr}`
        : `${integerPart}`
}

async function processERC20Transfers(transfers) {
    const results = []
    for (const transfer of transfers) {
        const tokenInfo = await getERC20Info(transfer.contract)
        results.push({
            ...transfer,
            ...tokenInfo,
            valueFormatted: formatTokenValue(transfer.value, tokenInfo.decimals),
        })
        await new Promise(resolve => setTimeout(resolve, 100)) // 在请求之间添加 100ms 的延迟
    }
    return results
}

async function processNFTTransfers(transfers, type) {
    return Promise.all(
        transfers.map(async transfer => {
            console.log(
                `处理合约: ${transfer.contract}, tokenId: ${transfer.tokenId} 的 ${type} 转移`
            )
            const nftInfo = await getNFTInfo(
                transfer.contract,
                transfer.tokenId,
                type
            )
            console.log(`处理的 ${type} 转移:`, nftInfo)
            return {
                ...transfer,
                ...nftInfo,
                metadata: nftInfo.metadata,
                image: nftInfo.image,
                description: nftInfo.description,
            }
        })
    )
}

async function sendToWebhook(data) {
    try {
        await axios.post(WEBHOOK_URL, data)
        console.log('数据成功发送到 webhook')
    } catch (error) {
        console.error('发送数据到 webhook 时出错:', error)
    }
}

async function main(params) {
    try {
        if (!params || !params.data) {
            throw new Error('输入无效: params.data 缺失')
        }

        const { erc20 = [], erc721 = [], erc1155 = [] } = params.data
        await setupWeb3()

        const enrichedData = {
            erc20: await processERC20Transfers(erc20),
            erc721: await processNFTTransfers(erc721, 'ERC721'),
            erc1155: await processNFTTransfers(erc1155, 'ERC1155'),
        }

        await sendToWebhook(enrichedData)

        return {
            status: '数据已处理并发送到 webhook',
            transferCounts: {
                erc20: erc20.length,
                erc721: erc721.length,
                erc1155: erc1155.length,
            },
        }
    } catch (error) {
        console.error('主函数中出现错误:', error)
        return {
            status: '处理数据时出错',
            error: error.message,
        }
    }
}

exports.main = main

点击“保存并关闭”以退出 Function 向导。接下来,返回到 Streams,并点击“创建 Stream”以返回 Stream 向导。

将 Stream 连接到 Function

在 Streams 向导中,在 Stream 目标设置中选择我们刚刚创建的 Function。然后,点击“部署”以启动你的 Stream。接下来,检查你的 webhook 以查看处理的数据。

接下来,让我们花点时间回顾一下我们的 Stream、Function、QuickNode RPC 和 IPFS 网关将如何协同工作,以捕获和索引代币转移。

工作原理

  • Stream 过滤器: Stream 捕获所有 ERC20、ERC721 和 ERC1155 代币的转移事件。
  • 数据处理: Function 接收过滤后的数据并进行丰富:
    • 对于 ERC20 代币: 它获取代币名称、符号和小数位数。
    • 对于 NFT(ERC721 和 ERC1155):它检索代币元数据,包括名称、符号和代币 URI。
  • 数据丰富性: Function 使用 QuickNode RPC 端点进行额外的区块链调用以获取代币信息。
  • IPFS 集成: 对于具有基于 IPFS 的元数据的 NFT,Function 使用 QuickNode IPFS 网关来检索元数据。
  • Webhook 发送: enrich 数据被发送到指定的 webhook URL 以供进一步使用或存储。

关键特性

  • 实时处理: 捕获并处理发生在区块链上的代币转移。
  • 多代币支持: 在一个系统中处理 ERC20、ERC721 和 ERC1155 转移。
  • 数据丰富性: 自动获取并附加相关的代币信息到转移数据。
  • 错误处理: 实现重试和后备机制,实现强大的数据获取。
  • 可扩展性: 利用 QuickNode 的基础架构处理大量转移事件。

自定义选项

  • 修改 Stream 过滤器,专注于特定合约或事件类型。
  • 调整 Function 代码,以添加更多自定义处理逻辑或集成其他服务。
  • 在 function 中实现额外的数据存储解决方案以外的 webhook(例如,数据库集成)。

更多资源

结论

本指南演示了如何使用 QuickNode 的 Streams 和 Functions 构建一个强大的实时代币转移索引器。此设置为各种区块链应用程序提供了基础,包括投资组合跟踪器、交易机器人和分析平台。 通过利用 QuickNode 的基础架构,你可以轻松扩展此解决方案,以处理跨多个区块链的大量数据,从而使你能够构建健壮且响应迅速的区块链应用程序。

我们 ❤️ 反馈!

让我们知道 如果你有任何反馈或对新主题的请求。我们很乐意听取你的意见。

  • 原文链接: quicknode.com/guides/qui...
  • 登链社区 AI 助手,为大家转译优秀英文文章,如有翻译不通的地方,还请包涵~
点赞 0
收藏 0
分享
本文参与登链社区写作激励计划 ,好文好收益,欢迎正在阅读的你也加入。

0 条评论

请先 登录 后评论
QuickNode
QuickNode
江湖只有他的大名,没有他的介绍。