100 小时构建三明治机器人

100 小时构建三明治机器人

Fotor AI:“吃三明治的吉娃娃”经过 16 次迭代

今天,我们将一起构建一个三明治机器人。 🥪

是的,我知道这听起来可能有些奇怪。我理解你可能在想:

又来了。网上已经有很多资源可以学习这个了。为什么还要从你这样的人的身上再来一篇无聊的三明治机器人教程?

好问题。 🤔

老实说,实际上网上并没有那么多好的资源。而且那些资源,要么是几个月/几年前的,要么在使用案例上有限,所以不妨稍微更新一下代码。

我花了超过 100 小时来完善这个设置。无论你是刚刚接触这个生态系统,还是深入其中,都可以把这当作你的指南。

如果你还没有尝试从零开始构建一个 MEV 机器人,现在是你的机会。快速提醒一下,这个项目不会从第一天就开始盈利。如果可以的话,它就不会像这样公开分享。但它会非常接近真实的情况,随着我们逐步添加更多功能,收益会开始流入。我们将在下个月对代码进行微调,以实现最佳性能 (然后再深入研究狙击和支持Solana)。

这是最终结果的预览:

我分享了我的 Github 仓库链接,供喜欢立即深入代码的人使用:

GitHub - solidquant/sandooo

本系列将分为两个部分:

  1. (第 1 部分):识别
  2. (第 2 部分):执行

在今天的文章中,我们将主要关注识别过程,而在下一篇文章中深入探讨如何处理执行部分。

目录:

  1. 三明治机器人 101
  2. 使用 Solidity/Yul 的三明治智能合约
  3. 使用 REVM 的三明治模拟引擎

引言

我们大多数人可能都熟悉由libevm编写的 JS 三明治机器人:

GitHub - libevm/subway: 如何在以太坊上执行三明治攻击的实用示例

我仍然记得我第一次接触 MEV,所有的一切都始于这个老派的仓库——游戏中的真正经典。尽管这个项目已经有些年头,但从中可以学到很多东西。对于任何初学 MEV 的人,确保你逐行分析每一行代码。

然而,当然存在一些局限性。并不是因为这个项目不出色,而是在过去三年中,MEV 市场发生了严重的演变

我们在这里挖掘这些变化,看看如今在 MEV 领域获利到底需要什么。如果你和我一样兴奋,那就让我们继续这段旅程。🙌

👾 加入我们的 Discord 团队,成千上万的人每天都在讨论 MEV 相关的内容。单独研究可能会有些孤独,所以快来打个招呼 🏄🏄。看看其他人是如何应对这个领域的:

加入 Solid Quant Discord 服务器!

在每个 MEV 策略中,你必须采取两个步骤:识别执行。在今天的文章中,我们将重点关注识别三明治机会。在下一篇文章中,我们将向区块构建者发送真实订单,并尝试与其他三明治机器人竞争。

如果你还不太熟悉区块构建者,可以看看我之前的文章:

我不在乎我是否被夹在中间,更大的事情正在到来 介绍了 MEV 的内部运作以及行业如何转变

三明治机器人 101

我们将首先设置我们的项目。

✋✋ 请注意,使用全节点运行生产代码将为你提供最佳性能。MEV 策略通常非常依赖网络,因此最好消除任何相关的延迟,而最简单的方法就是运行一个全节点。我个人使用 Geth + Lighthouse。

我还使用Rust 🦀进行整个项目,如果你还不熟悉 Rust,也不用担心,因为这里的概念即使你不懂也能理解。但建议你在深入之前先学习 Rust。

项目设置

首先,在你的本地机器上创建一个新的 Rust 项目,方法是:

cargo new sandooo

这将在名为sandooo的新目录中创建一个 Rust 模板。

使用你选择的 IDE 打开该目录,并将以下内容复制并粘贴到Cargo.toml文件中 (sandooo/Cargo.toml)

sandooo/Cargo.toml at main · solidquant/sandooo 一个三明治机器人。通过在 GitHub 上创建帐户来为 solidquant/sandooo 的开发做出贡献。


[package]  
name = "sandooo"  
version = "0.1.0"  
edition = "2021"  

[dependencies]  
dotenv = "0.15.0"  
anyhow = "1.0.70"  
itertools = "0.11.0"  
serde = "1.0.188"  
serde_json = "1.0.107"  
bounded-vec-deque = "0.1.1"  

# Telegram  
teloxide = { version = "0.12", features = ["macros"] }  

futures = "0.3.5"  
futures-util = "*"  
tokio = { version = "1.29.0", features = ["full"] }  
tokio-stream = { version = "0.1", features = ['sync'] }  
tokio-tungstenite = "*"  
async-trait = "0.1.74"  

ethers-core = "2.0"  
ethers-providers = "2.0"  
ethers-contract = "2.0"  
ethers = { version = "2.0", features = ["abigen", "ws", "ipc"] }  

ethers-flashbots = { git = "https://github.com/onbjerg/ethers-flashbots" }  

eth-encode-packed = "0.1.0"  
rlp = { version = "0.5", features = ["derive"] }  

foundry-evm-mini = { git = "https://github.com/solidquant/foundry-evm-mini.git" }  

revm = { version = "3", default-features = false, features = [  
  "std",  
  "serde",  
  "memory_limit",  
  "optional_eip3607",  
  "optional_block_gas_limit",  
  "optional_no_base_fee",  
] }  

csv = "1.2.2"  
colored = "2.0.0"  
log = "0.4.17"  
fern = { version = "0.6.2", features = ["colored"] }  
chrono = "0.4.23"  
indicatif = "0.17.5"  

