使用 gRPC 实现 Solana 数据的实时流式传输:账户、交易、区块

  • Shyft_to
  • 发布于 2024-09-17 19:55
  • 阅读 21

本文介绍了如何使用 Shyft 的 gRPC 服务高效地在 Solana 区块链上流式传输交易、账户和区块更新。通过 gRPC,开发者可以简化数据传输过程,减少延迟,并更容易地构建复杂的区块链应用程序,无需传统的专用节点和复杂的代码库。文章提供了详细的配置步骤和代码示例,包括账户、交易和区块数据的订阅和反序列化过程。

使用 Shyft 的 gRPC 服务快速流式传输交易、账户和区块更新的综合指南

使用 gRPC 在 Solana 上进行实时数据流传输

在 gRPC 发明之前,流式传输链上数据传统上具有挑战性且耗时。 然而,随着 gRPC 的引入,现在可以方便有效地流式传输链上数据,例如账户、交易和区块。 在本教程中,我们将探讨使用 gRPC 流式传输链上数据的关键特性和优势。 我们将探讨以下主题:

  • 为什么选择 gRPC?
  • 使用 gRPC 获取链上数据——账户、交易和区块
  • 反序列化我们的输出——账户和交易。

开始前的准备工作

  1. 获取你的 Shyft API 密钥、gRPC 端点和 gRPC Token

注册并在 Shyft 仪表板 中获取以上所有详细信息

  1. 一个服务器端后端(如 NodeJS)来接收 gRPC 数据

由于前端不支持 gRPC 流式传输,因此你需要一个后端应用程序来接收 gRPC 数据。 在此示例中,我们使用了 NodeJS,但也可以使用其他后端语言,例如 C#、Go、Java、Kotlin、Python 或 PHP。

  1. Git 存储库和依赖项

与本文相关的所有代码都可以在我们的 GitHub 上找到 。 请随时克隆并跟随。

要从 GitHub 克隆它,请在要保存代码的目录中打开一个终端,然后点击以下命令

$ git clone https://github.com/Shyft-to/solana-defi.git

完成后,你应该会看到存储库中涉及的所有目录,然后在终端中点击以下命令。

cd grpc-block
npm install

这将带你到此博客的代码所在的目录,并且还将安装运行项目所需的所有依赖项。 你也可以查看存储库中的其他目录,因为它们都是与我们已发布的其他博客相关的示例项目。

为什么选择 gRPC?

gRPC 以最小的延迟简化了 Solana 区块链上的数据传输,无需专用节点,并简化了开发过程。 通过利用 gRPC,由于其低延迟减少的代码库,可以更轻松地构建复杂的项目。 总之,gRPC 通过以下方面彻底改变了专用节点的时代:

  • 简化了 Solana 区块链上的数据流传输
  • 最小的延迟。
  • 易于构建代码库减少的复杂项目。
  • 低延迟流式传输。
  • 专用节点时代的革命。

获取实时链上数据

首先,我们需要定义 SUBSCRIBE REQUEST 接口,该接口指定我们感兴趣的数据。

interface SubscribeRequest {
    accounts: { [key: string]: SubscribeRequestFilterAccounts };
    slots: { [key: string]: SubscribeRequestFilterSlots };
    transactions: { [key: string]: SubscribeRequestFilterTransactions };
    transactionsStatus: { [key: string]: SubscribeRequestFilterTransactions };
    blocks: { [key: string]: SubscribeRequestFilterBlocks };
    blocksMeta: { [key: string]: SubscribeRequestFilterBlocksMeta };
    entry: { [key: string]: SubscribeRequestFilterEntry };
    commitment?: CommitmentLevel | undefined;
    accountsDataSlice: SubscribeRequestAccountsDataSlice[];
    ping?: SubscribeRequestPing | undefined;
  }

为了实现账户流式传输,我们需要以下列方式管理我们的流:此函数处理来自流的更新,还定义了我们希望流获取的数据。 这确保了我们只收到与我们的用例相关的信息。

