本文介绍了如何使用QuickNode Streams REST API设置和管理区块链数据流。通过此指南,开发者可以创建自定义数据流、配置目标以及应用过滤器来处理区块链数据,简化数据处理过程。文章详细阐述了Streams的特性、API能力以及代码实现,适合对区块链数据处理有一定了解的开发者。
使用 API 可以轻松实现自动化、与其他系统的无缝集成,并能够更高效地进行扩展。通过使用 QuickNode Streams REST API,开发人员可以直接将区块链数据集成到他们的后端,无需通过用户界面手动处理数据。
在本指南中,我们将引导你如何使用 QuickNode REST API 设置和管理 Streams。你将学习如何创建 Streams、配置目标,并应用过滤器来处理区块链数据。
Streams 是 QuickNode 提供的一项强大功能,它能够将实时和历史区块链数据传送到 不同目标 中。它支持灵活的过滤、确保交付和实时处理,是需要历史或实时数据的应用程序的理想解决方案。
Streams API 允许你:
了解更多关于 Streams REST API 的能力 这里。
在下一部分中,我们将逐步讲解如何编写一个过滤函数以处理 Ethereum 区块数据。然后,我们将使用 API 创建一个新的 Stream 并应用过滤函数。
你需要一个 API 密钥才能使用 Streams API。如果你尚未注册并创建任何端点,可以 在这里 创建一个账户。
要创建你的密钥,请登录到你的 QuickNode 账户,点击左上角的头像图标,然后选择 API 密钥。这将带你到 API 密钥页面。
生成具有 STREAMS_REST
权限的 API 密钥,并将其保存,以便在进行请求时使用。
Streams 支持多个目标,包括 Webhook。要创建一个 Webhook 目标,你需要在你的 Web 服务器中创建一个 Webhook 端点。你可以使用任何 Web 服务器,例如 TypedWebhook 来轻松创建一个 Webhook 端点。
通常,当你访问在线 Webhook 创建工具(如 TypedWebhook)时,会看到一个 Webhook URL。复制该 URL 并将其保存,以便稍后使用。
在深入代码之前,让我们概述使用 API 设置 Stream 所涉及的步骤:
main
)。testFilterFunction
)setupQuickNodeStream
)首先,创建一个新的目录用于你的项目并使用 npm 初始化。这将创建一个 package.json 文件,以跟踪你项目的依赖项。
mkdir quicknode-streams-api
cd quicknode-streams-api
npm init -y
接下来,安装项目所需的依赖项。你可以使用以下命令安装所需的依赖项:
npm install axios
Axios 是一个流行的 HTTP 客户端库,用于在 JavaScript 中发起 HTTP 请求。它提供了一个简单直观的 API 来发送 HTTP 请求和处理响应。
初始化项目后,下一步是编写一个过滤函数。过滤函数用于从区块链中过滤掉不必要的数据。在这个例子中,我们将仅提取 block
数据集中的块号和块哈希。要了解更多关于 Streams 可以处理的数据集,查看我们的 Streams 文档。
在项目的根目录中创建一个名为 index.js
的新文件。这个文件将包含使用 API 创建 Stream 的所有代码,包括过滤逻辑。我们将逐步构建这个文件。
将以下代码复制到 index.js
文件中,并用你的实际 API 密钥和 Webhook URL 替换 QUICKNODE_API_KEY
和 WEBHOOK_URL
占位符。
index.js
const axios = require('axios')
const QUICKNODE_API_KEY = 'YOUR-QUICKNODE-API-KEY' // 👈 替换为你实际的 API 密钥
const WEBHOOK_URL = 'YOUR-WEBHOOK-URL' // 👈 替换为你的 Webhook URL
function main(stream) {
try {
const data = stream.data
// 数据是一个对象数组。如果有一批事件,数组将包含多个对象。
// 在测试中,数组中只有一个对象。
const numberDecimal = parseInt(data[0].number, 16)
const filteredData = {
hash: data[0].hash,
number: numberDecimal,
}
return filteredData
} catch (error) {
return { error: error.message, stack: error.stack }
}
}
接下来,我们测试使用 QuickNode API 的过滤函数。
在创建流之前,验证你的过滤逻辑与实际数据一起工作是很重要的。QuickNode 的 API 提供了一种方便的方法来测试过滤函数,允许你验证其行为是否与特定块号相符。
测试过滤函数所需的参数为:
network
: 网络名称,例如 ethereum-mainnet
。dataset
: 数据集名称,例如 block
。filter_function
: base64 编码的过滤函数。block
: 用于测试过滤函数的块号或哈希。详情请见 Streams - 过滤函数文档。
要验证过滤函数的结果,你需选择一个块号并获取相应的块哈希。下面的代码使用 17811625
作为 Ethereum Mainnet 的示例块号。一旦你选择了块号或哈希,可以使用以下代码测试过滤函数。
在 main()
函数后添加以下代码。
此函数将 base64 编码的过滤函数作为输入参数,向 QuickNode 的 API 发送测试请求,通过将输出与已知的块哈希和数字进行比较来验证过滤逻辑的行为。我们将在 setupQuickNodeStream
函数中调用此函数。
index.js 继续
async function testFilterFunction(base64FilterFunction) {
const hash =
'0xb72704063570e4b5a5f972f380fad5e43e1e8c9a1b0e36f204b9282c89adc677' // 👈 测试块的哈希
const number = '17811625' // 👈 测试块的数字
let data = JSON.stringify({
network: 'ethereum-mainnet',
dataset: 'block',
filter_function: base64FilterFunction,
block: number,
})
try {
const response = await axios.post(
'https://api.quicknode.com/streams/rest/v1/streams/test_filter',
data,
{
headers: {
accept: 'application/json',
'Content-Type': 'application/json',
'x-api-key': QUICKNODE_API_KEY,
},
}
)
if (
response.status === 201 &&
response.data.hash === hash &&
response.data.number === number
) {
console.log('过滤函数测试成功!')
return true
} else {
console.error('测试过滤函数时出错:', response.status)
return false
}
} catch (error) {
console.error('测试过滤函数时出错:', error.message)
throw error
}
}
现在我们已经创建了过滤函数,可以使用 QuickNode API 创建一个 Stream。我们将使用 setupQuickNodeStream
函数创建一个新的 Stream,并传入已验证的过滤函数。
在 testFilterFunction
函数后添加以下代码。
此代码首先将过滤函数转换为 base64 字符串,测试它,如果测试成功则继续创建流。流被配置为使用 Webhook 目标,包括对重试和超时的错误处理。
查看 创建 Stream 文档 以查看 Streams 的所有可用选项。
index.js 继续
async function setupQuickNodeStream(startSlot, endSlot) {
const filterFunctionString = main.toString()
const base64FilterFunction =
Buffer.from(filterFunctionString).toString('base64')
const testResult = await testFilterFunction(base64FilterFunction)
if (!testResult) {
console.error('过滤函数失败。Stream 未创建。')
return
}
console.log('过滤函数通过。继续创建流。')
const streamConfig = {
name: '使用 API 测试的 Streams',
network: 'ethereum-mainnet',
dataset: 'block',
filter_function: base64FilterFunction,
destination: {
url: WEBHOOK_URL,
compression: 'none',
headers: {
'Content-Type': 'application/json',
},
max_retry: 3,
retry_interval_sec: 1,
},
status: 'active',
}
try {
const response = await axios.post(
'https://api.quicknode.com/streams/rest/v1/streams',
streamConfig,
{
headers: {
accept: 'application/json',
'Content-Type': 'application/json',
'x-api-key': QUICKNODE_API_KEY,
},
}
)
console.log('Stream 创建成功:', response.data)
return response.data.id
} catch (error) {
console.error('创建 Stream 时出错:', error.message)
throw error
}
}
QuickNode 支持多个目标,这意味着你可以将区块链数据导向不同的存储或处理服务。在本指南中,我们使用 Webhook 目标,但你可以为不同的需求配置 AWS S3、PostgreSQL 或 Snowflake。以下是每种配置的方法。
AWS S3 Bucket
const streamConfig = {
name: '使用 API 测试的 Streams',
network: 'ethereum-mainnet',
dataset: 'block',
filter_function: base64FilterFunction,
destination: {
s3: {
endpoint: 'your-s3-endpoint',
bucket: 'your-bucket-name',
region: 'us-east-1',
access_key: 'YOUR_ACCESS_KEY',
secret_key: 'YOUR_SECRET_KEY',
file_compression_type: 'gzip',
file_type: 'json',
max_retry: 3,
retry_interval_sec: 1,
use_ssl: true,
},
},
status: 'active',
}
PostgreSQL
const streamConfig = {
name: '使用 API 测试的 Streams',
network: 'ethereum-mainnet',
dataset: 'block',
filter_function: base64FilterFunction,
destination: {
postgresql: {
host: 'your-db-host',
port: 5432,
database: 'your-database',
table_name: 'your-table-name',
username: 'your-username',
password: 'your-password',
ssl_mode: 'require',
max_retry: 3,
retry_interval_sec: 1,
},
},
status: 'active',
}
Snowflake
const streamConfig = {
name: '使用 API 测试的 Streams',
network: 'ethereum-mainnet',
dataset: 'block',
filter_function: base64FilterFunction,
destination: {
snowflake: {
account: 'your-account',
warehouse: 'your-warehouse',
host: 'your-host',
database: 'your-database',
protocol: 'snowflake-protocol',
schema: 'your-schema',
table_name: 'your-table-name',
username: 'your-username',
max_retry: 3,
retry_interval_sec: 1,
},
},
status: 'active',
}
现在我们已经配置了我们的 Stream,让我们来运行它。我们将使用 setupQuickNodeStream
函数创建一个新的 Stream,并传入已验证的过滤函数。正如你所看到的,我们将开始和结束插槽作为参数传递。如果你想无限流,你可以传入 -1
作为结束插槽。
在你的 index.js
文件末尾添加以下代码。
index.js 继续
setupQuickNodeStream(1, 100000)
然后,使用以下命令运行脚本:
node index.js
然后,你应该在控制台中看到成功的消息和 Stream 的详细信息。
成功创建 Stream 后,你可以使用以下能力对其进行管理和交互:
这些操作使你能够动态管理你的数据 Streams,完全控制它们的生命周期。
在本指南中,我们讲解了如何使用 QuickNode API 设置 Stream,实施过滤函数,并管理现有 Streams。通过利用这些 API 能力,你可以实现数据管道的自动化,自定义数据流,并确保在最少的人工干预下保持一致性。使用 QuickNode Streams,处理区块链数据变得无缝,使你能够专注于构建你的应用程序,而我们则处理实时数据管理的复杂性。
让我们知道 如果你有任何反馈或新主题的请求。我们很希望听到你的意见。
- 原文链接: quicknode.com/guides/qui...
- 登链社区 AI 助手,为大家转译优秀英文文章,如有翻译不通的地方,还请包涵~
如果觉得我的文章对您有用,请随意打赏。你的支持将鼓励我继续创作!