[patch.crates-io]  
revm = { git = "https://github.com/bluealloy/revm/", rev = "80c909d6f242886cb26e6103a01d1a4bf9468426" }  

[profile.release]  
codegen-units = 1  
lto = "fat"

完成后,在src目录中创建一个新目录,并命名为common。然后创建一个新文件,命名为:constants.rs (sandooo/src/common/constants.rs)

sandooo/src/common/constants.rs

pub static PROJECT_NAME: &str = "sandooo";  

// 加载环境变量为字符串值的函数  
pub fn get_env(key: &str) -> String {  
    std::env::var(key).unwrap_or(String::from(""))  
}  

#[derive(Debug, Clone)]  
pub struct Env {  
    pub https_url: String,  
    pub wss_url: String,  
    pub bot_address: String,  
    pub private_key: String,  
    pub identity_key: String,  
    pub telegram_token: String,  
    pub telegram_chat_id: String,  
    pub use_alert: bool,  
    pub debug: bool,  
}  

// 创建新的 Env 结构将自动加载环境变量  
impl Env {  
    pub fn new() -> Self {  
        Env {  
            https_url: get_env("HTTPS_URL"),  
            wss_url: get_env("WSS_URL"),  
            bot_address: get_env("BOT_ADDRESS"),  
            private_key: get_env("PRIVATE_KEY"),  
            identity_key: get_env("IDENTITY_KEY"),  
            telegram_token: get_env("TELEGRAM_TOKEN"),  
            telegram_chat_id: get_env("TELEGRAM_CHAT_ID"),  
            use_alert: get_env("USE_ALERT").parse::<bool>().unwrap(),  
            debug: get_env("DEBUG").parse::<bool>().unwrap(),  
        }  
    }  
}  

pub static COINBASE: &str = "0xDAFEA492D9c6733ae3d56b7Ed1ADB60692c98Bc5"; // Flashbots Builder  

pub static WETH: &str = "0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2";  
pub static WETH_BALANCE_SLOT: i32 = 3;  
pub static WETH_DECIMALS: u8 = 18;

这使我们能够在启动时加载环境变量。自然,下一步是在根目录中创建一个 .env 文件,即 sandooo 目录 (sandooo/.env)

sandooo/.env.example

HTTPS_URL=http://localhost:8545  
WSS_URL=ws://localhost:8546  
BOT_ADDRESS="..."  
PRIVATE_KEY="..."  
IDENTITY_KEY="..."  
TELEGRAM_TOKEN="..."  
TELEGRAM_CHAT_ID="..."  
USE_ALERT=false  
DEBUG=true  

RUST_BACKTRACE=1
  • PRIVATE_KEY: 如果你打算使用真实钱包运行三明治机器人,这就是你的实际私钥
  • IDENTITY_KEY: 这可以设置为你选择的任何私钥。接收身份密钥的构建者将使用它根据搜索者声誉优先处理某些捆绑包。你可以从这里了解更多关于搜索者声誉的信息: https://docs.flashbots.net/flashbots-auction/advanced/reputation
  • TELEGRAM_TOKEN / TELEGRAM_CHAT_ID: 如果你不想使用 Telegram 警报,可以将这些字段留空。
  • DEBUG: 我们稍后将使用此标志来支持开发/生产模式。如果 DEBUG 设置为 true,我们将只运行模拟,而不发送任何真实的捆绑包。

我们还希望美化我们的控制台日志,因此我们在 src/common 目录中添加一个 utils.rs 文件 (sandooo/src/common/utils.rs)

sandooo/src/common/utils.rs

use anyhow::Result;  
use ethers::core::rand::thread_rng;  
use ethers::prelude::*;  
use ethers::{  
    self,  
    types::{  
        transaction::eip2930::{AccessList, AccessListItem},  
        U256,  
    },  
};  
use fern::colors::{Color, ColoredLevelConfig};  
use foundry_evm_mini::evm::utils::{b160_to_h160, h160_to_b160, ru256_to_u256, u256_to_ru256};  
use log::LevelFilter;  
use rand::Rng;  
use revm::primitives::{B160, U256 as rU256};  
use std::str::FromStr;  
use std::sync::Arc;  

use crate::common::constants::{PROJECT_NAME, WETH};  

// 格式化我们的控制台日志的函数  
pub fn setup_logger() -> Result<()> {  
    let colors = ColoredLevelConfig {  
        trace: Color::Cyan,  
        debug: Color::Magenta,  
        info: Color::Green,  
        warn: Color::Red,  
        error: Color::BrightRed,  
        ..ColoredLevelConfig::new()  
    };  

    fern::Dispatch::new()  
        .format(move |out, message, record| {  
            out.finish(format_args!(  
                "{}[{}] {}",  
                chrono::Local::now().format("[%H:%M:%S]"),  
                colors.color(record.level()),  
                message  
            ))  
        })  
        .chain(std::io::stdout())  
        .level(log::LevelFilter::Error)  
        .level_for(PROJECT_NAME, LevelFilter::Info)  
        .apply()?;  

    Ok(())  
}  

// 计算下一个区块的基础费用,给定上一个区块的 gas 使用量/限制  
// 参考: https://www.blocknative.com/blog/eip-1559-fees  
pub fn calculate_next_block_base_fee(  
    gas_used: U256,  
    gas_limit: U256,  
    base_fee_per_gas: U256,  
) -> U256 {  
    let gas_used = gas_used;  

    let mut target_gas_used = gas_limit / 2;  
    target_gas_used = if target_gas_used == U256::zero() {  
        U256::one()  
    } else {  
        target_gas_used  
    };  

    let new_base_fee = {  
        if gas_used > target_gas_used {  
            base_fee_per_gas  
                + ((base_fee_per_gas * (gas_used - target_gas_used)) / target_gas_used)  
                    / U256::from(8u64)  
        } else {  
            base_fee_per_gas  
                - ((base_fee_per_gas * (target_gas_used - gas_used)) / target_gas_used)  
                    / U256::from(8u64)  
        }  
    };  

    let seed = rand::thread_rng().gen_range(0..9);  
    new_base_fee + seed  
}  

