使用 Yellowstone gRPC Geyser 插件(Rust)监控 Solana 程序

本文介绍了如何使用Rust和QuickNode的Yellowstone gRPC插件实时监控Solana区块链上的Raydium Launchpad交易。通过设置gRPC客户端、订阅Raydium Launchpad交易、解析交易数据以及检测和过滤特定指令类型,可以实现对Solana链上活动的超低延迟通知,并可扩展到其他Solana程序的监控。

概述

实时区块链数据对DeFi开发者、交易机器人操作员以及追踪链上活动的分析师至关重要。在本指南中,我们将编写一个Rust脚本,使用QuickNode的Yellowstone gRPC附加组件监控Raydium Launchpad交易(尽管它也能够监控任何程序)。与传统的轮询方法不同,我们基于流的方式提供了超低延迟的通知,方便你关注的事项(例如,新代币发行、AMM迁移、大额购买等)。

通过本指南的学习,你将拥有一个功能齐全的程序监控系统,能够:

  • 实时检测Raydium Launchpad交易
  • 解析交易数据以提取代币余额和账户交互
  • 处理内部指令和交易细节
  • 针对特定指令类型进行过滤(销售,AMM迁移)
  • 脚本可以轻松修改以监控你选择的任何Solana程序

先决条件

依赖项 版本
rustc 1.85.0
cargo 1.85.0
tokio 1.28.0
yellowstone-grpc-client 6.0.0
yellowstone-grpc-proto 6.0.0
futures 0.3
log 0.4
env_logger 0.11.8
bs58 0.5.0
tonic 0.12.3

什么是Raydium Launchpad?

Raydium Launchpad是Raydium在Solana上发布新代币的新平台。它为项目提供了一种结构化的方式来发布新代币、自动化AMM迁移以及从交易活动中赚取费用。

在跟踪Raydium Launchpad活动时,可能会有几种交易类型值得关注:

  • Initialize - 新代币销售设置
  • BuyExactIn/BuyExactOut - 代币购买交易
  • MigrateToAmm - 将代币迁移到AMM流动性池
  • CreateConfig/UpdateConfig - 平台或特定销售的配置更改
  • ClaimVestedToken - 代币在归属期后的分配给买家

为什么使用Yellowstone gRPC?

Yellowstone gRPC基于Solana的Geyser插件系统,提供了实时Solana数据访问的流接口。Yellowstone提供实时流传输的内容包括:

  • 账户更新
  • 交易
  • 记录
  • 区块通知
  • 插槽通知

与传统的WebSocket实现相比,Yellowstone的gRPC接口提供了更低的延迟和更高的稳定性。它还包括一元操作用于快速、一时的数据检索。gRPC的效率和类型安全使得Yellowstone特别适合用于基于云的服务和数据库更新。QuickNode通过我们的Yellowstone gRPC Marketplace附加组件支持Yellowstone。

设置项目

让我们开始创建一个新的Rust项目:

cargo init raydium-launchpad-tracker
cd raydium-launchpad-tracker

你现在应该有一个名为raydium-launchpad-tracker的新目录,里面包含基本的Rust项目结构。在这个目录中,你将找到一个src文件夹,其中包含一个main.rs文件。这就是我们将编写代码的地方。你还会有一个Cargo.toml文件,这是你Rust项目的配置文件。

接下来,使用所需的依赖项更新你的Cargo.toml文件:

[package]
name = "raydium-launchpad-tracker"
version = "0.1.0"
edition = "2021"

[dependencies]
tokio = { version = "1.28", features = ["rt-multi-thread", "macros"] }
yellowstone-grpc-client = "6.0.0"
yellowstone-grpc-proto = "6.0.0"
futures = "0.3"
log = "0.4"
env_logger = "0.11.8"
bs58 = "0.5.0"
tonic = "0.12.3"

构建Raydium Launchpad监控器

我们的应用程序将由几个组件组成:

  1. 客户端设置和连接到Yellowstone gRPC
  2. Raydium Launchpad交易的订阅配置
  3. 交易解析和处理
  4. 指令类型检测和过滤

让我们在src/main.rs文件中实现这些组件。

导入所需库

首先,我们需要导入必要的库和模块。在你的src/main.rs文件顶部添加以下代码:

use {
    bs58,
    futures::{sink::SinkExt, stream::StreamExt},
    log::{error, info, warn},
    std::{collections::HashMap, env, fmt},
    tokio,
    tonic::{Status, service::Interceptor, transport::ClientTlsConfig},
    yellowstone_grpc_client::GeyserGrpcClient,
    yellowstone_grpc_proto::{
        geyser::{SubscribeUpdate, SubscribeUpdatePing},
        prelude::{
            CommitmentLevel, SubscribeRequest, SubscribeRequestFilterTransactions,
            subscribe_update::UpdateOneof,
        },
    },
};

该代码导入了我们应用程序所需的库,包括gRPC客户端、日志记录和用于异步编程的futures。

定义常量和主函数

现在让我们定义一些重要的常量:

// 常量
const RUST_LOG_LEVEL: &str = "info";
const RAYDIUM_LAUNCHPAD_PROGRAM: &str = "LanMV9sAd7wArD4vJFi2qDdfnVhFxYSUg6eADduJ3uj";
const TARGET_IX_TYPES: &[RaydiumInstructionType] = &[\
    // 👇 取消注释以过滤特定指令类型\
    // RaydiumInstructionType::Initialize,\
    // RaydiumInstructionType::MigrateToAmm,\
];
// 用你的QuickNode Yellowstone gRPC端点替换
const ENDPOINT: &str = "https://your-quicknode-endpoint.grpc.solana-mainnet.quiknode.pro:10000";
const AUTH_TOKEN: &str = "your-auth-token"; // 👈 用你的Token替换

让我们阐明这些常量的作用:

  • RUST_LOG_LEVEL:设置应用程序的日志记录级别。
  • RAYDIUM_LAUNCHPAD_PROGRAM:Raydium Launchpad的程序ID。如果你想监控不同的程序,请用你选择的程序ID替换此项。
  • TARGET_IX_TYPES:用于过滤的指令类型数组。你可以通过将此数组留空来监控所有程序交易。你可以取消注释希望监控的特定指令类型(或添加其他指令)。我们稍后将定义RaydiumInstructionType枚举。 (注意:要监控其他程序,你需要用相应程序的指令类型更新此数组。)
  • ENDPOINT:你的QuickNode Yellowstone实例的gRPC端点。你可以在我们的文档中找到有关配置端点的信息,在这里
  • AUTH_TOKEN:你的QuickNode Yellowstone实例的身份验证Token。
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    setup_logging();
    info!(
        "开始监控账户:{}",
        RAYDIUM_LAUNCHPAD_PROGRAM
    );

    let mut client = setup_client().await?;
    info!("连接到gRPC端点");

    let (subscribe_tx, subscribe_rx) = client.subscribe().await?;

    send_subscription_request(subscribe_tx).await?;
    info!("订阅请求已发送。正在监听更新...");

    process_updates(subscribe_rx).await?;

    info!("流关闭");
    Ok(())
}

我们的main函数初始化日志记录系统,设置gRPC客户端,并发送订阅请求以监控指定程序的交易,使用yellowstone_grpc_client。然后它处理更新流。接下来,让我们定义每一个函数。

设置日志记录和客户端连接

我们将添加所需的日志记录和客户端设置函数。在src/main.rs文件中添加以下代码:

fn setup_logging() {
    unsafe {
        env::set_var("RUST_LOG", RUST_LOG_LEVEL);
    }
    env_logger::init();
}

这将设置日志记录系统以使用指定的日志级别。你可以将RUST_LOG_LEVEL更改为debugerror以调整日志的详细程度。

接下来,让我们添加一个函数来设置我们的gRPC客户端。添加以下代码:

async fn setup_client() -> Result<GeyserGrpcClient<impl Interceptor>, Box<dyn std::error::Error>> {
    info!("连接到gRPC端点:{}", ENDPOINT);

    // 使用TLS配置构建gRPC客户端
    let client = GeyserGrpcClient::build_from_shared(ENDPOINT.to_string())?
        .x_token(Some(AUTH_TOKEN.to_string()))?
        .tls_config(ClientTlsConfig::new().with_native_roots())?
        .connect()
        .await?;

    Ok(client)
}

我们使用GeyserGrpcClient连接到Yellowstone gRPC端点。x_token方法用于为连接设置身份验证Token。tls_config方法用于配置安全通信的TLS设置。最后,我们等待connect方法以建立连接。

设置交易订阅

