介绍 Spawned:Rust 的 Erlang 风格 Actor

本文介绍了基于 Rust 语言的 Actor 框架 Spawned,该框架深受 Erlang/OTP 的 gen_server 启发。它旨在简化 Rust 的并发编程,通过宏定义协议并允许开发者编写纯顺序逻辑的业务代码,由框架自动处理消息路由和生命周期,有效解决了传统 Rust 并发中锁竞争、Arc/Mutex 复杂性以及异步编程的痛点。

Joe Armstrong 的 Programming Erlang 第 22.1 节的标题是“通往通用服务器之路”,作者本人称其为全书最重要的一节。在这一节中,他构建了一个小型服务器框架——大约 15 行 Erlang 代码——负责处理进程的派生(spawn)、接收消息并将其分发给回调模块。然后他在其基础上编写了一个名称服务器:

-module(name_server).
init() -> dict:new().
handle({add, Name, Place}, Dict) -> {ok, dict:store(Name, Place, Dict)};
handle({find, Name}, Dict)       -> {dict:find(Name, Dict), Dict}.

这就是全部的回调逻辑——它创建了一个字典,存储条目并进行查找。接着 Armstrong 阐述了他的观点:

现在停下来思考一下。回调中没有任何关于并发的代码,没有 spawn,没有 send,没有 receive,也没有 register。它纯粹是顺序执行的代码——别无他物。这意味着我们可以编写客户端-服务器模型,而无需了解底层的并发模型。

通过一系列重构,他随后演进该服务器框架——增加了容错性、热代码升级——而完全没有触及回调代码。业务逻辑保持不变;改变的只有框架。这就是 Armstrong 推广 gen_server 的方式,它是 OTP 的核心行为(behavior),自 1998 年以来一直支撑着电信基础设施的运行。

Spawned 将同样的理念引入了 Rust。它是一个 Actor 框架,你的业务逻辑只需编写普通的 Rust 方法——无需 Channel,无需 Arc<Mutex<T>>,无需并发原语——框架会处理 Mailbox、消息路由和生命周期。

快速示例

这是一个在 Spawned 中实现的键值存储——经典的 Erlang 名称服务器:

use spawned_concurrency::{protocol, Response};

#[derive(Debug, Clone, PartialEq)]
pub enum FindResult {
    Found { value: String },
    NotFound,
}

#[protocol]
pub trait NameServerProtocol: Send + Sync {
    fn add(&self, key: String, value: String) -> Response<()>;
    fn find(&self, key: String) -> Response<FindResult>;
}

这就是全部的消息接口。#[protocol] 会生成消息结构体(AddFind)、一个类型擦除的引用类型(NameServerRef),以及相关的内部连接,使得任何处理这些消息的 Actor 都可以通过该 Trait 进行调用。

Actor 的实现:

use spawned_concurrency::{actor, tasks::{Actor, Context, Handler, ActorStart}};

pub struct NameServer {
    inner: HashMap<String, String>,
}

#[actor(protocol = NameServerProtocol)]
impl NameServer {
    pub fn new() -> Self {
        NameServer { inner: HashMap::new() }
    }

    #[request_handler]
    async fn handle_add(&mut self, msg: Add, _ctx: &Context<Self>) {
        self.inner.insert(msg.key, msg.value);
    }

    #[request_handler]
    async fn handle_find(&mut self, msg: Find, _ctx: &Context<Self>) -> FindResult {
        match self.inner.get(&msg.key) {
            Some(value) => FindResult::Found { value: value.clone() },
            None => FindResult::NotFound,
        }
    }
}

以及如何使用它:

let ns = NameServer::new().start();

ns.add("Joe".into(), "At Home".into()).await.unwrap();

let result = ns.find("Joe".into()).await.unwrap();
assert_eq!(result, FindResult::Found { value: "At Home".to_string() });

ns.add(...)ns.find(...) 是对 Actor 引用进行的常规方法调用。在幕后,宏会构造消息,通过 Mailbox 发送消息,并路由回响应——但这些都不会泄露到你的代码中。

Rust 中的并发很难