pub fn access_list_to_ethers(access_list: Vec<(B160, Vec<rU256>)>) -> AccessList {  
    AccessList::from(  
        access_list  
            .into_iter()  
            .map(|(address, slots)| AccessListItem {  
                address: b160_to_h160(address),  
                storage_keys: slots  
                    .into_iter()  
                    .map(|y| H256::from_uint(&ru256_to_u256(y)))  
                    .collect(),  
            })  
            .collect::<Vec<AccessListItem>>(),  
    )  
}  

pub fn access_list_to_revm(access_list: AccessList) -> Vec<(B160, Vec<rU256>)> {  
    access_list  
        .0  
        .into_iter()  
        .map(|x| {  
            (  
                h160_to_b160(x.address),  
                x.storage_keys  
                    .into_iter()  
                    .map(|y| u256_to_ru256(y.0.into()))  
                    .collect(),  
            )  
        })  
        .collect()  
}  

abigen!(  
    IERC20,  
    r#"[  
        function balanceOf(address) external view returns (uint256)  
    ]"#,  
);  

// 实用函数  

pub async fn get_token_balance(  
    provider: Arc<Provider<Ws>>,  
    owner: H160,  
    token: H160,  
) -> Result<U256> {  
    let contract = IERC20::new(token, provider);  
    let token_balance = contract.balance_of(owner).call().await?;  
    Ok(token_balance)  
}  

