Yellowstone和Carbon:解析实时Solana程序数据

本文介绍了如何使用Yellowstone gRPC和Carbon框架构建一个Pump.fun交易监控器,该监控器可以实时监控Solana链上Pump.fun程序的新token创建和AMM迁移事件。文章详细说明了项目搭建、环境配置、代码实现以及如何利用Carbon框架简化链上数据捕获和处理的过程,并提供了扩展监控器的建议。

概述

监控特定 Solana 程序的交易可以为开发、分析或运营目的提供有价值的见解。访问程序数据以进行实时监控可能具有挑战性,而理解编码的程序数据可能更加困难。在本指南中,我们将使用 Yellowstone gRPC 和 Carbon(一个轻量级索引框架)为 Pump.fun 构建一个交易监控器来解析数据。

你将做什么

在本指南结束时,你将拥有一个 Rust 应用程序,它:

  • 利用 Yellowstone gRPC 和 Carbon 索引框架
  • 连接到 Yellowstone gRPC 端点
  • 过滤和流式传输 Pump.fun 以实时监控新的代币铸造和代币 AMM 迁移
  • 处理和记录有关目标事件的关键信息

你需要什么

依赖项 版本
rustc 1.85.0
cargo 1.85.0
tokio 1.43.0
yellowstone-grpc-proto 5.0.0
log 0.4.25
env_logger 0.11.5
carbon-core 0.8.0
carbon-yellowstone-grpc-datasource 0.8.0
carbon-pumpfun-decoder 0.8.0
async-trait 0.1.86
dotenv 0.15.0

了解工具

什么是 Yellowstone gRPC?

Yellowstone gRPC 是一个构建在 Geyser 插件系统上的 Solana 高性能数据流解决方案。它提供:

  • 账户更新、交易和插槽通知的实时流式传输
  • 与传统的 WebSocket 实现相比,延迟更低
  • 通过 gRPC 实现强大的类型安全
  • 用于订阅的有效过滤功能

什么是 Pump.fun?

Pump.fun 是 Solana 上一个流行的代币创建平台,允许任何人快速轻松地创建 meme 代币。 监控 Pump.fun 交易可以深入了解新的代币发行、市场趋势和用户活动。

什么是 Carbon?

Carbon 是一个用于 Solana 的轻量级索引框架,可简化捕获和处理链上数据的过程。 它提供了一个模块化管道架构,具有以下关键组件:

  • Pipeline:管理数据流的核心协调器
  • Data sources:提供数据更新(交易、账户)的组件
  • Pipes:通过解码器和处理器处理特定的更新
  • Decoders:将原始区块链数据转换为结构化类型
  • Processors:为解码后的数据实现自定义逻辑

Carbon 处理流式传输、解码和处理区块链数据的复杂细节,使你能够专注于构建应用程序逻辑。 在撰写本文时,Carbon 提供以下程序解码器:

可用的程序解码器

Crate 名称 描述
carbon-associated-token-account-decoder 关联的代币账户解码器
carbon-drift-v2-decoder Drift V2 程序解码器
carbon-fluxbeam-decoder Fluxbeam 程序解码器
carbon-jupiter-dca-decoder Jupiter DCA 程序解码器
carbon-jupiter-limit-order-decoder Jupiter 限价单程序解码器
carbon-jupiter-limit-order-2-decoder Jupiter 限价单 2 程序解码器
carbon-jupiter-perpetuals-decoder Jupiter 永续合约程序解码器
carbon-jupiter-swap-decoder Jupiter Swap 程序解码器
carbon-kamino-farms-decoder Kamino Farms 程序解码器
carbon-kamino-lending-decoder Kamino Lend 解码器
carbon-kamino-limit-order-decoder Kamino 限价单程序解码器
carbon-kamino-vault-decoder Kamino Vault 解码器
carbon-lifinity-amm-v2-decoder Lifinity AMM V2 程序解码器
carbon-marginfi-v2-decoder Marginfi V2 程序解码器
carbon-marinade-finance-decoder Marinade Finance 程序解码器
carbon-memo-program-decoder SPL Memo 程序解码器
carbon-meteora-dlmm-decoder Meteora DLMM 程序解码器
carbon-meteora-pools-decoder Meteora Pools 程序解码器
carbon-moonshot-decoder Moonshot 程序解码器
carbon-mpl-core-decoder MPL Core 程序解码器
carbon-mpl-token-metadata-decoder MPL 代币元数据程序解码器
carbon-name-service-decoder SPL 名称服务程序解码器
carbon-okx-dex-decoder OKX DEX 解码器
carbon-openbook-v2-decoder Openbook V2 程序解码器
carbon-orca-whirlpool-decoder Orca Whirlpool 程序解码器
carbon-phoenix-v1-decoder Phoenix V1 程序解码器
carbon-pumpfun-decoder Pumpfun 程序解码器
carbon-pump-swap-decoder PumpSwap 程序解码器
carbon-raydium-amm-v4-decoder Raydium AMM V4 程序解码器
carbon-raydium-clmm-decoder Raydium CLMM 程序解码器
carbon-raydium-cpmm-decoder Raydium CPMM 程序解码器
carbon-raydium-launchpad-decoder Raydium Launchpad 程序解码器
carbon-raydium-liquidity-locking-decoder Raydium 流动性锁定程序解码器
carbon-sharky-decoder SharkyFi 解码器
carbon-solayer-pool-restaking-decoder Solayer Pool Restaking 程序解码器
carbon-stabble-stable-swap-decoder Stabble Stable Swap 解码器
carbon-stabble-weighted-swap-decoder Stabble Weighted Swap 解码器
carbon-stake-program-decoder Stake 程序解码器
carbon-system-program-decoder 系统程序解码器
carbon-token-2022-decoder Token 2022 程序解码器
carbon-token-program-decoder Token 程序解码器
carbon-virtual-curve-decoder Meteora Virtual Curve 程序解码器
carbon-virtuals-decoder Virtuals 程序解码器
carbon-zeta-decoder Zeta 程序解码器