async function handleStream(client: Client, args: SubscribeRequest) {
    // Subscribe for events
    const stream = await client.subscribe();

    // Create `error` / `end` handler
    const streamClosed = new Promise<void>((resolve, reject) => {
      stream.on("error", (error) => {
        console.log("ERROR", error);
        reject(error);
        stream.end();
      });
      stream.on("end", () => {
        resolve();
      });
      stream.on("close", () => {
        resolve();
      });
    });

    // Handle updates
    stream.on("data", async (data) => {
      try{
       console.log(data)
     }catch(error){
    if(error){
      console.log(error)
    }
  }
});

    // Send subscribe request
    await new Promise<void>((resolve, reject) => {
      stream.write(args, (err: any) => {
        if (err === null || err === undefined) {
          resolve();
        } else {
          reject(err);
        }
      });
    }).catch((reason) => {
      console.error(reason);
      throw reason;
    });

    await streamClosed;
  }

接下来,我们需要定义我们的 subscribe 命令函数和 Request。 这一步是代码中的点睛之笔,并结束了我们的流实现。 每种流类型可能对流式传输有不同的要求。 例如,HANDLE STREAM 函数和反序列化的需求对于账户和交易流可能类似。 但是,区块流不需要反序列化,并且遵循不同的模式。

用于流式传输账户的订阅请求_

async function subscribeCommand(client: Client, args: SubscribeRequest) {
    while (true) {
      try {
        await handleStream(client, args);
      } catch (error) {
        console.error("Stream error, restarting in 1 second...", error);
        await new Promise((resolve) => setTimeout(resolve, 1000));
      }
    }
  }

  const client = new Client(
    'YOUR X REGION URL',
    'YOUR X TOKEN',
    undefined,
  );
  const req: SubscribeRequest = {
    slots: {},
    accounts: {
      "spl": {
        account: ["5oVNBeEEQvYi1cX3ir8Dx5n1P7pdxydbGF2X4TxVusJm"],
        owner: [],
        filters: [],
      },
    },
    transactions: {},
    transactionsStatus: {},
    blocks: {},
    blocksMeta: {},
    entry: {},
    accountsDataSlice: [],
    commitment: CommitmentLevel.CONFIRMED,
  };

用于流式传输某个程序的账户的订阅请求

const req: SubscribeRequest = {
  slots: {},
  accounts: {
    meteora: {
      owner: [METEORA_PROGRAM_ID.toBase58()],
      //program id (base58) for which we want to stream accounts
      account: [],
      filters: [],
    },
  },
  transactions: {},
  transactionsStatus: {},
  blocks: {},
  blocksMeta: {},
  accountsDataSlice: [],
  commitment: CommitmentLevel.PROCESSED,
  entry: {},
};

这与之前的请求非常相似,但我们不是过滤特定账户的更新,而是订阅与特定程序相关的更新。 这可以通过在订阅请求的账户参数中将程序 ID 指定为 owner 来完成,如上面的示例所示。

// Handling updates from the stream we setup above
  stream.on("data", (data) => {
    if (data.account?.account) {
      const account: AccountInfo<Buffer> = data.account.account;
      const accountData = account.data;
      const parsedAccountData = ACCOUNTS_PARSER.parseAccounts(
        new PublicKey(account.owner).toBase58(),
        accountData,
      );
      const response = {
        slot: data.slot,
        account: {
          executable: account.executable,
          owner: new PublicKey(account.owner).toBase58(),
          lamports: account.lamports,
        } as unknown as AccountInfo<any>,
        pubkey: new PublicKey(data.account?.account?.pubkey).toBase58(),
      };
      if (account.rentEpoch) {
        response.account.rentEpoch = account.rentEpoch;
      }
      if (parsedAccountData) {
        response.account.data = parsedAccountData;
      } else {
        response.account.data = utils.bytes.bs58.encode(accountData);
      }

      console.log(
        "Account Name: ",
        ACCOUNTS_PARSER.getAccountName(accountData),
      );
      console.log("Account Data: ", JSON.stringify(response, null, 2) + "\n");
    }
  });

收到更新后,我们可以使用 ACCOUNT_PARSER 分析每个更新并提取有价值的信息以供应用程序使用。 你可以在 这里 找到与流式传输账户数据相关的示例代码。

_流式传输交易的订阅请求

const req: SubscribeRequest = {
    slots: {},
    accounts: {},
    transactions: {
      alltxs: {
        vote: true,
        failed: true,
        signature: undefined,
        accountInclude: [],
        accountExclude: [],
        accountRequired: [],
      },
    },
    transactionsStatus: {},
    blocks: {},
    blocksMeta: {},
    entry: {},
    accountsDataSlice: [],
    commitment: CommitmentLevel.FINALIZED,
  };

_流式传输区块的订阅请求