```rust
pub fn create_new_wallet() -> (LocalWallet, H160) {  
    let wallet = LocalWallet::new(&mut thread_rng());  
    let address = wallet.address();  
    (wallet, address)  
}  

pub fn to_h160(str_address: &'static str) -> H160 {  
    H160::from_str(str_address).unwrap()  
}  

pub fn is_weth(token_address: H160) -> bool {  
    token_address == to_h160(WETH)  
}

setup_logger 函数将负责格式化我们的日志,我们还添加了一些额外的函数以便在整个项目中使用。我们将在它们出现时看到它们的用法。

我们快完成了。我们只需处理导入我们的新文件和函数,以便它们可以在我们的项目中使用。

为此:

  1. 创建 sandooo/src/lib.rs
pub mod common;
  1. 创建 sandooo/src/common/mod.rs
pub mod constants;  
pub mod utils;

项目设置已完成。现在我们可以进入三明治机器人更有趣的方面。

Mempool 流

例如,假设有人将 Uniswap 订单交易发送到公共内存池。任何直接发送到区块链的交易通常会进入公共内存池。

任何人都可以通过与节点提供者建立 websocket 连接来访问这些数据。为此,我们将在 sandooo/src/common/streams.rs 中创建一个新文件:

sandooo/src/common/streams.rs


use ethers::{  
    providers::{Middleware, Provider, Ws},  
    types::*,  
};  
use std::sync::Arc;  
use tokio::sync::broadcast::Sender;  
use tokio_stream::StreamExt;  

use crate::common::utils::calculate_next_block_base_fee;  

#[derive(Default, Debug, Clone)]  
pub struct NewBlock {  
    pub block_number: U64,  
    pub base_fee: U256,  
    pub next_base_fee: U256,  
}  

#[derive(Debug, Clone)]  
pub struct NewPendingTx {  
    pub added_block: Option<U64>,  
    pub tx: Transaction,  
}  

impl Default for NewPendingTx {  
    fn default() -> Self {  
        Self {  
            added_block: None,  
            tx: Transaction::default(),  
        }  
    }  
}  

#[derive(Debug, Clone)]  
pub enum Event {  
    Block(NewBlock),  
    PendingTx(NewPendingTx),  
}  

// 建立一个 websocket 连接以获取新创建的区块  
pub async fn stream_new_blocks(provider: Arc<Provider<Ws>>, event_sender: Sender<Event>) {  
    let stream = provider.subscribe_blocks().await.unwrap();  
    let mut stream = stream.filter_map(|block| match block.number {  
        Some(number) => Some(NewBlock {  
            block_number: number,  
            base_fee: block.base_fee_per_gas.unwrap_or_default(),  
            next_base_fee: U256::from(calculate_next_block_base_fee(  
                block.gas_used,  
                block.gas_limit,  
                block.base_fee_per_gas.unwrap_or_default(),  
            )),  
        }),  
        None => None,  
    });  

    while let Some(block) = stream.next().await {  
        match event_sender.send(Event::Block(block)) {  
            Ok(_) => {}  
            Err(_) => {}  
        }  
    }  
}  

// 建立一个 websocket 连接以获取新的待处理交易  
pub async fn stream_pending_transactions(provider: Arc<Provider<Ws>>, event_sender: Sender<Event>) {  
    let stream = provider.subscribe_pending_txs().await.unwrap();  
    let mut stream = stream.transactions_unordered(256).fuse();  

    while let Some(result) = stream.next().await {  
        match result {  
            Ok(tx) => match event_sender.send(Event::PendingTx(NewPendingTx {  
                added_block: None,  
                tx,  
            })) {  
                Ok(_) => {}  
                Err(_) => {}  
            },  
            Err(_) => {}  
        };  
    }  
}

并更新 sandooo/src/common/mod.rs

pub mod constants;  
pub mod streams;  
pub mod utils;

这样我们就可以在 streams.rs 中使用这些函数。

现在有了这两个函数,我们将能够实时获取新块和待处理交易。然而,我们仍然没有定义一个事件处理程序来处理 Block 和 PendingTx 事件。

为此,我们将在 src 目录中创建一个新目录,我们将其命名为:sandooo/src/sandwich。在此目录中创建两个新文件:

  • sandooo/src/sandwich/strategy.rs
    
    use bounded_vec_deque::BoundedVecDeque;  
    use ethers::signers::{LocalWallet, Signer};  
    use ethers::{  
    providers::{Middleware, Provider, Ws},  
    types::{BlockNumber, H160, H256, U256, U64},  
    };  
    use log::{info, warn};  
    use std::{collections::HashMap, str::FromStr, sync::Arc};  
    use tokio::sync::broadcast::Sender;  

// 我们稍后会更新这部分,目前只需导入必要的组件
use crate::common::constants::{Env, WETH};
use crate::common::streams::{Event, NewBlock};
use crate::common::utils::{calculate_next_block_base_fee, to_h160};

pub async fn run_sandwich_strategy(provider: Arc<Provider<Ws>>, event_sender: Sender<Event>) {
let mut event_receiver = event_sender.subscribe();

loop {  
    match event_receiver.recv().await {  
        Ok(event) => match event {  
            Event::Block(block) => {  
                info!("{:?}", block);  
            }  
            Event::PendingTx(mut pending_tx) => {  
                info!("{:?}", pending_tx);  
            }  
        },  
        _ => {}  
    }  
}  

}

*   **sandooo/src/sandwich/mod.rs**:

pub mod strategy;


*   **sandooo/src/lib.rs**:

pub mod common;
pub mod sandwich;

希望你现在明白,每次我们在 **src** 目录中添加一个新目录时,我们都会在 **sandooo/src/lib.rs** 中更新它,并且这些目录应该有一个 **mod.rs** 文件。每次我们在该目录中添加新文件时,我们都必须在 **mod.rs** 文件中添加它。从现在开始请不要忘记这样做,因为我将不再描述这个过程,并假设它总是会完成。

前往 **main.rs** 并更新代码:

use anyhow::Result;
use ethers::providers::{Provider, Ws};
use log::info;
use std::sync::Arc;
use tokio::sync::broadcast::{self, Sender};
use tokio::task::JoinSet;

use sandooo::common::constants::Env;
use sandooo::common::streams::{stream_new_blocks, stream_pending_transactions, Event};
use sandooo::common::utils::setup_logger;
use sandooo::sandwich::strategy::run_sandwich_strategy;

[tokio::main]

async fn main() -> Result<()> {
dotenv::dotenv().ok();
setup_logger().unwrap();

info!("Starting Sandooo");  

let env = Env::new();  

let ws = Ws::connect(env.wss_url.clone()).await.unwrap();  
let provider = Arc::new(Provider::new(ws));  

let (event_sender, _): (Sender&lt;Event>, _) = broadcast::channel(512);  

let mut set = JoinSet::new();  

set.spawn(stream_new_blocks(provider.clone(), event_sender.clone()));  
set.spawn(stream_pending_transactions(  
    provider.clone(),  
    event_sender.clone(),  
));  

set.spawn(run_sandwich_strategy(
provider.clone(),
event_sender.clone(),
));

while let Some(res) = set.join_next().await {
info!("{:?}", res);
}

Ok(())
}


主函数是我们整个系统的入口点,它将使用 Tokio 的 JoinSet 运行三个异步函数。

通过以下命令运行当前的 Rust 程序:

cargo run

将会在你的终端上显示大量待处理的交易:

![](https://img.learnblockchain.cn/attachments/migrate/1731979870411)

好吧,这太多了,光看着就让我眼睛疼。抱歉让你看到这个。但至少我们知道代码现在是有效的。所以我们只需要一种方法来弄清楚 **这些待处理交易中哪些值得关注** 和 **它们是否可以进行夹击**。我们将尝试逐一解决。

### 🔎 哪些待处理交易值得关注?

我们能想到的第一个答案可能是解码这些待处理交易中的输入数据。如果它们是对 Uniswap 池或 Uniswap 路由器的直接调用,我们应该能够仅通过这些数据弄清楚交易意图买入或卖出哪个代币,以及数量。

然而,这种方法并不是很可扩展。是的,我们可以捕获如下交易:

![](https://img.learnblockchain.cn/attachments/migrate/1731979870427)

这些是直接与 Uniswap 通用路由器交互的交易。

但是,我们无法捕获那些复杂得多但可能可以夹击的交易。这些可能是来自聚合器如 1inch 和 0x 的交易,或者是智能合约在 Uniswap 中外部调用的交换。

此外,如果你想添加更多的 DEX,你需要找到一种方法,通过阅读它们的函数规范来解码所有交易。

**这就是我们需要一种更可扩展的方法的原因。**

❓ 你知道在交易在下一个区块确认之前,有可能弄清楚它将对区块链状态做什么吗?

**我们可以通过追踪交易调用来实现这一点。** 追踪调用将尝试在调用者指定的区块状态上运行交易,并返回诸如:使用的 gas、调用栈、返回的日志等值。

我们将尝试使用 **eth_traceCall** 方法在 Geth 上弄清楚待处理交易涉及哪些 Uniswap V2 池——通过 _“涉及”_ 我们的意思是哪些池的状态因调用而改变。

[ debug_traceCall | 以太坊](https://docs.chainstack.com/reference/ethereum-tracecall?source=post_page-----a89235281da3--------------------------------) 以太坊 API 方法,在特定区块执行的上下文中追踪 eth_call 的执行。

让我们在 **sandwich** 目录中创建另一个文件:**sandooo/src/sandwich/simulation.rs**:

use anyhow::Result;
use eth_encode_packed::ethabi::ethereum_types::{H160 as eH160, U256 as eU256};
use eth_encode_packed::{SolidityDataType, TakeLastXBytes};
use ethers::abi::ParamType;
use ethers::prelude::*;
use ethers::providers::{Provider, Ws};
use ethers::types::{transaction::eip2930::AccessList, Bytes, H160, H256, I256, U256, U64};
use log::info;
use revm::primitives::{Bytecode, U256 as rU256};
use std::{collections::HashMap, default::Default, str::FromStr, sync::Arc};

use crate::common::constants::{WETH, WETH_BALANCE_SLOT};
use crate::common::streams::{NewBlock, NewPendingTx};
use crate::common::utils::{create_new_wallet, is_weth, to_h160};

[derive(Debug, Clone, Default)]

pub struct PendingTxInfo {
pub pending_tx: NewPendingTx,
pub touched_pairs: Vec<SwapInfo>,
}

[derive(Debug, Clone)]

pub enum SwapDirection {
Buy,
Sell,
}

[derive(Debug, Clone)]

pub struct SwapInfo {
pub tx_hash: H256,
pub target_pair: H160,
pub main_currency: H160,
pub target_token: H160,
pub version: u8,
pub token0_is_main: bool,
pub direction: SwapDirection,
}

pub static V2_SWAP_EVENT_ID: &str = "0xd78ad95f";

pub async fn debug_trace_call(
provider: &Arc<Provider<Ws>>,
new_block: &NewBlock,
pending_tx: &NewPendingTx,
) -> Result<Option<CallFrame>> {
let mut opts = GethDebugTracingCallOptions::default();
let mut call_config = CallConfig::default();
call_config.with_log = Some(true); // 👈 确保我们获取日志

opts.tracing_options.tracer = Some(GethDebugTracerType::BuiltInTracer(  
    GethDebugBuiltInTracerType::CallTracer,  
));  
opts.tracing_options.tracer_config = Some(GethDebugTracerConfig::BuiltInTracer(  
    GethDebugBuiltInTracerConfig::CallTracer(call_config),  
));  

let block_number = new_block.block_number;  
let mut tx = pending_tx.tx.clone();  
let nonce = provider  
    .get_transaction_count(tx.from, Some(block_number.into()))  
    .await  
    .unwrap_or_default();  
tx.nonce = nonce;  

let trace = provider  
    .debug_trace_call(&tx, Some(block_number.into()), opts)  
    .await;  

match trace {  
    Ok(trace) => match trace {  
        GethTrace::Known(call_tracer) => match call_tracer {  
            GethTraceFrame::CallTracer(frame) => Ok(Some(frame)),  
            _ => Ok(None),  
        },  
        _ => Ok(None),  
    },  
    _ => Ok(None),  
}  

**debug_trace_call** 函数将返回在追踪待处理交易后返回的调用帧。我们可以在稍微调整策略函数后尝试运行这个 **(sandooo/src/sandwich/strategy.rs)**:

// ... 导入

pub async fn run_sandwich_strategy(provider: Arc<Provider<Ws>>, event_sender: Sender<Event>) {
let block = provider
.get_block(BlockNumber::Latest)
.await
.unwrap()
.unwrap();
let mut new_block = NewBlock {
block_number: block.number.unwrap(),
base_fee: block.base_fee_per_gas.unwrap(),
next_base_fee: calculate_next_block_base_fee(
block.gas_used,
block.gas_limit,
block.base_fee_per_gas.unwrap(),
),
};

let mut event_receiver = event_sender.subscribe();  

loop {  
    match event_receiver.recv().await {  
        Ok(event) => match event {  
            Event::Block(block) => {  
                new_block = block;  
                info!("[Block #{:?}]", new_block.block_number);  
            }  
            Event::PendingTx(mut pending_tx) => {  
                let frame = debug_trace_call(&provider, &new_block, &pending_tx).await;  
                match frame {  
                    Ok(frame) => info!("{:?}", frame),  
                    Err(e) => info!("{e:?}"),  
                }  
            }  
        },  
        _ => {}  
    }  
}  

}

运行:

cargo run


将会给你一个调用帧,看起来像这样:

![](https://img.learnblockchain.cn/attachments/migrate/1731979870426)

我们感兴趣的部分是日志。然而,作为追踪结果返回的调用栈是递归的,因此一个调用帧可以有多个其他调用,这是一系列其他调用帧。每个调用帧可以包含日志。

为了递归地从调用帧中提取日志,我们使用另一个辅助函数,该函数将在 **sandooo/src/sandwich/simulation.rs** 中定义:

```rust
pub fn extract_logs(call_frame: &CallFrame, logs: &mut Vec&lt;CallLogFrame>) {  
    if let Some(ref logs_vec) = call_frame.logs {  
        logs.extend(logs_vec.iter().cloned());  
    }  

    if let Some(ref calls_vec) = call_frame.calls {  
        for call in calls_vec {  
            extract_logs(call, logs);  
        }  
    }  
}

