本文介绍了如何使用 QuickNode 的 Streams 和 Functions 创建一个实时的代币转移索引器,监控 ERC20、ERC721 和 ERC1155 代币的转移事件,并将处理后的数据发送到 webhook 进行进一步使用。文章提供了详细的步骤、代码示例以及对实际应用的总结,适合开发者参考。
Streams 对所有拥有 QuickNode 计划的用户开放。对于具有独特需求的团队,我们提供定制数据集、专属支持和自定义集成。有关更多信息,请 联系 我们的团队。
在本指南中,我们将引导你使用 QuickNode 的 Streams 和 Functions 创建一个实时代币转移索引器。该系统将监控 Base 区块链上的 ERC20、ERC721 和 ERC1155 代币转移,处理数据,并将 enriched 信息发送到 webhook 以供进一步使用。
首先,导航到 QuickNode Streams 页面 在仪表板上并点击“创建 Stream”。
接下来,使用以下设置创建一个 Stream:
选择选项以在流之前修改有效负载。
接下来,复制并粘贴以下代码以过滤 ERC20、ERC721 和 ERC1155 代币转移的流数据,提取关键数据,并生成发送到你的 function 进行处理的有效负载。
function stripPadding(logTopic) {
return logTopic ? '0x' + logTopic.slice(-40).toLowerCase() : ''
}
function parseSingleData(data) {
if (!data || data === '0x') return { tokenId: 0, value: 0 }
const idHex = data.slice(2, 66).replace(/^0+/, '') || '0'
const valueHex = data.slice(66).replace(/^0+/, '') || '0'
const id = idHex === '0' ? 0 : BigInt('0x' + idHex)
const value = valueHex === '0' ? 0 : BigInt('0x' + valueHex)
return { tokenId: id, value: value }
}
function parseBatchData(data) {
if (!data || data.length < 130) return { ids: [], values: [] }
const idsArrayOffset = parseInt(data.slice(2, 66), 16) * 2 + 2
const valuesArrayOffset = parseInt(data.slice(66, 130), 16) * 2 + 2
const tokenCount = (valuesArrayOffset - idsArrayOffset) / 64
const ids = Array.from({ length: tokenCount }, (_, i) => {
const idHex =
data
.slice(idsArrayOffset + i * 64, idsArrayOffset + (i + 1) * 64)
.replace(/^0+/, '') || '0'
return idHex === '0' ? 0 : BigInt('0x' + idHex)
})
const values = Array.from({ length: tokenCount }, (_, i) => {
const valueHex =
data
.slice(valuesArrayOffset + i * 64, valuesArrayOffset + (i + 1) * 64)
.replace(/^0+/, '') || '0'
return valueHex === '0' ? 0 : BigInt('0x' + valueHex)
})
return { ids, values }
}
function main(stream) {
try {
if (!stream || !stream.data) {
return null
}
const streamData = Array.isArray(stream.data)
? stream.data
: [stream.data]
const erc20Transfers = []
const erc721Transfers = []
const erc1155Transfers = []
streamData.forEach(stream => {
if (!stream || !stream.block || !stream.receipts) {
return
}
const blockTimestamp = stream.block.timestamp
? parseInt(stream.block.timestamp, 16) * 1000
: Date.now()
stream.receipts.forEach(receipt => {
if (!receipt || !receipt.logs) return
receipt.logs.forEach(log => {
if (!log || !log.topics || log.topics.length === 0) return
if (
log.topics[0] ===
'0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef'
) {
if (log.topics.length === 3 && log.data && log.data !== '0x') {
const valueHex = log.data.slice(2).replace(/^0+/, '')
const value = valueHex ? BigInt('0x' + valueHex).toString() : '0'
erc20Transfers.push({
type: 'ERC20',
sender: stripPadding(log.topics[1]),
receiver: stripPadding(log.topics[2]),
value: value,
contract: log.address,
txHash: log.transactionHash,
txIndex: log.transactionIndex,
blockTimestamp: blockTimestamp,
})
} else if (
log.topics.length === 4 &&
(!log.data || log.data === '0x')
) {
const tokenId = BigInt(log.topics[3]).toString()
erc721Transfers.push({
type: 'ERC721',
sender: stripPadding(log.topics[1]),
receiver: stripPadding(log.topics[2]),
tokenId: tokenId,
contract: log.address,
txHash: log.transactionHash,
txIndex: log.transactionIndex,
blockTimestamp: blockTimestamp,
})
}
} else if (
log.topics[0] ===
'0xc3d58168c5ae7397731d063d5bbf3d657854427343f4c083240f7aacaa2d0f62'
) {
const { tokenId, value } = parseSingleData(log.data)
erc1155Transfers.push({
type: 'ERC1155_Single',
operator: stripPadding(log.topics[1]),
sender: stripPadding(log.topics[2]),
receiver: stripPadding(log.topics[3]),
tokenId: tokenId.toString(),
value: value.toString(),
contract: log.address,
txHash: log.transactionHash,
txIndex: log.transactionIndex,
blockTimestamp: blockTimestamp,
})
} else if (
log.topics[0] ===
'0x4a39dc06d4c0dbc64b70af90fd698a233a518aa5d07e595d983b8c0526c8f7fb'
) {
const { ids, values } = parseBatchData(log.data)
ids.forEach((id, index) => {
erc1155Transfers.push({
type: 'ERC1155_Batch',
operator: stripPadding(log.topics[1]),
from: stripPadding(log.topics[2]),
to: stripPadding(log.topics[3]),
tokenId: id.toString(),
value: values[index].toString(),
contract: log.address,
txHash: log.transactionHash,
txIndex: log.transactionIndex,
blockTimestamp: blockTimestamp,
})
})
}
})
})
})
if (
!erc20Transfers.length &&
!erc721Transfers.length &&
!erc1155Transfers.length
) {
return null
}
return {
erc20: erc20Transfers,
erc721: erc721Transfers,
erc1155: erc1155Transfers,
}
} catch (e) {
console.error('主函数中出现错误:', e)
return { error: e.message }
}
}
点击“运行测试”按钮以测试你的过滤器对单个数据块的效果。测试完成后,你将看到 Stream 生成的数据有效负载示例。
点击“下一步”按钮,然后选择“Functions”作为你的 Stream 目标。接下来,在 Function 下拉列表中,选择“创建一个新 Function”选项。
在 Functions 设置页面的“选择命名空间”下,选择“创建一个新命名空间”,并输入名称(例如,TokenIndexing),然后点击“创建命名空间”。接下来,给你的 Function 命名(例如,BaseTokenIndexer),然后单击“下一步”(你可以将所有其他设置保留为默认设置)。在下一步中,粘贴以下代码以 实现 Function,该 Function 通过 RPC 检索代币元数据。确保使用你自己的真实值更新 BASE_QUICKNODE_URL
、WEBHOOK_URL
和 QUICKNODE_IPFS_GATEWAY
的值。
const { Web3 } = require('web3')
const axios = require('axios')
// 最小 ABI
const ERC20_ABI = [\
{\
constant: true,\
inputs: [],\
name: 'name',\
outputs: [{ name: '', type: 'string' }],\
type: 'function',\
},\
{\
constant: true,\
inputs: [],\
name: 'symbol',\
outputs: [{ name: '', type: 'string' }],\
type: 'function',\
},\
{\
constant: true,\
inputs: [],\
name: 'decimals',\
outputs: [{ name: '', type: 'uint8' }],\
type: 'function',\
},\
]
const ERC721_ABI = [\
{\
constant: true,\
inputs: [],\
name: 'name',\
outputs: [{ name: '', type: 'string' }],\
type: 'function',\
},\
{\
constant: true,\
inputs: [],\
name: 'symbol',\
outputs: [{ name: '', type: 'string' }],\
type: 'function',\
},\
{\
constant: true,\
inputs: [{ name: '_tokenId', type: 'uint256' }],\
name: 'tokenURI',\
outputs: [{ name: '', type: 'string' }],\
type: 'function',\
},\
]
const ERC1155_ABI = [\
{\
constant: true,\
inputs: [],\
name: 'name',\
outputs: [{ name: '', type: 'string' }],\
type: 'function',\
},\
{\
constant: true,\
inputs: [],\
name: 'symbol',\
outputs: [{ name: '', type: 'string' }],\
type: 'function',\
},\
{\
constant: true,\
inputs: [{ name: '_id', type: 'uint256' }],\
name: 'uri',\
outputs: [{ name: '', type: 'string' }],\
type: 'function',\
},\
{\
inputs: [{ internalType: 'uint256', name: 'id', type: 'uint256' }],\
name: 'uri',\
outputs: [{ internalType: 'string', name: '', type: 'string' }],\
stateMutability: 'view',\
type: 'function',\
},\
]
// 基础 QuickNode URL
const BASE_QUICKNODE_URL =
'https://my-real-endpoint.base-mainnet.quiknode.pro/token/'
// Webhook URL
const WEBHOOK_URL = 'https://webhook.site/real-webhook-url'
// IPFS 网关 URL
const QUICKNODE_IPFS_GATEWAY = 'https://real-ipfs-gateway/'
// 创建 Web3 实例
let web3
async function setupWeb3() {
web3 = new Web3(BASE_QUICKNODE_URL)
}
const ERC20_CACHE = new Map()
async function getERC20Info(contractAddress) {
if (ERC20_CACHE.has(contractAddress)) {
return ERC20_CACHE.get(contractAddress)
}
const contract = new web3.eth.Contract(ERC20_ABI, contractAddress)
let retries = 3
while (retries > 0) {
try {
const [name, symbol, decimals] = await Promise.all([\
contract.methods.name().call(),\
contract.methods.symbol().call(),\
contract.methods.decimals().call(),\
])
const info = { name, symbol, decimals: parseInt(decimals) }
ERC20_CACHE.set(contractAddress, info)
return info
} catch (error) {
console.error(
`获取 ${contractAddress} 的 ERC20 信息时出错(重试 ${4 - retries}/3):`,
error
)
retries--
if (retries === 0) {
console.error(
`在 3 次尝试后未能获取 ${contractAddress} 的 ERC20 信息`
)
return { name: '未知', symbol: '未知', decimals: 18 }
}
await new Promise(resolve => setTimeout(resolve, 1000)) // 在重试之前等待 1 秒
}
}
}
async function fetchTokenMetadata(url, retries = 3) {
if (!url || url === '未知') {
console.log('没有提供有效的元数据 URL')
return {}
}
// 处理 IPFS URI
if (url.startsWith('ipfs://')) {
url = `${QUICKNODE_IPFS_GATEWAY}ipfs/${url.slice(7)}`
}
for (let i = 0; i < retries; i++) {
try {
console.log(`尝试从以下 URL 获取元数据: ${url}`)
const response = await axios.get(url, { timeout: 5000 })
let metadata = response.data
// 如果元数据是字符串,则可能是另一个 URL
if (
typeof metadata === 'string' &&
(metadata.startsWith('http') || metadata.startsWith('ipfs://'))
) {
return await fetchTokenMetadata(metadata, retries - 1)
}
console.log('成功获取元数据:', metadata)
return metadata
} catch (error) {
console.error(
`获取元数据时出错(尝试 ${i + 1}/${retries}):`,
error.message
)
if (i === retries - 1) {
return {}
}
await new Promise(resolve => setTimeout(resolve, 1000)) // 等待 1 秒后重试
}
}
return {}
}
async function getNFTInfo(contractAddress, tokenId, type) {
const ABI = type === 'ERC721' ? ERC721_ABI : ERC1155_ABI
const contract = new web3.eth.Contract(ABI, contractAddress)
try {
console.log(
`获取合约:${contractAddress},tokenId: ${tokenId}的 NFT 信息`
)
let [name, symbol, tokenURI] = await Promise.all([\
contract.methods\
.name()\
.call()\
.catch(() => '未知'),\
contract.methods\
.symbol()\
.call()\
.catch(() => '未知'),\
(type === 'ERC721'\
? contract.methods.tokenURI(tokenId).call()\
: contract.methods.uri(tokenId).call()\
).catch(() => '未知'),\
])
console.log(`原始 tokenURI: ${tokenURI}`)
// 处理 ERC1155 URI 及 {id} 占位符
if (type === 'ERC1155' && tokenURI.includes('{id}')) {
const hexId = web3.utils.padLeft(web3.utils.toHex(tokenId), 64).slice(2)
tokenURI = tokenURI.replace('{id}', hexId)
console.log(`调整后的 ERC1155 tokenURI: ${tokenURI}`)
}
const metadata = await fetchTokenMetadata(tokenURI)
console.log(`获取的元数据:`, metadata)
return {
name,
symbol,
tokenURI,
metadata,
image: metadata.image || '没有可用的图像',
description: metadata.description || '没有可用的描述',
}
} catch (error) {
console.error(
`获取合约 ${contractAddress} 的 token ${tokenId} 的 NFT 信息时出错:`,
error
)
return {
name: '未知',
symbol: '未知',
tokenURI: '未知',
metadata: {},
image: '没有可用的图像',
description: '没有可用的描述',
}
}
}
function formatTokenValue(value, decimals) {
const divisor = BigInt(10) ** BigInt(decimals)
const bigIntValue = BigInt(value)
const integerPart = bigIntValue / divisor
const fractionalPart = bigIntValue % divisor
// 用前导零填充小数部分
const fractionalStr = fractionalPart.toString().padStart(decimals, '0')
// 去掉尾部零
const trimmedFractionalStr = fractionalStr.replace(/0+$/, '')
return trimmedFractionalStr
? `${integerPart}.${trimmedFractionalStr}`
: `${integerPart}`
}
async function processERC20Transfers(transfers) {
const results = []
for (const transfer of transfers) {
const tokenInfo = await getERC20Info(transfer.contract)
results.push({
...transfer,
...tokenInfo,
valueFormatted: formatTokenValue(transfer.value, tokenInfo.decimals),
})
await new Promise(resolve => setTimeout(resolve, 100)) // 在请求之间添加 100ms 的延迟
}
return results
}
async function processNFTTransfers(transfers, type) {
return Promise.all(
transfers.map(async transfer => {
console.log(
`处理合约: ${transfer.contract}, tokenId: ${transfer.tokenId} 的 ${type} 转移`
)
const nftInfo = await getNFTInfo(
transfer.contract,
transfer.tokenId,
type
)
console.log(`处理的 ${type} 转移:`, nftInfo)
return {
...transfer,
...nftInfo,
metadata: nftInfo.metadata,
image: nftInfo.image,
description: nftInfo.description,
}
})
)
}
async function sendToWebhook(data) {
try {
await axios.post(WEBHOOK_URL, data)
console.log('数据成功发送到 webhook')
} catch (error) {
console.error('发送数据到 webhook 时出错:', error)
}
}
async function main(params) {
try {
if (!params || !params.data) {
throw new Error('输入无效: params.data 缺失')
}
const { erc20 = [], erc721 = [], erc1155 = [] } = params.data
await setupWeb3()
const enrichedData = {
erc20: await processERC20Transfers(erc20),
erc721: await processNFTTransfers(erc721, 'ERC721'),
erc1155: await processNFTTransfers(erc1155, 'ERC1155'),
}
await sendToWebhook(enrichedData)
return {
status: '数据已处理并发送到 webhook',
transferCounts: {
erc20: erc20.length,
erc721: erc721.length,
erc1155: erc1155.length,
},
}
} catch (error) {
console.error('主函数中出现错误:', error)
return {
status: '处理数据时出错',
error: error.message,
}
}
}
exports.main = main
点击“保存并关闭”以退出 Function 向导。接下来,返回到 Streams,并点击“创建 Stream”以返回 Stream 向导。
在 Streams 向导中,在 Stream 目标设置中选择我们刚刚创建的 Function。然后,点击“部署”以启动你的 Stream。接下来,检查你的 webhook 以查看处理的数据。
接下来,让我们花点时间回顾一下我们的 Stream、Function、QuickNode RPC 和 IPFS 网关将如何协同工作,以捕获和索引代币转移。
本指南演示了如何使用 QuickNode 的 Streams 和 Functions 构建一个强大的实时代币转移索引器。此设置为各种区块链应用程序提供了基础,包括投资组合跟踪器、交易机器人和分析平台。 通过利用 QuickNode 的基础架构,你可以轻松扩展此解决方案,以处理跨多个区块链的大量数据,从而使你能够构建健壮且响应迅速的区块链应用程序。
让我们知道 如果你有任何反馈或对新主题的请求。我们很乐意听取你的意见。
- 原文链接: quicknode.com/guides/qui...
- 登链社区 AI 助手,为大家转译优秀英文文章,如有翻译不通的地方,还请包涵~
如果觉得我的文章对您有用,请随意打赏。你的支持将鼓励我继续创作!