Reth 执行扩展

  • Paradigm
  • 发布于 2024-05-04 19:36
  • 阅读 102

本文介绍了Reth Execution Extensions (ExEx)框架,旨在简化以太坊节点的复杂离链基础设施构建,支持高性能的实时数据处理。文章详细探讨了ExEx的工作原理,包括如何创建和使用ExEx来构建索引器和Rollup,强调了其在性能和开发便捷性方面的优势。

大纲

Reth 是一个用于构建高性能和可定制节点的全功能工具包。我们最近发布了改进 Reth 性能超过 100 倍的 性能路线图,以及我们的测试网络 Rollup - Reth AlphaNet,用于推动 Reth 的模块化和可扩展性到极限。

今天,我们很高兴地宣布 Reth 执行扩展(ExEx)。ExEx 是一个构建高性能和复杂离链基础设施的框架,作为后执行Hook。Reth 的 ExEx 可以用来实现 Rollup、索引器、MEV 机器人等,代码量比现有方法少超过 10 倍。通过此发布,我们展示了一个生产就绪的重组追踪器,代码行数少于 20,一种索引器,代码行数少于 250,以及一种 Rollup,代码行数少于 1000。

ExEx 是与研究集体 init4 共同设计的,init4 正在构建下一代以太坊基础设施。我们期待继续与 init4 团队合作,使 Reth 成为构建加密基础设施的首选平台!

我们今天如何构建离链基础设施?

区块链是一个以固定时间间隔确认区块及其交易数据的时钟。离链基础设施订阅这些定期的区块更新,并作为响应更新其自身的内部状态。

例如,考虑以太坊索引器是如何工作的:

  1. 它订阅以太坊的事件,如区块和日志,通常通过 eth_subscribe 或通过 eth_getFilterChanges 进行轮询。
  2. 对于每个事件,它会检索 JSON-RPC 上所需的任何额外数据,例如区块及其交易的收据。
  3. 对于每个有效负载,它会根据地址或其关心的主题等配置解码所需的日志。
  4. 它将所有解码的数据写入数据库,例如 Postgres 或 Sqlite。

这是大型数据管道中常见的提取、转换和加载(ETL)模式,像 Fivetran 这样的公司负责数据提取,Snowflake 处理数据装载到数据仓库,而客户则专注于编写转换的业务逻辑。

我们观察到同样的模式也适用于其他加密基础设施元素,例如 Rollup、MEV 搜索器,或更复杂的数据基础设施,如 Shadow Logs。

以此为动力,我们识别出在为以太坊节点构建 ETL 管道时的关键挑战:

  1. 数据新鲜度:链重组意味着大多数基础设施通常处于链尖后面,以避免在可能不再是规范链的一部分的状态上进行操作。这在实际操作中意味着构建实时加密数据产品具有挑战性,尤其是许多产品的延迟(通常为多个区块,十几秒),相对于它们可以为客户提供的信息。我们相信这发生的原因是节点在重组感知通知流上没有很好的开发者体验。
  2. 性能:在不同系统之间移动数据、转换数据并对其进行拼接意味着性能开销不可忽视。例如,一个直接接入 Reth 数据库的 Reth 基础索引器 显示出比直接接入 JSON-RPC 的其他索引器提高了 1-2 个数量级,表明通过联置工作负载和移除中介通信层可以实现的显著性能改进。
  3. 操作复杂性:以高正常运行时间运行以太坊节点本身就是一个巨大的挑战。在其上运行额外基础设施会进一步加剧此问题,并要求开发者考虑作业编排 API 或为相对简单的任务运行多个服务。

迫切需要一个更好的 API 来构建依赖于节点状态变化的离链基础设施。该 API 必须是高性能的、集成全面的,并且能感知重组。我们需要建立以太坊 ETL 基础设施和作业编排的 Airflow 时刻。

介绍 Reth 执行扩展 (ExEx)

执行扩展 (ExEx) 是构建实时、高性能以及零操作离链基础设施的后执行Hook,基于 Reth。

执行扩展是一个从 Reth 状态衍生其状态的任务。此类状态衍生的示例包括 Rollup、索引器、MEV 提取器等。我们预计,开发者将构建可重用的 ExEx,这些 ExEx 以标准化方式进行组合,类似于 Cosmos SDK 模块或 Substrate 小部件的工作方式。

