本文介绍了基于 Rust 语言的 Actor 框架 Spawned,该框架深受 Erlang/OTP 的 gen_server 启发。它旨在简化 Rust 的并发编程,通过宏定义协议并允许开发者编写纯顺序逻辑的业务代码,由框架自动处理消息路由和生命周期,有效解决了传统 Rust 并发中锁竞争、Arc/Mutex 复杂性以及异步编程的痛点。
Joe Armstrong 的 Programming Erlang 第 22.1 节的标题是“通往通用服务器之路”,作者本人称其为全书最重要的一节。在这一节中,他构建了一个小型服务器框架——大约 15 行 Erlang 代码——负责处理进程的派生(spawn)、接收消息并将其分发给回调模块。然后他在其基础上编写了一个名称服务器:
-module(name_server).
init() -> dict:new().
handle({add, Name, Place}, Dict) -> {ok, dict:store(Name, Place, Dict)};
handle({find, Name}, Dict) -> {dict:find(Name, Dict), Dict}.
这就是全部的回调逻辑——它创建了一个字典,存储条目并进行查找。接着 Armstrong 阐述了他的观点:
现在停下来思考一下。回调中没有任何关于并发的代码,没有 spawn,没有 send,没有 receive,也没有 register。它纯粹是顺序执行的代码——别无他物。这意味着我们可以编写客户端-服务器模型,而无需了解底层的并发模型。
通过一系列重构,他随后演进该服务器框架——增加了容错性、热代码升级——而完全没有触及回调代码。业务逻辑保持不变;改变的只有框架。这就是 Armstrong 推广 gen_server 的方式,它是 OTP 的核心行为(behavior),自 1998 年以来一直支撑着电信基础设施的运行。
Spawned 将同样的理念引入了 Rust。它是一个 Actor 框架,你的业务逻辑只需编写普通的 Rust 方法——无需 Channel,无需 Arc<Mutex<T>>,无需并发原语——框架会处理 Mailbox、消息路由和生命周期。
这是一个在 Spawned 中实现的键值存储——经典的 Erlang 名称服务器:
use spawned_concurrency::{protocol, Response};
#[derive(Debug, Clone, PartialEq)]
pub enum FindResult {
Found { value: String },
NotFound,
}
#[protocol]
pub trait NameServerProtocol: Send + Sync {
fn add(&self, key: String, value: String) -> Response<()>;
fn find(&self, key: String) -> Response<FindResult>;
}
这就是全部的消息接口。#[protocol] 会生成消息结构体(Add,Find)、一个类型擦除的引用类型(NameServerRef),以及相关的内部连接,使得任何处理这些消息的 Actor 都可以通过该 Trait 进行调用。
Actor 的实现:
use spawned_concurrency::{actor, tasks::{Actor, Context, Handler, ActorStart}};
pub struct NameServer {
inner: HashMap<String, String>,
}
#[actor(protocol = NameServerProtocol)]
impl NameServer {
pub fn new() -> Self {
NameServer { inner: HashMap::new() }
}
#[request_handler]
async fn handle_add(&mut self, msg: Add, _ctx: &Context<Self>) {
self.inner.insert(msg.key, msg.value);
}
#[request_handler]
async fn handle_find(&mut self, msg: Find, _ctx: &Context<Self>) -> FindResult {
match self.inner.get(&msg.key) {
Some(value) => FindResult::Found { value: value.clone() },
None => FindResult::NotFound,
}
}
}
以及如何使用它:
let ns = NameServer::new().start();
ns.add("Joe".into(), "At Home".into()).await.unwrap();
let result = ns.find("Joe".into()).await.unwrap();
assert_eq!(result, FindResult::Found { value: "At Home".to_string() });
ns.add(...) 和 ns.find(...) 是对 Actor 引用进行的常规方法调用。在幕后,宏会构造消息,通过 Mailbox 发送消息,并路由回响应——但这些都不会泄露到你的代码中。
Rust 为你提供了编写正确并发代码的工具。但“正确”和“简单”并不是一回事。
在 Rust 中处理共享可变状态的标准方法是 Arc<Mutex<T>>。它虽然有效,但增加了每个调用点的复杂度:
let state = Arc::new(Mutex::new(HashMap::new()));
// 每次访问:克隆 Arc,加锁,处理中毒 (poisoning)
let state = state.clone();
tokio::spawn(async move {
let mut guard = state.lock().unwrap_or_else(|p| p.into_inner());
guard.insert("key".into(), "value".into());
});
这只是一个保护单个 HashMap 的锁。在一个真实的系统中,你会拥有几十个这样的锁,现在你必须去推敲锁的顺序、竞争情况,以及当一个线程在持有锁的情况下发生 panic 时会发生什么。借用检查器(Borrow checker)在编译时防止了数据竞争,但它无法防止逻辑死锁,也无法告诉你锁的粒度是否错误。
Async Rust 解决了吞吐量问题——在小型线程池上运行成千上万个并发 Task——但它引入了自身的复杂度。Future 是必须满足 Send + 'static 才能跨越 Task 边界的状态机,这意味着要以新的方式与借用检查器抗争。在同步代码中运行良好的生命周期在加入 .await 的瞬间就会变成错误。Pin 出现在 Trait 签名中,让新手和资深开发者都感到困惑。你会遇到有色函数(colored functions)——异步和同步代码不易组合,一旦一层代码变成了异步,它往往会带动其他所有代码也变成异步。
此外还有运行时依赖问题。大多数异步 Rust 代码都假设使用 tokio——但并非每个项目都想用或能用 tokio。嵌入式系统、CLI 工具以及具有特定线程要求的代码库可能需要不同的选择。
使用 Tokio 实现 Actor 模式是一个受欢迎的折中方案:给每个“Actor”一个 mpsc Channel,派生一个循环处理传入消息的 Task,并在该循环内管理状态。这避免了锁,但你最终会在各处编写相同的脚手架代码——Channel 设置、消息 Enum、接收循环、停机逻辑。每个 Actor 都是一个定制的基础设施。
Actor 模型通过将隔离作为默认设置来规避这些问题。每个 Actor 拥有自己的状态,一次处理一条消息,并且完全通过消息传递与其他 Actor 通信。没有需要保护的共享内存,也没有需要排序的锁。
在处理程序内部,你的代码是顺序执行的——只有 &mut self 和消息。这正是 Armstrong 所描述的:回调是纯粹的顺序代码,而框架提供并发行为。在 Spawned 中,生成的调度使用与你手动构建的相同的基于 Channel 的架构——宏消除的是样板代码,而不是性能。
Rust 已经存在一些 Actor 框架——尤其是 Actix 和 Ractor。我们构建 Spawned 是因为我们希望完全控制框架的功能和方向。我们还有一些特定设计目标,而现有的选项并未完全与之契合。
紧贴 Erlang/OTP 惯例。 我们的团队在 Erlang 和 Elixir 构建系统方面拥有多年经验,并且在这些项目中获得了很高的成功率。这些经验教会了我们重视 OTP 背后的核心理念——业务逻辑与并发的分离、gen_server 回调结构、协议定义 Actor 之间清晰接口的方式。Spawned 的 API 直接模仿了 gen_server:Protocol 映射到模块导出,#[request_handler] 映射到 handle_call,#[send_handler] 映射到 handle_cast,而调用 Actor 就是对引用的方法调用——就像在 Erlang 模块中调用客户端函数一样。
更简洁的 API 表面。 Ractor 要求 Actor 的所有消息都存在于单个 Enum 中,且每个变体中都要嵌入回复 Channel 作为 RpcReplyPort<T>。处理程序是对所有变体的单个 match。Actix 通过为每个消息类型提供单独的 Handler<M> 实现来避免这种情况,但每个消息仍需要 #[derive(Message)] 和 #[rtype(result = "...")] 注解。在 Spawned 中,你编写一个带有方法的 Trait 并用 #[protocol] 注解——消息结构体、调度逻辑和类型擦除的引用都会自动生成。
协议级的类型擦除,而不仅仅是消息级。 Actix 提供了 Recipient<M>——一个作用域限于单个消息类型的类型擦除引用。如果你的 Protocol 有五个方法,你需要五个独立的 Recipient 值。Spawned 会生成一个单独的 Arc<dyn Protocol> 引用,通过一个值暴露所有方法。Ractor 通过 DerivedActorRef 提供了有限的类型擦除,但它要求每个 Actor 显式选择加入,而不是从框架中免费获得。下一节将展示这在实践中是什么样子的。
运行时独立性。 Spawned 中所有特定于运行时的代码都被隔离在 spawned-rt 这一薄抽象层之后。目前的实现使用 tokio,但这种架构设计使得更换不同的异步运行时,或者为 Actor 工作负载构建专用运行时,都不需要更改 Actor 或 Protocol 代码。
考虑一个聊天应用。聊天室需要向用户传递消息,而用户需要向聊天室发送消息。使用具体类型时,这将在 ChatRoom 和 User 之间产生循环依赖。使用 Protocol 后,每一侧仅依赖于另一侧的接口:
#[protocol]
pub trait RoomProtocol: Send + Sync {
fn say(&self, from: String, text: String) -> Result<(), ActorError>;
fn add_member(&self, name: String, user: UserRef) -> Result<(), ActorError>;
fn members(&self) -> Response<Vec<String>>;
}
#[protocol]
pub trait UserProtocol: Send + Sync {
fn deliver(&self, from: String, text: String) -> Result<(), ActorError>;
fn say(&self, text: String) -> Result<(), ActorError>;
fn join_room(&self, room: RoomRef) -> Result<(), ActorError>;
}
请注意返回类型——这是 请求(Request) 和 发送(Send) 之间的区别。返回 Response<T> 的方法是请求:调用者发送一条消息并等待回复。返回 Result<(), ActorError>(或没有返回类型)的方法是发送:即发即弃的消息,不会阻塞调用者。单个 Protocol 可以混合这两种类型,#[actor] 宏使用 #[request_handler] 和 #[send_handler] 注解为每种类型生成正确的调度。
RoomRef 和 UserRef 会自动生成为 Arc<dyn RoomProtocol> 和 Arc<dyn UserProtocol>。Actor 持有 Protocol 引用而不是具体类型:
pub struct User {
name: String,
room: Option<RoomRef>, // 任何实现了 RoomProtocol 的 Actor
}
pub struct ChatRoom {
members: Vec<(String, UserRef)>, // 任何实现了 UserProtocol 的 Actor
}
任何一个 Actor 都不需要知道另一个 Actor 的具体类型。它们纯粹通过 Protocol 接口通信——这正是 Erlang 通过 PID 和消息传递自然实现的模式。
let room = ChatRoom::new().start();
let alice = User::new("Alice".into()).start();
let bob = User::new("Bob".into()).start();
alice.join_room(room.to_room_ref()).unwrap();
bob.join_room(room.to_room_ref()).unwrap();
alice.say("Hello everyone!".into()).unwrap();
bob.say("Hey Alice!".into()).unwrap();
Spawned 可以在异步运行时或普通 OS 线程上运行 Actor,且使用相同的 API。
异步模式(tasks):
use spawned_concurrency::tasks::{Actor, Context, Handler, ActorStart};
use spawned_rt::tasks as rt;
fn main() {
rt::run(async {
let ns = NameServer::new().start();
let result = ns.find("Joe".into()).await.unwrap();
})
}
线程模式(threads):
use spawned_concurrency::threads::{Actor, Context, Handler, ActorStart};
use spawned_rt::threads as rt;
fn main() {
rt::run(|| {
let ns = NameServer::new().start();
let result = ns.find("Joe".into()).unwrap();
})
}
在这两种模式下,Actor 的实现是完全相同的。Response<T> 是桥梁——在 Task 模式下你对它使用 .await,在线程模式下你直接调用 .unwrap()。
这具有实际意义:
即使在异步模式下,Spawned 也允许你为每个 Actor 选择执行后端:
Backend::Async —— 在异步任务池上运行(默认,最适合 IO 密集型工作)Backend::Blocking —— 在运行时的阻塞线程池上运行(用于会使异步任务饥饿的 CPU 密集型处理程序)Backend::Thread —— 在专用 OS 线程上运行(用于需要线程亲和性或实时保证的 Actor)当 Actor 的处理程序发生 panic 时,Spawned 会捕获 panic,通过 tracing::error! 记录它,并停止该 Actor。发送给该 Actor 的后续消息将返回 ActorError::ActorStopped。系统中的其他 Actor 不受影响——panic 被限制在引发它的 Actor 内部。
这同样适用于生命周期Hook:如果 #[started] Hook发生 panic,Actor 会立即退出而不运行 #[stopped]。
路线图的下一步是 监督树(Supervision trees),它将使恢复变得声明式且自动化。
如果你曾使用过 Erlang/OTP,Spawned 的概念可以直接对应:
| Erlang/OTP | Spawned | 描述 |
|---|---|---|
| 模块导出 (客户端 API) | #[protocol] trait |
公共消息接口 |
-behaviour(gen_server) |
#[actor] |
声明一个 Actor 实现 |
handle_call/3 |
#[request_handler] |
同步请求处理程序 |
handle_cast/2 |
#[send_handler] |
即发即弃处理程序 |
init/1 |
#[started] |
初始化回调 |
terminate/2 |
#[stopped] |
清理回调 |
gen_server:call/2 |
ns.find(...) |
直接方法调用 |
gen_server:cast/2 |
ns.notify(...) |
直接方法调用 (发送) |
Pid |
ActorRef<T> |
运行中 Actor 的Handle |
register/2 |
registry::register(name, ref) |
按名称注册 |
whereis/1 |
registry::whereis(name) |
按名称查找 |
spawned-rt 抽象层旨在使这种切换变得无缝。cargo add spawned-concurrency spawned-rt
如果你有任何问题或反馈,请在 GitHub 上提交 Issue 或在 X 上找到我们。
- 原文链接: blog.lambdaclass.com/int...
- 登链社区 AI 助手,为大家转译优秀英文文章,如有翻译不通的地方,还请包涵~
如果觉得我的文章对您有用,请随意打赏。你的支持将鼓励我继续创作!