通过这个新函数,我们可以轻松地将日志扁平化为一个单一的向量。你可以尝试再次更新 strategy.rs 函数:

loop {  
    match event_receiver.recv().await {  
        Ok(event) => match event {  
            Event::Block(block) => {  
                new_block = block;  
                info!("[Block #{:?}]", new_block.block_number);  
            }  
            // 只需更新这一部分 👇  
            Event::PendingTx(mut pending_tx) => {  
                let frame = debug_trace_call(&provider, &new_block, &pending_tx).await;  
                match frame {  
                    Ok(frame) => match frame {  
                        Some(frame) => {  
                            let mut logs = Vec::new();  
                            extract_logs(&frame, &mut logs);  
                            info!("{:?}", logs);  
                        }  
                        _ => {}  
                    },  
                    Err(e) => info!("{e:?}"),  
                }  
            }  
        },  
        _ => {}  
    }  
}

尝试运行这个,你现在会看到所有的日志都被扁平化为一个单一的向量:

当然,一些追踪将没有日志。

下一步是过滤这些日志,并找出哪些待处理交易正在尝试在 Uniswap V2 DEX 上进行交换。(我们现在将重点放在 Uniswap V2 上,并在本系列的最后部分添加 V3。)

我们可以通过过滤出如下的 Swap 日志来实现:

通过访问 Etherscan,你可以知道 Swap 事件的 4 字节选择器是 0xd78ad95f,可以从 topic0 中看到:

Uniswap V2: USDT | 地址 0x0d4a11d5eeaac28ec3f61d100daf4d40471f1852 | Etherscan

因此,我们将在 sandooo/src/sandwich/simulation.rs 中添加另一个函数:

pub async fn extract_swap_info(  
    provider: &Arc&lt;Provider&lt;Ws>>,  
    new_block: &NewBlock,  
    pending_tx: &NewPendingTx,  
    pools_map: &HashMap&lt;H160, Pool>,  
) -> Result&lt;Vec&lt;SwapInfo>> {  
    let tx_hash = pending_tx.tx.hash;  
    let mut swap_info_vec = Vec::new();  

    let frame = debug_trace_call(provider, new_block, pending_tx).await?;  
    if frame.is_none() {  
        return Ok(swap_info_vec);  
    }  
    let frame = frame.unwrap();  

    let mut logs = Vec::new();  
    extract_logs(&frame, &mut logs);  

    for log in &logs {  
        match &log.topics {  
            Some(topics) => {  
                if topics.len() > 1 {  
                    let selector = &format!("{:?}", topics[0])[0..10];  
                    let is_v2_swap = selector == V2_SWAP_EVENT_ID;  
                    if is_v2_swap {  
                        let pair_address = log.address.unwrap();  

                        // 仅过滤我们在内存中保留的池  
                        let pool = pools_map.get(&pair_address);  
                        if pool.is_none() {  
                            continue;  
                        }  
                        let pool = pool.unwrap();  

                        let token0 = pool.token0;  
                        let token1 = pool.token1;  

                        let token0_is_weth = is_weth(token0);  
                        let token1_is_weth = is_weth(token1);  

                        // 仅过滤 WETH 交易对  
                        if !token0_is_weth && !token1_is_weth {  
                            continue;  
                        }  

                        let (main_currency, target_token, token0_is_main) = if token0_is_weth {  
                            (token0, token1, true)  
                        } else {  
                            (token1, token0, false)  
                        };  

                        let (in0, _, _, out1) = match ethers::abi::decode(  
                            &[  
                                ParamType::Uint(256),  
                                ParamType::Uint(256),  
                                ParamType::Uint(256),  
                                ParamType::Uint(256),  
                            ],  
                            log.data.as_ref().unwrap(),  
                        ) {  
                            Ok(input) => {  
                                let uints: Vec&lt;U256> = input  
                                    .into_iter()  
                                    .map(|i| i.to_owned().into_uint().unwrap())  
                                    .collect();  
                                (uints[0], uints[1], uints[2], uints[3])  
                            }  
                            _ => {  
                                let zero = U256::zero();  
                                (zero, zero, zero, zero)  
                            }  
                        };  

                        let zero_for_one = (in0 > U256::zero()) && (out1 > U256::zero());  

                        let direction = if token0_is_main {  
                            if zero_for_one {  
                                SwapDirection::Buy  
                            } else {  
                                SwapDirection::Sell  
                            }  
                        } else {  
                            if zero_for_one {  
                                SwapDirection::Sell  
                            } else {  
                                SwapDirection::Buy  
                            }  
                        };  

                        let swap_info = SwapInfo {  
                            tx_hash,  
                            target_pair: pair_address,  
                            main_currency,  
                            target_token,  
                            version: 2,  
                            token0_is_main,  
                            direction,  
                        };  
                        swap_info_vec.push(swap_info);  
                    }  
                }  
            }  
            _ => {}  
        }  
    }  

    Ok(swap_info_vec)  
}

我们在两个步骤中过滤日志:

  1. 首先,仅过滤我们在 pools_map 中保留的池。我们还没有添加这一部分,但我们将在下一节中添加。
  2. 其次,仅过滤 WETH 交易对池。

一旦我们完成了这个,我们通过以下方式解码日志数据:

ethers::abi::decode(  
    &[  
        ParamType::Uint(256),  
        ParamType::Uint(256),  
        ParamType::Uint(256),  
        ParamType::Uint(256),  
    ],  
    log.data.as_ref().unwrap(),  
)

并提取出 amount0In, amount1In, amount0Out, amount1Out 值。

我们还可以利用这些数据确定交易是用于购买还是出售目标代币。我们将 目标代币 定义为与 WETH 代币(主货币)配对的代币。

现在让我们添加一种方法,在程序启动时更新 Uniswap V2 池及其相关的 ERC-20 代币,以便追踪 + 日志提取能够正常工作。

sandooo/src/common 中添加两个新文件:

  1. pools.rs

