如何使用REST API设置流媒体

  • QuickNode
  • 发布于 2025-01-30 22:38
  • 阅读 14

本文介绍了如何使用QuickNode Streams REST API设置和管理区块链数据流。通过此指南,开发者可以创建自定义数据流、配置目标以及应用过滤器来处理区块链数据,简化数据处理过程。文章详细阐述了Streams的特性、API能力以及代码实现,适合对区块链数据处理有一定了解的开发者。

概述

使用 API 可以轻松实现自动化、与其他系统的无缝集成,并能够更高效地进行扩展。通过使用 QuickNode Streams REST API,开发人员可以直接将区块链数据集成到他们的后端,无需通过用户界面手动处理数据。

在本指南中,我们将引导你如何使用 QuickNode REST API 设置和管理 Streams。你将学习如何创建 Streams、配置目标,并应用过滤器来处理区块链数据。

你将做什么

  • 创建一个具有可自定义配置的新 Stream,目标包括 Webhooks、AWS S3 和 PostgreSQL
  • 应用过滤器以处理传入的数据

你需要什么

  • 一个 QuickNode 账户
  • 一个代码编辑器(例如,VSCode)
  • 对 REST API 和 JSON 的熟悉
  • 已安装 Node.js

什么是 Streams?

Streams 是 QuickNode 提供的一项强大功能,它能够将实时和历史区块链数据传送到 不同目标 中。它支持灵活的过滤、确保交付和实时处理,是需要历史或实时数据的应用程序的理想解决方案。

Streams 的关键特性

  • 可自定义过滤: 不必拉取大量数据集并手动过滤,定义你自己的过滤逻辑,减少数据噪声和处理开销。
  • 多个目标: 仅需简单配置即可轻松流式传输数据到 Webhooks、AWS S3、PostgreSQL 和其他服务。
  • 历史数据回填: 使用预定义模板检索历史区块链数据,提供透明的定价和完成时间估计,以便于更好的计划。
  • 成本高效: 定价基于处理的数据量,确保即便对于高流量应用也能保持可预测的成本。
  • 数据一致性: 内置重组处理,你可以依赖一致性的数据交付,确保在网络重组期间没有区块或交易丢失。

API 能力

Streams API 允许你:

  • 创建和管理 Streams
  • 测试和应用过滤函数
  • 配置多种目标类型
  • 监测和追踪你的 Streams 性能

了解更多关于 Streams REST API 的能力 这里

在下一部分中,我们将逐步讲解如何编写一个过滤函数以处理 Ethereum 区块数据。然后,我们将使用 API 创建一个新的 Stream 并应用过滤函数。

先决条件

QuickNode 账户和 API 密钥

你需要一个 API 密钥才能使用 Streams API。如果你尚未注册并创建任何端点,可以 在这里 创建一个账户。

要创建你的密钥,请登录到你的 QuickNode 账户,点击左上角的头像图标,然后选择 API 密钥。这将带你到 API 密钥页面

生成具有 STREAMS_REST 权限的 API 密钥,并将其保存,以便在进行请求时使用。

API Key

Webhook 目标

Streams 支持多个目标,包括 Webhook。要创建一个 Webhook 目标,你需要在你的 Web 服务器中创建一个 Webhook 端点。你可以使用任何 Web 服务器,例如 TypedWebhook 来轻松创建一个 Webhook 端点。

通常,当你访问在线 Webhook 创建工具(如 TypedWebhook)时,会看到一个 Webhook URL。复制该 URL 并将其保存,以便稍后使用。

如何使用 API 设置 Stream

Stream 设置流程概览

在深入代码之前,让我们概述使用 API 设置 Stream 所涉及的步骤:

  1. 过滤函数: 编写并测试一个处理区块链数据的过滤函数 ( main)。
  2. 测试过滤函数: 使用 QuickNode API 验证过滤逻辑。( testFilterFunction)
  3. 创建 Stream: 验证通过后,使用 API 创建一个 Stream 并配置目标,例如 Webhooks 或 AWS S3。( setupQuickNodeStream)
  4. 运行和管理 Streams: 创建 Stream 之后,你可以使用 API 管理和与之交互。

步骤 1: 初始化你的项目

首先,创建一个新的目录用于你的项目并使用 npm 初始化。这将创建一个 package.json 文件,以跟踪你项目的依赖项。

mkdir quicknode-streams-api
cd quicknode-streams-api
npm init -y

接下来,安装项目所需的依赖项。你可以使用以下命令安装所需的依赖项:

npm install axios

Axios 是一个流行的 HTTP 客户端库,用于在 JavaScript 中发起 HTTP 请求。它提供了一个简单直观的 API 来发送 HTTP 请求和处理响应。