我们与 init4 团队 协同设计了执行扩展(请关注他们!),这是一个新兴的研究集体,正在构建下一代以太坊基础设施。我们期待继续与他们团队合作,将 ExEx 推向生产,并使 Reth 成为构建加密基础设施的首选平台!

我们仍处于构建 ExEx 的最佳实践早期阶段,欢迎开发者加入我们,探索构建离链加密基础设施的新前沿。如果有合作想法,请随时联系。

ExEx 是如何工作的?

在 Rust 术语中,ExEx 是一个与 Reth 一起无限运行的 Future。ExEx 是使用一个解构为 ExEx 的异步闭包初始化的。以下是预期的端到端流程:

  1. Reth 暴露了一个感知重组的流,称为 ExExNotification,其中 包含 已提交的区块和所有相关的交易和收据、状态变化以及与之相关的 Merkle 树更新列表。
  2. 预计开发者将通过编写 ExEx 作为 async 函数来消费该流,以获取状态,例如 Rollup 区块。该流在 追加 ExEx 状态 时暴露 ChainCommitted 变体,并提供 撤销任何变更 的 ChainReverted/Reorged 变体。这使 ExEx 可以在原生区块时间运行,同时提供安全处理重组的合理 API,而非不处理重组而引入延迟。
  3. ExEx 由 Reth 的 ExExManager 调度,该管理者负责将 Reth 的通知路由到 ExEx,并将 ExEx 事件返回到 Reth,同时 Reth 的任务执行器推动 ExEx 完成。
  4. 每个 ExEx 通过 Node Builder 的 install_exex API 在节点上安装。

以下是从节点开发者的角度看大致的样子:

上面的 <50 行代码片段封装了定义和安装 ExEx 的过程。它极其强大,允许在没有额外基础设施的情况下扩展以太坊节点的功能。

现在让我们逐步了解一些例子。

你好,ExEx!

执行扩展的 “Hello World” 是一个重组追踪器。下图所示的 ExEx 说明了记录是否出现新链或重组。人们只需解析以下 ExEx 发出的日志信息,便可以轻松地在其 Reth 节点上构建重组追踪器。

在这个例子中,oldnew 链在该区块范围内完全访问每一个状态变化,以及有关 Chain 结构的 Merkle 树更新和其他有用信息。

使用 ExEx 构建 OP 堆栈的索引器

既然我们已经了解了如何Hook节点事件,现在让我们构建一个更复杂的示例,例如 一个用于 OP 堆栈链中存款和取款的索引器,使用 SQlite 作为后端。

在这个案例中:

  1. 我们使用 Alloy 的 sol! 宏加载了 OP 堆栈的桥接合约,以生成类型安全的 ABI 解码器(这是一个非常强大的宏,我们鼓励开发者 深入了解)。
  2. 我们初始化 SQLite 连接并设置数据库表。
  3. 每当接收到 ExExNotification 时,我们会读取每个已提交区块的日志,解码它,然后将其插入到 SQLite 中。
  4. 如果 ExExNotification 是关于链重组的,则从 SQLite 表中删除相应的条目。

就这样!超级简单,可能是在 30 分钟内构建的本地托管实时索引器中性能最高的。请查看下面的代码,并查看 完整示例

使用 ExEx 构建 Rollup

现在让我们做一些更有趣的事情,构建一个最小的 Rollup 作为 ExEx,使用 EVM 运行时和 SQLite 作为后端!

如果你 拉远一点,甚至 Rollup 也是 ETL 式的管道:

  1. 提取在 L1 上发布的数据并转换为 L2 有效负载(例如 OP 堆栈派生函数)。
  2. 运行状态转换函数(例如 EVM)。
  3. 将更新后的状态写入持久存储。

在此示例中,我们演示了一个简化的 Rollup,将其状态派生自发布到 Zenith 的 RLP 编码 EVM 交易(一个 Holesky 智能合约,用于发布我们 Rollup 的块承诺)通过一个 简单的区块构建器,这两个项目都是由 init4 团队构建的。

该示例特别:

  1. 配置一个 EVM,并实例化一个 SQLite 数据库,并实施所需的 revm 数据库特征,以使用 SQLite 作为 EVM 后端。
  2. 过滤发送到已部署 Rollup 合约的交易,ABI 解码 calldata,然后将其 RLP 解码为 Rollup 区块,该区块将由配置的 EVM 执行。
  3. 将 EVM 执行的结果插入 SQLite。