sandooo/src/common/pools.rs at main · solidquant/sandooo

2. tokens.rs

sandooo/src/common/tokens.rs at main

3. bytecode.rs

sandooo/src/common/bytecode.rs at main

完成后,让我们再次更新 sandooo/src/sandwich/strategy.rs 文件:

pub async fn run_sandwich_strategy(provider: Arc&lt;Provider&lt;Ws>>, event_sender: Sender&lt;Event>) {  
    let env = Env::new();  

    // load_all_pools:  
    // this will load all Uniswap V2 pools that was deployed after the block #10000000  
    let (pools, prev_pool_id) = load_all_pools(env.wss_url.clone(), 10000000, 50000)  
        .await  
        .unwrap();  

    // load_all_tokens:  
    // this will get all the token information including: name, symbol, symbol, totalSupply  
    let block_number = provider.get_block_number().await.unwrap();  
    let tokens_map = load_all_tokens(&provider, block_number, &pools, prev_pool_id)  
        .await  
        .unwrap();  
    info!("Tokens map count: {:?}", tokens_map.len());  

    // filter pools that don't have both token0 / token1 info  
    let pools_vec: Vec&lt;Pool> = pools  
        .into_iter()  
        .filter(|p| {  
            let token0_exists = tokens_map.contains_key(&p.token0);  
            let token1_exists = tokens_map.contains_key(&p.token1);  
            token0_exists && token1_exists  
        })  
        .collect();  
    info!("Filtered pools by tokens count: {:?}", pools_vec.len());  

    let pools_map: HashMap&lt;H160, Pool> = pools_vec  
        .clone()  
        .into_iter()  
        .map(|p| (p.address, p))  
        .collect();  

    let block = provider  
        .get_block(BlockNumber::Latest)  
        .await  
        .unwrap()  
        .unwrap();  
    let mut new_block = NewBlock {  
        block_number: block.number.unwrap(),  
        base_fee: block.base_fee_per_gas.unwrap(),  
        next_base_fee: calculate_next_block_base_fee(  
            block.gas_used,  
            block.gas_limit,  
            block.base_fee_per_gas.unwrap(),  
        ),  
    };  

    let mut event_receiver = event_sender.subscribe();  

    loop {  
        match event_receiver.recv().await {  
            Ok(event) => match event {  
                Event::Block(block) => {  
                    new_block = block;  
                    info!("\[Block #{:?}\]", new_block.block_number);  
                }  
                Event::PendingTx(mut pending_tx) => {  
                    let swap_info =  
                        extract_swap_info(&provider, &new_block, &pending_tx, &pools_map).await;  
                    info!("{:?}", swap_info);  
                }  
            },  
            _ => {}  
        }  
    }  
}

👏👏👏 现在从根目录创建一个新目录:sandooo/cache. 这一部分很重要,因为 pools.rstokens.rs 将创建一个所有现有 Uniswap V2 池和代币的文件缓存,以便在我们重启系统时快速加载。

当你在创建 cache 目录后启动系统时,你会看到程序开始使用 RPC 节点端点加载池:

在让程序运行一段时间后(我使用全节点大约花了 30 分钟),它将开始打印我们从 geth 跟踪中提取的交换信息。

如你所见,我们得到了 target_pair, main_currency, target_token, 和正确的交换方向 的待处理交易。

我们终于准备好进入分析中更有趣的部分:理解三明治捆绑的利润和成本结构。

🥪 这些交易中哪一个是可以进行三明治交易的?

要回答这个问题,我们必须理解三明治捆绑的利润和成本分析是如何进行的。

我们将考虑最基本的三明治捆绑类型,如下所示:

  • 前置交易: WETH → 目标代币 (购买)
  • 受害者交易: WETH → 目标代币 (购买)
  • 后置交易: 目标代币 → WETH (出售)

理解三明治策略的简单形式对于深入了解更复杂的变体至关重要。我们最终会在将 V3 集成到我们的机器人时处理这些高级策略。但现在,让我们掌握基础知识。

三明治的概念非常简单:你在某人之前购买,并在那个人之后立即出售,以确保获得利润。 如果更多人购买某个代币,价格就会上涨,这就是三明治策略的利润部分如何运作。

由于我们现在能够监控来自 Uniswap V2 池的所有买入和卖出交易,因此我们可以尝试将它们分组为前置、受害者和后置交易的捆绑,并找出以下内容:

  1. 在受害者之前我们可以购买的最大代币数量, 确保受害者的交易不会回滚(交易可能由于用户在使用 Uniswap V2 路由合约时设置的滑点容忍度而回滚)
  2. 如果所有三笔交易都顺利进行而不回滚,我们可以预期获得的最大利润

通过我们的模拟引擎。

与其理论化计算如何运作,不如实时构建一些捆绑并模拟它们,以查看我们是否真的能获利。让我们立即开始。

使用 Solidity/Yul 的三明治智能合约

链上交易与在币安、Bybit 等中心化交易所的链下交易有所不同。我不会对哪种更具挑战性发表意见,因为这是一个主观问题。有些人认为在中心化交易所构建策略更困难,因为价格波动迅速,而另一些人则认为链上交易更复杂,因为较长的区块构建时间带来了新的挑战。这两种观点都有其合理性,在任何平台上实现盈利都不是一件容易的事。

然而,有一点是确定的:MEV 的执行方面比 CEX 交易复杂得多

如果你想在 MEV 中获利,了解如何开发一个安全高效的智能合约至关重要。

对 Yul 不熟悉的读者可以参考我之前关于该主题的文章:

如何在你的 MEV 项目中使用 Yul 一份关于降低 gas 成本和使用汇编处理错误、转移代币、交换代币等的 A 到 Z 指南