async function handleStream(client: Client, args: SubscribeRequest) {
    // Subscribe for events
    const stream = await client.subscribe();

    // Create `error` / `end` handler
    const streamClosed = new Promise<void>((resolve, reject) => {
      stream.on("error", (error) => {
        console.log("ERROR", error);
        reject(error);
        stream.end();
      });
      stream.on("end", () => {
        resolve();
      });
      stream.on("close", () => {
        resolve();
      });
    });

    // Handle updates
    stream.on("data", async (data) => {
      try{
        const blockhash = data.blockMeta.blockhash;
        const parentBlockhash = data.blockMeta.parentBlockhash;
        const blockTime = data.blockMeta.blockTime.timestamp;
        const slot = data.blockMeta.slot;
        console.log(`
            Blockhash : ${blockhash}
            Parent Blockhash : ${parentBlockhash}
            Block Time : ${blockTime}
            Slot : ${slot}
            `)
  }catch(error){
    if(error){
      console.log(error)
    }
  }
});

    // Send subscribe request
    await new Promise<void>((resolve, reject) => {
      stream.write(args, (err: any) => {
        if (err === null || err === undefined) {
          resolve();
        } else {
          reject(err);
        }
      });
    }).catch((reason) => {
      console.error(reason);
      throw reason;
    });

    await streamClosed;
  }

  async function subscribeCommand(client: Client, args: SubscribeRequest) {
    while (true) {
      try {
        await handleStream(client, args);
      } catch (error) {
        console.error("Stream error, restarting in 1 second...", error);
        await new Promise((resolve) => setTimeout(resolve, 1000));
      }
    }
  }

  const client = new Client(
    'YOUR X URL',
    'YOUR X TOKEN',
    undefined,
  );
  const req: SubscribeRequest = {
    slots: {},
    accounts: {},
    transactions: {},
    transactionsStatus: {},
    blocks: {},
    blocksMeta: { blockmetadata: {} },
    entry: {},
    accountsDataSlice: [],
  };

最后,我们发出调用

subscribeCommand(client, req);

这是一个输出可能是什么样子的示例:

{
  filters: [ 'spl' ],
  account: {
    account: {
      pubkey: <Buffer 47 57 89 9f b8 be db a2 87 78 aa cd 67 e5 68 e7 34 70 cc e9 0b cd 53 2b 6c b6 18 29 76 28 82 4e>,
      lamports: '31461600',
      owner: <Buffer 06 dd f6 e1 d7 65 a1 93 d9 cb e1 46 ce eb 79 ac 1c b4 85 ed 5f 5b 37 91 3a 8c f5 85 7e ff 00 a9>,
      executable: false,
      rentEpoch: '18446744073709551615',
      data: <Buffer 01 00 00 00 8d d8 72 a3 b7 15 de d1 d4 60 34 3f f5 ba 4a 28 10 2e 39 02 47 25 89 5f eb c7 a9 c7 97 21 33 d3 f6 4d
40 b3 ae 18 04 00 09 01 00 00 00 00 ... 32 more bytes>,
      writeVersion: '1376590771003',
      txnSignature: <Buffer 92 2d ee 51 37 6c b4 c6 a3 46 8f d8 16 01 dc 1c cc c7 09 8e 7e 1d 96 43 8f 28 48 7b c7 b4 43 22 13 58 b2 9e 34 09 27 31 8d 67 4e c7 b7 6a 2f 2e a1 ce ... 14 more bytes>
    },
    slot: '282339432',
    isStartup: false
  },
  slot: undefined,
  transaction: undefined,
  block: undefined,
  ping: undefined,
  pong: undefined,
  blockMeta: undefined,
  entry: undefined
}

不要惊慌——这是我们反序列化流数据的地方! 以下代码块演示了如何反序列化从流接收的数据。 这一步对于解释原始数据和提取有意义的信息至关重要。

反序列化账户数据

要将缓冲账户数据的输出转换为更易读的格式,我们需要反序列化我们的数据。 下面是一个示例,说明如何编写一个函数来反序列化类中的数据