查看 Carbon 文档 以获取最新更新和其他解码器。 在本指南中,我们将使用 carbon-pumpfun-decoder 来解码 Pump.fun 程序指令——完成本指南后,你应该能够以类似的方式利用任何其他解码器。

项目设置

让我们从设置 Rust 项目结构开始。

1. 创建一个新项目

cargo new pump-fun-carbon
cd pump-fun-carbon

2. 配置依赖项

Cargo.toml 文件的内容替换为:

[package]
name = "pump-fun-carbon"
version = "0.1.0"
edition = "2024"

[dependencies]
## Yellowstone & Carbon dependencies
carbon-core = "0.8.0"
carbon-pumpfun-decoder = "0.8.0"
carbon-yellowstone-grpc-datasource = "0.8.0"
yellowstone-grpc-proto = "5.0.0"

## Async and utilities
async-trait = "0.1.86"
tokio = { version = "1.43.0", features = ["full"] }
dotenv = "0.15.0"
env_logger = "0.11.5"
log = "0.4.25"

3. 创建环境配置

在项目根目录中创建一个 .env 文件,以存储你的 Yellowstone gRPC 端点详细信息:

GEYSER_URL=your-quicknode-yellowstone-grpc-endpoint
X_TOKEN=your-quicknode-x-token

将占位符值替换为你的实际 QuickNode Yellowstone gRPC 端点和身份验证Token。你可以在我们的文档中找到有关配置端点的信息,这里

实施

现在,让我们构建我们的 Pump.fun 交易监控器。 初始化项目时,你应该有一个 src 目录,其中包含一个 main.rs 文件。 我们会将代码添加到此文件中。 如果你没有,请继续创建它。

1. 创建常量和导入

让我们首先导入必要的库并为我们的应用程序定义一些常量:

use {
    async_trait::async_trait,
    carbon_core::{
        deserialize::ArrangeAccounts,
        error::CarbonResult,
        instruction::{DecodedInstruction, InstructionMetadata, NestedInstructions},
        metrics::MetricsCollection,
        processor::Processor,
    },
    carbon_pumpfun_decoder::{
        PROGRAM_ID as PUMP_FUN_PROGRAM_ID, PumpfunDecoder,
        instructions::{PumpfunInstruction, create::Create, migrate::Migrate},
    },
    carbon_yellowstone_grpc_datasource::YellowstoneGrpcGeyserClient,
    std::{
        collections::{HashMap, HashSet},
        env,
        sync::Arc,
    },
    tokio::sync::RwLock,
    yellowstone_grpc_proto::geyser::{
        CommitmentLevel, SubscribeRequestFilterAccounts, SubscribeRequestFilterTransactions,
    },
};

// Pump.fun authority addresses to monitor
// 要监控的 Pump.fun 授权地址
const PUMP_FUN_MINT_AUTHORITY: &str = "TSLvdd1pWpHVjahSpsvCXUbgwsL3JAcvokwaKt1eokM";
const PUMP_FUN_MIGRATION_AUTHORITY: &str = "39azUYFWPz3VHgKCf3VChUwbpURdCHRxjWVowf5jUJjg";

