100 小时构建三明治机器人
- 原文链接:medium.com/@solidquant...
- 译者:AI翻译官,校对:翻译小组
- 本文链接:learnblockchain.cn/article…
Fotor AI:“吃三明治的吉娃娃”经过 16 次迭代
今天,我们将一起构建一个三明治机器人。 🥪
是的,我知道这听起来可能有些奇怪。我理解你可能在想:
又来了。网上已经有很多资源可以学习这个了。为什么还要从你这样的人的身上再来一篇无聊的三明治机器人教程?
好问题。 🤔
老实说,实际上网上并没有那么多好的资源。而且那些资源,要么是几个月/几年前的,要么在使用案例上有限,所以不妨稍微更新一下代码。
我花了超过 100 小时来完善这个设置。无论你是刚刚接触这个生态系统,还是深入其中,都可以把这当作你的指南。
如果你还没有尝试从零开始构建一个 MEV 机器人,现在是你的机会。快速提醒一下,这个项目不会从第一天就开始盈利。如果可以的话,它就不会像这样公开分享。但它会非常接近真实的情况,随着我们逐步添加更多功能,收益会开始流入。我们将在下个月对代码进行微调,以实现最佳性能 (然后再深入研究狙击和支持Solana)。
这是最终结果的预览:
我分享了我的 Github 仓库链接,供喜欢立即深入代码的人使用:
本系列将分为两个部分:
在今天的文章中,我们将主要关注识别过程,而在下一篇文章中深入探讨如何处理执行部分。
我们大多数人可能都熟悉由libevm编写的 JS 三明治机器人:
GitHub - libevm/subway: 如何在以太坊上执行三明治攻击的实用示例
我仍然记得我第一次接触 MEV,所有的一切都始于这个老派的仓库——游戏中的真正经典。尽管这个项目已经有些年头,但从中可以学到很多东西。对于任何初学 MEV 的人,确保你逐行分析每一行代码。
然而,当然存在一些局限性。并不是因为这个项目不出色,而是在过去三年中,MEV 市场发生了严重的演变。
我们在这里挖掘这些变化,看看如今在 MEV 领域获利到底需要什么。如果你和我一样兴奋,那就让我们继续这段旅程。🙌
👾 加入我们的 Discord 团队,成千上万的人每天都在讨论 MEV 相关的内容。单独研究可能会有些孤独,所以快来打个招呼 🏄🏄。看看其他人是如何应对这个领域的:
在每个 MEV 策略中,你必须采取两个步骤:识别和执行。在今天的文章中,我们将重点关注识别三明治机会。在下一篇文章中,我们将向区块构建者发送真实订单,并尝试与其他三明治机器人竞争。
如果你还不太熟悉区块构建者,可以看看我之前的文章:
我不在乎我是否被夹在中间,更大的事情正在到来 介绍了 MEV 的内部运作以及行业如何转变
我们将首先设置我们的项目。
✋✋ 请注意,使用全节点运行生产代码将为你提供最佳性能。MEV 策略通常非常依赖网络,因此最好消除任何相关的延迟,而最简单的方法就是运行一个全节点。我个人使用 Geth + Lighthouse。
我还使用Rust 🦀进行整个项目,如果你还不熟悉 Rust,也不用担心,因为这里的概念即使你不懂也能理解。但建议你在深入之前先学习 Rust。
首先,在你的本地机器上创建一个新的 Rust 项目,方法是:
cargo new sandooo
这将在名为sandooo的新目录中创建一个 Rust 模板。
使用你选择的 IDE 打开该目录,并将以下内容复制并粘贴到Cargo.toml文件中 (sandooo/Cargo.toml):
sandooo/Cargo.toml at main · solidquant/sandooo 一个三明治机器人。通过在 GitHub 上创建帐户来为 solidquant/sandooo 的开发做出贡献。
[package]
name = "sandooo"
version = "0.1.0"
edition = "2021"
[dependencies]
dotenv = "0.15.0"
anyhow = "1.0.70"
itertools = "0.11.0"
serde = "1.0.188"
serde_json = "1.0.107"
bounded-vec-deque = "0.1.1"
# Telegram
teloxide = { version = "0.12", features = ["macros"] }
futures = "0.3.5"
futures-util = "*"
tokio = { version = "1.29.0", features = ["full"] }
tokio-stream = { version = "0.1", features = ['sync'] }
tokio-tungstenite = "*"
async-trait = "0.1.74"
ethers-core = "2.0"
ethers-providers = "2.0"
ethers-contract = "2.0"
ethers = { version = "2.0", features = ["abigen", "ws", "ipc"] }
ethers-flashbots = { git = "https://github.com/onbjerg/ethers-flashbots" }
eth-encode-packed = "0.1.0"
rlp = { version = "0.5", features = ["derive"] }
foundry-evm-mini = { git = "https://github.com/solidquant/foundry-evm-mini.git" }
revm = { version = "3", default-features = false, features = [
"std",
"serde",
"memory_limit",
"optional_eip3607",
"optional_block_gas_limit",
"optional_no_base_fee",
] }
csv = "1.2.2"
colored = "2.0.0"
log = "0.4.17"
fern = { version = "0.6.2", features = ["colored"] }
chrono = "0.4.23"
indicatif = "0.17.5"
[patch.crates-io]
revm = { git = "https://github.com/bluealloy/revm/", rev = "80c909d6f242886cb26e6103a01d1a4bf9468426" }
[profile.release]
codegen-units = 1
lto = "fat"
完成后,在src目录中创建一个新目录,并命名为common。然后创建一个新文件,命名为:constants.rs (sandooo/src/common/constants.rs):
sandooo/src/common/constants.rs
pub static PROJECT_NAME: &str = "sandooo";
// 加载环境变量为字符串值的函数
pub fn get_env(key: &str) -> String {
std::env::var(key).unwrap_or(String::from(""))
}
#[derive(Debug, Clone)]
pub struct Env {
pub https_url: String,
pub wss_url: String,
pub bot_address: String,
pub private_key: String,
pub identity_key: String,
pub telegram_token: String,
pub telegram_chat_id: String,
pub use_alert: bool,
pub debug: bool,
}
// 创建新的 Env 结构将自动加载环境变量
impl Env {
pub fn new() -> Self {
Env {
https_url: get_env("HTTPS_URL"),
wss_url: get_env("WSS_URL"),
bot_address: get_env("BOT_ADDRESS"),
private_key: get_env("PRIVATE_KEY"),
identity_key: get_env("IDENTITY_KEY"),
telegram_token: get_env("TELEGRAM_TOKEN"),
telegram_chat_id: get_env("TELEGRAM_CHAT_ID"),
use_alert: get_env("USE_ALERT").parse::<bool>().unwrap(),
debug: get_env("DEBUG").parse::<bool>().unwrap(),
}
}
}
pub static COINBASE: &str = "0xDAFEA492D9c6733ae3d56b7Ed1ADB60692c98Bc5"; // Flashbots Builder
pub static WETH: &str = "0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2";
pub static WETH_BALANCE_SLOT: i32 = 3;
pub static WETH_DECIMALS: u8 = 18;
这使我们能够在启动时加载环境变量。自然,下一步是在根目录中创建一个 .env 文件,即 sandooo 目录 (sandooo/.env):
HTTPS_URL=http://localhost:8545
WSS_URL=ws://localhost:8546
BOT_ADDRESS="..."
PRIVATE_KEY="..."
IDENTITY_KEY="..."
TELEGRAM_TOKEN="..."
TELEGRAM_CHAT_ID="..."
USE_ALERT=false
DEBUG=true
RUST_BACKTRACE=1
我们还希望美化我们的控制台日志,因此我们在 src/common 目录中添加一个 utils.rs 文件 (sandooo/src/common/utils.rs):
use anyhow::Result;
use ethers::core::rand::thread_rng;
use ethers::prelude::*;
use ethers::{
self,
types::{
transaction::eip2930::{AccessList, AccessListItem},
U256,
},
};
use fern::colors::{Color, ColoredLevelConfig};
use foundry_evm_mini::evm::utils::{b160_to_h160, h160_to_b160, ru256_to_u256, u256_to_ru256};
use log::LevelFilter;
use rand::Rng;
use revm::primitives::{B160, U256 as rU256};
use std::str::FromStr;
use std::sync::Arc;
use crate::common::constants::{PROJECT_NAME, WETH};
// 格式化我们的控制台日志的函数
pub fn setup_logger() -> Result<()> {
let colors = ColoredLevelConfig {
trace: Color::Cyan,
debug: Color::Magenta,
info: Color::Green,
warn: Color::Red,
error: Color::BrightRed,
..ColoredLevelConfig::new()
};
fern::Dispatch::new()
.format(move |out, message, record| {
out.finish(format_args!(
"{}[{}] {}",
chrono::Local::now().format("[%H:%M:%S]"),
colors.color(record.level()),
message
))
})
.chain(std::io::stdout())
.level(log::LevelFilter::Error)
.level_for(PROJECT_NAME, LevelFilter::Info)
.apply()?;
Ok(())
}
// 计算下一个区块的基础费用,给定上一个区块的 gas 使用量/限制
// 参考: https://www.blocknative.com/blog/eip-1559-fees
pub fn calculate_next_block_base_fee(
gas_used: U256,
gas_limit: U256,
base_fee_per_gas: U256,
) -> U256 {
let gas_used = gas_used;
let mut target_gas_used = gas_limit / 2;
target_gas_used = if target_gas_used == U256::zero() {
U256::one()
} else {
target_gas_used
};
let new_base_fee = {
if gas_used > target_gas_used {
base_fee_per_gas
+ ((base_fee_per_gas * (gas_used - target_gas_used)) / target_gas_used)
/ U256::from(8u64)
} else {
base_fee_per_gas
- ((base_fee_per_gas * (target_gas_used - gas_used)) / target_gas_used)
/ U256::from(8u64)
}
};
let seed = rand::thread_rng().gen_range(0..9);
new_base_fee + seed
}
pub fn access_list_to_ethers(access_list: Vec<(B160, Vec<rU256>)>) -> AccessList {
AccessList::from(
access_list
.into_iter()
.map(|(address, slots)| AccessListItem {
address: b160_to_h160(address),
storage_keys: slots
.into_iter()
.map(|y| H256::from_uint(&ru256_to_u256(y)))
.collect(),
})
.collect::<Vec<AccessListItem>>(),
)
}
pub fn access_list_to_revm(access_list: AccessList) -> Vec<(B160, Vec<rU256>)> {
access_list
.0
.into_iter()
.map(|x| {
(
h160_to_b160(x.address),
x.storage_keys
.into_iter()
.map(|y| u256_to_ru256(y.0.into()))
.collect(),
)
})
.collect()
}
abigen!(
IERC20,
r#"[
function balanceOf(address) external view returns (uint256)
]"#,
);
// 实用函数
pub async fn get_token_balance(
provider: Arc<Provider<Ws>>,
owner: H160,
token: H160,
) -> Result<U256> {
let contract = IERC20::new(token, provider);
let token_balance = contract.balance_of(owner).call().await?;
Ok(token_balance)
}
```rust
pub fn create_new_wallet() -> (LocalWallet, H160) {
let wallet = LocalWallet::new(&mut thread_rng());
let address = wallet.address();
(wallet, address)
}
pub fn to_h160(str_address: &'static str) -> H160 {
H160::from_str(str_address).unwrap()
}
pub fn is_weth(token_address: H160) -> bool {
token_address == to_h160(WETH)
}
setup_logger 函数将负责格式化我们的日志,我们还添加了一些额外的函数以便在整个项目中使用。我们将在它们出现时看到它们的用法。
我们快完成了。我们只需处理导入我们的新文件和函数,以便它们可以在我们的项目中使用。
为此:
pub mod common;
pub mod constants;
pub mod utils;
项目设置已完成。现在我们可以进入三明治机器人更有趣的方面。
例如,假设有人将 Uniswap 订单交易发送到公共内存池。任何直接发送到区块链的交易通常会进入公共内存池。
任何人都可以通过与节点提供者建立 websocket 连接来访问这些数据。为此,我们将在 sandooo/src/common/streams.rs 中创建一个新文件:
use ethers::{
providers::{Middleware, Provider, Ws},
types::*,
};
use std::sync::Arc;
use tokio::sync::broadcast::Sender;
use tokio_stream::StreamExt;
use crate::common::utils::calculate_next_block_base_fee;
#[derive(Default, Debug, Clone)]
pub struct NewBlock {
pub block_number: U64,
pub base_fee: U256,
pub next_base_fee: U256,
}
#[derive(Debug, Clone)]
pub struct NewPendingTx {
pub added_block: Option<U64>,
pub tx: Transaction,
}
impl Default for NewPendingTx {
fn default() -> Self {
Self {
added_block: None,
tx: Transaction::default(),
}
}
}
#[derive(Debug, Clone)]
pub enum Event {
Block(NewBlock),
PendingTx(NewPendingTx),
}
// 建立一个 websocket 连接以获取新创建的区块
pub async fn stream_new_blocks(provider: Arc<Provider<Ws>>, event_sender: Sender<Event>) {
let stream = provider.subscribe_blocks().await.unwrap();
let mut stream = stream.filter_map(|block| match block.number {
Some(number) => Some(NewBlock {
block_number: number,
base_fee: block.base_fee_per_gas.unwrap_or_default(),
next_base_fee: U256::from(calculate_next_block_base_fee(
block.gas_used,
block.gas_limit,
block.base_fee_per_gas.unwrap_or_default(),
)),
}),
None => None,
});
while let Some(block) = stream.next().await {
match event_sender.send(Event::Block(block)) {
Ok(_) => {}
Err(_) => {}
}
}
}
// 建立一个 websocket 连接以获取新的待处理交易
pub async fn stream_pending_transactions(provider: Arc<Provider<Ws>>, event_sender: Sender<Event>) {
let stream = provider.subscribe_pending_txs().await.unwrap();
let mut stream = stream.transactions_unordered(256).fuse();
while let Some(result) = stream.next().await {
match result {
Ok(tx) => match event_sender.send(Event::PendingTx(NewPendingTx {
added_block: None,
tx,
})) {
Ok(_) => {}
Err(_) => {}
},
Err(_) => {}
};
}
}
并更新 sandooo/src/common/mod.rs:
pub mod constants;
pub mod streams;
pub mod utils;
这样我们就可以在 streams.rs 中使用这些函数。
现在有了这两个函数,我们将能够实时获取新块和待处理交易。然而,我们仍然没有定义一个事件处理程序来处理 Block 和 PendingTx 事件。
为此,我们将在 src 目录中创建一个新目录,我们将其命名为:sandooo/src/sandwich。在此目录中创建两个新文件:
use bounded_vec_deque::BoundedVecDeque;
use ethers::signers::{LocalWallet, Signer};
use ethers::{
providers::{Middleware, Provider, Ws},
types::{BlockNumber, H160, H256, U256, U64},
};
use log::{info, warn};
use std::{collections::HashMap, str::FromStr, sync::Arc};
use tokio::sync::broadcast::Sender;
// 我们稍后会更新这部分,目前只需导入必要的组件
use crate::common::constants::{Env, WETH};
use crate::common::streams::{Event, NewBlock};
use crate::common::utils::{calculate_next_block_base_fee, to_h160};
pub async fn run_sandwich_strategy(provider: Arc<Provider<Ws>>, event_sender: Sender<Event>) {
let mut event_receiver = event_sender.subscribe();
loop {
match event_receiver.recv().await {
Ok(event) => match event {
Event::Block(block) => {
info!("{:?}", block);
}
Event::PendingTx(mut pending_tx) => {
info!("{:?}", pending_tx);
}
},
_ => {}
}
}
}
* **sandooo/src/sandwich/mod.rs**:
pub mod strategy;
* **sandooo/src/lib.rs**:
pub mod common;
pub mod sandwich;
希望你现在明白,每次我们在 **src** 目录中添加一个新目录时,我们都会在 **sandooo/src/lib.rs** 中更新它,并且这些目录应该有一个 **mod.rs** 文件。每次我们在该目录中添加新文件时,我们都必须在 **mod.rs** 文件中添加它。从现在开始请不要忘记这样做,因为我将不再描述这个过程,并假设它总是会完成。
前往 **main.rs** 并更新代码:
use anyhow::Result;
use ethers::providers::{Provider, Ws};
use log::info;
use std::sync::Arc;
use tokio::sync::broadcast::{self, Sender};
use tokio::task::JoinSet;
use sandooo::common::constants::Env;
use sandooo::common::streams::{stream_new_blocks, stream_pending_transactions, Event};
use sandooo::common::utils::setup_logger;
use sandooo::sandwich::strategy::run_sandwich_strategy;
async fn main() -> Result<()> {
dotenv::dotenv().ok();
setup_logger().unwrap();
info!("Starting Sandooo");
let env = Env::new();
let ws = Ws::connect(env.wss_url.clone()).await.unwrap();
let provider = Arc::new(Provider::new(ws));
let (event_sender, _): (Sender<Event>, _) = broadcast::channel(512);
let mut set = JoinSet::new();
set.spawn(stream_new_blocks(provider.clone(), event_sender.clone()));
set.spawn(stream_pending_transactions(
provider.clone(),
event_sender.clone(),
));
set.spawn(run_sandwich_strategy(
provider.clone(),
event_sender.clone(),
));
while let Some(res) = set.join_next().await {
info!("{:?}", res);
}
Ok(())
}
主函数是我们整个系统的入口点,它将使用 Tokio 的 JoinSet 运行三个异步函数。
通过以下命令运行当前的 Rust 程序:
cargo run
将会在你的终端上显示大量待处理的交易:
![](https://img.learnblockchain.cn/attachments/migrate/1731979870411)
好吧,这太多了,光看着就让我眼睛疼。抱歉让你看到这个。但至少我们知道代码现在是有效的。所以我们只需要一种方法来弄清楚 **这些待处理交易中哪些值得关注** 和 **它们是否可以进行夹击**。我们将尝试逐一解决。
### 🔎 哪些待处理交易值得关注?
我们能想到的第一个答案可能是解码这些待处理交易中的输入数据。如果它们是对 Uniswap 池或 Uniswap 路由器的直接调用,我们应该能够仅通过这些数据弄清楚交易意图买入或卖出哪个代币,以及数量。
然而,这种方法并不是很可扩展。是的,我们可以捕获如下交易:
![](https://img.learnblockchain.cn/attachments/migrate/1731979870427)
这些是直接与 Uniswap 通用路由器交互的交易。
但是,我们无法捕获那些复杂得多但可能可以夹击的交易。这些可能是来自聚合器如 1inch 和 0x 的交易,或者是智能合约在 Uniswap 中外部调用的交换。
此外,如果你想添加更多的 DEX,你需要找到一种方法,通过阅读它们的函数规范来解码所有交易。
**这就是我们需要一种更可扩展的方法的原因。**
❓ 你知道在交易在下一个区块确认之前,有可能弄清楚它将对区块链状态做什么吗?
**我们可以通过追踪交易调用来实现这一点。** 追踪调用将尝试在调用者指定的区块状态上运行交易,并返回诸如:使用的 gas、调用栈、返回的日志等值。
我们将尝试使用 **eth_traceCall** 方法在 Geth 上弄清楚待处理交易涉及哪些 Uniswap V2 池——通过 _“涉及”_ 我们的意思是哪些池的状态因调用而改变。
[ debug_traceCall | 以太坊](https://docs.chainstack.com/reference/ethereum-tracecall?source=post_page-----a89235281da3--------------------------------) 以太坊 API 方法,在特定区块执行的上下文中追踪 eth_call 的执行。
让我们在 **sandwich** 目录中创建另一个文件:**sandooo/src/sandwich/simulation.rs**:
use anyhow::Result;
use eth_encode_packed::ethabi::ethereum_types::{H160 as eH160, U256 as eU256};
use eth_encode_packed::{SolidityDataType, TakeLastXBytes};
use ethers::abi::ParamType;
use ethers::prelude::*;
use ethers::providers::{Provider, Ws};
use ethers::types::{transaction::eip2930::AccessList, Bytes, H160, H256, I256, U256, U64};
use log::info;
use revm::primitives::{Bytecode, U256 as rU256};
use std::{collections::HashMap, default::Default, str::FromStr, sync::Arc};
use crate::common::constants::{WETH, WETH_BALANCE_SLOT};
use crate::common::streams::{NewBlock, NewPendingTx};
use crate::common::utils::{create_new_wallet, is_weth, to_h160};
pub struct PendingTxInfo {
pub pending_tx: NewPendingTx,
pub touched_pairs: Vec<SwapInfo>,
}
pub enum SwapDirection {
Buy,
Sell,
}
pub struct SwapInfo {
pub tx_hash: H256,
pub target_pair: H160,
pub main_currency: H160,
pub target_token: H160,
pub version: u8,
pub token0_is_main: bool,
pub direction: SwapDirection,
}
pub static V2_SWAP_EVENT_ID: &str = "0xd78ad95f";
pub async fn debug_trace_call(
provider: &Arc<Provider<Ws>>,
new_block: &NewBlock,
pending_tx: &NewPendingTx,
) -> Result<Option<CallFrame>> {
let mut opts = GethDebugTracingCallOptions::default();
let mut call_config = CallConfig::default();
call_config.with_log = Some(true); // 👈 确保我们获取日志
opts.tracing_options.tracer = Some(GethDebugTracerType::BuiltInTracer(
GethDebugBuiltInTracerType::CallTracer,
));
opts.tracing_options.tracer_config = Some(GethDebugTracerConfig::BuiltInTracer(
GethDebugBuiltInTracerConfig::CallTracer(call_config),
));
let block_number = new_block.block_number;
let mut tx = pending_tx.tx.clone();
let nonce = provider
.get_transaction_count(tx.from, Some(block_number.into()))
.await
.unwrap_or_default();
tx.nonce = nonce;
let trace = provider
.debug_trace_call(&tx, Some(block_number.into()), opts)
.await;
match trace {
Ok(trace) => match trace {
GethTrace::Known(call_tracer) => match call_tracer {
GethTraceFrame::CallTracer(frame) => Ok(Some(frame)),
_ => Ok(None),
},
_ => Ok(None),
},
_ => Ok(None),
}
**debug_trace_call** 函数将返回在追踪待处理交易后返回的调用帧。我们可以在稍微调整策略函数后尝试运行这个 **(sandooo/src/sandwich/strategy.rs)**:
// ... 导入
pub async fn run_sandwich_strategy(provider: Arc<Provider<Ws>>, event_sender: Sender<Event>) {
let block = provider
.get_block(BlockNumber::Latest)
.await
.unwrap()
.unwrap();
let mut new_block = NewBlock {
block_number: block.number.unwrap(),
base_fee: block.base_fee_per_gas.unwrap(),
next_base_fee: calculate_next_block_base_fee(
block.gas_used,
block.gas_limit,
block.base_fee_per_gas.unwrap(),
),
};
let mut event_receiver = event_sender.subscribe();
loop {
match event_receiver.recv().await {
Ok(event) => match event {
Event::Block(block) => {
new_block = block;
info!("[Block #{:?}]", new_block.block_number);
}
Event::PendingTx(mut pending_tx) => {
let frame = debug_trace_call(&provider, &new_block, &pending_tx).await;
match frame {
Ok(frame) => info!("{:?}", frame),
Err(e) => info!("{e:?}"),
}
}
},
_ => {}
}
}
}
运行:
cargo run
将会给你一个调用帧,看起来像这样:
![](https://img.learnblockchain.cn/attachments/migrate/1731979870426)
我们感兴趣的部分是日志。然而,作为追踪结果返回的调用栈是递归的,因此一个调用帧可以有多个其他调用,这是一系列其他调用帧。每个调用帧可以包含日志。
为了递归地从调用帧中提取日志,我们使用另一个辅助函数,该函数将在 **sandooo/src/sandwich/simulation.rs** 中定义:
```rust
pub fn extract_logs(call_frame: &CallFrame, logs: &mut Vec<CallLogFrame>) {
if let Some(ref logs_vec) = call_frame.logs {
logs.extend(logs_vec.iter().cloned());
}
if let Some(ref calls_vec) = call_frame.calls {
for call in calls_vec {
extract_logs(call, logs);
}
}
}
通过这个新函数,我们可以轻松地将日志扁平化为一个单一的向量。你可以尝试再次更新 strategy.rs 函数:
loop {
match event_receiver.recv().await {
Ok(event) => match event {
Event::Block(block) => {
new_block = block;
info!("[Block #{:?}]", new_block.block_number);
}
// 只需更新这一部分 👇
Event::PendingTx(mut pending_tx) => {
let frame = debug_trace_call(&provider, &new_block, &pending_tx).await;
match frame {
Ok(frame) => match frame {
Some(frame) => {
let mut logs = Vec::new();
extract_logs(&frame, &mut logs);
info!("{:?}", logs);
}
_ => {}
},
Err(e) => info!("{e:?}"),
}
}
},
_ => {}
}
}
尝试运行这个,你现在会看到所有的日志都被扁平化为一个单一的向量:
当然,一些追踪将没有日志。
下一步是过滤这些日志,并找出哪些待处理交易正在尝试在 Uniswap V2 DEX 上进行交换。(我们现在将重点放在 Uniswap V2 上,并在本系列的最后部分添加 V3。)
我们可以通过过滤出如下的 Swap 日志来实现:
通过访问 Etherscan,你可以知道 Swap 事件的 4 字节选择器是 0xd78ad95f,可以从 topic0 中看到:
Uniswap V2: USDT | 地址 0x0d4a11d5eeaac28ec3f61d100daf4d40471f1852 | Etherscan
因此,我们将在 sandooo/src/sandwich/simulation.rs 中添加另一个函数:
pub async fn extract_swap_info(
provider: &Arc<Provider<Ws>>,
new_block: &NewBlock,
pending_tx: &NewPendingTx,
pools_map: &HashMap<H160, Pool>,
) -> Result<Vec<SwapInfo>> {
let tx_hash = pending_tx.tx.hash;
let mut swap_info_vec = Vec::new();
let frame = debug_trace_call(provider, new_block, pending_tx).await?;
if frame.is_none() {
return Ok(swap_info_vec);
}
let frame = frame.unwrap();
let mut logs = Vec::new();
extract_logs(&frame, &mut logs);
for log in &logs {
match &log.topics {
Some(topics) => {
if topics.len() > 1 {
let selector = &format!("{:?}", topics[0])[0..10];
let is_v2_swap = selector == V2_SWAP_EVENT_ID;
if is_v2_swap {
let pair_address = log.address.unwrap();
// 仅过滤我们在内存中保留的池
let pool = pools_map.get(&pair_address);
if pool.is_none() {
continue;
}
let pool = pool.unwrap();
let token0 = pool.token0;
let token1 = pool.token1;
let token0_is_weth = is_weth(token0);
let token1_is_weth = is_weth(token1);
// 仅过滤 WETH 交易对
if !token0_is_weth && !token1_is_weth {
continue;
}
let (main_currency, target_token, token0_is_main) = if token0_is_weth {
(token0, token1, true)
} else {
(token1, token0, false)
};
let (in0, _, _, out1) = match ethers::abi::decode(
&[
ParamType::Uint(256),
ParamType::Uint(256),
ParamType::Uint(256),
ParamType::Uint(256),
],
log.data.as_ref().unwrap(),
) {
Ok(input) => {
let uints: Vec<U256> = input
.into_iter()
.map(|i| i.to_owned().into_uint().unwrap())
.collect();
(uints[0], uints[1], uints[2], uints[3])
}
_ => {
let zero = U256::zero();
(zero, zero, zero, zero)
}
};
let zero_for_one = (in0 > U256::zero()) && (out1 > U256::zero());
let direction = if token0_is_main {
if zero_for_one {
SwapDirection::Buy
} else {
SwapDirection::Sell
}
} else {
if zero_for_one {
SwapDirection::Sell
} else {
SwapDirection::Buy
}
};
let swap_info = SwapInfo {
tx_hash,
target_pair: pair_address,
main_currency,
target_token,
version: 2,
token0_is_main,
direction,
};
swap_info_vec.push(swap_info);
}
}
}
_ => {}
}
}
Ok(swap_info_vec)
}
我们在两个步骤中过滤日志:
一旦我们完成了这个,我们通过以下方式解码日志数据:
ethers::abi::decode(
&[
ParamType::Uint(256),
ParamType::Uint(256),
ParamType::Uint(256),
ParamType::Uint(256),
],
log.data.as_ref().unwrap(),
)
并提取出 amount0In, amount1In, amount0Out, amount1Out 值。
我们还可以利用这些数据确定交易是用于购买还是出售目标代币。我们将 目标代币 定义为与 WETH 代币(主货币)配对的代币。
现在让我们添加一种方法,在程序启动时更新 Uniswap V2 池及其相关的 ERC-20 代币,以便追踪 + 日志提取能够正常工作。
在 sandooo/src/common 中添加两个新文件:
sandooo/src/common/pools.rs at main · solidquant/sandooo
2. tokens.rs
sandooo/src/common/tokens.rs at main
3. bytecode.rs
sandooo/src/common/bytecode.rs at main
完成后,让我们再次更新 sandooo/src/sandwich/strategy.rs 文件:
pub async fn run_sandwich_strategy(provider: Arc<Provider<Ws>>, event_sender: Sender<Event>) {
let env = Env::new();
// load_all_pools:
// this will load all Uniswap V2 pools that was deployed after the block #10000000
let (pools, prev_pool_id) = load_all_pools(env.wss_url.clone(), 10000000, 50000)
.await
.unwrap();
// load_all_tokens:
// this will get all the token information including: name, symbol, symbol, totalSupply
let block_number = provider.get_block_number().await.unwrap();
let tokens_map = load_all_tokens(&provider, block_number, &pools, prev_pool_id)
.await
.unwrap();
info!("Tokens map count: {:?}", tokens_map.len());
// filter pools that don't have both token0 / token1 info
let pools_vec: Vec<Pool> = pools
.into_iter()
.filter(|p| {
let token0_exists = tokens_map.contains_key(&p.token0);
let token1_exists = tokens_map.contains_key(&p.token1);
token0_exists && token1_exists
})
.collect();
info!("Filtered pools by tokens count: {:?}", pools_vec.len());
let pools_map: HashMap<H160, Pool> = pools_vec
.clone()
.into_iter()
.map(|p| (p.address, p))
.collect();
let block = provider
.get_block(BlockNumber::Latest)
.await
.unwrap()
.unwrap();
let mut new_block = NewBlock {
block_number: block.number.unwrap(),
base_fee: block.base_fee_per_gas.unwrap(),
next_base_fee: calculate_next_block_base_fee(
block.gas_used,
block.gas_limit,
block.base_fee_per_gas.unwrap(),
),
};
let mut event_receiver = event_sender.subscribe();
loop {
match event_receiver.recv().await {
Ok(event) => match event {
Event::Block(block) => {
new_block = block;
info!("\[Block #{:?}\]", new_block.block_number);
}
Event::PendingTx(mut pending_tx) => {
let swap_info =
extract_swap_info(&provider, &new_block, &pending_tx, &pools_map).await;
info!("{:?}", swap_info);
}
},
_ => {}
}
}
}
👏👏👏 现在从根目录创建一个新目录:sandooo/cache. 这一部分很重要,因为 pools.rs 和 tokens.rs 将创建一个所有现有 Uniswap V2 池和代币的文件缓存,以便在我们重启系统时快速加载。
当你在创建 cache 目录后启动系统时,你会看到程序开始使用 RPC 节点端点加载池:
在让程序运行一段时间后(我使用全节点大约花了 30 分钟),它将开始打印我们从 geth 跟踪中提取的交换信息。
如你所见,我们得到了 target_pair, main_currency, target_token, 和正确的交换方向 的待处理交易。
我们终于准备好进入分析中更有趣的部分:理解三明治捆绑的利润和成本结构。
要回答这个问题,我们必须理解三明治捆绑的利润和成本分析是如何进行的。
我们将考虑最基本的三明治捆绑类型,如下所示:
理解三明治策略的简单形式对于深入了解更复杂的变体至关重要。我们最终会在将 V3 集成到我们的机器人时处理这些高级策略。但现在,让我们掌握基础知识。
三明治的概念非常简单:你在某人之前购买,并在那个人之后立即出售,以确保获得利润。 如果更多人购买某个代币,价格就会上涨,这就是三明治策略的利润部分如何运作。
由于我们现在能够监控来自 Uniswap V2 池的所有买入和卖出交易,因此我们可以尝试将它们分组为前置、受害者和后置交易的捆绑,并找出以下内容:
通过我们的模拟引擎。
与其理论化计算如何运作,不如实时构建一些捆绑并模拟它们,以查看我们是否真的能获利。让我们立即开始。
链上交易与在币安、Bybit 等中心化交易所的链下交易有所不同。我不会对哪种更具挑战性发表意见,因为这是一个主观问题。有些人认为在中心化交易所构建策略更困难,因为价格波动迅速,而另一些人则认为链上交易更复杂,因为较长的区块构建时间带来了新的挑战。这两种观点都有其合理性,在任何平台上实现盈利都不是一件容易的事。
然而,有一点是确定的:MEV 的执行方面比 CEX 交易复杂得多。
如果你想在 MEV 中获利,了解如何开发一个安全高效的智能合约至关重要。
对 Yul 不熟悉的读者可以参考我之前关于该主题的文章:
如何在你的 MEV 项目中使用 Yul 一份关于降低 gas 成本和使用汇编处理错误、转移代币、交换代币等的 A 到 Z 指南
我们需要一个合约来模拟我们的交易 (当然在实际交易中也是如此),帮助我们确定潜在收益和相关的 gas 成本。鉴于每个人都会有一个针对其策略量身定制的合约,预计会遇到各种利润和成本分析。
合约在这里提供:
sandooo/contracts/src/Sandooo.sol at main
这是一个非常简单的合约,使用 Foundry 编写,采用 Yul 语言。
我们将快速查看 fallback 函数:
fallback() external payable {
// We check that the msg.sender is the owner who deployed the contract
require(msg.sender == owner, "NOT_OWNER");
assembly {
let ptr := mload(0x40)
let end := calldatasize()
// the first 8 bytes (64 bits, uint64) of the calldata is the block_number
// we want to make sure that our transactions are valid only on
// the block that we've specified
let block_number := shr(192, calldataload(0))
if iszero(eq(block_number, number())) {
revert(0, 0)
}
// we can pass in multiple swap instructions
// which we'll use later when we group multiple sandwiches together
for {
let offset := 8
} lt(offset, end) {
} {
let zeroForOne := shr(248, calldataload(offset)) // 1 byte
let pair := shr(96, calldataload(add(offset, 1))) // 20 bytes
let tokenIn := shr(96, calldataload(add(offset, 21))) // 20 bytes
let amountIn := calldataload(add(offset, 41)) // 32 bytes
let amountOut := calldataload(add(offset, 73)) // 32 bytes
offset := add(offset, 105) // 1 + 20 + 20 + 32 + 32
// transfer tokenIn to pair contract first
mstore(ptr, TOKEN_TRANSFER_ID)
mstore(add(ptr, 4), pair)
mstore(add(ptr, 36), amountIn)
if iszero(call(gas(), tokenIn, 0, ptr, 68, 0, 0)) {
revert(0, 0)
}
// call swap function in UniswapV2Pair contract
// zeroForOne means the transaction is a swap going from token0 to token1
// Uniswap V2 swap function expects us to pass it in the amountOut value
// so if zeroForOne == 1 (true), the out token is token1
// and if zeroForOne == 0 (false), the out token is token0
mstore(ptr, V2_SWAP_ID)
switch zeroForOne
case 0 {
mstore(add(ptr, 4), amountOut)
mstore(add(ptr, 36), 0)
}
case 1 {
mstore(add(ptr, 4), 0)
mstore(add(ptr, 36), amountOut)
}
mstore(add(ptr, 68), address())
mstore(add(ptr, 100), 0x80)
if iszero(call(gas(), pair, 0, ptr, 164, 0, 0)) {
revert(0, 0)
}
}
}
}
现在我们已经准备好合约,终于可以进行一些真实的模拟了。
我们将总共运行三个模拟步骤:
你可能会想,为什么我们需要这么多模拟步骤,但你会看到它们在生态系统中都有其作用。
sandooo/src/sandwich/appetizer.rs at main
在开胃菜模拟中,我们尝试将 0.1 WETH 作为我们第一次购买交易(前置交易)的 amountIn,并尝试查看它是否有利可图。
这是为了减少我们在后续步骤中运行的模拟数量。如果三明治捆绑包甚至无法接受 0.1 WETH 作为输入,那么继续进行优化步骤就没有意义。
如果我们的开胃菜模拟通过了,那么我们想要在进入下一步之前找出优化后的 WETH amountIn 值。
你可以在这个文件中查看优化过程:
sandooo/src/sandwich/simulation.rs
通过二次搜索,我们可以找出可以用 WETH 购买的代币数量,以最大化我们的收益。
sandooo/src/sandwich/main_dish.rs
在我们完成优化步骤后,我们使用该结果运行另一个模拟并计算准确的收入值。
收入计算如下:
如果收入大于 0,则意味着三明治捆绑包可以覆盖我们的 gas 成本,因此我们使用此步骤的收入值计算贿赂金额。
运行包含所有这些组件的系统将产生以下 gif。尝试运行:
cargo run
这是一段相当的旅程。🙏 非常感谢你们的陪伴。
最初,我计划将其分解为几篇小文章,但我想强调逐行深入 GitHub 代码的重要性。自己运行程序并亲眼见证其成功运行是我推荐给每个人的绝佳实践。
在本系列的后续文章中,我们将深入探讨如何将捆绑包发送给像 Flashbots 这样的构建者。我们将比较我们的捆绑包与当前系统中竞争对手的表现,并探索我们可以在设置中编织的潜在优化。
下次见,大家!💥
如果觉得我的文章对您有用,请随意打赏。你的支持将鼓励我继续创作!