import * as base58 from "bs58";
import {
  AccountMeta,
  CompiledInstruction,
  ConfirmedTransactionMeta,
  LoadedAddresses,
  Message,
  MessageCompiledInstruction,
  MessageV0,
  PublicKey,
  TransactionInstruction,
  VersionedMessage,
  VersionedTransactionResponse,
} from "@solana/web3.js";
import { accountDeserialize } from "./deserializingAccount";
export class TransactionFormatter {
  public async formTransactionFromJson(
    data: any,
  ) {
    const signatures = base58.encode(
      Buffer.from(data.txnSignature,"base64")
    )
   const publicKey = base58.encode(
    Buffer.from(data.pubkey,'base64')
     )
   const owner = base58.encode(
    Buffer.from(data.owner,'base64')
   )
  const info = await accountDeserialize(data.data)    return {
      publicKey,
      signatures,
      owner,
      info
    };
  }

请注意,对于你需要反序列化的每个其他数据,你都需要指定你希望解码的数据。 你需要以这种方式编写你的 TransactionFormatter 类。

const TXN_FORMATTER = new TransactionFormatter();

然后你编辑你的 handleStream 函数以适应此

async function handleStream(client: Client, args: SubscribeRequest) {
    // Subscribe for events
    const stream = await client.subscribe();

    // Create `error` / `end` handler
    const streamClosed = new Promise<void>((resolve, reject) => {
      stream.on("error", (error) => {
        console.log("ERROR", error);
        reject(error);
        stream.end();
      });
      stream.on("end", () => {
        resolve();
      });
      stream.on("close", () => {
        resolve();
      });
    });

    // Handle updates
    stream.on("data", async (data) => {
      try{
     if(data?.account?.account){
      const txn = await TXN_FORMATTER.formTransactionFromJson(
        data?.account?.account,
      )
      console.log(txn)
     }

  }catch(error){
    if(error){
      console.log(error)
    }
  }
});

    // Send subscribe request
    await new Promise<void>((resolve, reject) => {
      stream.write(args, (err: any) => {
        if (err === null || err === undefined) {
          resolve();
        } else {
          reject(err);
        }
      });
    }).catch((reason) => {
      console.error(reason);
      throw reason;
    });

    await streamClosed;
  }

正确解码数据后。 你的输出应该是可读的,如下所示:

{
  publicKey: '5oVNBeEEQvYi1cX3ir8Dx5n1P7pdxydbGF2X4TxVusJm',
  signatures: 'bNya1bzHFUQ9VppF3E5rMwu4MtzCeATwDm98gb9EgxGzWkQGkJzFa5K51WqutJSjfTNSmRRDhDNVjEC5guAfU7S',
  owner: 'TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA',
  info: {
    mintAuthorityOption: 1,
    mintAuthority: PublicKey [PublicKey(AYhux5gJzCoeoc1PoJ1VxwPDe22RwcvpHviLDD1oCGvW)] {
      _bn: <BN: 8dd872a3b715ded1d460343ff5ba4a28102e39024725895febc7a9c7972133d3>
    },
    supply: 1153019595443727n,
    decimals: 9,
    isInitialized: true,
    freezeAuthorityOption: 0,
    freezeAuthority: PublicKey [PublicKey(11111111111111111111111111111111)] {
      _bn: <BN: 0>
    }
  }
}

结论

使用专用节点在 Solana 区块链上流式传输数据确实可能是一项复杂且资源密集型的任务,需要更大且更复杂的代码库。 然而,借助 gRPC,开发人员现在可以方便地轻松流式传输链上数据,简化他们的代码,并使他们能够专注于构建强大的应用程序,从而挖掘 Solana 区块链的强大功能。 在 Shyft,我们很高兴向开发人员社区推出这种创新的解决方案。 我们的团队致力于提供必要的工具和支持,以使此过程无缝衔接。 我们鼓励开发人员加入我们的 Discord 服务器或在 Twitter 上关注我们以获取更新和进一步的帮助。 借助 gRPC,开发人员可以利用低延迟和减少的代码库来轻松构建复杂的项目。 如需进一步的帮助,请加入我们的 Discord 服务器 或在 Twitter 上关注我们以获取更新。

你可以在 GitHub 上的此处 找到与本文相关的所有代码,请随时克隆并跟随。

资源

  • 原文链接: blogs.shyft.to/real-time...
  • 登链社区 AI 助手,为大家转译优秀英文文章,如有翻译不通的地方,还请包涵~
点赞 0
收藏 0
分享
本文参与登链社区写作激励计划 ,好文好收益,欢迎正在阅读的你也加入。

0 条评论

请先 登录 后评论
Shyft_to
Shyft_to
在 Solana上更快更智能地构建,使用Shyft的SuperIndexer、gRPC、RPC、API和SDK