在此代码中,我们从 Carbon 和 Pump.fun 解码器导入必要的模块。 我们还定义了 Pump.fun 铸造和迁移授权的常量,我们将使用它们来过滤交易——这将帮助我们减少需要处理的数据量,并专注于相关的交易(稍后会详细介绍)。

2. 实现 Main 函数

接下来,让我们实现作为应用程序入口点的 main 函数。 将以下代码添加到你的 main.rs 文件中(我们稍后将演练该代码):

##[tokio::main]
pub async fn main() -> CarbonResult<()> {
    // 1 - Initialize logging
    // 1 - 初始化日志记录
    unsafe {
        std::env::set_var("RUST_LOG", "info");
    }
    env_logger::init();
    log::info!("Starting Pumpfun Transaction Processor Using Carbon");
    // 2 - Check environment variables
    // 2 - 检查环境变量
    dotenv::dotenv().ok();
    let geyser_url = match env::var("GEYSER_URL") {
        Ok(url) if !url.is_empty() => url,
        _ => {
            log::error!("GEYSER_URL environment variable not set or empty");
            return Err(carbon_core::error::Error::Custom(
                "GEYSER_URL not set".into(),
            ));
        }
    };

    log::info!("Using GEYSER_URL: {}", geyser_url);

    let x_token = env::var("X_TOKEN").ok();
    log::info!(
        "X_TOKEN is {}",
        if x_token.is_some() { "set" } else { "not set" }
    );

    // 3 - Initialize account filters
    // 3 - 初始化账户过滤器
    let account_filters: HashMap<String, SubscribeRequestFilterAccounts> = HashMap::new();

    // 4 - Initialize transaction filter
    // 4 - 初始化交易过滤器
    let transaction_filter = SubscribeRequestFilterTransactions {
        vote: Some(false),
        failed: Some(false),
        account_include: vec![\
            PUMP_FUN_MINT_AUTHORITY.to_string(),\
            PUMP_FUN_MIGRATION_AUTHORITY.to_string(),\
        ],
        account_exclude: vec![],
        account_required: vec![PUMP_FUN_PROGRAM_ID.to_string()],
        signature: None,
    };

    let mut transaction_filters: HashMap<String, SubscribeRequestFilterTransactions> =
        HashMap::new();

    transaction_filters.insert(
        "raydium_launchpad_transaction_filter".to_string(),
        transaction_filter,
    );

    // 5 - Initialize Yellowstone Grpc Geyser Client
    // 5 - 初始化 Yellowstone Grpc Geyser 客户端
    let yellowstone_grpc = YellowstoneGrpcGeyserClient::new(
        env::var("GEYSER_URL").unwrap_or_default(),
        env::var("X_TOKEN").ok(),
        Some(CommitmentLevel::Processed),
        account_filters,
        transaction_filters,
        Arc::new(RwLock::new(HashSet::new())),
    );

    // 6 - Build and run the Carbon pipeline
    // 6 - 构建并运行 Carbon 管道
    carbon_core::pipeline::Pipeline::builder()
        .datasource(yellowstone_grpc)
        .instruction(PumpfunDecoder, PumpfunInstructionProcessor)
        //.account(PumpfunDecoder, PumpfunAccountProcessor)
        .shutdown_strategy(carbon_core::pipeline::ShutdownStrategy::Immediate)
        .build()?
        .run()
        .await?;

    Ok(())
}