同样,超级简单。它还可以与 blobs 一起工作!

ExEx Rollups 是极其强大的,因为我们现在可以在 Reth 上运行任意数量的 Rollups,而无需额外的基础设施,只需将其作为 ExEx 安装

我们正在努力扩展示例以支持 blobs,并提供内置排序器,以便于完成的端到端演示。如果这是你想构建的内容,请随时联系,因为我们认为这有可能引入 L2 PBS、去中心化/共享排序器或甚至基于 SGX 的排序器等功能。

下面是 示例 代码片段。

我可以用执行扩展构建什么?

这个问题可以重新表述为“什么可以建模为后执行Hook?”结果是,很多事情都可以!

我们看到了几种应该构建的有价值 ExEx:

  1. Rollup 派生管道,如 Kona,用于配置为 L2 使用的 EVM,类似于 Reth Alphanet 的 EVM 的设置。我们还预测,组合 L2 ExEx 将提供通往第 2 阶段 Rollup 去中心化的最快路径。期待我们在这里的更多内容。我们迫不及待想在 Reth 上运行 OP Mainnet、Base、Zora 和其他 Rollups 作为 ExEx。
  2. 与节点服务紧密集成的进程外 ExEx,通过 gRPC 铺平了多租户的道路,让 Reth 作为离链服务的控制平面。
  3. 替代的 VM 集成(例如 MoveVM 或 Arbitrum Stylus),或类似 ArtemisSGX RevmShadow Logs 的复杂执行管道。
  4. 基础设施与重新抵押结合的基础设施,如预言机和桥梁,或任何其他主动验证服务(AVS)。这得益于 ExEx 可以通过以太坊的 DiscV5 P2P 网络彼此对接,并具有除节点“最终化”通知以外的写权限选定参与者。
  5. 下一代辅助基础设施,如 AI 协处理器或去中心化/共享排序器。

执行扩展的下一步是什么?

目前,ExEx 需要在节点上通过在主函数中进行自定义构建来安装。我们的目标是使 ExEx 动态加载为插件,并暴露 Docker Hub 风格的 reth pull API,以便开发者可以轻松地通过空中分发 ExEx 给节点操作员。

我们希望使 Reth 成为一个平台,在核心节点操作上提供稳定性和性能,同时也成为创新的发射台。

Reth 项目有望改变人们对构建高性能离链基础设施的方法的看法,而 ExEx 只是一个开始。我们期待继续在 Reth 上构建基础设施并对其进行投资。

如果你想构建 ExEx 项目,请联系 georgios@paradigm.xyz,或直接在 Github 上 贡献

use futures::Future;
use reth_exex::{ExExContext, ExExEvent, ExExNotification};
use reth_node_api::FullNodeComponents;
use reth_node_ethereum::EthereumNode;

// `ExExContext` 可在每个 ExEx 中使用,以与节点的其余部分接口。
//
// pub struct ExExContext<Node: FullNodeComponents> {
//     /// 用于与区块链交互的配置提供者。
//     pub provider: Node::Provider,
//     /// 节点的任务执行器。
//     pub task_executor: TaskExecutor,
//     /// 节点的交易池。
//     pub pool: Node::Pool,
//     /// 接收 [`ExExNotification`] 的通道。
//     pub notifications: Receiver<ExExNotification>,
//     // .. 其他有用的上下文字段
// }
async fn exex<Node: FullNodeComponents>(mut ctx: ExExContext<Node>) -> eyre::Result<()> {
    while let Some(notification) = ctx.notifications.recv().await {
        match &notification {
            ExExNotification::ChainCommitted { new } => {
                // 做一些事情
            }
            ExExNotification::ChainReorged { old, new } => {
                // 做一些事情
            }
            ExExNotification::ChainReverted { old } => {
                // 做一些事情
            }
        };
    }
    Ok(())
}

