如何实时流式传输Solana程序数据

  • QuickNode
  • 发布于 2025-02-04 15:52
  • 阅读 30

本文介绍了如何使用QuickNode的Streams工具来获取和过滤Solana区块链中的实时数据,具体步骤包括创建流、过滤数据、建立Webhook服务器,以及如何利用ngrok使本地服务器公开可访问。文章提供了详细的代码示例和相关配置,适合希望获取Solana特定区块数据的开发者。

概述

管理 Solana 生成的大量数据可能是一个挑战,通常你只需要一小部分区块数据。Streams 是一个强大的工具,它可以让你从 Solana 中实时或历史数据中检索并过滤数据,并将其发送到你选择的目标。这个指南将向你展示如何使用 Streams 检索实时 Solana 程序数据,以及如何使用 Filters 精确解析你需要的数据并将其发送到你的 Webhook 目标。

查看快速视频演示,以查看 Solana Streams 的实际应用。

Solana Streams 演示 - YouTube

QuickNode

131K 订阅者

Solana Streams 演示

QuickNode

搜索

信息

购物

点击解除静音

如果播放没有立即开始,请尝试重启你的设备。

你已登出

你观看的视频可能会添加到电视的观看历史,并影响电视推荐。为避免此情况,请在你的计算机上取消并登录 YouTube。

取消确认

分享

包括播放列表

检索共享信息时出现错误。请稍后再试。

稍后观看

分享

复制链接

在上观看

0:00

/ •实时

在 YouTube 上观看

订阅我们的 YouTube 频道以获得更多视频! 订阅

让我们开始吧!

你将要做的事情

  • 在 QuickNode 上创建 Stream
  • 过滤进入的 Stream 数据,以检索 Pump.fun 上的新代币创建
  • 创建一个本地服务器以接收过滤后的数据
  • 将过滤后的数据路由到你的 Webhook 目标

最终,我们的服务器将以如下方式记录新的 Pump.fun 创建:

Pump.fun 创建

你将需要的东西

步骤 1:创建 Stream

从你的 QuickNode 仪表板,导航到 Streams 页面并点击 "+ 创建 Stream" 按钮,或者在 这里 创建一个。

从设置卡片中选择 "Solana" 和 "Mainnet":

Solana 主网设置

可以给你的 Stream 添加一个独特的名称,或者直接使用随机生成的名称。

接下来,向下滚动到 "Stream Start" 选项,并确保选择 "最新区块"(用于实时数据)。或者,如果你想从特定的区块高度开始,可以选择 "区块 #" 并输入想要开始的区块高度。如果你正在构建一个有固定结束区块的 Stream,也可以输入 "Stream End"(我们将保持为空,因为我们只想持续 streaming 新创建的代币)。

接下来,选择 "在流之前修改负载"。这是一个非常重要的步骤,因为你只会根据传送到目标的数据量收费。我们的过滤器将允许我们:

  1. 只过滤你需要的数据(在这种情况下,我们将过滤使用 Pump.fun 程序创建的新代币数据)
  2. 根据你的喜好修改返回的数据(在这种情况下,我们将只返回代币铸造地址、交易签名和时间戳)

Solana 主网设置

或者,如果你不过滤你的数据,你将会收到每个区块的所有区块数据,可能非常大且费用昂贵。返回的区块数据来自 Solana 的 getBlock RPC 方法,并返回如下 JSON 对象:

{
  "blockHeight": 270646103,
  "blockTime": 1727197869,
  "blockhash": "52egMfezPu8MfzMc1ZPzZAaR6o7ZuJ4633VQPCyDbotJ",
  "parentSlot": 291763038,
  "previousBlockhash": "GG8Y7BEvZf3CRixX5k8GraDSkofcMLC9mXAqteuwKR7d",
  "transactions": [\
    /* ... 所有区块中的交易 ... */\
  ],
}

接下来,你应该会看到一个 JavaScript 代码编辑器,用于你的 main 函数。这是你 Stream 过滤器的入口点(请查看我们的 文档 获取有关过滤器的更多信息)。默认代码如下:

function main(stream) {
  const data = stream.data

  return data;
}

stream 对象包含 datametadata 字段。data 字段包含负载数据,metadata 字段包含 Stream 的元数据。继续点击 "▶️ 运行测试" 来测试你的 Stream。由于 Solana 区块相当大,这将需要几秒钟加载。最终,它应该有效地返回指定 "测试区块" 的区块数据:

测试区块

请随意浏览响应。那里面有很多内容——让我们看看能否过滤到我们需要的数据。

步骤 2:过滤 Stream 数据