接下来,让我们实现订阅请求函数。这将发送包含我们监控过滤器的订阅请求到Yellowstone gRPC服务器。添加以下代码:

/// 发送带有交易过滤器的订阅请求
async fn send_subscription_request<T>(mut tx: T) -> Result<(), Box<dyn std::error::Error>>
where
    T: SinkExt<SubscribeRequest> + Unpin,
    <T as futures::Sink<SubscribeRequest>>::Error: std::error::Error + 'static,
{
    // 使用目标账户创建账户过滤器
    let mut accounts_filter = HashMap::new();
    accounts_filter.insert(
        "account_monitor".to_string(),
        SubscribeRequestFilterTransactions {
            account_include: vec![],
            account_exclude: vec![],
            account_required: vec![\
                // 根据需要替换或添加监控的其他账户\
                RAYDIUM_LAUNCHPAD_PROGRAM.to_string(),\
            ],
            vote: Some(false),
            failed: Some(false),
            signature: None,
        },
    );

    // 发送订阅请求
    tx.send(SubscribeRequest {
        transactions: accounts_filter,
        commitment: Some(CommitmentLevel::Processed as i32),
        ..Default::default()
    })
    .await?;

    Ok(())
}

该函数创建包含指定过滤器的订阅请求。accounts_requiredaccount_includeaccount_exclude 字段可用于指定要包含或排除的账户。votefailed 字段可用于过滤投票交易或失败的交易。 我们使用CommitmentLevel::Processed尽可能快地获取交易,但你可以将其更改为ConfirmedFinalized以获得更高的最终性。

处理交易更新

现在,让我们添加处理更新流的函数:

async fn process_updates<S>(mut stream: S) -> Result<(), Box<dyn std::error::Error>>
where
    S: StreamExt<Item = Result<SubscribeUpdate, Status>> + Unpin,
{
    while let Some(message) = stream.next().await {
        match message {
            Ok(msg) => handle_message(msg)?,
            Err(e) => {
                error!("接收消息时出错:{:?}", e);
                break;
            }
        }
    }

    Ok(())
}

该函数处理来自Yellowstone gRPC服务器的更新流。它使用循环持续接收消息并调用handle_message函数以处理接收到的每条消息。

Raydium指令类型检测

让我们定义Raydium Launchpad指令类型的枚举,并定义每个指令的判别器。这将帮助我们识别接收到的交易中的特定指令类型。在你的文件中添加以下代码:

#[derive(Debug, Clone, PartialEq)]
pub enum RaydiumInstructionType {
    Initialize,
    BuyExactIn,
    BuyExactOut,
    SellExactIn,
    SellExactOut,
    ClaimPlatformFee,
    ClaimVestedToken,
    CollectFee,
    CollectMigrateFee,
    CreateConfig,
    CreatePlatformConfig,
    CreateVestingAccount,
    MigrateToAmm,
    MigrateToCpswap,
    UpdateConfig,
    UpdatePlatformConfig,
    Unknown([u8; 8]),
}

pub fn parse_raydium_instruction_type(data: &[u8]) -> RaydiumInstructionType {
    if data.len() < 8 {
        return RaydiumInstructionType::Unknown([0; 8]);
    }

    let mut discriminator = [0u8; 8];
    discriminator.copy_from_slice(&data[0..8]);

    match discriminator {
        [175, 175, 109, 31, 13, 152, 155, 237] => RaydiumInstructionType::Initialize,
        [250, 234, 13, 123, 213, 156, 19, 236] => RaydiumInstructionType::BuyExactIn,
        [24, 211, 116, 40, 105, 3, 153, 56] => RaydiumInstructionType::BuyExactOut,
        [149, 39, 222, 155, 211, 124, 152, 26] => RaydiumInstructionType::SellExactIn,
        [95, 200, 71, 34, 8, 9, 11, 166] => RaydiumInstructionType::SellExactOut,
        [156, 39, 208, 135, 76, 237, 61, 72] => RaydiumInstructionType::ClaimPlatformFee,
        [49, 33, 104, 30, 189, 157, 79, 35] => RaydiumInstructionType::ClaimVestedToken,
        [60, 173, 247, 103, 4, 93, 130, 48] => RaydiumInstructionType::CollectFee,
        [255, 186, 150, 223, 235, 118, 201, 186] => RaydiumInstructionType::CollectMigrateFee,
        [201, 207, 243, 114, 75, 111, 47, 189] => RaydiumInstructionType::CreateConfig,
        [176, 90, 196, 175, 253, 113, 220, 20] => RaydiumInstructionType::CreatePlatformConfig,
        [129, 178, 2, 13, 217, 172, 230, 218] => RaydiumInstructionType::CreateVestingAccount,
        [207, 82, 192, 145, 254, 207, 145, 223] => RaydiumInstructionType::MigrateToAmm,
        [136, 92, 200, 103, 28, 218, 144, 140] => RaydiumInstructionType::MigrateToCpswap,
        [29, 158, 252, 191, 10, 83, 219, 99] => RaydiumInstructionType::UpdateConfig,
        [195, 60, 76, 129, 146, 45, 67, 143] => RaydiumInstructionType::UpdatePlatformConfig,
        _ => RaydiumInstructionType::Unknown(discriminator),
    }
}