Rust 为你提供了编写正确并发代码的工具。但“正确”和“简单”并不是一回事。

共享状态

在 Rust 中处理共享可变状态的标准方法是 Arc<Mutex<T>>。它虽然有效,但增加了每个调用点的复杂度:

let state = Arc::new(Mutex::new(HashMap::new()));

// 每次访问:克隆 Arc,加锁,处理中毒 (poisoning)
let state = state.clone();
tokio::spawn(async move {
    let mut guard = state.lock().unwrap_or_else(|p| p.into_inner());
    guard.insert("key".into(), "value".into());
});

这只是一个保护单个 HashMap 的锁。在一个真实的系统中,你会拥有几十个这样的锁,现在你必须去推敲锁的顺序、竞争情况,以及当一个线程在持有锁的情况下发生 panic 时会发生什么。借用检查器(Borrow checker)在编译时防止了数据竞争,但它无法防止逻辑死锁,也无法告诉你锁的粒度是否错误。

Async 复杂度

Async Rust 解决了吞吐量问题——在小型线程池上运行成千上万个并发 Task——但它引入了自身的复杂度。Future 是必须满足 Send + 'static 才能跨越 Task 边界的状态机,这意味着要以新的方式与借用检查器抗争。在同步代码中运行良好的生命周期在加入 .await 的瞬间就会变成错误。Pin 出现在 Trait 签名中,让新手和资深开发者都感到困惑。你会遇到有色函数(colored functions)——异步和同步代码不易组合,一旦一层代码变成了异步,它往往会带动其他所有代码也变成异步。

此外还有运行时依赖问题。大多数异步 Rust 代码都假设使用 tokio——但并非每个项目都想用或能用 tokio。嵌入式系统、CLI 工具以及具有特定线程要求的代码库可能需要不同的选择。

自行实现 Channel 的方式

使用 Tokio 实现 Actor 模式是一个受欢迎的折中方案:给每个“Actor”一个 mpsc Channel,派生一个循环处理传入消息的 Task,并在该循环内管理状态。这避免了锁,但你最终会在各处编写相同的脚手架代码——Channel 设置、消息 Enum、接收循环、停机逻辑。每个 Actor 都是一个定制的基础设施。

Actor 能带给你什么

Actor 模型通过将隔离作为默认设置来规避这些问题。每个 Actor 拥有自己的状态,一次处理一条消息,并且完全通过消息传递与其他 Actor 通信。没有需要保护的共享内存,也没有需要排序的锁。

在处理程序内部,你的代码是顺序执行的——只有 &mut self 和消息。这正是 Armstrong 所描述的:回调是纯粹的顺序代码,而框架提供并发行为。在 Spawned 中,生成的调度使用与你手动构建的相同的基于 Channel 的架构——宏消除的是样板代码,而不是性能。

我们为什么要构建 Spawned

Rust 已经存在一些 Actor 框架——尤其是 ActixRactor。我们构建 Spawned 是因为我们希望完全控制框架的功能和方向。我们还有一些特定设计目标,而现有的选项并未完全与之契合。

紧贴 Erlang/OTP 惯例。 我们的团队在 Erlang 和 Elixir 构建系统方面拥有多年经验,并且在这些项目中获得了很高的成功率。这些经验教会了我们重视 OTP 背后的核心理念——业务逻辑与并发的分离、gen_server 回调结构、协议定义 Actor 之间清晰接口的方式。Spawned 的 API 直接模仿了 gen_server:Protocol 映射到模块导出,#[request_handler] 映射到 handle_call#[send_handler] 映射到 handle_cast,而调用 Actor 就是对引用的方法调用——就像在 Erlang 模块中调用客户端函数一样。

更简洁的 API 表面。 Ractor 要求 Actor 的所有消息都存在于单个 Enum 中,且每个变体中都要嵌入回复 Channel 作为 RpcReplyPort<T>。处理程序是对所有变体的单个 match。Actix 通过为每个消息类型提供单独的 Handler<M> 实现来避免这种情况,但每个消息仍需要 #[derive(Message)]#[rtype(result = "...")] 注解。在 Spawned 中,你编写一个带有方法的 Trait 并用 #[protocol] 注解——消息结构体、调度逻辑和类型擦除的引用都会自动生成。