步骤 2: 编写过滤函数

初始化项目后,下一步是编写一个过滤函数。过滤函数用于从区块链中过滤掉不必要的数据。在这个例子中,我们将仅提取 block 数据集中的块号和块哈希。要了解更多关于 Streams 可以处理的数据集,查看我们的 Streams 文档

在项目的根目录中创建一个名为 index.js 的新文件。这个文件将包含使用 API 创建 Stream 的所有代码,包括过滤逻辑。我们将逐步构建这个文件。

将以下代码复制到 index.js 文件中,并用你的实际 API 密钥和 Webhook URL 替换 QUICKNODE_API_KEYWEBHOOK_URL 占位符。

index.js

const axios = require('axios')

const QUICKNODE_API_KEY = 'YOUR-QUICKNODE-API-KEY' // 👈 替换为你实际的 API 密钥
const WEBHOOK_URL = 'YOUR-WEBHOOK-URL' // 👈 替换为你的 Webhook URL

function main(stream) {
    try {
        const data = stream.data

        // 数据是一个对象数组。如果有一批事件,数组将包含多个对象。
        // 在测试中,数组中只有一个对象。
        const numberDecimal = parseInt(data[0].number, 16)
        const filteredData = {
            hash: data[0].hash,
            number: numberDecimal,
        }
        return filteredData
    } catch (error) {
        return { error: error.message, stack: error.stack }
    }
}

接下来,我们测试使用 QuickNode API 的过滤函数。

步骤 3: 测试过滤函数

在创建流之前,验证你的过滤逻辑与实际数据一起工作是很重要的。QuickNode 的 API 提供了一种方便的方法来测试过滤函数,允许你验证其行为是否与特定块号相符。

测试过滤函数所需的参数为:

  • network: 网络名称,例如 ethereum-mainnet
  • dataset: 数据集名称,例如 block
  • filter_function: base64 编码的过滤函数。
  • block: 用于测试过滤函数的块号或哈希。

详情请见 Streams - 过滤函数文档

要验证过滤函数的结果,你需选择一个块号并获取相应的块哈希。下面的代码使用 17811625 作为 Ethereum Mainnet 的示例块号。一旦你选择了块号或哈希,可以使用以下代码测试过滤函数。

main() 函数后添加以下代码。

此函数将 base64 编码的过滤函数作为输入参数,向 QuickNode 的 API 发送测试请求,通过将输出与已知的块哈希和数字进行比较来验证过滤逻辑的行为。我们将在 setupQuickNodeStream 函数中调用此函数。

index.js 继续

async function testFilterFunction(base64FilterFunction) {
    const hash =
        '0xb72704063570e4b5a5f972f380fad5e43e1e8c9a1b0e36f204b9282c89adc677' // 👈 测试块的哈希
    const number = '17811625' // 👈 测试块的数字

    let data = JSON.stringify({
        network: 'ethereum-mainnet',
        dataset: 'block',
        filter_function: base64FilterFunction,
        block: number,
    })

    try {
        const response = await axios.post(
            'https://api.quicknode.com/streams/rest/v1/streams/test_filter',
            data,
            {
                headers: {
                    accept: 'application/json',
                    'Content-Type': 'application/json',
                    'x-api-key': QUICKNODE_API_KEY,
                },
            }
        )

        if (
            response.status === 201 &&
            response.data.hash === hash &&
            response.data.number === number
        ) {
            console.log('过滤函数测试成功!')
            return true
        } else {
            console.error('测试过滤函数时出错:', response.status)
            return false
        }
    } catch (error) {
        console.error('测试过滤函数时出错:', error.message)
        throw error
    }
}

步骤 4: 创建 Stream

现在我们已经创建了过滤函数,可以使用 QuickNode API 创建一个 Stream。我们将使用 setupQuickNodeStream 函数创建一个新的 Stream,并传入已验证的过滤函数。

testFilterFunction 函数后添加以下代码。

此代码首先将过滤函数转换为 base64 字符串,测试它,如果测试成功则继续创建流。流被配置为使用 Webhook 目标,包括对重试和超时的错误处理。

查看 创建 Stream 文档 以查看 Streams 的所有可用选项。

index.js 继续