fn main() -> eyre::Result<()> {
    reth::cli::Cli::parse_args().run(|builder, _| async move {
        let handle = builder
            .node(EthereumNode::default())
            .install_exex("Minimal", |ctx| async move { exex(ctx) } )
            .launch()
            .await?;

        handle.wait_for_node_exit().await
    })
}
async fn exex<Node: FullNodeComponents>(mut ctx: ExExContext<Node>) -> eyre::Result<()> {
    while let Some(notification) = ctx.notifications.recv().await {
        match &notification {
            ExExNotification::ChainCommitted { new } => {
                info!(committed_chain = ?new.range(), "Received commit");
            }
            ExExNotification::ChainReorged { old, new } => {
                info!(from_chain = ?old.range(), to_chain = ?new.range(), "Received reorg");
            }
            ExExNotification::ChainReverted { old } => {
                info!(reverted_chain = ?old.range(), "Received revert");
            }
        };

        if let Some(committed_chain) = notification.committed_chain() {
            ctx.events.send(ExExEvent::FinishedHeight(committed_chain.tip().number))?;
        }
    }
    Ok(())
}
use alloy_sol_types::{sol, SolEventInterface};
use futures::Future;
use reth_exex::{ExExContext, ExExEvent};
use reth_node_api::FullNodeComponents;
use reth_node_ethereum::EthereumNode;
use reth_primitives::{Log, SealedBlockWithSenders, TransactionSigned};
use reth_provider::Chain;
use reth_tracing::tracing::info;
use rusqlite::Connection;

sol!(L1StandardBridge, "l1_standard_bridge_abi.json");
use crate::L1StandardBridge::{ETHBridgeFinalized, ETHBridgeInitiated, L1StandardBridgeEvents};

fn create_tables(connection: &mut Connection) -> rusqlite::Result<()> {
    connection.execute(
        r#"
            CREATE TABLE IF NOT EXISTS deposits (
                id               INTEGER PRIMARY KEY,
                block_number     INTEGER NOT NULL,
                tx_hash          TEXT NOT NULL UNIQUE,
                contract_address TEXT NOT NULL,
                "from"           TEXT NOT NULL,
                "to"             TEXT NOT NULL,
                amount           TEXT NOT NULL
            );
            "#,
        (),
    )?;
    // .. 其余的数据库初始化

    Ok(())
}

/// 一个 ExEx 示例,监听来自 OP 堆栈链的 ETH 桥接事件
/// 并在 SQLite 数据库中存储存款和取款。
async fn op_bridge_exex<Node: FullNodeComponents>(
    mut ctx: ExExContext<Node>,
    connection: Connection,
) -> eyre::Result<()> {
    // 处理所有新的链状态通知
    while let Some(notification) = ctx.notifications.recv().await {
        // 撤销所有存款和取款
        if let Some(reverted_chain) = notification.reverted_chain() {
            // ..
        }

        // 插入所有新的存款和取款
        if let Some(committed_chain) = notification.committed_chain() {
            // ..
        }
    }

    Ok(())
}

/// 将区块链解码为扁平的收据日志列表,并仅过滤
/// [L1StandardBridgeEvents]。
fn decode_chain_into_events(
    chain: &Chain,
) -> impl Iterator<Item = (&SealedBlockWithSenders, &TransactionSigned, &Log, L1StandardBridgeEvents)>
{
    chain
        // 获取所有区块和收据
        .blocks_and_receipts()
        // .. 继续解码
}

fn main() -> eyre::Result<()> {
    reth::cli::Cli::parse_args().run(|builder, _| async move {
        let handle = builder
            .node(EthereumNode::default())
            .install_exex("OPBridge", |ctx| async move {
                let connection = Connection::open("op_bridge.db")?;
              	create_tables(&mut connection)?;
                Ok(op_bridge_exex(ctx, connection))
            })
            .launch()
            .await?;

        handle.wait_for_node_exit().await
    })
}

use alloy_rlp::Decodable;
use alloy_sol_types::{sol, SolEventInterface, SolInterface};
use db::Database;
use eyre::OptionExt;
use once_cell::sync::Lazy;
use reth_exex::{ExExContext, ExExEvent};
use reth_interfaces::executor::BlockValidationError;
use reth_node_api::{ConfigureEvm, ConfigureEvmEnv, FullNodeComponents};
use reth_node_ethereum::{EthEvmConfig, EthereumNode};
use reth_primitives::{
    address, constants,
    revm::env::fill_tx_env,
    revm_primitives::{CfgEnvWithHandlerCfg, EVMError, ExecutionResult, ResultAndState},
    Address, Block, BlockWithSenders, Bytes, ChainSpec, ChainSpecBuilder, Genesis, Hardfork,
    Header, Receipt, SealedBlockWithSenders, TransactionSigned, U256,
};
use reth_provider::Chain;
use reth_revm::{
    db::{states::bundle_state::BundleRetention, BundleState},
    DatabaseCommit, StateBuilder,
};
use reth_tracing::tracing::{debug, error, info};
use rusqlite::Connection;
use std::sync::Arc;