我们在区块中有大量数据。让我们想想如何过滤出仅显示我们想看到的数据。对于本示例,我们计划将我们的负载包括一个 matchedTransactions 数组,其中包含每个交易的 signatureblockTimemint

{
  "matchedTransactions": [\
    {\
      "signature": "字符串",\
      "blockTime": "字符串",\
      "accounts": {\
        "mint": "字符串"\
      }\
    }\
  ]
}

让我们想想我们的过滤器需要做什么:

  1. 过滤掉失败的交易(原始区块数据将包括失败的交易)
  2. 过滤包含调用 Pump.fun 程序的指令的交易( 6EF8rrecthR5Dkzon8Nwu78hRvfCKubJ14M5uBEwF6P
  3. 过滤这些交易,只包括调用 Pump.fun create 指令的交易
  4. 最后,过滤完数据后,我们需要将交易数据格式化以适应我们的负载

让我们创建它。

创建入口点

在你的 Streams 页面内,将现有的 main 函数替换为以下内容:

function main(stream) {
    try {
        const data = stream.data[0];
        if (!data?.transactions?.length) {
            return { error: '无效或缺失的流' };
        }

        const matchedTransactions = data.transactions
            .filter(matchesFilter)
            .map(tx => formatTransaction(tx, data));

        if (matchedTransactions.length === 0) {
            return null;
        }
        return { matchedTransactions };
    } catch (error) {
        console.error('主函数中的错误:', error);
        return { error: error.message, stack: error.stack };
    }
}

这里是一个简单的函数,正好执行我们刚刚描述的操作:

  1. 首先,我们通过在未接收到数据时抛出错误来确保 Stream 数据有效
  2. 然后我们使用 matchesFilter 函数过滤交易
  3. 接下来我们对过滤交易进行映射,使用 formatTransaction 函数进行新的格式化
  4. 最后,如果有交易,我们将返回已格式化的交易

定义常量

在定义我们的实用函数之前,让我们定义一些常量。这些将用于定义我们的过滤器和账户的包含(你可以稍后调整这些以支持你自己的用例)。在 main 函数上方,添加以下常量:

const BASE58_ALPHABET = '123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz';
const PUMP_FUN_PROGRAM_ID = '6EF8rrecthR5Dkzon8Nwu78hRvfCKubJ14M5uBEwF6P';
const PUMP_FUN_CREATE_IX_DISCRIMINATOR = [24, 30, 200, 40, 5, 28, 7, 119];
const FILTER_CONFIG = {
    programIds: [PUMP_FUN_PROGRAM_ID],
    skipFailed: true,
    instructionDiscriminators: [PUMP_FUN_CREATE_IX_DISCRIMINATOR]
};
const ACCOUNTS_TO_INCLUDE = [{\
    name: "mint",\
    index: 0\
}];

让我们解释一下这些是什么:

  • BASE58_ALPHABET: 这个常量定义了 Base58 字母表。Solana 使用 base58 编码来表示 Solana 地址,这些地址是字符字符串。我们将利用这一点解码指令鉴别符。
  • PUMP_FUN_PROGRAM_ID: 这是 Pump.fun 程序地址。
  • PUMP_FUN_CREATE_IX_DISCRIMINATOR: 这是 Pump.fun 程序中 create 指令的 指令鉴别符。如果你尚未拥有针对目标指令的鉴别符,可以查看这个有益的 Solana StackExchange 答复 来了解如何找到它。
  • FILTER_CONFIG: 这个对象定义了我们 Stream 的过滤配置。它包括要过滤的程序 ID、是否跳过失败的交易和要过滤的指令鉴别符。我们已设置好,这样你可以按需向你的过滤器添加程序或指令。
  • ACCOUNTS_TO_INCLUDE: 这是一个对象数组,定义了在交易数据中要包含的账户。在这种情况下,我们只关心交易的 mint 账户。我们指定了在指令数据中账户的预期索引。在这种情况下,我们知道 mintcreate 指令中的第一个账户(在程序的 IDL 中找到, 这里)。

很好。现在我们只需要定义一些实用函数来使我们的 Stream 工作。

定义实用函数

首先,我们将需要我们的 matchesFilter 函数。这个函数将根据我们的过滤配置过滤交易。我们还将创建一个可重用的 matchesInstructionDiscriminator 函数,以匹配我们过滤配置中的指令鉴别符。在你的 main 函数下方添加以下内容:

function matchesFilter(tx) {
    if (FILTER_CONFIG.skipFailed && tx.meta?.err !== null) {
        return false;
    }

    const programIds = new Set(tx.transaction.message.instructions.map(ix => ix.programId));
    if (!FILTER_CONFIG.programIds.some(id => programIds.has(id))) {
        return false;
    }

    return tx.transaction.message.instructions.some(matchesInstructionDiscriminator);
}

function matchesInstructionDiscriminator(ix) {
    if (!ix?.data) return false;
    const decodedData = decodeBase58(ix.data);
    return FILTER_CONFIG.instructionDiscriminators.some(discriminator =>
        discriminator.length === 8 && discriminator.every((byte, index) => byte === decodedData[index])
    );
}

matchesFilter 进行三项检查:

  1. 如果 skipFailed 标志设置为 true 且交易有错误,我们将返回 false 以跳过交易
  2. 如果交易不匹配过滤配置中的程序 ID,我们将返回 false 以跳过交易
  3. 如果交易匹配程序 ID和指令鉴别符,我们将返回 true 以包含交易

接下来,我们将定义我们的 matchesInstructionDiscriminator 函数。该函数将匹配我们过滤配置中的指令鉴别符。首先通过 decodeBase58 函数解码指令数据,然后检查鉴别符是否匹配预期鉴别符。让我们向下定义 decodeBase58 函数。在你的 matchesFilter 函数下方添加以下内容:

function decodeBase58(encoded) {
    if (typeof encoded !== 'string') return [];
    const result = [];
    for (let i = 0; i < encoded.length; i++) {
        let carry = BASE58_ALPHABET.indexOf(encoded[i]);
        if (carry < 0) return []; // 无效字符,返回空数组
        for (let j = 0; j < result.length; j++) {
            carry += result[j] * 58;
            result[j] = carry & 0xff;
            carry >>= 8;
        }
        while (carry > 0) {
            result.push(carry & 0xff);
            carry >>= 8;
        }
    }
    // 添加前导零
    for (let i = 0; i < encoded.length && encoded[i] === '1'; i++) {
        result.push(0);
    }
    return result.reverse();
}

decodeBase58 函数将 Base58 编码的字符串转换回其原始字节数组。其工作原理如下:

  1. 遍历输入字符串中的每个字符。
  2. 对于每个字符,查找在 Base58 字母表中的数值。
  3. 使用基于进位的算法构建原始数据的字节。
  4. 通过在末尾添加零字节来处理前导零(表示为 Base58 中的 '1')。
  5. 最后,反转结果数组以获得正确的字节顺序。

此功能对于解码 Solana 交易中的指令数据至关重要,使我们能够将其与过滤配置中的预期鉴别符进行比较。

最后,让我们定义我们的 formatTransaction 函数来整理我们的负载。在你的代码块底部添加以下内容:

function formatTransaction(tx, stream) {
    const matchingInstruction = tx.transaction.message.instructions.find(matchesInstructionDiscriminator);
    const includedAccounts = ACCOUNTS_TO_INCLUDE.reduce((acc, { name, index }) => {
        acc[name] = matchingInstruction.accounts[index];
        return acc;
    }, {});

    return {
        signature: tx.transaction.signatures[0],
        blockTime: stream.blockTime,
        accounts: includedAccounts
    }
}

此函数接受一个事务和 Stream 数据,然后使用 matchesInstructionDiscriminator 函数在交易中查找匹配的指令。接着它提取 ACCOUNTS_TO_INCLUDE 数组中指定的账户(这意味着你可以进一步包括额外的账户),并返回一个包含签名、区块时间和账户的新的对象。

测试你的 Stream

如果你向上滚动到代码块上方,你会看到一个字段“测试区块”。这是我们将用于测试我们的 Stream 的区块号。在这种情况下,我找到了一个我知道包含成功的 create 交易的区块。将字段更改为 “291788725”,然后点击 “▶️ 运行测试”。你应该会看到以下响应:

{
  "matchedTransactions": [\
    {\
      "accounts": {\
        "mint": "AWcvL1GSNX8VDLm1nFWzB9u2o4guAmXM341imLaHpump"\
      },\
      "blockTime": 1727209327,\
      "signature": "4vzEaCkQnKym4TdDv67JF9VYMbvoMRwWU5E6TMZPSAbHJh4tXhsbcU8dkaFey1kFn6ZLQ2PMVzxb8zaexuFrii7q"\
    }\
  ]
}

干得好!这展示了该区块的负载将是什么样子!点击右下角的“下一步”继续到下一步。你会注意到这里的返回数据大小约为 ~274 B,而未过滤的区块数据约为 ~4.5 MB!

步骤 3:创建 Webhook 服务器

经过设置,我们需要一种方式来接收和处理数据。我们将创建一个简单的 Express 服务器,监听来自我们 Stream 的输入 webhooks。

设置你的项目

首先,让我们设置一个新的 Node.js 项目:

  1. 创建一个新目录用于你的项目并切换到该目录:
mkdir pump-fun-stream && cd pump-fun-stream
  1. 初始化一个新的 Node.js 项目:
npm init -y
  1. 安装所需的依赖:
npm install express body-parser

创建服务器

现在,创建我们的服务器。新建一个名为 server.js 的文件,并添加以下代码:

const express = require('express');
const bodyParser = require('body-parser');

const app = express();
const port = 3000;

app.use(bodyParser.json());

app.post('/webhook', (req, res) => {
    if (req.body && req.body.matchedTransactions && Array.isArray(req.body.matchedTransactions)) {
        req.body.matchedTransactions.forEach(transaction => {
                const { accounts: { mint }, blockTime, signature } = transaction;
                const date = new Date(blockTime * 1000).toLocaleString();
                console.log('\n' + '='.repeat(110) + '\n')
                console.log(`   💊 新代币铸造: ${mint}`);
                console.log(`      区块时间: ${date}`);
                console.log(`      TXID: ${signature}`);
        });
    }

    res.status(200).send('Webhook 收到');
});

app.listen(port, () => {
    console.log(`Webhook 服务器监听在 http://localhost:${port}`);
});

让我们分解一下这个服务器的作用:

  1. 我们使用 Express 创建一个简单的 Web 服务器。
  2. 服务器在 /webhook 端点上监听 POST 请求。
  3. 当收到请求时,它检查请求的主体是否包含 matchedTransactions 数组。
  4. 对于数组中的每个交易,它记录铸币地址、区块时间和交易签名。
  5. 最后,它发送 200 OK 响应以确认收到 webhook。

运行服务器

要启动你的服务器,运行:

node server.js

你应该看到消息:Webhook 服务器监听在 http://localhost:3000

步骤 4:公开你的本地服务器

为了让 QuickNode 将 webhooks 发送到你的服务器,需要使其可公开访问。我们将使用 ngrok 创建一个至你的本地服务器的安全隧道。

  1. 如果你尚未下载并安装 ngrok,请 下载并安装 ngrok

  2. 在一个新的终端窗口中,运行:

ngrok http 3000
  1. ngrok 将提供一个公共 URL,转发到你的本地服务器。它的样子可能是 https://1234-56-78-910-11.ngrok.io

复制该 URL。我们将在下一步中需要它。

步骤 5:配置你的 Stream 的目标

现在我们已经运行并公开了服务器,接下来让我们配置我们的 Stream,使其向服务器发送数据:

  1. 返回到你的 Stream 配置页面。
  2. 在“目标类型”部分,选择“Webhook”。
  3. 在“目标 URL”字段中,输入你在上一步获取的 ngrok URL,后面加上 /webhook。例如: https://1234-56-78-910-11.ngrok.io/webhook
  4. 添加一个自定义头,以指定负载的 Content-Type。例如: Content-Type: application/json

你的 Stream 配置应如下所示:

Stream 配置

点击 "▶️ 测试目标" 将测试 webhook 发送到你的服务器。然后点击 "创建一个 Stream" 完成你的配置。你应看到交易开始在你服务器的控制台中出现:

服务器控制台

就是这样!你的 Stream 现在已设置为过滤新的 Pump.fun 代币创建并将数据发送到你的本地服务器。干得好!

继续构建!

你现在拥有使用 Streams 为自己的应用程序服务的工具。请记住,我们创建的过滤器只是一个示例——你可以想出更多想法。欢迎随意修改过滤逻辑以查找不同的程序、账户或指令。你还可以向你的服务器添加更复杂的处理逻辑。快乐的 streaming!

找点灵感继续构建?查看这些资源:

无论你在构建什么,我们都希望听到你的声音。可以在 DiscordTwitter 上与我们联系,告诉我们你正在做什么!

我们 ❤️ 反馈!

让我们知道 如果你有任何反馈或对新主题的请求。我们很乐意听取你的意见。

  • 原文链接: quicknode.com/guides/qui...
  • 登链社区 AI 助手,为大家转译优秀英文文章,如有翻译不通的地方,还请包涵~
点赞 0
收藏 0
分享
本文参与登链社区写作激励计划 ,好文好收益,欢迎正在阅读的你也加入。

0 条评论

请先 登录 后评论
QuickNode
QuickNode
江湖只有他的大名,没有他的介绍。