本文介绍了Reth Execution Extensions (ExEx)框架,旨在简化以太坊节点的复杂离链基础设施构建,支持高性能的实时数据处理。文章详细探讨了ExEx的工作原理,包括如何创建和使用ExEx来构建索引器和Rollup,强调了其在性能和开发便捷性方面的优势。
Reth 是一个用于构建高性能和可定制节点的全功能工具包。我们最近发布了改进 Reth 性能超过 100 倍的 性能路线图,以及我们的测试网络 Rollup - Reth AlphaNet,用于推动 Reth 的模块化和可扩展性到极限。
今天,我们很高兴地宣布 Reth 执行扩展(ExEx)。ExEx 是一个构建高性能和复杂离链基础设施的框架,作为后执行Hook。Reth 的 ExEx 可以用来实现 Rollup、索引器、MEV 机器人等,代码量比现有方法少超过 10 倍。通过此发布,我们展示了一个生产就绪的重组追踪器,代码行数少于 20,一种索引器,代码行数少于 250,以及一种 Rollup,代码行数少于 1000。
ExEx 是与研究集体 init4 共同设计的,init4 正在构建下一代以太坊基础设施。我们期待继续与 init4 团队合作,使 Reth 成为构建加密基础设施的首选平台!
区块链是一个以固定时间间隔确认区块及其交易数据的时钟。离链基础设施订阅这些定期的区块更新,并作为响应更新其自身的内部状态。
例如,考虑以太坊索引器是如何工作的:
eth_subscribe
或通过 eth_getFilterChanges
进行轮询。这是大型数据管道中常见的提取、转换和加载(ETL)模式,像 Fivetran 这样的公司负责数据提取,Snowflake 处理数据装载到数据仓库,而客户则专注于编写转换的业务逻辑。
我们观察到同样的模式也适用于其他加密基础设施元素,例如 Rollup、MEV 搜索器,或更复杂的数据基础设施,如 Shadow Logs。
以此为动力,我们识别出在为以太坊节点构建 ETL 管道时的关键挑战:
迫切需要一个更好的 API 来构建依赖于节点状态变化的离链基础设施。该 API 必须是高性能的、集成全面的,并且能感知重组。我们需要建立以太坊 ETL 基础设施和作业编排的 Airflow 时刻。
执行扩展 (ExEx) 是构建实时、高性能以及零操作离链基础设施的后执行Hook,基于 Reth。
执行扩展是一个从 Reth 状态衍生其状态的任务。此类状态衍生的示例包括 Rollup、索引器、MEV 提取器等。我们预计,开发者将构建可重用的 ExEx,这些 ExEx 以标准化方式进行组合,类似于 Cosmos SDK 模块或 Substrate 小部件的工作方式。
我们与 init4 团队 协同设计了执行扩展(请关注他们!),这是一个新兴的研究集体,正在构建下一代以太坊基础设施。我们期待继续与他们团队合作,将 ExEx 推向生产,并使 Reth 成为构建加密基础设施的首选平台!
我们仍处于构建 ExEx 的最佳实践早期阶段,欢迎开发者加入我们,探索构建离链加密基础设施的新前沿。如果有合作想法,请随时联系。
在 Rust 术语中,ExEx 是一个与 Reth 一起无限运行的 Future。ExEx 是使用一个解构为 ExEx 的异步闭包初始化的。以下是预期的端到端流程:
ExExNotification
,其中 包含 已提交的区块和所有相关的交易和收据、状态变化以及与之相关的 Merkle 树更新列表。async
函数来消费该流,以获取状态,例如 Rollup 区块。该流在 追加 ExEx 状态 时暴露 ChainCommitted 变体,并提供 撤销任何变更 的 ChainReverted/Reorged 变体。这使 ExEx 可以在原生区块时间运行,同时提供安全处理重组的合理 API,而非不处理重组而引入延迟。ExExManager
调度,该管理者负责将 Reth 的通知路由到 ExEx,并将 ExEx 事件返回到 Reth,同时 Reth 的任务执行器推动 ExEx 完成。install_exex
API 在节点上安装。以下是从节点开发者的角度看大致的样子:
上面的 <50 行代码片段封装了定义和安装 ExEx 的过程。它极其强大,允许在没有额外基础设施的情况下扩展以太坊节点的功能。
现在让我们逐步了解一些例子。
执行扩展的 “Hello World” 是一个重组追踪器。下图所示的 ExEx 说明了记录是否出现新链或重组。人们只需解析以下 ExEx 发出的日志信息,便可以轻松地在其 Reth 节点上构建重组追踪器。
在这个例子中,old
和 new
链在该区块范围内完全访问每一个状态变化,以及有关 Chain
结构的 Merkle 树更新和其他有用信息。
既然我们已经了解了如何Hook节点事件,现在让我们构建一个更复杂的示例,例如 一个用于 OP 堆栈链中存款和取款的索引器,使用 SQlite 作为后端。
在这个案例中:
sol!
宏加载了 OP 堆栈的桥接合约,以生成类型安全的 ABI 解码器(这是一个非常强大的宏,我们鼓励开发者 深入了解)。ExExNotification
时,我们会读取每个已提交区块的日志,解码它,然后将其插入到 SQLite 中。ExExNotification
是关于链重组的,则从 SQLite 表中删除相应的条目。就这样!超级简单,可能是在 30 分钟内构建的本地托管实时索引器中性能最高的。请查看下面的代码,并查看 完整示例。
现在让我们做一些更有趣的事情,构建一个最小的 Rollup 作为 ExEx,使用 EVM 运行时和 SQLite 作为后端!
如果你 拉远一点,甚至 Rollup 也是 ETL 式的管道:
在此示例中,我们演示了一个简化的 Rollup,将其状态派生自发布到 Zenith 的 RLP 编码 EVM 交易(一个 Holesky 智能合约,用于发布我们 Rollup 的块承诺)通过一个 简单的区块构建器,这两个项目都是由 init4 团队构建的。
该示例特别:
revm
数据库特征,以使用 SQLite 作为 EVM 后端。同样,超级简单。它还可以与 blobs 一起工作!
ExEx Rollups 是极其强大的,因为我们现在可以在 Reth 上运行任意数量的 Rollups,而无需额外的基础设施,只需将其作为 ExEx 安装。
我们正在努力扩展示例以支持 blobs,并提供内置排序器,以便于完成的端到端演示。如果这是你想构建的内容,请随时联系,因为我们认为这有可能引入 L2 PBS、去中心化/共享排序器或甚至基于 SGX 的排序器等功能。
下面是 示例 代码片段。
这个问题可以重新表述为“什么可以建模为后执行Hook?”结果是,很多事情都可以!
我们看到了几种应该构建的有价值 ExEx:
目前,ExEx 需要在节点上通过在主函数中进行自定义构建来安装。我们的目标是使 ExEx 动态加载为插件,并暴露 Docker Hub 风格的 reth pull
API,以便开发者可以轻松地通过空中分发 ExEx 给节点操作员。
我们希望使 Reth 成为一个平台,在核心节点操作上提供稳定性和性能,同时也成为创新的发射台。
Reth 项目有望改变人们对构建高性能离链基础设施的方法的看法,而 ExEx 只是一个开始。我们期待继续在 Reth 上构建基础设施并对其进行投资。
如果你想构建 ExEx 项目,请联系 georgios@paradigm.xyz,或直接在 Github 上 贡献。