我们需要一个合约来模拟我们的交易 (当然在实际交易中也是如此),帮助我们确定潜在收益和相关的 gas 成本。鉴于每个人都会有一个针对其策略量身定制的合约,预计会遇到各种利润和成本分析。

合约在这里提供:

sandooo/contracts/src/Sandooo.sol at main

这是一个非常简单的合约,使用 Foundry 编写,采用 Yul 语言。

我们将快速查看 fallback 函数:

fallback() external payable {  
    // We check that the msg.sender is the owner who deployed the contract  
    require(msg.sender == owner, "NOT_OWNER");  

    assembly {  
        let ptr := mload(0x40)  
        let end := calldatasize()  

        // the first 8 bytes (64 bits, uint64) of the calldata is the block_number  
        // we want to make sure that our transactions are valid only on  
        // the block that we've specified  
        let block_number := shr(192, calldataload(0))  
        if iszero(eq(block_number, number())) {  
            revert(0, 0)  
        }  

        // we can pass in multiple swap instructions  
        // which we'll use later when we group multiple sandwiches together  
        for {  
            let offset := 8  
        } lt(offset, end) {  

        } {  
            let zeroForOne := shr(248, calldataload(offset)) // 1 byte  
            let pair := shr(96, calldataload(add(offset, 1))) // 20 bytes  
            let tokenIn := shr(96, calldataload(add(offset, 21))) // 20 bytes  
            let amountIn := calldataload(add(offset, 41)) // 32 bytes  
            let amountOut := calldataload(add(offset, 73)) // 32 bytes  
            offset := add(offset, 105) // 1 + 20 + 20 + 32 + 32  

            // transfer tokenIn to pair contract first  
            mstore(ptr, TOKEN_TRANSFER_ID)  
            mstore(add(ptr, 4), pair)  
            mstore(add(ptr, 36), amountIn)  

            if iszero(call(gas(), tokenIn, 0, ptr, 68, 0, 0)) {  
                revert(0, 0)  
            }  

            // call swap function in UniswapV2Pair contract  
            // zeroForOne means the transaction is a swap going from token0 to token1  
            // Uniswap V2 swap function expects us to pass it in the amountOut value  
            // so if zeroForOne == 1 (true), the out token is token1  
            // and if zeroForOne == 0 (false), the out token is token0  
            mstore(ptr, V2_SWAP_ID)  
            switch zeroForOne  
            case 0 {  
                mstore(add(ptr, 4), amountOut)  
                mstore(add(ptr, 36), 0)  
            }  
            case 1 {  
                mstore(add(ptr, 4), 0)  
                mstore(add(ptr, 36), amountOut)  
            }  
            mstore(add(ptr, 68), address())  
            mstore(add(ptr, 100), 0x80)  

            if iszero(call(gas(), pair, 0, ptr, 164, 0, 0)) {  
                revert(0, 0)  
            }  
        }  
    }  
}

使用 REVM 的三明治模拟引擎

现在我们已经准备好合约,终于可以进行一些真实的模拟了。

我们将总共运行三个模拟步骤:

  1. 开胃菜 模拟
  2. 输入金额 优化 模拟
  3. 主菜 模拟

你可能会想,为什么我们需要这么多模拟步骤,但你会看到它们在生态系统中都有其作用。

开胃菜模拟

sandooo/src/sandwich/appetizer.rs at main

在开胃菜模拟中,我们尝试将 0.1 WETH 作为我们第一次购买交易(前置交易)的 amountIn,并尝试查看它是否有利可图。

这是为了减少我们在后续步骤中运行的模拟数量。如果三明治捆绑包甚至无法接受 0.1 WETH 作为输入,那么继续进行优化步骤就没有意义。

输入金额 优化 模拟

如果我们的开胃菜模拟通过了,那么我们想要在进入下一步之前找出优化后的 WETH amountIn 值。

你可以在这个文件中查看优化过程:

sandooo/src/sandwich/simulation.rs

通过二次搜索,我们可以找出可以用 WETH 购买的代币数量,以最大化我们的收益。

主菜 模拟

sandooo/src/sandwich/main_dish.rs

在我们完成优化步骤后,我们使用该结果运行另一个模拟并计算准确的收入值。

收入计算如下:

  • 利润 = 合约 WETH 余额之后 - WETH 余额之前
  • 成本 = 用户 ETH 余额之后 - ETH 余额之前
  • 收入 = 利润 - 成本

如果收入大于 0,则意味着三明治捆绑包可以覆盖我们的 gas 成本,因此我们使用此步骤的收入值计算贿赂金额。

运行包含所有这些组件的系统将产生以下 gif。尝试运行:

cargo run

接下来期待什么

这是一段相当的旅程。🙏 非常感谢你们的陪伴。

最初,我计划将其分解为几篇小文章,但我想强调逐行深入 GitHub 代码的重要性。自己运行程序并亲眼见证其成功运行是我推荐给每个人的绝佳实践。

在本系列的后续文章中,我们将深入探讨如何将捆绑包发送给像 Flashbots 这样的构建者。我们将比较我们的捆绑包与当前系统中竞争对手的表现,并探索我们可以在设置中编织的潜在优化。

下次见,大家!💥

我是 AI 翻译官,为大家转译优秀英文文章,如有翻译不通的地方,在这里修改,还请包涵~

点赞 5
收藏 5
分享
本文参与登链社区写作激励计划 ,好文好收益,欢迎正在阅读的你也加入。

1 条评论

请先 登录 后评论
Solid Quant
Solid Quant
江湖只有他的大名,没有他的介绍。