我们非常高兴地宣布:tokio-mpmcv0.2.0正式发布!这个版本新增了简洁的channelAPI,使你可以像使用tokio::sync::mpsc::channel一样,轻松创建支持多生产者/多消费者的并发信道。🔍为什么选择tokio-mpmc?Rust
我们非常高兴地宣布:
tokio-mpmc
v0.2 正式发布!
这个版本新增了简洁的 channel
API,使你可以像使用 tokio::sync::mpsc::channel
一样,轻松创建支持 多生产者 / 多消费者 的并发信道。
tokio-mpmc
?Rust 社区中已有多个异步信道实现,例如 tokio::sync::mpsc
(MPSC 模式)、flume
等。
但在高并发场景下,传统的单消费者模型常常成为瓶颈。tokio-mpmc
专为此类瓶颈设计,提供原生的 MPMC(Multi-producer, Multi-consumer)支持,并与 Tokio 完美集成。
channel
API 统一风格你现在可以使用以下方式创建一个带缓冲的多消费者信道:
use tokio_mpmc::channel;
这使得你几乎可以一行替换 tokio::sync::mpsc::channel
,并立即享受 MPMC 带来的并发性能提升。
use tokio_mpmc::channel;
#[tokio::main]
async fn main() {
tracing_subscriber::fmt().init();
let (tx, rx) = channel(10); // 创建容量为10的channel
let num_receivers = 3;
let mut receiver_tasks = Vec::new();
for i in 0..num_receivers {
let rx = rx.clone();
let task = tokio::spawn(async move {
let mut count = 0;
tracing::info!("Receiver {} started.", i);
while let Ok(Some(value)) = rx.recv().await {
tracing::info!("Receiver {} received value: {}", i, value);
count += 1;
}
tracing::info!("Receiver {} completed. Received count: {}", i, count);
});
receiver_tasks.push(task);
}
for i in 0..10 {
tx.send(i).await.unwrap();
}
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
drop(tx);
for task in receiver_tasks {
task.await.unwrap();
}
tracing::info!("All receivers completed.");
}
Receiver 2 started.
Receiver 2 received value: 0
Receiver 2 received value: 1
Receiver 2 received value: 2
Receiver 0 started.
Receiver 1 started.
Receiver 1 received value: 5
Receiver 1 received value: 6
Receiver 1 received value: 7
Receiver 0 received value: 4
Receiver 2 received value: 3
Receiver 1 received value: 8
Receiver 0 received value: 9
Receiver 2 completed. Received count: 4
Receiver 1 completed. Received count: 4
Receiver 0 completed. Received count: 2
All receivers completed.
我们对 tokio-mpmc
的两个 API(channel
与底层 queue
)与社区常用的 [flume
] 进行了基准对比:
tokio-mpsc-channel |
tokio-mpmc-channel |
tokio-mpmc-queue |
flume |
|
---|---|---|---|---|
non-io |
1.39 ms (✅ 1.00x) |
65.38 us (🚀 21.21x faster) |
168.86 us (🚀 8.21x faster) |
773.68 us (✅ 1.79x faster) |
io |
197.97 ms (✅ 1.00x) |
46.32 ms (🚀 4.27x faster) |
46.83 ms (🚀 4.23x faster) |
197.76 ms (✅ 1.00x faster) |
Note:
non-io
表示无 IO 操作(纯计算环境)io
表示有 IO 操作(模拟实际异步场景)
这些结果说明:
✅ tokio-mpmc-channel
在 IO 与非 IO 场景下均表现优越,在多个消费者处理并发数据时具有明显性能优势。
在你的 Cargo.toml
中加入:
tokio-mpmc = "0.2"
项目主页 👉 tokio-mpmc
我们欢迎你试用 tokio-mpmc
,并在 GitHub 提交 issue 或 PR。
这个项目正在积极发展中,你的每一条建议都可能推动它变得更好。
Rust 异步通信,从
mpsc
到mpmc
,只需一行代码的切换。现在就试试tokio-mpmc v0.2
,为你的 Tokio 项目注入并发加速度吧!🚀
如果觉得我的文章对您有用,请随意打赏。你的支持将鼓励我继续创作!