use futures::Future;
use reth_exex::{ExExContext, ExExEvent, ExExNotification};
use reth_node_api::FullNodeComponents;
use reth_node_ethereum::EthereumNode;
// `ExExContext` 可在每个 ExEx 中使用,以与节点的其余部分接口。
//
// pub struct ExExContext<Node: FullNodeComponents> {
// /// 用于与区块链交互的配置提供者。
// pub provider: Node::Provider,
// /// 节点的任务执行器。
// pub task_executor: TaskExecutor,
// /// 节点的交易池。
// pub pool: Node::Pool,
// /// 接收 [`ExExNotification`] 的通道。
// pub notifications: Receiver<ExExNotification>,
// // .. 其他有用的上下文字段
// }
async fn exex<Node: FullNodeComponents>(mut ctx: ExExContext<Node>) -> eyre::Result<()> {
while let Some(notification) = ctx.notifications.recv().await {
match ¬ification {
ExExNotification::ChainCommitted { new } => {
// 做一些事情
}
ExExNotification::ChainReorged { old, new } => {
// 做一些事情
}
ExExNotification::ChainReverted { old } => {
// 做一些事情
}
};
}
Ok(())
}
fn main() -> eyre::Result<()> {
reth::cli::Cli::parse_args().run(|builder, _| async move {
let handle = builder
.node(EthereumNode::default())
.install_exex("Minimal", |ctx| async move { exex(ctx) } )
.launch()
.await?;
handle.wait_for_node_exit().await
})
}