协议级的类型擦除,而不仅仅是消息级。 Actix 提供了 Recipient<M>——一个作用域限于单个消息类型的类型擦除引用。如果你的 Protocol 有五个方法,你需要五个独立的 Recipient 值。Spawned 会生成一个单独的 Arc<dyn Protocol> 引用,通过一个值暴露所有方法。Ractor 通过 DerivedActorRef 提供了有限的类型擦除,但它要求每个 Actor 显式选择加入,而不是从框架中免费获得。下一节将展示这在实践中是什么样子的。

运行时独立性。 Spawned 中所有特定于运行时的代码都被隔离在 spawned-rt 这一薄抽象层之后。目前的实现使用 tokio,但这种架构设计使得更换不同的异步运行时,或者为 Actor 工作负载构建专用运行时,都不需要更改 Actor 或 Protocol 代码。

类型擦除的 Protocol 引用

考虑一个聊天应用。聊天室需要向用户传递消息,而用户需要向聊天室发送消息。使用具体类型时,这将在 ChatRoomUser 之间产生循环依赖。使用 Protocol 后,每一侧仅依赖于另一侧的接口:

#[protocol]
pub trait RoomProtocol: Send + Sync {
    fn say(&self, from: String, text: String) -> Result<(), ActorError>;
    fn add_member(&self, name: String, user: UserRef) -> Result<(), ActorError>;
    fn members(&self) -> Response<Vec<String>>;
}

#[protocol]
pub trait UserProtocol: Send + Sync {
    fn deliver(&self, from: String, text: String) -> Result<(), ActorError>;
    fn say(&self, text: String) -> Result<(), ActorError>;
    fn join_room(&self, room: RoomRef) -> Result<(), ActorError>;
}

请注意返回类型——这是 请求(Request)发送(Send) 之间的区别。返回 Response<T> 的方法是请求:调用者发送一条消息并等待回复。返回 Result<(), ActorError>(或没有返回类型)的方法是发送:即发即弃的消息,不会阻塞调用者。单个 Protocol 可以混合这两种类型,#[actor] 宏使用 #[request_handler]#[send_handler] 注解为每种类型生成正确的调度。

RoomRefUserRef 会自动生成为 Arc<dyn RoomProtocol>Arc<dyn UserProtocol>。Actor 持有 Protocol 引用而不是具体类型:

pub struct User {
    name: String,
    room: Option<RoomRef>,  // 任何实现了 RoomProtocol 的 Actor
}

pub struct ChatRoom {
    members: Vec<(String, UserRef)>,  // 任何实现了 UserProtocol 的 Actor
}

任何一个 Actor 都不需要知道另一个 Actor 的具体类型。它们纯粹通过 Protocol 接口通信——这正是 Erlang 通过 PID 和消息传递自然实现的模式。

let room = ChatRoom::new().start();
let alice = User::new("Alice".into()).start();
let bob = User::new("Bob".into()).start();

alice.join_room(room.to_room_ref()).unwrap();
bob.join_room(room.to_room_ref()).unwrap();

alice.say("Hello everyone!".into()).unwrap();
bob.say("Hey Alice!".into()).unwrap();

异步或线程 —— 同样的 API,由你选择

Spawned 可以在异步运行时或普通 OS 线程上运行 Actor,且使用相同的 API。

异步模式tasks):

use spawned_concurrency::tasks::{Actor, Context, Handler, ActorStart};
use spawned_rt::tasks as rt;

fn main() {
    rt::run(async {
        let ns = NameServer::new().start();
        let result = ns.find("Joe".into()).await.unwrap();
    })
}

线程模式threads):

use spawned_concurrency::threads::{Actor, Context, Handler, ActorStart};
use spawned_rt::threads as rt;

fn main() {
    rt::run(|| {
        let ns = NameServer::new().start();
        let result = ns.find("Joe".into()).unwrap();
    })
}

在这两种模式下,Actor 的实现是完全相同的。Response<T> 是桥梁——在 Task 模式下你对它使用 .await,在线程模式下你直接调用 .unwrap()

