本文深入探讨了区块链开发中的实时数据处理,比较了WebSocket与QuickNode Streams的异同。文章提供了Streams的详细介绍,包括其关键特性、如何使用自定义过滤器和函数来处理区块链数据、性能比较以及使用场景。通过对WebSocket的局限性和Streams的优势进行分析,读者可以根据自身应用的需求选择合适的解决方案。
在区块链开发中,实时数据对于构建响应式、交互式应用程序至关重要。传统上,开发者使用 WebSocket 来处理实时通信,尽管 WebSocket 在许多场景下有效,但在应用程序发展壮大时,它存在的局限可能会使得可扩展性和可靠性挑战重重。这就是 Streams 的用武之地。
QuickNode 的 Streams 是一个专门设计的解决方案,旨在应对大规模处理实时和历史区块链数据的挑战,同时提供附加功能以简化你的开发过程。
在本指南中,我们将探索 WebSocket 与 Streams 之间的相似点和差异,并帮助你理解 Streams 如何解决开发者在使用 WebSocket 时常遇到的一些问题。到最后,你将掌握做出明智决策所需的知识,以选择最适合你的应用程序实时数据需求的解决方案。
WebSocket 是一种通信协议,提供客户端和服务器之间的实时双向数据传输。与传统的 HTTP 请求不同,WebSocket 维护持久连接,允许无需持续发起请求就能进行连续数据交换。
在区块链环境中,WebSocket 通常用于订阅实时事件,如新块、交易或智能合约事件。它们能够实现低延迟通信,非常适合需要快速响应区块链事件的应用程序。
以下是简化的过程:
Streams 是一个区块链数据流和 ETL(提取、转换、加载)服务,旨在使实时和历史区块链数据对开发者易于访问。它允许你可靠地将实时区块链数据流或回填历史数据到你想要的目的地,无论它是 API(例如,Webhook)、兼容 S3 的存储桶还是数据库(例如,PostgreSQL、Snowflake)。
Streams 旨在简化区块链数据的管理,无论是实时还是历史,允许开发者专注于其应用逻辑。
带过滤器的 Streams 让你仅接收所需的区块链数据,减少数据量和成本。通过 JavaScript 过滤器,你可以在数据到达目的地之前对其进行修改。过滤器还包括对 关键值存储 的访问,允许你存储来自 Stream 的数据并通过 REST API 访问。
示例过滤函数
function main(stream) {
const data = stream.data
var numberDecimal = parseInt(data[0].number, 16);
var filteredData = {
hash: data[0].hash,
number: numberDecimal
};
return filteredData;
}
此过滤器确保你仅接收区块编号和区块哈希,而不是整个区块数据集。你可以在 此处 找到其他多个过滤器示例(例如,ERC-20 转账、跟踪特定地址等)。
虽然 Streams 提供实时和历史区块链数据,将其与 Functions 结合可以使你动态处理和增强这些数据。无论你是在创建 API 还是在区块链数据上执行复杂操作,Functions 都能增加另一层灵活性和自定义性,无需管理服务器。
以下是 Functions 与 Streams 的完美互补原因:
API 准备就绪:Functions 会自动曝光为 API,使你可以轻松将其集成到前端或其他服务中。你可以将实时区块链数据转换为立即可用的 API。
无服务器灵活性:使用 Functions,你可以在无需担心管理服务器或扩展的情况下,在区块链数据上执行复杂操作。Functions 随着需求自我扩展,这意味着你只需支付所使用的部分。
外部库支持:Functions 使你可以在运行时环境中访问外部库,包括与 web3 相关的和实用工具相关的库。查看 此处 的完整支持库列表。
过滤和转换数据:Functions 允许你从 Streams 中过滤特定数据,如交易或事件,并将编码的区块链数据转换为适合你应用需求的格式。
增强数据:需要在将数据路由到目标之前增强数据吗?Functions 使你能够进行额外的 API 调用或检索补充数据,在将区块链数据传递之前提供额外的上下文。
关键值存储:访问 关键值存储,使你能根据 Functions 的操作存储和检索数据。
以下是 Streams 和 Functions 如何协同工作的示例。有关详细的 Functions 库,请单击 此处。
Defi 或 NFT 平台的自定义 API:通过结合 Streams 和 Functions,你可以创建提供实时或聚合数据的自定义 API,例如 DeFi 平台的最新交易统计数据或 NFT 交易的实时更新。
自动通知:设置一个 Stream 来监控重要的区块链事件,并使用一个 Function 来实时触发通知,将其路由到如 PagerDuty、Discord 或 Google Sheets 等服务。这些用例在 Functions 库 中得到了很好的体现,在那里你会找到类似 Stream 到 Discord 或 Stream 到 Google Sheets 的示例。
数据分析:通过分析 Stream 事件实时处理区块链数据,将结果存储在关键值存储中,并在你需要进一步处理或通过 REST API 提供服务时检索它们。此工作流非常适合跟踪和提供指标及交易或构建自定义数据增强的 API 解决方案。
Streams 基于发布-订阅模型运作,但具有额外的可靠性和功能:
在本节中,我们将逐个主题比较 WebSocket 和 Streams,逐一细分它们的关键功能和使用场景,重点关注 WebSocket 的常见问题。
特征 | WebSocket | Streams |
---|---|---|
数据传递模型 | 订阅驱动模型,客户端维持开放连接。 | 推送驱动模型,数据发送到指定的目的地而无需维持持续连接。 |
数据类型和范围 | 设计用于实时数据。 | 同时支持实时和历史数据。 |
历史数据 | 需要单独的 API 调用来获取历史数据。 | 支持实时数据的回填和历史数据。 |
可靠性和保证 | 尽力而为交付。连接中断时数据可能会丢失。 | 保证交付,确保即使在断开连接的情况下也不会丢失数据。 |
数据转换和过滤 | 限于客户端对接收数据的过滤。 | 强大的服务器端过滤和转换以及通过 Functions 的增强能力。 |
区块链重组处理 | 不支持链重组的内置处理。 | 自动检测和处理区块链重组。 |
多链支持 | 通常需要为每个区块链建立单独的连接。 | 需要多个 Streams 处理多个链,但提供易于设置的工作流和重复使用的支持。 |
目的地灵活性 | 数据直接由客户端应用接收。 | 支持多种目的地类型,包括 Webhook、S3 桶和数据库,并且通过 Functions 提供额外的路由逻辑。 |
双向通信 | 适合需要双向通信的应用。 | 设计为基于推送的单向数据流,即面向多个目的地。 |
压缩 | 没有内置的压缩功能。 | Streams 提供内置数据压缩,显著减少某些目的地的有效负载大小和成本。 |
定价 | 基于开放连接或请求数量进行定价。 | 基于处理和交付的数据量进行定价。 |
API 准备就绪 | 没有内置的 API。 | 自动曝光为 API。 |
性能考虑 | WebSocket | Streams |
---|---|---|
延迟 | 通常在小规模、简单用例下提供较低延迟。 | 针对高流量数据场景进行了优化,可以轻松处理每秒数千条消息。 |
可扩展性和吞吐量 | 受到打开连接数量的限制。 | 高吞吐量(每秒可超过 3000 条消息)。 |
资源利用 | 客户端需要维护开放连接并处理所有传入数据。 | 将大部分工作转移到 QuickNode 的基础设施上,减少客户端资源消耗。 |
示例使用场景 | WebSocket | Streams |
---|---|---|
交互式应用 | 适合依赖双向通信的交互式应用,如游戏或协作工具。 | 专注于单向数据流。 |
数据分析应用 | 不适用,因为 WebSocket 不支持历史数据。 | 适合需要实时和历史数据的数据分析应用。 |
实时警报 | 适合实时警报系统。 | 适合实时警报系统。 |
区块链浏览器 | 不适用,因为缺乏历史数据,并且不支持内置的重组处理。 | 适合浏览器,因为它们支持历史数据并能处理区块链重组。 |
自定义 API | 实现复杂逻辑和实时更新较为困难。 | 使用 Functions 从 Streams 中聚合、过滤和增强数据,构建自定义 API。 |
尽管 WebSocket 是实时通信的强大工具,但它也存在一些挑战:
由于网络问题或服务器故障,WebSocket 可能会意外断开。重新连接并确保在停机期间没有数据丢失需要手动实现,例如重连逻辑和排队错过的事件。Streams 提供内置的可靠性,有可配置的超时设置(例如:30 秒阈值)、可自定义的重试次数及在指定重试次数后暂停机制。
建议
如果你需要处理这种断开连接,请查看 处理 WebSocket 断开连接 文章。
由于 WebSocket 通常交付超过所需的更多数据,可能会导致带宽使用过度。开发者必须在客户端过滤这些数据,这在高流量应用中特别低效。Streams 提供 服务器端过滤。
WebSocket 通常仅提供实时数据。如果你需要将实时数据与历史数据同步(例如,在区块链浏览器中),那么为历史和实时数据管理单独的数据管道会增加你的架构复杂性。Streams 提供 实时和历史数据。
现在我们已经讨论了一些开发者在使用 WebSocket 时面临的常见挑战,接下来我们将快速探讨如何实现 WebSocket 和 Streams。
下面的代码是代码示例,旨在为你快速概述每种设置的工作方式。请注意,这些并不是完整的、生产就绪的解决方案,而是帮助你快速启动 WebSocket 和 Streams 的起点。
确保将占位符 YOUR-QUICKNODE-WSS-URL
和 YOUR-QUICKNODE-API-KEY
替换为你自己的。
此示例演示如何使用 ethers.js 设置简单的 WebSocket 连接,以监听以太坊网络上的新块。你需要你的 QuickNode WebSocket 端点,可以从 QuickNode 仪表板 获取。
const { WebSocketProvider } = require('ethers')
const wsProvider = new WebSocketProvider('YOUR-QUICKNODE-WSS-URL')
wsProvider.on('block', blockNumber => {
console.log('新块:', blockNumber)
})
// 处理断开连接
wsProvider.websocket.on('close', () => {
console.log('WebSocket 断开连接')
// 在此实现重连逻辑
})
此基本示例展示了如何实时监听新块。如果 WebSocket 连接断开,你需要实现自定义的重连逻辑,因为 WebSocket 不会自动处理这一点。
在本示例中,我们将使用 QuickNode Streams API 设置一个 Stream。该 Stream 监听以太坊主网并将数据发送到指定的 webhook。你需要配置你的 API 密钥、webhook URL 和用于流式传输的区块范围。
在下面的代码中,我们首先定义一个 main()
函数,这是我们的过滤函数,处理数据并将区块编号从十六进制转换为十进制格式。逻辑的核心围绕通过 QuickNode 的 API 创建一个 Stream,但在创建 Stream 之前,我们使用 testFilterFunction()
进行预检查。
testFilterFunction()
向 QuickNode API 发送过滤函数的 base64 编码版本(在这种情况下是 main()
函数),以验证其行为。它检查过滤器与示例区块数据的匹配,以确保它正常工作。
如果测试通过,setupQuickNodeStream()
函数将继续创建带有已验证过滤器的新 Stream。如果测试失败,将不会创建 Stream,并记录错误。
点击这里以展开代码示例
const axios = require("axios");
const QUICKNODE_API_KEY = "YOUR-QUICKNODE-API-KEY"; // 替换为你的实际 API 密钥
const WEBHOOK_URL = "YOUR-WEBHOOK-URL"; // 替换为你的 ngrok URL
function main(stream) {
try {
const data = stream.data
const numberDecimal = parseInt(data[0].number, 16);
const filteredData = {
hash: data[0].hash,
number: numberDecimal,
};
return filteredData;
} catch (error) {
return { error: error.message, stack: error.stack };
}
}
async function testFilterFunction(base64FilterFunction) {
const hash =
"0xb72704063570e4b5a5f972f380fad5e43e1e8c9a1b0e36f204b9282c89adc677"; // 测试区块的哈希
const number = "17811625"; // 测试区块的编号
let data = JSON.stringify({
network: "ethereum-mainnet",
dataset: "block",
filter_function: base64FilterFunction,
block: number,
});
try {
const response = await axios.post(
"https://api.quicknode.com/streams/rest/v1/streams/test_filter",
data,
{
headers: {
accept: "application/json",
"Content-Type": "application/json",
"x-api-key": QUICKNODE_API_KEY,
},
}
);
if (
response.status === 201 &&
response.data.hash == hash &&
response.data.number == number
) {
console.log("过滤函数测试成功!");
return true;
} else {
console.error("测试过滤函数时出错:", response.status);
return false;
}
} catch (error) {
console.error(
"测试过滤函数时出错:",
error.response ? error.response.data : error.message
);
throw error;
}
}
async function setupQuickNodeStream(startSlot, endSlot) {
const filterFunctionString = main.toString();
const base64FilterFunction =
Buffer.from(filterFunctionString).toString("base64");
// 首先测试过滤函数
const testResult = await testFilterFunction(base64FilterFunction);
if (!testResult) {
console.error("过滤函数失败。Stream 未创建。");
return;
}
console.log("过滤函数通过。开始创建 Stream。");
const streamConfig = {
name: "通过 API 测试的 Streams",
network: "ethereum-mainnet",
dataset: "block",
filter_function: base64FilterFunction,
region: "usa_east",
start_range: startSlot,
end_range: endSlot,
dataset_batch_size: 1,
include_stream_metadata: "header",
destination: "webhook",
fix_block_reorgs: 0,
keep_distance_from_tip: 0,
destination_attributes: {
url: WEBHOOK_URL,
compression: "none",
headers: {
"Content-Type": "application/json",
},
max_retry: 3,
retry_interval_sec: 1,
post_timeout_sec: 10,
},
status: "active",
};
try {
const response = await axios.post(
"https://api.quicknode.com/streams/rest/v1/streams",
streamConfig,
{
headers: {
accept: "application/json",
"Content-Type": "application/json",
"x-api-key": QUICKNODE_API_KEY,
},
}
);
console.log("Stream 创建成功:", response.data);
return response.data.id;
} catch (error) {
console.error(
"创建 Stream 时出错:",
error.response ? error.response.data : error.message
);
throw error;
}
}
setupQuickNodeStream(1, 5).then((res) => console.log(res));
此示例展示了如何通过编程创建一个 Stream。你可以通过调整过滤器和目的地设置进一步自定义。
你也可以直接从 QuickNode 仪表板创建和管理 Streams,而无需编写代码。以下是在仪表板上设置 Stream 的方法:
这种方法允许你快速部署一个 Stream,而无需任何代码。
尽管 WebSocket 在某些情况下可能适用于实时区块链数据,但其通常需要高额的维护劳动,尤其是在处理重连逻辑和断开连接时。另一方面,Streams 提供了一个更健壮、可扩展且功能丰富的替代方案,特别适合复杂或高流量的应用。
通过理解两者的优缺点,你可以对哪个解决方案最适合你的项目做出明智决定。无论你是选择 WebSocket 以简化开发,还是选择 Streams 以增强可靠性和可扩展性,QuickNode 都具备支持你所有需求的工具和基础设施。
如有任何问题、建议或想法,请直接 联系我。同时,通过关注我们的 Twitter 并加入我们的 Discord 和 Telegram 公告频道 来了解最新动态。
告知我们你是否有任何反馈或新的主题请求。我们期待你的来信。
- 原文链接: quicknode.com/guides/qui...
- 登链社区 AI 助手,为大家转译优秀英文文章,如有翻译不通的地方,还请包涵~
如果觉得我的文章对您有用,请随意打赏。你的支持将鼓励我继续创作!