async function setupQuickNodeStream(startSlot, endSlot) {
    const filterFunctionString = main.toString()
    const base64FilterFunction =
        Buffer.from(filterFunctionString).toString('base64')

    const testResult = await testFilterFunction(base64FilterFunction)
    if (!testResult) {
        console.error('过滤函数失败。Stream 未创建。')
        return
    }

    console.log('过滤函数通过。继续创建流。')

    const streamConfig = {
        name: '使用 API 测试的 Streams',
        network: 'ethereum-mainnet',
        dataset: 'block',
        filter_function: base64FilterFunction,
        destination: {
            url: WEBHOOK_URL,
            compression: 'none',
            headers: {
                'Content-Type': 'application/json',
            },
            max_retry: 3,
            retry_interval_sec: 1,
        },
        status: 'active',
    }

    try {
        const response = await axios.post(
            'https://api.quicknode.com/streams/rest/v1/streams',
            streamConfig,
            {
                headers: {
                    accept: 'application/json',
                    'Content-Type': 'application/json',
                    'x-api-key': QUICKNODE_API_KEY,
                },
            }
        )

        console.log('Stream 创建成功:', response.data)
        return response.data.id
    } catch (error) {
        console.error('创建 Stream 时出错:', error.message)
        throw error
    }
}

步骤 5: 目标选项(可选)

QuickNode 支持多个目标,这意味着你可以将区块链数据导向不同的存储或处理服务。在本指南中,我们使用 Webhook 目标,但你可以为不同的需求配置 AWS S3、PostgreSQL 或 Snowflake。以下是每种配置的方法。

AWS S3

AWS S3 Bucket

const streamConfig = {
    name: '使用 API 测试的 Streams',
    network: 'ethereum-mainnet',
    dataset: 'block',
    filter_function: base64FilterFunction,
    destination: {
        s3: {
            endpoint: 'your-s3-endpoint',
            bucket: 'your-bucket-name',
            region: 'us-east-1',
            access_key: 'YOUR_ACCESS_KEY',
            secret_key: 'YOUR_SECRET_KEY',
            file_compression_type: 'gzip',
            file_type: 'json',
            max_retry: 3,
            retry_interval_sec: 1,
            use_ssl: true,
        },
    },
    status: 'active',
}

PostgreSQL

PostgreSQL

const streamConfig = {
    name: '使用 API 测试的 Streams',
    network: 'ethereum-mainnet',
    dataset: 'block',
    filter_function: base64FilterFunction,
    destination: {
        postgresql: {
            host: 'your-db-host',
            port: 5432,
            database: 'your-database',
            table_name: 'your-table-name',
            username: 'your-username',
            password: 'your-password',
            ssl_mode: 'require',
            max_retry: 3,
            retry_interval_sec: 1,
        },
    },
    status: 'active',
}

Snowflake

Snowflake

const streamConfig = {
    name: '使用 API 测试的 Streams',
    network: 'ethereum-mainnet',
    dataset: 'block',
    filter_function: base64FilterFunction,
    destination: {
        snowflake: {
            account: 'your-account',
            warehouse: 'your-warehouse',
            host: 'your-host',
            database: 'your-database',
            protocol: 'snowflake-protocol',
            schema: 'your-schema',
            table_name: 'your-table-name',
            username: 'your-username',
            max_retry: 3,
            retry_interval_sec: 1,
        },
    },
    status: 'active',
}

步骤 6: 运行 Stream

现在我们已经配置了我们的 Stream,让我们来运行它。我们将使用 setupQuickNodeStream 函数创建一个新的 Stream,并传入已验证的过滤函数。正如你所看到的,我们将开始和结束插槽作为参数传递。如果你想无限流,你可以传入 -1 作为结束插槽。

在你的 index.js 文件末尾添加以下代码。

index.js 继续

setupQuickNodeStream(1, 100000)

然后,使用以下命令运行脚本:

node index.js

然后,你应该在控制台中看到成功的消息和 Stream 的详细信息。

步骤 7: 管理 Streams

成功创建 Stream 后,你可以使用以下能力对其进行管理和交互:

这些操作使你能够动态管理你的数据 Streams,完全控制它们的生命周期。

结论

在本指南中,我们讲解了如何使用 QuickNode API 设置 Stream,实施过滤函数,并管理现有 Streams。通过利用这些 API 能力,你可以实现数据管道的自动化,自定义数据流,并确保在最少的人工干预下保持一致性。使用 QuickNode Streams,处理区块链数据变得无缝,使你能够专注于构建你的应用程序,而我们则处理实时数据管理的复杂性。

我们 ❤️ 反馈!

让我们知道 如果你有任何反馈或新主题的请求。我们很希望听到你的意见。

其他资源

  • 原文链接: quicknode.com/guides/qui...
  • 登链社区 AI 助手,为大家转译优秀英文文章,如有翻译不通的地方,还请包涵~
点赞 0
收藏 0
分享
本文参与登链社区写作激励计划 ,好文好收益,欢迎正在阅读的你也加入。

0 条评论

请先 登录 后评论
QuickNode
QuickNode
江湖只有他的大名,没有他的介绍。