本文介绍了如何在Solana上使用Yellowstone gRPC动态修改订阅请求,而无需断开流或丢失数据。通过stream.write()
方法,可以在连接中发送新的订阅请求,从而实现实时更新订阅,同时保持流的活动状态,避免断开和重新连接的开销,适用于需要根据用户行为或外部事件进行调整的应用场景,例如追踪新上线的Token、监控钱包活动或响应治理提议。
Solana Yellowstone gRPCs 提供了一种强大的方式来使用基于 gRPC 的订阅流式传输实时区块链事件,如交易、账户更新和区块。从流接收到的数据类型通过订阅请求指定,这允许开发者筛选特定的更新,如交易、账户或 slot。
虽然 Yellowstone gRPCs 是依赖连续、实时数据流的生产应用中的可靠技术,但实际用例通常需要动态适应性。在一些用例中,当你开始获取数据时,你最初请求的信息并不完全是你后来需要的,尤其是当你的应用或其用户发生变化时。动态修改订阅请求——无需断开和重新连接流——对于几个用例来说至关重要。
在本文中,我们将探讨如何在不中断流的情况下修改订阅请求。
要开始,我们需要一些东西。
身份验证:gRPC 端点和 gRPC Token Shyft 的 gRPC 节点遍布欧盟和美国地区。要访问,我们需要特定区域的 gRPC 端点和访问Token,这些Token可以在你的 Shyft 仪表板上购买。
一个服务器端后端(如 NodeJS)来接收 gRPC 数据 由于 web 浏览器不支持 gRPC 服务,因此你需要一个后端应用程序如 C#、Go、Java、Python 等来接收 gRPC 数据。
本文的完整代码可在 我们的文档 和 Replit 中找到——可以随意探索和测试。我们还在 GitHub 上分享了一系列涵盖 gRPC 和 DeFi 的示例用例,你可以克隆并进行实验。
这是一个示例,它使用 client.subscribe()
请求发送订阅,然后使用另一个 client.write()
来写入相同的订阅请求。
import Client, {
CommitmentLevel,
SubscribeRequestAccountsDataSlice,
SubscribeRequestFilterAccounts,
SubscribeRequestFilterBlocks,
SubscribeRequestFilterBlocksMeta,
SubscribeRequestFilterEntry,
SubscribeRequestFilterSlots,
SubscribeRequestFilterTransactions,
} from "@triton-one/yellowstone-grpc";
import { SubscribeRequestPing } from "@triton-one/yellowstone-grpc/dist/grpc/geyser";
interface SubscribeRequest {
accounts: { [key: string]: SubscribeRequestFilterAccounts };
slots: { [key: string]: SubscribeRequestFilterSlots };
transactions: { [key: string]: SubscribeRequestFilterTransactions };
transactionsStatus: { [key: string]: SubscribeRequestFilterTransactions };
blocks: { [key: string]: SubscribeRequestFilterBlocks };
blocksMeta: { [key: string]: SubscribeRequestFilterBlocksMeta };
entry: { [key: string]: SubscribeRequestFilterEntry };
commitment?: CommitmentLevel;
accountsDataSlice: SubscribeRequestAccountsDataSlice[];
ping?: SubscribeRequestPing;
}
const subscribedWalletsA: string[] = [\
"EPjFWdd5AufqSSqeM2qN1xzybapC8G4wEGGkZwyTDt1v",\
"5n2WeFEQbfV65niEP63sZc3VA7EgC4gxcTzsGGuXpump",\
"4oJh9x5Cr14bfaBtUsXN1YUZbxRhuae9nrkSyWGSpump",\
"GBpE12CEBFY9C74gRBuZMTPgy2BGEJNCn4cHbEPKpump",\
"oraim8c9d1nkfuQk9EzGYEUGxqL3MHQYndRw1huVo5h",\
];
const subscribedWalletsB: string[] = [\
"6EF8rrecthR5Dkzon8Nwu78hRvfCKubJ14M5uBEwF6P",\
];
const subscribeRequest1: SubscribeRequest = {
accounts: {},
slots: {},
transactions: {
modifying_A: {
vote: false,
failed: false,
signature: undefined,
accountInclude: subscribedWalletsA,
accountExclude: [],
accountRequired: [],
},
},
transactionsStatus: {},
entry: {},
blocks: {},
blocksMeta: {},
accountsDataSlice: [],
ping: undefined,
commitment: CommitmentLevel.PROCESSED,
};
// Subscribes to account changes for program-owned accounts of subscribedWalletsB
// 订阅 subscribedWalletsB 的程序拥有的账户的账户更改
const subscribeRequest2: SubscribeRequest = {
accounts: {
modifying_B: {
account: [],
filters: [],
owner: subscribedWalletsB,
},
},
slots: {},
transactions: {},
transactionsStatus: {},
blocks: {},
blocksMeta: {
block: [],
},
entry: {},
accountsDataSlice: [],
ping: undefined,
commitment: CommitmentLevel.PROCESSED,
};
/**
* Dynamically updates the current stream subscription with new request parameters.
* 使用新的请求参数动态更新当前流订阅。
*/
async function updateSubscription(stream: any, args: SubscribeRequest) {
try {
stream.write(args);
} catch (error) {
console.error("Failed to send updated subscription request:", error);
}
}
/**
* Handles a single streaming session.
* 处理单个流会话。
* Automatically switches to a second subscription request after a timeout.
* 超时后自动切换到第二个订阅请求。
*/
async function handleStream(client: Client, args: SubscribeRequest) {
const stream = await client.subscribe();
// Waits for the stream to close or error out
// 等待流关闭或出错
const streamClosed = new Promise<void>((resolve, reject) => {
stream.on("error", (error) => {
console.error("Stream Error:", error);
reject(error);
stream.end();
});
stream.on("end", resolve);
stream.on("close", resolve);
});
// Automatically switch subscription after 10 seconds
// 10 秒后自动切换订阅
setTimeout(async () => {
console.log("🔁 Switching to second subscription request...");
await updateSubscription(stream, subscribeRequest2);
}, 10000);
// Handle incoming data
// 处理传入数据
stream.on("data", async (data) => {
try {
console.log("📦 Streamed Data:", data);
// You can add more processing logic here
// 你可以在此处添加更多处理逻辑
} catch (error) {
console.error("Error processing stream data:", error);
}
});
// Send initial subscription request
// 发送初始订阅请求
await new Promise<void>((resolve, reject) => {
stream.write(args, (err: any) => (err ? reject(err) : resolve()));
}).catch((reason) => {
console.error("Initial stream write failed:", reason);
throw reason;
});
await streamClosed;
}
/**
* Starts the stream and continuously attempts to reconnect on errors.
* 启动流并不断尝试在错误时重新连接。
*/
async function subscribeCommand(client: Client, args: SubscribeRequest) {
while (true) {
try {
await handleStream(client, args);
} catch (error) {
console.error("Stream error. Retrying in 1 second...", error);
await new Promise((resolve) => setTimeout(resolve, 1000));
}
}
}
const client = new Client("YOUR-GRPC-URL", "YOUR-GRPC-TOKEN", undefined);
// Start streaming with the first subscription
// 从第一个订阅开始流式传输
subscribeCommand(client, subscribeRequest1);
在上面演示的示例中,我们可以看到我们有两个不同的订阅请求。流以 subscribeRequest1
开始,它侦听涉及预定义的一组钱包地址(subscribedWalletsA
)的交易。
动态更新 10 秒后,代码使用 subscribeRequest2
调用 updateSubscription()
,这将流切换到监视由另一个钱包集(subscribedWalletsB
)拥有的帐户,并开始侦听 blocksMeta
。
这是在不结束或重新启动流的情况下完成的,这要归功于 stream.write()
方法,该方法在连接中发送新的 SubscribeRequest
。
强大的错误处理 handleStream()
函数侦听 error
、close
或 end
事件。如果流中断,subscribeCommand()
包装器会在 1 秒延迟后重试连接,从而确保在不稳定的网络条件下或后端中断期间的弹性。
在 Solana Yellowstone gRPC 中间流修改 SubscribeRequest
是构建依赖于实时区块链数据的响应式、可用于生产的应用程序的关键功能。以下是这种方法如此强大的原因:
如果你错过了,本文的完整代码可在 我们的文档 和 Replit 中找到——可以随意探索和测试。我们还在 GitHub 上分享了一系列涵盖 gRPC 和 DeFi 的示例用例,你可以克隆并进行实验。
你可以浏览我们的其他相关文章: 在 Solana 上流式传输实时数据 , 使用 gRPC 进行实时数据流式传输:账户、交易、区块 , 如何在 Solana 上流式传输实时 Pump.fun 更新 , 和 跟踪 Raydium 上的新池 .
- 原文链接: blogs.shyft.to/how-to-mo...
- 登链社区 AI 助手,为大家转译优秀英文文章,如有翻译不通的地方,还请包涵~
如果觉得我的文章对您有用,请随意打赏。你的支持将鼓励我继续创作!