🚀 重大发布:tokio-mpmc v0.2 引入 channel 特性,轻松替换 mpsc::channel

  • King
  • 发布于 1天前
  • 阅读 72

我们非常高兴地宣布:tokio-mpmcv0.2.0正式发布!这个版本新增了简洁的channelAPI,使你可以像使用tokio::sync::mpsc::channel一样,轻松创建支持多生产者/多消费者的并发信道。🔍为什么选择tokio-mpmc?Rust

我们非常高兴地宣布:tokio-mpmc v0.2 正式发布!

这个版本新增了简洁的 channel API,使你可以像使用 tokio::sync::mpsc::channel 一样,轻松创建支持 多生产者 / 多消费者 的并发信道。


🔍 为什么选择 tokio-mpmc

Rust 社区中已有多个异步信道实现,例如 tokio::sync::mpsc(MPSC 模式)、flume 等。

但在高并发场景下,传统的单消费者模型常常成为瓶颈。tokio-mpmc 专为此类瓶颈设计,提供原生的 MPMC(Multi-producer, Multi-consumer)支持,并与 Tokio 完美集成。

tokio-mpmc channel


🌟 新特性:channel API 统一风格

你现在可以使用以下方式创建一个带缓冲的多消费者信道:

use tokio_mpmc::channel;

这使得你几乎可以一行替换 tokio::sync::mpsc::channel,并立即享受 MPMC 带来的并发性能提升。


✅ 示例代码

use tokio_mpmc::channel;

#[tokio::main]
async fn main() {
    tracing_subscriber::fmt().init();

    let (tx, rx) = channel(10); // 创建容量为10的channel

    let num_receivers = 3;
    let mut receiver_tasks = Vec::new();

    for i in 0..num_receivers {
        let rx = rx.clone();
        let task = tokio::spawn(async move {
            let mut count = 0;
            tracing::info!("Receiver {} started.", i);
            while let Ok(Some(value)) = rx.recv().await {
                tracing::info!("Receiver {} received value: {}", i, value);
                count += 1;
            }
            tracing::info!("Receiver {} completed. Received count: {}", i, count);
        });
        receiver_tasks.push(task);
    }

    for i in 0..10 {
        tx.send(i).await.unwrap();
    }

    tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
    drop(tx);

    for task in receiver_tasks {
        task.await.unwrap();
    }

    tracing::info!("All receivers completed.");
}

输出结果

Receiver 2 started.
Receiver 2 received value: 0
Receiver 2 received value: 1
Receiver 2 received value: 2
Receiver 0 started.
Receiver 1 started.
Receiver 1 received value: 5
Receiver 1 received value: 6
Receiver 1 received value: 7
Receiver 0 received value: 4
Receiver 2 received value: 3
Receiver 1 received value: 8
Receiver 0 received value: 9
Receiver 2 completed. Received count: 4
Receiver 1 completed. Received count: 4
Receiver 0 completed. Received count: 2
All receivers completed.

📊 Benchmark Results

我们对 tokio-mpmc 的两个 API(channel 与底层 queue)与社区常用的 [flume] 进行了基准对比:

性能测试对比(越快越好)

tokio-mpsc-channel tokio-mpmc-channel tokio-mpmc-queue flume
non-io 1.39 ms (✅ 1.00x) 65.38 us (🚀 21.21x faster) 168.86 us (🚀 8.21x faster) 773.68 us (✅ 1.79x faster)
io 197.97 ms (✅ 1.00x) 46.32 ms (🚀 4.27x faster) 46.83 ms (🚀 4.23x faster) 197.76 ms (✅ 1.00x faster)

Note:

  • non-io 表示无 IO 操作(纯计算环境)
  • io 表示有 IO 操作(模拟实际异步场景)

这些结果说明: ✅ tokio-mpmc-channel 在 IO 与非 IO 场景下均表现优越,在多个消费者处理并发数据时具有明显性能优势


🔧 使用场景

  • 异步工作池(async worker pool)
  • 高并发任务调度
  • 并行处理流水线(streaming pipelines)
  • 游戏引擎、区块链、Web 服务中的消息并发处理场景

📦 如何使用

在你的 Cargo.toml 中加入:

tokio-mpmc = "0.2"

项目主页 👉 tokio-mpmc


📣 欢迎反馈与参与

我们欢迎你试用 tokio-mpmc,并在 GitHub 提交 issue 或 PR。 这个项目正在积极发展中,你的每一条建议都可能推动它变得更好。


Rust 异步通信,从 mpscmpmc,只需一行代码的切换。现在就试试 tokio-mpmc v0.2,为你的 Tokio 项目注入并发加速度吧!🚀

点赞 0
收藏 0
分享
本文参与登链社区写作激励计划 ,好文好收益,欢迎正在阅读的你也加入。

0 条评论

请先 登录 后评论
King
King
0x56af...a0dd
擅长Rust/Solidity/FunC/Move开发