让我们来分解一下这段代码的作用:

  1. 初始化日志记录 - 我们将日志级别设置为“info”并初始化记录器以在控制台输出中查看信息性消息。

  2. 检查环境变量 - 我们从 .env 文件加载变量并验证是否设置了所需的 GEYSER_URL。 我们还检查是否有可用于身份验证的 X_TOKEN

  3. 初始化账户过滤器 - 我们为账户过滤器创建一个空的 HashMap。 在此实现中,我们不按特定账户进行过滤,但该结构已为未来的扩展做好准备(例如,如果你想监控特定 LP 账户的更改而不是交易事件)。

  4. 初始化交易过滤器 - 我们创建一个过滤器来专门定位 Pump.fun 交易:

    • 排除投票交易和失败的交易
    • 包括涉及铸币授权或迁移授权的交易。 如果没有此过滤器,我们将收到 Pump.fun 程序的所有交易,包括买入和卖出——这将导致大量与我们的监控目标无关的噪音,并最终使用更多的credits。 因为我们知道铸币授权仅用于创建新代币,而迁移授权仅用于迁移代币,所以我们现在确保我们的监控器仅接收包含我们关心的指令的交易。
    • 要求存在 Pump.fun 程序 ID
    • 没有要排除的特定账户或要匹配的签名(尽管你可以考虑使用此字段来过滤恶意创建者或其他不需要的账户)
  5. 初始化 Yellowstone gRPC 客户端 - 我们创建客户端,它将连接到具有我们配置的过滤器的 Yellowstone gRPC 端点。 我们使用 Processed 提交级别以确保我们收到最新的数据。 我们注释掉了一个账户处理器,因为我们没有在此示例中使用它,但如果你也想监控账户更改,则可以将其添加回来。

  6. 构建并运行 Carbon 管道 - 最后,我们构建 Carbon 管道:

    • 使用 Yellowstone gRPC 客户端作为数据源
    • 将 Pump.fun 解码器连接到我们的处理器
    • 设置立即关闭策略
    • 构建并运行管道

太棒了! 现在我们只需要定义和实现 PumpfunInstructionProcessor,它将处理解码的 Pump.fun 指令。

3. 实现指令处理器

在你的 main 函数下面,让我们实现 PumpfunInstructionProcessor,它将处理解码的 Pump.fun 指令:

pub struct PumpfunInstructionProcessor;
##[async_trait]
impl Processor for PumpfunInstructionProcessor {
    type InputType = (
        InstructionMetadata,
        DecodedInstruction<PumpfunInstruction>,
        NestedInstructions,
    );

    async fn process(
        &mut self,
        (metadata, instruction, _nested_instructions): Self::InputType,
        _metrics: Arc<MetricsCollection>,
    ) -> CarbonResult<()> {
        let signature = metadata.transaction_metadata.signature;
        let accounts = instruction.accounts;

        match instruction.data {
            PumpfunInstruction::Sell(_params) => {
                log::info!("❌ - EXPECT TO NEVER SEE SELL INSTRUCTION");
                log::info!("TxId: {}", signature);
            }
            PumpfunInstruction::Create(params) => {
                log::info!("💊 NEW PUMPFUN TOKEN CREATED");
                log::info!(
                    "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━"
                );
                log::info!("TxId: {}", signature);
                log::info!("Token Details:");
                log::info!("   • Name:   {}", params.name);
                log::info!("   • Symbol: {}", params.symbol);

                match Create::arrange_accounts(&accounts) {
                    Some(arranged_accounts) => {
                        log::info!("   • Mint:   {}", arranged_accounts.mint);
                        log::info!(
                            "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━"
                        );
                    }
                    None => {
                        log::warn!(
                            "Failed to arrange accounts for Create instruction. Create instruction: signature: {signature}"
                        );
                    }
                }
            }

            PumpfunInstruction::Migrate(_params) => {
                log::info!("🔄 MIGRATE INSTRUCTION DETECTED");
                log::info!(
                    "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━"
                );
                log::info!("TxId: {}", signature);
                match Migrate::arrange_accounts(&accounts) {
                    Some(arranged_accounts) => {
                        log::info!("   • Mint:   {}", arranged_accounts.mint);
                        log::info!(
                            "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━"
                        );
                    }
                    None => {
                        log::warn!(
                            "Failed to arrange accounts for Migrate instruction. Migrate instruction: signature: {signature}"
                        );
                    }
                }
            }
            _ => {
                // Ignore non-target instructions
                // 忽略非目标指令
            }
        }

        Ok(())
    }
}

让我们分解一下这个指令处理器:

  1. 结构和Trait 实现 - 我们定义了一个实现 Carbon Processor trait 的 PumpfunInstructionProcessor 结构,它负责处理解码的指令。 从 Carbon 文档中,Processor trait 提供了一个异步方法 process,它负责处理特定类型(InputType)的数据。 此 trait 旨在由需要在管道中处理数据的类型实现,允许自定义处理不同的数据类型。”
##[async_trait]
pub trait Processor {
    type InputType;