impl std::fmt::Display for RaydiumInstructionType {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        match self {
            RaydiumInstructionType::Initialize => write!(f, "Initialize"),
            RaydiumInstructionType::BuyExactIn => write!(f, "BuyExactIn"),
            RaydiumInstructionType::BuyExactOut => write!(f, "BuyExactOut"),
            RaydiumInstructionType::SellExactIn => write!(f, "SellExactIn"),
            RaydiumInstructionType::SellExactOut => write!(f, "SellExactOut"),
            RaydiumInstructionType::ClaimPlatformFee => write!(f, "ClaimPlatformFee"),
            RaydiumInstructionType::ClaimVestedToken => write!(f, "ClaimVestedToken"),
            RaydiumInstructionType::CollectFee => write!(f, "CollectFee"),
            RaydiumInstructionType::CollectMigrateFee => write!(f, "CollectMigrateFee"),
            RaydiumInstructionType::CreateConfig => write!(f, "CreateConfig"),
            RaydiumInstructionType::CreatePlatformConfig => write!(f, "CreatePlatformConfig"),
            RaydiumInstructionType::CreateVestingAccount => write!(f, "CreateVestingAccount"),
            RaydiumInstructionType::MigrateToAmm => write!(f, "MigrateToAmm"),
            RaydiumInstructionType::MigrateToCpswap => write!(f, "MigrateToCpswap"),
            RaydiumInstructionType::UpdateConfig => write!(f, "UpdateConfig"),
            RaydiumInstructionType::UpdatePlatformConfig => write!(f, "UpdatePlatformConfig"),
            RaydiumInstructionType::Unknown(discriminator) => {
                write!(f, "Unknown(discriminator={:?})", discriminator)
            }
        }
    }
}

我们通过程序的IDL获取指令及其判别器。我们使用指令数据的前8个字节作为判别器来识别指令类型。parse_raydium_instruction_type函数接受一个字节切片并返回相应的RaydiumInstructionType。我们还为该枚举添加了Display实现,以便更容易地打印指令类型。

交易解析结构

让我们定义处理交易解析所需的结构。这可能看起来有点复杂,因为我们将解析很多数据,并且为了这次演示——我们将解析所有交易数据——你可以根据需要限制解析的内容。具体来说,让我们定义ParsedTransactionParsedInstructionParsedInnerInstructionParsedTokenBalance结构。添加以下代码:

#[derive(Debug, Default)]
struct ParsedTransaction {
    signature: String,
    is_vote: bool,
    account_keys: Vec<String>,
    recent_blockhash: String,
    instructions: Vec<ParsedInstruction>,
    success: bool,
    fee: u64,
    pre_token_balances: Vec<ParsedTokenBalance>,
    post_token_balances: Vec<ParsedTokenBalance>,
    logs: Vec<String>,
    inner_instructions: Vec<ParsedInnerInstruction>,
    slot: u64,
}

impl fmt::Display for ParsedTransaction {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        writeln!(f, "交易:{}", self.signature)?;
        writeln!(
            f,
            "状态:{}",
            if self.success { "成功" } else { "失败" }
        )?;
        writeln!(f, "插槽:{}", self.slot)?;
        writeln!(f, "费用:{} lamports", self.fee)?;

        writeln!(f, "\n账户密钥:")?;
        for (i, key) in self.account_keys.iter().enumerate() {
            writeln!(f, "  [{}] {}", i, key)?;
        }