这具有实际意义:

  • 一个通过派生 Actor 来并行化工作的 CLI 工具 不需要设置异步运行时——线程模式为你提供基于普通 OS 线程的 Actor。
  • 一个具有固定帧率(tick)主循环的 游戏服务器 可以将线程模式的 Actor 用于后台系统(库存、匹配),而无需将游戏循环拉入异步。
  • 一个执行压缩、证明生成或物理模拟的 CPU 密集型 Actor 可以在专用 OS 线程上运行,异步运行时的调度程序无法干扰它——没有协作式让出,不与 IO 绑定任务共享线程池,只有对 CPU 核心的独占访问。

即使在异步模式下,Spawned 也允许你为每个 Actor 选择执行后端:

  • Backend::Async —— 在异步任务池上运行(默认,最适合 IO 密集型工作)
  • Backend::Blocking —— 在运行时的阻塞线程池上运行(用于会使异步任务饥饿的 CPU 密集型处理程序)
  • Backend::Thread —— 在专用 OS 线程上运行(用于需要线程亲和性或实时保证的 Actor)

错误处理和故障隔离

当 Actor 的处理程序发生 panic 时,Spawned 会捕获 panic,通过 tracing::error! 记录它,并停止该 Actor。发送给该 Actor 的后续消息将返回 ActorError::ActorStopped。系统中的其他 Actor 不受影响——panic 被限制在引发它的 Actor 内部。

这同样适用于生命周期Hook:如果 #[started] Hook发生 panic,Actor 会立即退出而不运行 #[stopped]

路线图的下一步是 监督树(Supervision trees),它将使恢复变得声明式且自动化。

Erlang 开发者会感到宾至如归

如果你曾使用过 Erlang/OTP,Spawned 的概念可以直接对应:

Erlang/OTP Spawned 描述
模块导出 (客户端 API) #[protocol] trait 公共消息接口
-behaviour(gen_server) #[actor] 声明一个 Actor 实现
handle_call/3 #[request_handler] 同步请求处理程序
handle_cast/2 #[send_handler] 即发即弃处理程序
init/1 #[started] 初始化回调
terminate/2 #[stopped] 清理回调
gen_server:call/2 ns.find(...) 直接方法调用
gen_server:cast/2 ns.notify(...) 直接方法调用 (发送)
Pid ActorRef<T> 运行中 Actor 的Handle
register/2 registry::register(name, ref) 按名称注册
whereis/1 registry::whereis(name) 按名称查找

下一步计划

  • 监督树 —— 当一个 Actor 崩溃时,谁来重启它?监督树使这变得声明式:你定义一个 Actor 树和一种重启策略(one-for-one、one-for-all、rest-for-one),框架会处理其余的工作。这一特性使得 Erlang 系统能够连续运行多年而无停机,它是我们路线图上优先级最高的项目。
  • 可观测性 —— 为 Actor Mailbox、消息延迟和生命周期事件内置仪表盘,这样你就可以在生产环境中查看 Actor 的运行状况,而无需为每个处理程序添加临时日志。
  • 自定义运行时 —— 一个为 Actor 工作负载量身定制的专用运行时,为那些需要更轻量级或更专业调度程序的团队替换当抢跑时。spawned-rt 抽象层旨在使这种切换变得无缝。
  • 确定性运行时 —— 一个能产生可复现执行轨迹的运行时,因此你可以精确地回放和调试 Actor 交互。灵感来自 commonware

开始使用

cargo add spawned-concurrency spawned-rt

如果你有任何问题或反馈,请在 GitHub 上提交 Issue 或在 X 上找到我们。

  • 原文链接: blog.lambdaclass.com/int...
  • 登链社区 AI 助手,为大家转译优秀英文文章,如有翻译不通的地方,还请包涵~
点赞 0
收藏 0
分享
本文参与登链社区写作激励计划 ,好文好收益,欢迎正在阅读的你也加入。

0 条评论

请先 登录 后评论
lambdaclass
lambdaclass
LambdaClass是一家风险投资工作室,致力于解决与分布式系统、机器学习、编译器和密码学相关的难题。