如何使用过滤器与流

  • QuickNode
  • 发布于 2024-05-29 20:45
  • 阅读 40

本文介绍了如何使用Streams和Filters构建端到端的区块链数据管道,包括如何创建Stream、过滤数据并将其发送到Webhook目的地,并通过Python脚本将Uniswap V3的交换数据聚合到一个JSON文件中。

概述

为区块链数据创建端到端的数据管道可能会很复杂。这就是 Streams 的用武之地。Streams 是一个实时区块链数据解决方案,允许用户将历史和实时区块链数据检索到多个目的地(例如,Webhook、PostgreSQL、Snowflake 等)。Streams 不仅可以将区块链数据发送到你需要的任何地方,你还可以在数据到达目的地之前对其进行过滤,避免解析或为不需要的数据付费。

在本指南中,我们将展示如何利用 Streams 检索实时区块链数据,并使用 Filters 选择性地解析你需要的数据并将其发送到 Webhook 目的地。我们将通过过滤 Uniswap V3 上的交换并将其聚合到一个 JSON 文件中来展示这一点。这对于具有指定起始和结束块的 Streams 特别有效,将所有收据合并到一个文件中。

你将做什么

  • 在 QuickNode 上创建一个 Stream
  • 过滤传入的 Stream 数据并将其发送到 Webhook 目的地
  • 将来自 Webhook 目的地的 Uniswap V3 交换聚合到一个 JSON 文件中

你需要什么

准备 Webhook

设置项目目录

导航到你想要创建项目的目录,并使用 Python 创建以下虚拟环境:

python -m venv venv
source venv/bin/activate

安装 Flask

然后,安装 Flask,这是一个 Web 开发框架,我们将使用它来设置一个 API 路由,我们的 Stream 将把区块链数据发送到该路由:

pip install Flask

准备 Python 脚本

接下来,在同一目录中创建一个名为 app.py 的文件,并输入以下代码:

from flask import Flask, request, jsonify
import json
import logging
import os

app = Flask(__name__)
logging.basicConfig(level=logging.DEBUG)

receipts_file = 'combined_receipts.json'
all_receipts = []

def load_data():
    """从文件中加载现有的收据数据到内存中。"""
    global all_receipts
    if os.path.exists(receipts_file):
        try:
            with open(receipts_file, 'r') as f:
                all_receipts = json.load(f)
            logging.info("数据成功从文件加载。")
        except Exception as e:
            logging.error(f"加载数据时出错: {str(e)}")

@app.route('/webhook', methods=['POST'])
def webhook():
    """处理传入的 webhook 请求以处理和保存收据数据。"""
    logging.info("收到请求!")
    try:
        if request.is_json:
            data = request.get_json()
            logging.debug(f"接收到的数据: {data}")
            if data and 'data' in data:
                for item in data['data']:
                    if 'filteredReceipts' in item and item['filteredReceipts']:
                        all_receipts.extend(item['filteredReceipts'])
                        logging.debug(f"处理的收据: {item['filteredReceipts']}")
            else:
                logging.warning("在 JSON 中未找到 'data' 键或 'data' 键为空")
        else:
            logging.error("请求内容类型不是 JSON")
            return jsonify({"error": "内容类型必须为 'application/json'"}), 200

        save_data()
        return jsonify({"status": "数据已接收并保存"}), 200
    except Exception as e:
        logging.error(f"处理请求时出错: {str(e)}")
        return jsonify({"error": str(e)}), 500

def save_data():
    """将合并的收据数据保存到文件中。"""
    global receipts_file
    try:
        if os.path.exists(receipts_file):
            with open(receipts_file, 'r') as f:
                existing_receipts = json.load(f)
        else:
            existing_receipts = []

        combined_receipts = existing_receipts + all_receipts

        with open(receipts_file, 'w') as f:
            json.dump(combined_receipts, f, indent=4)

        logging.info("数据成功保存到文件中。")
        all_receipts.clear()
    except Exception as e:
        logging.error(f"保存数据时出错: {str(e)}")

if __name__ == '__main__':
    load_data()
    app.run(host='0.0.0.0', port=8000)

让我们回顾一下代码。

首先,我们导入所需的依赖项,如 Flaskjsonloggingos。然后,我们初始化一个 Flask 实例和一个空数组 all_receipts。接着,我们使用 @app.route 命令定义一个名为 /webhook 的路由,并将其设置为 POST 路由。在 /webhook 路由中,我们将检查请求内容类型是否为 application/json,然后查看响应的过滤收据并将其添加到数组中(即 all_receipts)。处理完数据后,save_data 函数将累积的收据写入名为 combined_receipts.json 的文件中,并清除列表以备将来数据使用。如果有效负载不是 JSON 格式或缺少必要数据,脚本将返回错误。