    async fn process(
        &mut self,
        data: Self::InputType,
        metrics: Arc<MetricsCollection>,
    ) -> CarbonResult<()>;
}
  1. 输入类型定义 - InputType 是一个包含三个元素的元组:
    • InstructionMetadata:包含交易上下文信息
    • DecodedInstruction<PumpfunInstruction>:解码的 Pump.fun 指令
    • NestedInstructions:任何嵌套的内部指令,如 CPI 调用
  2. Process 方法 - 这是实际处理逻辑发生的地方:
    • 我们提取交易签名和账户列表
    • 我们匹配不同的指令类型以适当地处理每种类型
  3. 指令处理程序

    • Sell Instruction - 仅包含此内容用于演示目的,以帮助说明我们的 account_include 过滤器是如何工作的。 我们的交易过滤器指定目标交易必须包括铸造或迁移授权账户——这两种账户都不存在于出售指令中。 因此,此代码路径永远不应执行。 日志消息“EXPECT TO NEVER SEE SELL INSTRUCTION”强化了这一点。 如果你想,你可以完全删除这段代码——或者修改过滤器以包含出售指令(如果你也想监控它们)。

    • Create Instruction - 当创建一个新的 Pump.fun 代币时:

      • 我们记录交易并格式化它以方便阅读
      • 我们从参数中提取代币详细信息,如 namesymbol
      • 我们使用 arrange_accounts 来获取对所涉及账户的结构化访问,以便我们可以记录 mint 地址
    • Migrate Instruction - 当发生代币迁移时,我们记录与 Create 类似的信息

    • 其他说明 - 任何其他 Pump.fun 指令都将被忽略

  4. 账户安排 - 对于 Create 和 Migrate 指令,我们都使用 arrange_accounts 辅助方法。 这个 Carbon 函数:
    • 获取原始账户列表
    • 将它们返回到一个结构化对象中,该对象可以轻松地按名称访问特定账户
    • 如果账户与预期模式不匹配,则返回 None

此处理器设计演示了 Carbon 的一个关键优势——以强类型的方式处理解码的指令数据,而不是处理原始字节。 PumpfunDecoder(由 carbon-pumpfun-decoder crate 提供)处理反序列化原始指令数据的复杂任务,允许我们的处理器专注于业务逻辑。 你可以使用 IDE 的智能感知功能来浏览 PumpfunInstruction 枚举并查看可以解码的所有可能的指令。

运行应用程序

要运行你的 Pump.fun 交易监控器:

  1. 确保你的 .env 文件设置了有效的 Yellowstone gRPC 凭据
  2. 构建应用程序:
cargo build
  1. 运行应用程序:
cargo run

你应该看到如下输出:

[INFO] Starting Pumpfun Transaction Processor Using Carbon
[INFO] Using GEYSER_URL: your-geyser-url
[INFO] X_TOKEN is set

当创建一个新的 Pump.fun 代币时,你将看到如下输出:

[INFO] 💊 NEW PUMPFUN TOKEN CREATED
[INFO] ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
[INFO] TxId: 5QrTR9...truncated...
[INFO] Token Details:
[INFO]    • Name:   Awesome Token
[INFO]    • Symbol: AWSM
[INFO]    • Mint:   AWsM4rKh...truncated...
[INFO] ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

干得好!

扩展你的监控器

以下是一些你可以扩展这个基本监控器的方法:

  1. 数据库集成:将交易数据存储在数据库中以进行分析
  2. 扩展过滤器:添加更多过滤器以定位其他指令类型
  3. 多程序监控:定义并 insert 其他交易过滤器以监控多个程序并利用多个解码器
  4. 自动化交易:利用 Metis Swap API with Pump.fun support (支持 Pump.fun 的 Metis Swap API)来构建和实施自动化交易策略

总结

在本指南中,我们使用 Carbon 和 Yellowstone gRPC 构建了一个 Pump.fun 交易监控器。 这种方法以最少的代码提供高性能的实时监控。 Carbon 框架抽象了 Solana 交易和账户数据的许多解析复杂性,使你能够专注于你的应用程序逻辑。

如果你有任何问题或需要有关 Solana 开发项目的帮助,请加入我们的 QuickNode Discord 或在 Twitter 上关注我们。

我们 ❤️ 反馈!

如果你对新主题有任何反馈或要求,请 告诉我们。 我们很乐意听到你的声音。

附加资源

  • 原文链接: quicknode.com/guides/sol...
  • 登链社区 AI 助手,为大家转译优秀英文文章,如有翻译不通的地方,还请包涵~
点赞 0
收藏 0
分享
本文参与登链社区写作激励计划 ,好文好收益,欢迎正在阅读的你也加入。

0 条评论

请先 登录 后评论
QuickNode
QuickNode
江湖只有他的大名,没有他的介绍。