async fn exex<Node: FullNodeComponents>(mut ctx: ExExContext<Node>) -> eyre::Result<()> {
while let Some(notification) = ctx.notifications.recv().await {
match ¬ification {
ExExNotification::ChainCommitted { new } => {
info!(committed_chain = ?new.range(), "Received commit");
}
ExExNotification::ChainReorged { old, new } => {
info!(from_chain = ?old.range(), to_chain = ?new.range(), "Received reorg");
}
ExExNotification::ChainReverted { old } => {
info!(reverted_chain = ?old.range(), "Received revert");
}
};
if let Some(committed_chain) = notification.committed_chain() {
ctx.events.send(ExExEvent::FinishedHeight(committed_chain.tip().number))?;
}
}
Ok(())
}

use alloy_sol_types::{sol, SolEventInterface};
use futures::Future;
use reth_exex::{ExExContext, ExExEvent};
use reth_node_api::FullNodeComponents;
use reth_node_ethereum::EthereumNode;
use reth_primitives::{Log, SealedBlockWithSenders, TransactionSigned};
use reth_provider::Chain;
use reth_tracing::tracing::info;
use rusqlite::Connection;
sol!(L1StandardBridge, "l1_standard_bridge_abi.json");
use crate::L1StandardBridge::{ETHBridgeFinalized, ETHBridgeInitiated, L1StandardBridgeEvents};
fn create_tables(connection: &mut Connection) -> rusqlite::Result<()> {
connection.execute(
r#"
CREATE TABLE IF NOT EXISTS deposits (
id INTEGER PRIMARY KEY,
block_number INTEGER NOT NULL,
tx_hash TEXT NOT NULL UNIQUE,
contract_address TEXT NOT NULL,
"from" TEXT NOT NULL,
"to" TEXT NOT NULL,
amount TEXT NOT NULL
);
"#,
(),
)?;
// .. 其余的数据库初始化
Ok(())
}
/// 一个 ExEx 示例,监听来自 OP 堆栈链的 ETH 桥接事件
/// 并在 SQLite 数据库中存储存款和取款。
async fn op_bridge_exex<Node: FullNodeComponents>(
mut ctx: ExExContext<Node>,
connection: Connection,
) -> eyre::Result<()> {
// 处理所有新的链状态通知
while let Some(notification) = ctx.notifications.recv().await {
// 撤销所有存款和取款
if let Some(reverted_chain) = notification.reverted_chain() {
// ..
}
// 插入所有新的存款和取款
if let Some(committed_chain) = notification.committed_chain() {
// ..
}
}
Ok(())
}
/// 将区块链解码为扁平的收据日志列表,并仅过滤
/// [L1StandardBridgeEvents]。
fn decode_chain_into_events(
chain: &Chain,
) -> impl Iterator<Item = (&SealedBlockWithSenders, &TransactionSigned, &Log, L1StandardBridgeEvents)>
{
chain
// 获取所有区块和收据
.blocks_and_receipts()
// .. 继续解码
}
fn main() -> eyre::Result<()> {
reth::cli::Cli::parse_args().run(|builder, _| async move {
let handle = builder
.node(EthereumNode::default())
.install_exex("OPBridge", |ctx| async move {
let connection = Connection::open("op_bridge.db")?;
create_tables(&mut connection)?;
Ok(op_bridge_exex(ctx, connection))
})
.launch()
.await?;
handle.wait_for_node_exit().await
})
}