启动 Flask 服务器

现在,要在同一目录中启动服务器,请运行以下命令:

python app.py

这将在 localhost8000 端口上启动你的服务器。

设置 ngrok

下一步是使我们的 Flask 服务器在远程服务器上运行,为此我们将使用 ngrok。首先,你需要确保已安装 ngrok 并且你的 ngrok 账户已使用你的 authtoken 进行身份验证。

你可以使用以下 ngrok 命令设置身份验证:

ngrok authtoken [your_authtoken_here]

身份验证后,启动远程服务器:

ngrok http 8000

你将看到类似以下的输出:

ngrok remote 8000

显示的 URL 将 API 调用转发到你的 localhost。现在我们的 Flask 服务器可以公开访问,我们可以继续进行下一步,即在 QuickNode 上创建我们的 Stream。

设置 Stream

导航到 QuickNode Streams 页面并点击 Create Stream 按钮。

Stream settings 页面上,将 Stream 对齐到以下配置:

  • : Ethereum
  • 网络: Mainnet
  • 数据集: Receipts
  • Stream 开始: 在此处输入起始块号
  • Stream 结束: 在此处输入结束块号
  • Stream 有效负载: 在流式传输之前修改 Stream

然后,使用以下代码过滤 Stream。该代码过滤包含 Uniswap V3 交换的交易收据。你可以根据需要修改过滤器。

function main(stream) {
        try {
            var data = stream.data[0];
            var filteredReceipts = [];
            data.forEach(receipt => {
                let relevantLogs = receipt.logs.filter(log =>
                    log.topics[0] === "0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67"
                );
                if (relevantLogs.length > 0) {
                    filteredReceipts.push(receipt);
                }
            });

            return {
                filteredReceipts
            };
        } catch (e) {
            return {error: e.message};
        }
    }

你可以点击 Run Test 按钮以确保你的过滤器语法正确。你将看到一个示例响应,该响应在 Receipts 数据集上测试了你的过滤器代码。

为了获取历史数据,你不需要启用重组处理选项,但如果你想流式传输实时数据,可以启用 Latest block delayRestream on reorg 功能来处理重组。你可以在此处了解更多关于重组的信息 here。页面完成后,点击 Next,你需要配置你的 Stream 目的地。

然后,在 Stream destination 页面上,将目的地设置为 Webhook 并配置它以与以下详细信息对齐。如果未提及某个字段,可以保留原样。

填写详细信息后,点击 Test Destination 按钮以验证设置。这将发送来自 Streams 的真实数据样本(原始或过滤,取决于你的配置)以及你定义的任何自定义头。

最后,点击 Create a Stream 按钮。Stream 启动后,检查项目目录中的 combined_receipts.json 文件,查看数据是否正确保存。