mod db;

sol!(RollupContract, "rollup_abi.json");
use RollupContrac:{RollupContractCalls, RollupContractEvents};

const DATABASE_PATH: &str = "rollup.db";
const ROLLUP_CONTRACT_ADDRESS: Address = address!("74ae65DF20cB0e3BF8c022051d0Cdd79cc60890C");
const ROLLUP_SUBMITTER_ADDRESS: Address = address!("B01042Db06b04d3677564222010DF5Bd09C5A947");
const CHAIN_ID: u64 = 17001;
static CHAIN_SPEC: Lazy<Arc<ChainSpec>> = Lazy::new(|| {
    Arc::new(
        ChainSpecBuilder::default()
            .chain(CHAIN_ID.into())
            .genesis(Genesis::clique_genesis(CHAIN_ID, ROLLUP_SUBMITTER_ADDRESS))
            .shanghai_activated()
            .build(),
    )
});

struct Rollup<Node: FullNodeComponents> {
    ctx: ExExContext<Node>,
    db: Database,
}

impl<Node: FullNodeComponents> Rollup<Node> {
    fn new(ctx: ExExContext<Node>, connection: Connection) -> eyre::Result<Self> {
        let db = Database::new(connection)?;
        Ok(Self { ctx, db })
    }

    async fn start(mut self) -> eyre::Result<()> {
        // 处理所有新的链状态通知
        while let Some(notification) = self.ctx.notifications.recv().await {
            if let Some(reverted_chain) = notification.reverted_chain() {
                self.revert(&reverted_chain)?;
            }

            if let Some(committed_chain) = notification.committed_chain() {
                self.commit(&committed_chain)?;
                self.ctx.events.send(ExExEvent::FinishedHeight(committed_chain.tip().number))?;
            }
        }

        Ok(())
    }

    /// 处理新的链提交。
    ///
    /// 此函数将所有交易解码到 Rollup 合约中为事件,执行相应的动作,并将结果插入数据库。
    fn commit(&mut self, chain: &Chain) -> eyre::Result<()> {
        let events = decode_chain_into_rollup_events(chain);

        for (_, tx, event) in events {
            match event {
                // 一个新的区块提交到 Rollup 合约。
                // 此区块在现有 Rollup 状态之上执行,并提交到数据库中。
                RollupContractEvents::BlockSubmitted(_) => {
                    // ..
                }
                // 对 Rollup 合约的 ETH 存款。此存款将添加到接收者的余额中并提交到数据库中。
                RollupContractEvents::Enter(RollupContract::Enter {
                    token,
                    rollupRecipient,
                    amount,
                }) => {
                    // ..
                }
                _ => (),
            }
        }

        Ok(())
    }

    /// 处理链撤销。
    ///
    /// 此函数将所有交易解码到 Rollup 合约中为事件,撤销相应的动作并更新数据库。
    fn revert(&mut self, chain: &Chain) -> eyre::Result<()> {
        let mut events = decode_chain_into_rollup_events(chain);
        // 逆转事件顺序以从提示开始撤销
        events.reverse();

        for (_, tx, event) in events {
            match event {
                // 从数据库中撤销区块。
                RollupContractEvents::BlockSubmitted(_) => {
                    // ..
                }
                // 从接收者的余额中减去存款。
                RollupContractEvents::Enter(RollupContract::Enter {
                    token,
                    rollupRecipient,
                    amount,
                }) => {
                    // ..
                }
                _ => (),
            }
        }

        Ok(())
    }
}

fn main() -> eyre::Result<()> {
    reth::cli::Cli::parse_args().run(|builder, _| async move {
        let handle = builder
            .node(EthereumNode::default())
            .install_exex("Rollup", move |ctx| async {
                let connection = Connection::open(DATABASE_PATH)?;
                Ok(Rollup::new(ctx, connection)?.start())
            })
            .launch()
            .await?;

        handle.wait_for_node_exit().await
    })
}
  • 原文链接: paradigm.xyz/2024/05/ret...
  • 登链社区 AI 助手,为大家转译优秀英文文章,如有翻译不通的地方,还请包涵~
点赞 0
收藏 0
分享
本文参与登链社区写作激励计划 ,好文好收益,欢迎正在阅读的你也加入。

0 条评论

请先 登录 后评论