        writeln!(f, "\n指令:")?;
        for (i, ix) in self.instructions.iter().enumerate() {
            writeln!(f, "  指令 {}:", i)?;
            writeln!(
                f,
                "    程序:{} (索引:{})",
                ix.program_id, ix.program_id_index
            )?;
            writeln!(f, "    账户:")?;
            for (idx, acc) in &ix.accounts {
                writeln!(f, "      [{}] {}", idx, acc)?;
            }
            writeln!(f, "    数据:{} 字节", ix.data.len())?;
        }

        if !self.inner_instructions.is_empty() {
            writeln!(f, "\n内部指令:")?;
            for inner_ix in &self.inner_instructions {
                writeln!(f, "  指令索引:{}", inner_ix.instruction_index)?;
                for (i, ix) in inner_ix.instructions.iter().enumerate() {
                    writeln!(f, "    内部指令 {}:", i)?;
                    writeln!(
                        f,
                        "      程序:{} (索引:{})",
                        ix.program_id, ix.program_id_index
                    )?;
                    writeln!(f, "      账户:")?;
                    for (idx, acc) in &ix.accounts {
                        writeln!(f, "        [{}] {}", idx, acc)?;
                    }
                    writeln!(f, "      数据:{} 字节", ix.data.len())?;
                }
            }
        }

        if !self.pre_token_balances.is_empty() || !self.post_token_balances.is_empty() {
            writeln!(f, "\n代币余额:")?;

            let mut balance_changes = HashMap::new();

            for balance in &self.pre_token_balances {
                let key = (balance.account_index, balance.mint.clone());
                balance_changes.insert(key, (balance.amount.clone(), "".to_string()));
            }

            for balance in &self.post_token_balances {
                let key = (balance.account_index, balance.mint.clone());

                if let Some((_, post)) = balance_changes.get_mut(&key) {
                    *post = balance.amount.clone();
                } else {
                    balance_changes.insert(key, ("".to_string(), balance.amount.clone()));
                }
            }

            for ((account_idx, mint), (pre_amount, post_amount)) in balance_changes {
                let account_key = if (account_idx as usize) < self.account_keys.len() {
                    &self.account_keys[account_idx as usize]
                } else {
                    "未知"
                };

                if pre_amount.is_empty() {
                    writeln!(
                        f,
                        "  账户 {} ({}): 新余额 {} (铸造:{})",
                        account_idx, account_key, post_amount, mint
                    )?;
                } else if post_amount.is_empty() {
                    writeln!(
                        f,
                        "  账户 {} ({}): 移除余额 {} (铸造:{})",
                        account_idx, account_key, pre_amount, mint
                    )?;
                } else {
                    writeln!(
                        f,
                        "  账户 {} ({}): {} → {} (铸造:{})",
                        account_idx, account_key, pre_amount, post_amount, mint
                    )?;
                }
            }
        }

        if !self.logs.is_empty() {
            writeln!(f, "\n交易日志:")?;
            for (i, log) in self.logs.iter().enumerate() {
                writeln!(f, "  [{}] {}", i, log)?;
            }
        }

        Ok(())
    }
}

#[derive(Debug)]
struct ParsedInstruction {
    program_id: String,
    program_id_index: u8,
    accounts: Vec<(usize, String)>, // (索引, pubkey)
    data: Vec<u8>,
}

#[derive(Debug)]
struct ParsedInnerInstruction {
    instruction_index: u8,
    instructions: Vec<ParsedInstruction>,
}

#[derive(Debug)]
struct ParsedTokenBalance {
    account_index: u32,
    mint: String,
    owner: String,
    amount: String,
}

除了定义我们的结构外,我们还为ParsedTransaction实现了Display特性,以方便打印交易细节。除了记录交易的每个元素外,它还处理对向量和映射的迭代,以可读的格式显示账户密钥、指令、内部指令、代币余额和日志。

交易解析器实现

现在,让我们添加交易解析器逻辑,该逻辑将解析交易数据并填充我们的ParsedTransaction结构。该函数将负责从接收到的交易更新中提取相关信息。添加以下代码:

struct TransactionParser;