use alloy_rlp::Decodable;
use alloy_sol_types::{sol, SolEventInterface, SolInterface};
use db::Database;
use eyre::OptionExt;
use once_cell::sync::Lazy;
use reth_exex::{ExExContext, ExExEvent};
use reth_interfaces::executor::BlockValidationError;
use reth_node_api::{ConfigureEvm, ConfigureEvmEnv, FullNodeComponents};
use reth_node_ethereum::{EthEvmConfig, EthereumNode};
use reth_primitives::{
address, constants,
revm::env::fill_tx_env,
revm_primitives::{CfgEnvWithHandlerCfg, EVMError, ExecutionResult, ResultAndState},
Address, Block, BlockWithSenders, Bytes, ChainSpec, ChainSpecBuilder, Genesis, Hardfork,
Header, Receipt, SealedBlockWithSenders, TransactionSigned, U256,
};
use reth_provider::Chain;
use reth_revm::{
db::{states::bundle_state::BundleRetention, BundleState},
DatabaseCommit, StateBuilder,
};
use reth_tracing::tracing::{debug, error, info};
use rusqlite::Connection;
use std::sync::Arc;
mod db;
sol!(RollupContract, "rollup_abi.json");
use RollupContrac:{RollupContractCalls, RollupContractEvents};
const DATABASE_PATH: &str = "rollup.db";
const ROLLUP_CONTRACT_ADDRESS: Address = address!("74ae65DF20cB0e3BF8c022051d0Cdd79cc60890C");
const ROLLUP_SUBMITTER_ADDRESS: Address = address!("B01042Db06b04d3677564222010DF5Bd09C5A947");
const CHAIN_ID: u64 = 17001;
static CHAIN_SPEC: Lazy<Arc<ChainSpec>> = Lazy::new(|| {
Arc::new(
ChainSpecBuilder::default()
.chain(CHAIN_ID.into())
.genesis(Genesis::clique_genesis(CHAIN_ID, ROLLUP_SUBMITTER_ADDRESS))
.shanghai_activated()
.build(),
)
});
struct Rollup<Node: FullNodeComponents> {
ctx: ExExContext<Node>,
db: Database,
}
impl<Node: FullNodeComponents> Rollup<Node> {
fn new(ctx: ExExContext<Node>, connection: Connection) -> eyre::Result<Self> {
let db = Database::new(connection)?;
Ok(Self { ctx, db })
}
async fn start(mut self) -> eyre::Result<()> {
// 处理所有新的链状态通知
while let Some(notification) = self.ctx.notifications.recv().await {
if let Some(reverted_chain) = notification.reverted_chain() {
self.revert(&reverted_chain)?;
}
if let Some(committed_chain) = notification.committed_chain() {
self.commit(&committed_chain)?;
self.ctx.events.send(ExExEvent::FinishedHeight(committed_chain.tip().number))?;
}
}
Ok(())
}
/// 处理新的链提交。
///
/// 此函数将所有交易解码到 Rollup 合约中为事件,执行相应的动作,并将结果插入数据库。
fn commit(&mut self, chain: &Chain) -> eyre::Result<()> {
let events = decode_chain_into_rollup_events(chain);
for (_, tx, event) in events {
match event {
// 一个新的区块提交到 Rollup 合约。
// 此区块在现有 Rollup 状态之上执行,并提交到数据库中。
RollupContractEvents::BlockSubmitted(_) => {
// ..
}
// 对 Rollup 合约的 ETH 存款。此存款将添加到接收者的余额中并提交到数据库中。
RollupContractEvents::Enter(RollupContract::Enter {
token,
rollupRecipient,
amount,
}) => {
// ..
}
_ => (),
}
}
Ok(())
}
/// 处理链撤销。
///
/// 此函数将所有交易解码到 Rollup 合约中为事件,撤销相应的动作并更新数据库。
fn revert(&mut self, chain: &Chain) -> eyre::Result<()> {
let mut events = decode_chain_into_rollup_events(chain);
// 逆转事件顺序以从提示开始撤销
events.reverse();
for (_, tx, event) in events {
match event {
// 从数据库中撤销区块。
RollupContractEvents::BlockSubmitted(_) => {
// ..
}
// 从接收者的余额中减去存款。
RollupContractEvents::Enter(RollupContract::Enter {
token,
rollupRecipient,
amount,
}) => {
// ..
}
_ => (),
}
}
Ok(())
}
}
fn main() -> eyre::Result<()> {
reth::cli::Cli::parse_args().run(|builder, _| async move {
let handle = builder
.node(EthereumNode::default())
.install_exex("Rollup", move |ctx| async {
let connection = Connection::open(DATABASE_PATH)?;
Ok(Rollup::new(ctx, connection)?.start())
})
.launch()
.await?;
handle.wait_for_node_exit().await
})
}
- 原文链接: paradigm.xyz/2024/05/ret...
- 登链社区 AI 助手,为大家转译优秀英文文章,如有翻译不通的地方,还请包涵~
如果觉得我的文章对您有用,请随意打赏。你的支持将鼓励我继续创作!