概述
为区块链数据创建端到端的数据管道可能会很复杂。这就是 Streams 的用武之地。Streams 是一个实时区块链数据解决方案,允许用户将历史和实时区块链数据检索到多个目的地(例如,Webhook、PostgreSQL、Snowflake 等)。Streams 不仅可以将区块链数据发送到你需要的任何地方,你还可以在数据到达目的地之前对其进行过滤,避免解析或为不需要的数据付费。
在本指南中,我们将展示如何利用 Streams 检索实时区块链数据,并使用 Filters 选择性地解析你需要的数据并将其发送到 Webhook 目的地。我们将通过过滤 Uniswap V3 上的交换并将其聚合到一个 JSON 文件中来展示这一点。这对于具有指定起始和结束块的 Streams 特别有效,将所有收据合并到一个文件中。
你将做什么
- 在 QuickNode 上创建一个 Stream
- 过滤传入的 Stream 数据并将其发送到 Webhook 目的地
- 将来自 Webhook 目的地的 Uniswap V3 交换聚合到一个 JSON 文件中
你需要什么
准备 Webhook
设置项目目录
导航到你想要创建项目的目录,并使用 Python 创建以下虚拟环境:
python -m venv venv
source venv/bin/activate
安装 Flask
然后,安装 Flask,这是一个 Web 开发框架,我们将使用它来设置一个 API 路由,我们的 Stream 将把区块链数据发送到该路由:
pip install Flask
准备 Python 脚本
接下来,在同一目录中创建一个名为 app.py 的文件,并输入以下代码:
from flask import Flask, request, jsonify
import json
import logging
import os
app = Flask(__name__)
logging.basicConfig(level=logging.DEBUG)
receipts_file = 'combined_receipts.json'
all_receipts = []
def load_data():
"""从文件中加载现有的收据数据到内存中。"""
global all_receipts
if os.path.exists(receipts_file):
try:
with open(receipts_file, 'r') as f:
all_receipts = json.load(f)
logging.info("数据成功从文件加载。")
except Exception as e:
logging.error(f"加载数据时出错: {str(e)}")
@app.route('/webhook', methods=['POST'])
def webhook():
"""处理传入的 webhook 请求以处理和保存收据数据。"""
logging.info("收到请求!")
try:
if request.is_json:
data = request.get_json()
logging.debug(f"接收到的数据: {data}")
if data and 'data' in data:
for item in data['data']:
if 'filteredReceipts' in item and item['filteredReceipts']:
all_receipts.extend(item['filteredReceipts'])
logging.debug(f"处理的收据: {item['filteredReceipts']}")
else:
logging.warning("在 JSON 中未找到 'data' 键或 'data' 键为空")
else:
logging.error("请求内容类型不是 JSON")
return jsonify({"error": "内容类型必须为 'application/json'"}), 200
save_data()
return jsonify({"status": "数据已接收并保存"}), 200
except Exception as e:
logging.error(f"处理请求时出错: {str(e)}")
return jsonify({"error": str(e)}), 500
def save_data():
"""将合并的收据数据保存到文件中。"""
global receipts_file
try:
if os.path.exists(receipts_file):
with open(receipts_file, 'r') as f:
existing_receipts = json.load(f)
else:
existing_receipts = []
combined_receipts = existing_receipts + all_receipts
with open(receipts_file, 'w') as f:
json.dump(combined_receipts, f, indent=4)
logging.info("数据成功保存到文件中。")
all_receipts.clear()
except Exception as e:
logging.error(f"保存数据时出错: {str(e)}")
if __name__ == '__main__':
load_data()
app.run(host='0.0.0.0', port=8000)
让我们回顾一下代码。
首先,我们导入所需的依赖项,如 Flask
、json
、logging
和 os
。然后,我们初始化一个 Flask 实例和一个空数组 all_receipts
。接着,我们使用 @app.route
命令定义一个名为 /webhook
的路由,并将其设置为 POST
路由。在 /webhook
路由中,我们将检查请求内容类型是否为 application/json,然后查看响应的过滤收据并将其添加到数组中(即 all_receipts)。处理完数据后,save_data
函数将累积的收据写入名为 combined_receipts.json
的文件中,并清除列表以备将来数据使用。如果有效负载不是 JSON 格式或缺少必要数据,脚本将返回错误。
启动 Flask 服务器
现在,要在同一目录中启动服务器,请运行以下命令:
python app.py
这将在 localhost
的 8000
端口上启动你的服务器。
设置 ngrok
下一步是使我们的 Flask 服务器在远程服务器上运行,为此我们将使用 ngrok。首先,你需要确保已安装 ngrok 并且你的 ngrok 账户已使用你的 authtoken 进行身份验证。
你可以使用以下 ngrok 命令设置身份验证:
ngrok authtoken [your_authtoken_here]
身份验证后,启动远程服务器:
ngrok http 8000
你将看到类似以下的输出:

显示的 URL 将 API 调用转发到你的 localhost。现在我们的 Flask 服务器可以公开访问,我们可以继续进行下一步,即在 QuickNode 上创建我们的 Stream。
设置 Stream
导航到 QuickNode Streams 页面并点击 Create Stream 按钮。
在 Stream settings 页面上,将 Stream 对齐到以下配置:
- 链: Ethereum
- 网络: Mainnet
- 数据集: Receipts
- Stream 开始: 在此处输入起始块号
- Stream 结束: 在此处输入结束块号
- Stream 有效负载: 在流式传输之前修改 Stream
然后,使用以下代码过滤 Stream。该代码过滤包含 Uniswap V3 交换的交易收据。你可以根据需要修改过滤器。
function main(stream) {
try {
var data = stream.data[0];
var filteredReceipts = [];
data.forEach(receipt => {
let relevantLogs = receipt.logs.filter(log =>
log.topics[0] === "0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67"
);
if (relevantLogs.length > 0) {
filteredReceipts.push(receipt);
}
});
return {
filteredReceipts
};
} catch (e) {
return {error: e.message};
}
}
你可以点击 Run Test 按钮以确保你的过滤器语法正确。你将看到一个示例响应,该响应在 Receipts 数据集上测试了你的过滤器代码。
为了获取历史数据,你不需要启用重组处理选项,但如果你想流式传输实时数据,可以启用 Latest block delay 或 Restream on reorg 功能来处理重组。你可以在此处了解更多关于重组的信息 here。页面完成后,点击 Next,你需要配置你的 Stream 目的地。
然后,在 Stream destination 页面上,将目的地设置为 Webhook 并配置它以与以下详细信息对齐。如果未提及某个字段,可以保留原样。
填写详细信息后,点击 Test Destination 按钮以验证设置。这将发送来自 Streams 的真实数据样本(原始或过滤,取决于你的配置)以及你定义的任何自定义头。
最后,点击 Create a Stream 按钮。Stream 启动后,检查项目目录中的 combined_receipts.json
文件,查看数据是否正确保存。
你将看到类似于以下的数据:
- 示例 combined_receipts.json```
[\
{\
"status": "0x1",\
"cumulativeGasUsed": "0x68c6f",\
"logs": [\
{\
"address": "0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48",\
"topics": [\
"0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef",\
"0x00000000000000000000000088e6a0c2ddd26feeb64f039a2c41296fcb3f5640",\
"0x0000000000000000000000009def7cde171841a9f0724124ca0b01a622d749e4"\
],\
"data": "0x000000000000000000000000000000000000000000000000000000258d374bbb",\
"blockHash": "0xc905fe077d226558d3b6a2bc639884b9781b020437eeaaf523553e82adf6b644",\
"blockNumber": "0x130442c",\
"blockTimestamp": "0x6650a6bf",\
"transactionHash": "0x6fe22c82b1cd99f7fd9182e8eb868553370849105449004b5cf9c74da5f4c443",\
"transactionIndex": "0x2",\
"logIndex": "0xd",\
"removed": false\
},\
{\
"address": "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2",\
"topics": [\
"0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef",\
"0x0000000000000000000000009def7cde171841a9f0724124ca0b01a622d749e4",\
"0x00000000000000000000000088e6a0c2ddd26feeb64f039a2c41296fcb3f5640"\
],\
"data": "0x000000000000000000000000000000000000000000000002594b71a4d5ee53c1",\
"blockHash": "0xc905fe077d226558d3b6a2bc639884b9781b020437eeaaf523553e82adf6b644",\
"blockNumber": "0x130442c",\
"blockTimestamp": "0x6650a6bf",\
"transactionHash": "0x6fe22c82b1cd99f7fd9182e8eb868553370849105449004b5cf9c74da5f4c443",\
"transactionIndex": "0x2",\
"logIndex": "0xe",\
"removed": false\
},\
{\
"address": "0x88e6a极简极繁:0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2",\
"topics": [\
"0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67",\
"0x0000000000000000000000009def7cde171841a9f0724124ca0b01a622d749e4",\
"极简极繁:0x0000000000000000000000009def7cde171841a9f0724124ca0b01a622d749e4"\
],\
"data": "0xffffffffffffffffffffffffffffffffffffffffffffffffffffffda72c8b445000000000000000000000000000000000000000000000002594b71a4d5ee53c1000000000000000000000000000000000000400388e482fbc934c0066081e9f0000000000000000000000000000000000000000000000000f1a828618b323899000000000000000000000000000000极简极繁:0x000000000000000000000000000000000000000000000000000000000002f62f",\
"blockHash": "0xc905fe077d226558d3b6a2bc639884b9781b020437eeaaf523553极简极繁:e82adf6b644",\
"blockNumber": "0x130442c",\
"blockTimestamp": "0x6650a6bf",\
"transactionHash": "0x6fe22c82b1cd99f7fd9182e8eb868553370849105449004b5cf9c74da5f4c443",\
"transactionIndex": "0x2",\
"logIndex": "0xf",\
"removed": false\
}\
],\
"logsBloom": "0x
"type": "0x2",\
"transactionHash": "0x6fe22c82b1cd99f7fd9182e8eb868553370849105449004b5cf9c74da5f4c443",\
"transactionIndex": "0x2",\
"blockHash": "0xc905fe077d226558d3b6a2bc639884b9781b020437极简极繁:eeaaf523553e82adf6b644",\
"blockNumber": "极简极繁:0x130442c",\
"gasUsed": "0x1d9f7",\
"effectiveGasPrice": "0xa8246f9fc",\
"from": "0x93793bd1f3e35a0efd098c30e486a860a0ef7551",\
"to": "0x9def7cde171841a9f0724124ca0b01a622d749e4",\
"contractAddress": null\
},\
{\
"status": "0x1",\
"cumulativeGasUsed": "0xc62b8",\
"logs": [\
{\
"address": "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2",\
"topics": [\
"0xe1fffcc4923d04b559f4d29a8bfc6cda04eb5b0d3c460751c2402c5c5cc9109c",\
"0x0000000000000000000000003fc91a3afd70395cd496c647d5a6cc9d4b2b7fad"\
],\
"data": "0x0000000000000000000000000000000000000000000000000de0b6b3a7640000",\
"blockHash": "0xe1ec24b04014fb62f8742d8fc65ebe190957136cf7b0b02d70122e62d1461cc6",\
"blockNumber": "0x130442d",\
"blockTimestamp": "0x6650a6cb",\
"transactionHash": "0x6a35cbca8c9beae3b0305cb615bd16ce18af5624059c8158712d52ae4b30cba1",\
"transactionIndex": "0x2",\
"logIndex": "0xf",\
"removed": false\
},\
{\
"address": "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2",\
"topics": [\
"0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef",\
"0x0000000000000000000000003fc91a3afd70395cd496c647d5a6cc9d4b极简极繁:2b7fad",\
"0x000000000000000000000000ebfeb540282bbea9721c4d486f53c609b87f95da"\
],\
"data": "0x0000000000000000000000000000000000000000000000000853a0d2313c0000",\
"blockHash": "0xe1ec24b04014fb62f8742d8fc65ebe190957136cf7b0b02d70122e62d1461cc6",\
"blockNumber": "0x130442d",\
"blockTimestamp": "0x6650a6cb",\
"transactionHash": "0x6a35cbca8c9beae3b0305cb615bd16ce18af5624059c8158712d52ae4b30cba1",\
"transactionIndex": "0x2",\
"logIndex": "0x10",\
"removed": false\
},\
{\
"address": "0x66e564819340cc2f54abceb4e49941fa07e426b4",\
"topics": [\
"0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef",\
"0x000000000000000000000000ebfeb540282bbea9721c4d486f53c609b87f95da",\
"0x0000000000000000000000003fc91a3afd70395cd496c647d5a6cc9d4b2b7fad"\
],\
"data": "0x000000000000000000000000000000000000000000000000036697761f973fa4",\
"blockHash": "0xe1ec24b04014fb62f8742d8fc65ebe190957136cf7b0b02d70122e62d1461cc6",\
"blockNumber": "0x130442d",\
"blockTimestamp": "0x6650a6cb",\
"transactionHash": "0x6a35cbca8c9beae3b0305cb615bd16ce18af5624059c8158712d52ae4b30cba1",\
"transactionIndex": "0x2",\
"logIndex极简极繁:": "0x11",\
"removed": false\
},\
{\
"address": "0xebfeb540282bbea9721c4d486f53c609b87f95da",\
"topics": [\
"0x1c411e9a96e071241c2f21f7726b17ae89e3cab4c78be50e062b03a9fffbbad1"\
],\
"data": "0x00000000000000000000000000000000000000000000000050ea511347ec7722000000000000000000000000000000000000000000000000cdda1c0209b77814",\
"blockHash": "0xe1ec24b04014fb62f8742d8fc65ebe190957136cf7b0b02d70122e62d1461cc6",\
"blockNumber": "0x130442d",\
"blockTimestamp": "0x6650a6cb",\
"transactionHash": "0x6a35cbca8c9beae3b0305cb615bd16ce18af5624059c8158712d52ae4b30cba1",\
"transactionIndex": "0x2",\
"logIndex": "0x12",\
"removed": false\
},\
{\
"address": "0xebfeb540282bbea9721c4d486f53c609b87f95da",\
"topics": [\
"0xd78ad95fa46c994b6551d0da85fc275fe613ce37657fb8d5e3d130840159d822",\
"0x0000000000000000000000003fc91a3afd70395cd496c647d5a6cc9d4b2b7fad",\
"0x0000000000000000000000003fc91a3afd70395cd496c647d5a6cc9d4b2b7fad"\
],\
"data": "0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000853a0d2313c0000000000000000000000000000000000000000000000000000036697761f973fa40000000000000000000000000000000000000000000000000000000000000000",\
"blockHash": "0xe1ec24b04014fb62f8742d8fc65ebe190957136cf7b0b02d70122e62d1461cc6",\
"blockNumber": "0x130442d",\
"blockTimestamp": "0x6650极简极繁:a6cb",\
"transactionHash": "0x极简极繁:6a35cbca8c9beae3b0305cb615bd16ce18af5624059c8158712d52ae4b30cba1",\
"transactionIndex": "0x2",\
"logIndex": "0x13",\
"removed": false\
},\
{\
"address": "0x68bbed6a47194eff1cf514b50ea91895597fc91e",\
"topics": [\
"0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef",\
"0x0000000000000000000000002b8574482d62d8df670dfd3be15f2f092941284e",\
"0x0000000000000000000000003fc91a3afd70395cd496c647d5a6cc9d4b2b7fad"\
],\
"data": "0x0000000000000000000000000000000000000000000b4f18646f6874fb65b31d",\
"blockHash": "0xe1ec24b04014fb62f8742d8fc65ebe190957136cf7b0b02d70122e62d1461cc6",\
"blockNumber": "0x130442d",\
"blockTimestamp": "0x6650a6cb",\
"transactionHash": "0x6a35cbca8c9beae3b0305cb615bd16ce18af5624059c8158712d52ae4b30cba1",\
"transactionIndex": "0x2",\
"logIndex": "0x14",\
"removed": false\
},\
{\
"address": "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2极简极繁:",\
"topics": [\
"0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef",\
"0x0000000000000000000000003fc91a3afd70395cd496c647d5a6cc9d4b2b7fad",\
"0x0000000000000000000000002b8574482极简极繁:d62d8df670dfd3be15f2f092941284e"\
],\
"data": "0x00000000000000000000000000000000000000000000000004db732547630000",\
"blockHash": "0xe1ec24b04014fb62f8742d8fc65ebe190957136cf7b0b02d70122e62d1461cc6",\
"blockNumber": "0x130442d",\
"blockTimestamp": "0x6650a6cb",\
"transactionHash": "0x6a35cbca8c9beae3b0305cb615bd16ce18af5624059c815871