你将看到类似于以下的数据:

  • 示例 combined_receipts.json``` [\ {\ "status": "0x1",\ "cumulativeGasUsed": "0x68c6f",\ "logs": [\ {\ "address": "0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48",\ "topics": [\ "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef",\ "0x00000000000000000000000088e6a0c2ddd26feeb64f039a2c41296fcb3f5640",\ "0x0000000000000000000000009def7cde171841a9f0724124ca0b01a622d749e4"\ ],\ "data": "0x000000000000000000000000000000000000000000000000000000258d374bbb",\ "blockHash": "0xc905fe077d226558d3b6a2bc639884b9781b020437eeaaf523553e82adf6b644",\ "blockNumber": "0x130442c",\ "blockTimestamp": "0x6650a6bf",\ "transactionHash": "0x6fe22c82b1cd99f7fd9182e8eb868553370849105449004b5cf9c74da5f4c443",\ "transactionIndex": "0x2",\ "logIndex": "0xd",\ "removed": false\ },\ {\ "address": "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2",\ "topics": [\ "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef",\ "0x0000000000000000000000009def7cde171841a9f0724124ca0b01a622d749e4",\ "0x00000000000000000000000088e6a0c2ddd26feeb64f039a2c41296fcb3f5640"\ ],\ "data": "0x000000000000000000000000000000000000000000000002594b71a4d5ee53c1",\ "blockHash": "0xc905fe077d226558d3b6a2bc639884b9781b020437eeaaf523553e82adf6b644",\ "blockNumber": "0x130442c",\ "blockTimestamp": "0x6650a6bf",\ "transactionHash": "0x6fe22c82b1cd99f7fd9182e8eb868553370849105449004b5cf9c74da5f4c443",\ "transactionIndex": "0x2",\ "logIndex": "0xe",\ "removed": false\ },\ {\ "address": "0x88e6a极简极繁:0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2",\ "topics": [\ "0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67",\ "0x0000000000000000000000009def7cde171841a9f0724124ca0b01a622d749e4",\ "极简极繁:0x0000000000000000000000009def7cde171841a9f0724124ca0b01a622d749e4"\ ],\ "data": "0xffffffffffffffffffffffffffffffffffffffffffffffffffffffda72c8b445000000000000000000000000000000000000000000000002594b71a4d5ee53c1000000000000000000000000000000000000400388e482fbc934c0066081e9f0000000000000000000000000000000000000000000000000f1a828618b323899000000000000000000000000000000极简极繁:0x000000000000000000000000000000000000000000000000000000000002f62f",\ "blockHash": "0xc905fe077d226558d3b6a2bc639884b9781b020437eeaaf523553极简极繁:e82adf6b644",\ "blockNumber": "0x130442c",\ "blockTimestamp": "0x6650a6bf",\ "transactionHash": "0x6fe22c82b1cd99f7fd9182e8eb868553370849105449004b5cf9c74da5f4c443",\ "transactionIndex": "0x2",\ "logIndex": "0xf",\ "removed": false\ }\ ],\ "logsBloom": "0xtype": "0x2",\ "transactionHash": "0x6fe22c82b1cd99f7fd9182e8eb868553370849105449004b5cf9c74da5f4c443",\ "transactionIndex": "0x2",\ "blockHash": "0xc905fe077d226558d3b6a2bc639884b9781b020437极简极繁:eeaaf523553e82adf6b644",\ "blockNumber": "极简极繁:0x130442c",\ "gasUsed": "0x1d9f7",\ "effectiveGasPrice": "0xa8246f9fc",\ "from": "0x93793bd1f3e35a0efd098c30e486a860a0ef7551",\ "to": "0x9def7cde171841a9f0724124ca0b01a622d749e4",\ "contractAddress": null\ },\ {\ "status": "0x1",\ "cumulativeGasUsed": "0xc62b8",\ "logs": [\ {\ "address": "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2",\ "topics": [\ "0xe1fffcc4923d04b559f4d29a8bfc6cda04eb5b0d3c460751c2402c5c5cc9109c",\ "0x0000000000000000000000003fc91a3afd70395cd496c647d5a6cc9d4b2b7fad"\ ],\ "data": "0x0000000000000000000000000000000000000000000000000de0b6b3a7640000",\ "blockHash": "0xe1ec24b04014fb62f8742d8fc65ebe190957136cf7b0b02d70122e62d1461cc6",\ "blockNumber": "0x130442d",\ "blockTimestamp": "0x6650a6cb",\ "transactionHash": "0x6a35cbca8c9beae3b0305cb615bd16ce18af5624059c8158712d52ae4b30cba1",\ "transactionIndex": "0x2",\ "logIndex": "0xf",\ "removed": false\ },\ {\ "address": "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2",\ "topics": [\ "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef",\ "0x0000000000000000000000003fc91a3afd70395cd496c647d5a6cc9d4b极简极繁:2b7fad",\ "0x000000000000000000000000ebfeb540282bbea9721c4d486f53c609b87f95da"\ ],\ "data": "0x0000000000000000000000000000000000000000000000000853a0d2313c0000",\ "blockHash": "0xe1ec24b04014fb62f8742d8fc65ebe190957136cf7b0b02d70122e62d1461cc6",\ "blockNumber": "0x130442d",\ "blockTimestamp": "0x6650a6cb",\ "transactionHash": "0x6a35cbca8c9beae3b0305cb615bd16ce18af5624059c8158712d52ae4b30cba1",\ "transactionIndex": "0x2",\ "logIndex": "0x10",\ "removed": false\ },\ {\ "address": "0x66e564819340cc2f54abceb4e49941fa07e426b4",\ "topics": [\ "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef",\ "0x000000000000000000000000ebfeb540282bbea9721c4d486f53c609b87f95da",\ "0x0000000000000000000000003fc91a3afd70395cd496c647d5a6cc9d4b2b7fad"\ ],\ "data": "0x000000000000000000000000000000000000000000000000036697761f973fa4",\ "blockHash": "0xe1ec24b04014fb62f8742d8fc65ebe190957136cf7b0b02d70122e62d1461cc6",\ "blockNumber": "0x130442d",\ "blockTimestamp": "0x6650a6cb",\ "transactionHash": "0x6a35cbca8c9beae3b0305cb615bd16ce18af5624059c8158712d52ae4b30cba1",\ "transactionIndex": "0x2",\ "logIndex极简极繁:": "0x11",\ "removed": false\ },\ {\ "address": "0xebfeb540282bbea9721c4d486f53c609b87f95da",\ "topics": [\ "0x1c411e9a96e071241c2f21f7726b17ae89e3cab4c78be50e062b03a9fffbbad1"\ ],\ "data": "0x00000000000000000000000000000000000000000000000050ea511347ec7722000000000000000000000000000000000000000000000000cdda1c0209b77814",\ "blockHash": "0xe1ec24b04014fb62f8742d8fc65ebe190957136cf7b0b02d70122e62d1461cc6",\ "blockNumber": "0x130442d",\ "blockTimestamp": "0x6650a6cb",\ "transactionHash": "0x6a35cbca8c9beae3b0305cb615bd16ce18af5624059c8158712d52ae4b30cba1",\ "transactionIndex": "0x2",\ "logIndex": "0x12",\ "removed": false\ },\ {\ "address": "0xebfeb540282bbea9721c4d486f53c609b87f95da",\ "topics": [\ "0xd78ad95fa46c994b6551d0da85fc275fe613ce37657fb8d5e3d130840159d822",\ "0x0000000000000000000000003fc91a3afd70395cd496c647d5a6cc9d4b2b7fad",\ "0x0000000000000000000000003fc91a3afd70395cd496c647d5a6cc9d4b2b7fad"\ ],\ "data": "0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000853a0d2313c0000000000000000000000000000000000000000000000000000036697761f973fa40000000000000000000000000000000000000000000000000000000000000000",\ "blockHash": "0xe1ec24b04014fb62f8742d8fc65ebe190957136cf7b0b02d70122e62d1461cc6",\ "blockNumber": "0x130442d",\ "blockTimestamp": "0x6650极简极繁:a6cb",\ "transactionHash": "0x极简极繁:6a35cbca8c9beae3b0305cb615bd16ce18af5624059c8158712d52ae4b30cba1",\ "transactionIndex": "0x2",\ "logIndex": "0x13",\ "removed": false\ },\ {\ "address": "0x68bbed6a47194eff1cf514b50ea91895597fc91e",\ "topics": [\ "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef",\ "0x0000000000000000000000002b8574482d62d8df670dfd3be15f2f092941284e",\ "0x0000000000000000000000003fc91a3afd70395cd496c647d5a6cc9d4b2b7fad"\ ],\ "data": "0x0000000000000000000000000000000000000000000b4f18646f6874fb65b31d",\ "blockHash": "0xe1ec24b04014fb62f8742d8fc65ebe190957136cf7b0b02d70122e62d1461cc6",\ "blockNumber": "0x130442d",\ "blockTimestamp": "0x6650a6cb",\ "transactionHash": "0x6a35cbca8c9beae3b0305cb615bd16ce18af5624059c8158712d52ae4b30cba1",\ "transactionIndex": "0x2",\ "logIndex": "0x14",\ "removed": false\ },\ {\ "address": "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2极简极繁:",\ "topics": [\ "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef",\ "0x0000000000000000000000003fc91a3afd70395cd496c647d5a6cc9d4b2b7fad",\ "0x0000000000000000000000002b8574482极简极繁:d62d8df670dfd3be15f2f092941284e"\ ],\ "data": "0x00000000000000000000000000000000000000000000000004db732547630000",\ "blockHash": "0xe1ec24b04014fb62f8742d8fc65ebe190957136cf7b0b02d70122e62d1461cc6",\ "blockNumber": "0x130442d",\ "blockTimestamp": "0x6650a6cb",\ "transactionHash": "0x6a35cbca8c9beae3b0305cb615bd16ce18af5624059c815871
  • 原文链接: quicknode.com/guides/qui...
  • 登链社区 AI 助手,为大家转译优秀英文文章,如有翻译不通的地方,还请包涵~
点赞 0
收藏 0
分享
本文参与登链社区写作激励计划 ,好文好收益,欢迎正在阅读的你也加入。

0 条评论

请先 登录 后评论
QuickNode
QuickNode
江湖只有他的大名,没有他的介绍。