在高性能异步系统中,任务的调度和执行策略对整体吞吐量与响应延迟起着决定性作用。Tokio是Rust社区主流的异步运行时之一,拥有成熟的任务系统和I/O模型。然而,当我们需要对任务调度进行更细粒度的控制,比如优先级、批处理、任务分组或返回结果流时,Tokio的原生抽象就显得不够灵活。
在高性能异步系统中,任务的调度和执行策略对整体吞吐量与响应延迟起着决定性作用。Tokio 是 Rust 社区主流的异步运行时之一,拥有成熟的任务系统和 I/O 模型。
然而,当我们需要对任务调度进行更细粒度的控制,比如优先级、批处理、任务分组或返回结果流时,Tokio 的原生抽象就显得不够灵活。
为此,tokio-fusion 提供了一种融合了 Tokio executor、异步任务建模与线程池调度的新机制。 本文将深入剖析 tokio-fusion 的架构、核心组件与设计思想。
tokio-fusion 的目标是提供:
Arc<ThreadPool>
可在多个任务间安全共享;架构由以下几部分组成:
ThreadPool
ThreadPool
是 tokio-fusion 的调度引擎,它持有一个 tokio::runtime::Runtime
并在内部维护一个多生产者单消费者(MPSC)的任务队列,支持配置以下参数:
worker_threads
:线程池中的 worker 数量;queue_capacity
:内部任务通道的缓冲大小;它的主要职责包括:
ResultHandle<T>
。Task<T>
Task<T>
是 tokio-fusion 的核心任务抽象,定义为:
pub struct Task<T> {
pub fut: BoxFuture<'static, ThreadPoolResult<T>>,
pub priority: usize,
...
}
该结构封装了:
fut
,必须返回 Result<T, ThreadPoolError>
;ResultHandle<T>
任务提交成功后,会返回一个 ResultHandle<T>
,支持异步地 .await_result()
来获取结果。这避免了开发者自己管理 oneshot 通道或回调闭包。
内部使用 tokio::sync::oneshot
来桥接任务执行结果和用户逻辑,确保类型安全。
以单个任务的提交流程为例:
let thread_pool = Arc::new(ThreadPool::default());
let task = Task::new(async {
// 业务逻辑
Ok(compute_something().await)
}, 1); // priority
let handle = thread_pool.submit(task).await.unwrap();
let result = handle.await_result().await;
执行流程图如下:
submit()
提交 Task<T>
;ThreadPool
将其包装进调度队列;Runtime
中的 worker 消费队列并 spawn;oneshot::Sender
传回结果;ResultHandle.await_result()
异步获取最终结果。你可一次性提交多个 Task<T>
,并获取多个 ResultHandle<T>
,配合 FuturesUnordered
可实现流式聚合处理。
let thread_pool = Arc::new(ThreadPool::default());
let mut batch = BatchExecutor::new(Arc::clone(&thread_pool));
// Add tasks to the batch
for i in 0..5 {
batch.add_task(my_task(i), i);
}
// Execute all tasks and collect results
let results = batch.execute().await;
// Or stream results as they complete
let mut stream = batch.execute_stream().await;
while let Some(result) = stream.next().await {
println!("Got result: {:?}", result);
}
目前 priority
字段尚未集成到队列排序中,但未来将支持 heap-based 优先队列,使得高优任务能先执行。
通过 ThreadPoolResult<T>
和 ThreadPoolError
,你可以对运行时错误进行分类,例如队列满、执行 panic、提前 drop 等。
tokio-fusion 非常适合以下几类应用:
Task<T>
提交执行;特性 | tokio::spawn | rayon/futures | tokio-fusion |
---|---|---|---|
支持 async task | ✅ | 部分 | ✅ |
支持线程池控制 | ❌(由 runtime 控) | ❌(线程绑定) | ✅(自定义) |
支持任务优先级 | ❌ | ❌ | ✅(支持中) |
返回结果 Handle | ❌(需手动包装) | ❌ | ✅(强类型) |
队列容量可控 | ❌ | ❌ | ✅ |
tokio-fusion 是一个面向工程实践的异步任务池,它不试图替代 Tokio,而是对其进行结构化增强,提供更强的任务建模能力、更灵活的调度入口。适用于需要异步批量任务控制、结果统一收集、线程池隔离的中大型项目。
欢迎在你的高并发系统中尝试引入 tokio-fusion,并提出 PR 或 issue 进行共建。
如果觉得我的文章对您有用,请随意打赏。你的支持将鼓励我继续创作!