impl TransactionParser {
    pub fn parse_transaction(
        tx_update: &yellowstone_grpc_proto::geyser::SubscribeUpdateTransaction,
    ) -> Result<ParsedTransaction, Box<dyn std::error::Error>> {
        let mut parsed_tx = ParsedTransaction::default();
        parsed_tx.slot = tx_update.slot;

        if let Some(tx_info) = &tx_update.transaction {
            parsed_tx.is_vote = tx_info.is_vote;

            parsed_tx.signature = bs58::encode(&tx_info.signature).into_string();

            if let Some(tx) = &tx_info.transaction {
                if let Some(msg) = &tx.message {
                    for key in &msg.account_keys {
                        parsed_tx.account_keys.push(bs58::encode(key).into_string());
                    }

                    if let Some(meta) = &tx_info.meta {
                        for addr in &meta.loaded_writable_addresses {
                            let base58_addr = bs58::encode(addr).into_string();
                            parsed_tx.account_keys.push(base58_addr);
                        }

                        for addr in &meta.loaded_readonly_addresses {
                            let base58_addr = bs58::encode(addr).into_string();
                            parsed_tx.account_keys.push(base58_addr);
                        }
                    }

                    parsed_tx.recent_blockhash = bs58::encode(&msg.recent_blockhash).into_string();

                    for ix in &msg.instructions {
                        let program_id_index = ix.program_id_index;
                        let program_id =
                            if (program_id_index as usize) < parsed_tx.account_keys.len() {
                                parsed_tx.account_keys[program_id_index as usize].clone()
                            } else {
                                "未知".to_string()
                            };

                        let mut accounts = Vec::new();
                        for &acc_idx in &ix.accounts {
                            let account_idx = acc_idx as usize;
                            if account_idx < parsed_tx.account_keys.len() {
                                accounts.push((
                                    account_idx,
                                    parsed_tx.account_keys[account_idx].clone(),
                                ));
                            }
                        }

                        parsed_tx.instructions.push(ParsedInstruction {
                            program_id,
                            program_id_index: program_id_index as u8,
                            accounts,
                            data: ix.data.clone(),
                        });
                    }
                }
            }

            if let Some(meta) = &tx_info.meta {
                parsed_tx.success = meta.err.is_none();
                parsed_tx.fee = meta.fee;

                for balance in &meta.pre_token_balances {
                    if let Some(amount) = &balance.ui_token_amount {
                        parsed_tx.pre_token_balances.push(ParsedTokenBalance {
                            account_index: balance.account_index,
                            mint: balance.mint.clone(),
                            owner: balance.owner.clone(),
                            amount: amount.ui_amount_string.clone(),
                        });
                    }
                }

                for balance in &meta.post_token_balances {
                    if let Some(amount) = &balance.ui_token_amount {
                        parsed_tx.post_token_balances.push(ParsedTokenBalance {
                            account_index: balance.account_index,
                            mint: balance.mint.clone(),
                            owner: balance.owner.clone(),
                            amount: amount.ui_amount_string.clone(),
                        });
                    }
                }

                for inner_ix in &meta.inner_instructions {
                    let mut parsed_inner_ixs = Vec::new();

                    for ix in &inner_ix.instructions {
                        let program_id_index = ix.program_id_index;

                        let program_id =
                            if (program_id_index as usize) < parsed_tx.account_keys.len() {
                                parsed_tx.account_keys[program_id_index as usize].clone()
                            } else {
                                "未知".to_string()
                            };

                        let mut accounts = Vec::new();
                        for &acc_idx in &ix.accounts {
                            let account_idx = acc_idx as usize;
                            if account_idx < parsed_tx.account_keys.len() {
                                accounts.push((
                                    account_idx,
                                    parsed_tx.account_keys[account_idx].clone(),
                                ));
                            }
                        }

                        parsed_inner_ixs.push(ParsedInstruction {
                            program_id,
                            program_id_index: program_id_index as u8,
                            accounts,
                            data: ix.data.clone(),
                        });
                    }

                    parsed_tx.inner_instructions.push(ParsedInnerInstruction {
                        instruction_index: inner_ix.index as u8,
                        instructions: parsed_inner_ixs,
                    });
                }

                parsed_tx.logs = meta.log_messages.clone();
            }
        }

        Ok(parsed_tx)
    }
}

这里的内容很多,因为交易数据相当复杂。parse_transaction函数接受一个SubscribeUpdateTransaction对象,并将相关信息提取到我们的ParsedTransaction结构中。它处理解析交易签名、账户密钥、最近的区块哈希、指令、内部指令、代币余额和日志。

消息处理器实现

最后,让我们实现消息处理函数,该函数在process_updates中调用。此函数将处理来自Yellowstone gRPC流的传入消息并根据需要处理它们。添加以下代码:

fn handle_message(msg: SubscribeUpdate) -> Result<(), Box<dyn std::error::Error>> {
    match msg.update_oneof {
        Some(UpdateOneof::Transaction(transaction_update)) => {
            match TransactionParser::parse_transaction(&transaction_update) {
                Ok(parsed_tx) => {
                    let mut has_raydium_ix = false;
                    let mut found_target_ix = false;
                    let mut found_ix_types = Vec::new();

                    if TARGET_IX_TYPES.is_empty() {
                        found_target_ix = true;
                    }

                    for (i, ix) in parsed_tx.instructions.iter().enumerate() {
                        if ix.program_id == RAYDIUM_LAUNCHPAD_PROGRAM {
                            has_raydium_ix = true;
                            let raydium_ix_type = parse_raydium_instruction_type(&ix.data);
                            found_ix_types.push(raydium_ix_type.clone());

                            if TARGET_IX_TYPES.contains(&raydium_ix_type) {
                                found_target_ix = true;
                            }
                            if found_target_ix {
                                info!(
                                    "找到目标指令:{} 在索引 {}",
                                    raydium_ix_type, i
                                );
                            }
                        }
                    }

                    for inner_ix_group in &parsed_tx.inner_instructions {
                        for (i, inner_ix) in inner_ix_group.instructions.iter().enumerate() {
                            if inner_ix.program_id == RAYDIUM_LAUNCHPAD_PROGRAM {
                                has_raydium_ix = true;
                                let raydium_ix_type =
                                    parse_raydium_instruction_type(&inner_ix.data);
                                found_ix_types.push(raydium_ix_type.clone());

                                if TARGET_IX_TYPES.contains(&raydium_ix_type) {
                                    found_target_ix = true;
                                }
                                if found_target_ix && !matches!(raydium_ix_type, RaydiumInstructionType::Unknown(_)) {
                                    info!(
                                        "找到目标指令:{} 在内部索引 {}.{}",
                                        raydium_ix_type, inner_ix_group.instruction_index, i
                                    );
                                }
                            }
                        }
                    }

                    if found_target_ix && has_raydium_ix {
                        info!("发现Raydium Launchpad交易!");
                        info!("解析的交易:\n{}", parsed_tx);
                    }
                }
                Err(e) => {
                    error!("解析交易失败:{:?}", e);
                }
            }
        }
        Some(UpdateOneof::Ping(SubscribeUpdatePing {})) => {
            // 忽略ping
        }
        Some(other) => {
            info!("收到意外更新。更新类型:{:?}", other);
        }
        None => {
            warn!("收到空更新");
        }
    }

    Ok(())
}

这里有很多事情发生,让我们分解一下:

  • handle_message函数接受一个SubscribeUpdate消息并检查它是否包含交易更新。
  • 如果包含,它调用TransactionParser::parse_transaction函数来解析交易数据。
  • 它检查交易(包括内部指令)是否包含Raydium Launchpad指令,以及它们是否与在TARGET_IX_TYPES中指定的目标指令类型匹配。
  • 如果找到目标指令,它记录交易和指令的详细信息。
  • 它还以优雅的方式处理ping和意外更新。

运行应用程序

要运行该应用程序,你需要:

  1. 用你的QuickNode Yellowstone gRPC端点信息更新ENDPOINTAUTH_TOKEN常量
  2. 根据需要自定义TARGET_IX_TYPES数组以过滤特定的指令类型
  3. 构建并运行应用程序
cargo build

然后运行应用程序:

cargo run

你应该开始看到日志,显示应用程序正在连接到Yellowstone gRPC端点并处理交易。它应该看起来像这样:

[2025-04-17T23:58:26Z INFO  raydium_launchpad_tracker] 发现Raydium Launchpad交易!
[2025-04-17T23:58:26Z INFO  raydium_launchpad_tracker] 找到目标指令:BuyExactIn 在索引 5
[2025-04-17T23:58:26Z INFO  raydium_launchpad_tracker] 解析的交易:
    交易:4s5xG2Gf25aBxVr8ahdTvChT1ioWzNTqbCNFkchfHokHUxzyGDKTzFadsn9xAsgBgqr4EWT87NZKRncbq1ZGYvkG
    状态:成功
    插槽:334162911
    费用:53060 lamports

    账户密钥:
      [0] 2VSCxjXzykbjVBKrBa3we4yjxWtKAUkd8hAgNwpPkKP7
      ## ...
      [17] SysvarRent111111111111111111111111111111111

    指令:
      指令 0:
        程序:ComputeBudget111111111111111111111111111111 (索引:6)
        账户:
        数据:9字节
      指令 1:
        程序:ComputeBudget111111111111111111111111111111 (索引:6)
        账户:
        数据:5字节
      指令 2:
        程序:ATokenGPvbdGVxr1b2hvZbsiqW5xWH25efTNsLJA8knL (索引:7)
        账户:
          [0] 2VSCxjXzykbjVBKrBa3we4yjxWtKAUkd8hAgNwpPkKP7
          [1] ALuscPsdi4rS2u47hSZ8vxmGDWb9csm49Sg63FSCW1gj
      ## ...

    代币余额:
      账户 5 (7W1...Yujxc): 22.384714494 → 23.184714494 (铸造:So1...112)
      ## ...

    交易日志:
      [0] 程序ComputeBudget111111111111111111111111111111调用[1]
      ## ...
      [49] 程序TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA成功

该日志旨在提供综合信息——你可以自定义ParsedTransaction结构和handle_message函数,只记录你感兴趣的信息。

自定义交易过滤

该应用程序可以自定义以关注特定的Raydium Launchpad指令类型。要实现此目的,请修改TARGET_IX_TYPES数组:

const TARGET_IX_TYPES: &[RaydiumInstructionType] = &[\
    RaydiumInstructionType::Initialize,      // 跟踪新代币销售\
    RaydiumInstructionType::MigrateToAmm,    // 跟踪迁移到AMM\
];

如果你将数组留空,应用程序将监控所有Raydium Launchpad指令。

额外增强

以下是你可以增强此应用程序的一些方法:

  1. 将数据存储到数据库中 - 连接到数据库以存储交易信息以供以后分析
  2. 创建警报 - 为特定交易类型或阈值设置通知
  3. 添加Web界面 - 创建仪表板以可视化实时交易数据
  4. 跟踪特定代币 - 过滤与特定代币铸币相关的交易
  5. 分析价格影响 - 计算代币销售和迁移的价格影响

故障排除

以下是你可能遇到的常见问题的解决方案:

  1. 连接问题 - 确保你的端点URL和身份验证Token是正确的(请查看我们的Yellowstone文档以获取更多信息)
  2. 高消息量 - 考虑添加更具体的服务器端过滤器,以减少处理的交易数量—— 请注意,我们的脚本包括服务器和客户端过滤。在我们的示例中,这意味着更新send_subscription_request中的accounts_filter以包括特定账户或交易类型。如果你的目标指令包括一个唯一账户,你可以将其添加到account_required字段。例如,如果你想跟踪Initialize指令,可以添加Metaplex Token Metadata程序账户,该账户在初始化新代币时是必需的,但在交换时不是必需的( metaqbxxUerdq28cj1RbAWkYQm3ybzjb6a8bt518x1s)。
  3. 错过的交易 - 检查你的承诺级别(目前设置为Processed);你可能希望使用ConfirmedFinalized以获得更可靠的结果。
  4. 流或账户限制 - 对于需要多个流或更大过滤数组的应用程序,请查看我们的Velocity Tier联系企业专家以获取定制解决方案。

总结

在本指南中,我们构建了一个Rust脚本,利用QuickNode的Yellowstone gRPC附加组件实时监控Raydium Launchpad交易。这种方法相比传统WebSocket具有显著优势,包括更低的延迟和更可靠的交易检测。

通过了解和监控Raydium Launchpad交易,你可以获得有关代币发行、销售活动和平台配置的宝贵见解。更重要的是,你现在拥有构建自己自定义监控解决方案的工具,适用于其他Solana程序!

我们 ❤️ 反馈!

告诉我们如果你有任何反馈或对新主题的请求。我们很想听到你的意见。

额外资源

  • 原文链接: quicknode.com/guides/sol...
  • 登链社区 AI 助手,为大家转译优秀英文文章,如有翻译不通的地方,还请包涵~
点赞 0
收藏 0
分享
本文参与登链社区写作激励计划 ,好文好收益,欢迎正在阅读的你也加入。

0 条评论

请先 登录 后评论
QuickNode
QuickNode
江湖只有他的大名,没有他的介绍。