Rust

2025年10月18日更新 9 人订阅
原价: ¥ 6 限时优惠
专栏简介 Rust编程语言之错误处理 Rust 语言之 flod Rust编程语言之Cargo、Crates.io详解 Rust编程语言之枚举与模式匹配 Rust语言 - 接口设计的建议之受约束(Constrained) Rust编程语言之无畏并发 Rust语言 - 接口设计的建议之灵活(flexible) Rust语言 - 接口设计的建议之显而易见(Obvious) Rust语言 - 接口设计的建议之不意外(unsurprising) Rust 实战:构建实用的 CLI 工具 HTTPie Rust编程语言学习之高级特性 Rust内存管理揭秘:深度剖析指针与智能指针 解决Rust中数组和切片的编译时大小问题 《Rust编程之道》学习笔记一 Rust Async 异步编程 简易教程 使用 Async Rust 构建简单的 P2P 节点 Rust编程语言入门之模式匹配 Rust async 编程 Rust编程语言之编写自动化测试 Rust编程语言之函数式语言特性:迭代器和闭包 《Rust编程之道》学习笔记二 Rust Tips 比较数值 使用 Rust 开发一个微型游戏 Rust编程初探:深入理解Struct结构体 深入理解Rust中的内存管理:栈、堆与静态内存详解 深入理解 Rust 结构体:经典结构体、元组结构体和单元结构体的实现 深入掌握 Rust 结构体:从模板到实例化的完整指南 深入理解Rust中的结构体:逻辑与数据结合的实战示例 深入理解 Rust 枚举:从基础到实践 掌握Rust字符串的精髓:String与&str的最佳实践 全面解析 Rust 模块系统:实战案例与应用技巧 Rust 中的 HashMap 实战指南:理解与优化技巧 掌握Rust模式匹配:从基础语法到实际应用 Rust 中的面向对象编程:特性与实现指南 深入理解 Rust 的 Pin 和 Unpin:理论与实践解析 Rust Trait 与 Go Interface:从设计到实战的深度对比 从零开始:用 Rust 和 Axum 打造高效 Web 应用 Rust 错误处理详解:掌握 anyhow、thiserror 和 snafu Rust 如何优雅实现冒泡排序 链表倒数 K 节点怎么删?Python/Go/Rust 实战 用 Rust 玩转数据存储:JSON 文件持久化实战 Rust实战:打造高效字符串分割函数 如何高效学习一门技术:从知到行的飞轮效应 Rust 编程入门:Struct 让代码更优雅 Rust 编程:零基础入门高性能开发 用 Rust 写个猜数游戏,编程小白也能上手! Rust 入门教程:变量到数据类型,轻松掌握! 深入浅出 Rust:函数、控制流与所有权核心特性解析 从零开始:用 Rust 和 Axum 打造高效 Web 服务 Rust 集合类型解析:Vector、String、HashMap 深入浅出Rust:泛型、Trait与生命周期的硬核指南 Rust实战:博物馆门票限流系统设计与实现 用 Rust 打造高性能图片处理服务器:从零开始实现类似 Thumbor 的功能 Rust 编程入门实战:从零开始抓取网页并转换为 Markdown 深入浅出 Rust:高效处理二进制数据的 Bytes 与 BytesMut 实战 Rust智能指针:解锁内存管理的进阶之道 用 Rust 打造命令行利器:从零到一实现 mini-grep 解锁Rust代码组织:轻松掌握Package、Crate与Module Rust 所有权:从内存管理到生产力释放 深入解析 Rust 的面向对象编程:特性、实现与设计模式 Rust + Protobuf:从零打造高效键值存储项目 bacon 点燃 Rust:比 cargo-watch 更爽的开发体验 用 Rust 打造微型游戏:从零开始的 Flappy Dragon 开发之旅 函数式编程的Rust之旅:闭包与迭代器的深入解析与实践 探索Rust编程之道:从设计哲学到内存安全的学习笔记 精读《Rust编程之道》:吃透语言精要,彻底搞懂所有权与借用 Rust 避坑指南:搞定数值比较,别再让 0.1 + 0.2 != 0.3 困扰你! 告别 Vec!掌握 Rust bytes 库,解锁零拷贝的真正威力 告别竞态条件:基于 Axum 和 Serde 的 Rust 并发状态管理最佳实践 Rust 异步编程实践:从 Tokio 基础到阻塞任务处理模式 Rust 网络编程实战:用 Tokio 手写一个迷你 TCP 反向代理 (minginx) 保姆级教程:Zsh + Oh My Zsh 终极配置,让你的 Ubuntu 终端效率倍增 不止于后端:Rust 在 Web 开发中的崛起之路 (2024数据解读) Rust核心利器:枚举(Enum)与模式匹配(Match),告别空指针,写出优雅健壮的代码 Rust 错误处理终极指南:从 panic! 到 Result 的优雅之道 想用 Rust 开发游戏?这份超详细的入门教程请收好! 用 Rust 实现 HTTPie:一个现代 CLI 工具的构建过程 Rust 异步实战:从0到1,用 Tokio 打造一个高性能并发聊天室 深入 Rust 核心:彻底搞懂指针、引用与智能指针 Rust 生产级后端实战:用 Axum + sqlx 打造高性能短链接服务 深入 Rust 内存模型:栈、堆、所有权与底层原理 Rust 核心概念解析:引用、借用与内部可变性 掌握 Rust 核心:生命周期与借用检查全解析 Rust 内存布局深度解析:从对齐、填充到 repr 属性 Rust Trait 分派机制:静态与动态的抉择与权衡 Rust Thread::Builder 用法详解:线程命名与栈大小设置 Rust 泛型 Trait:关联类型与泛型参数的核心区别 Rust Scoped Threads 实战:更安全、更简洁的并发编程 Rust 核心设计:孤儿规则与代码一致性解析 Rust 实战:从零构建一个多线程 Web 服务器 Rust Web 开发实战:构建教师管理 API 硬核实战:从零到一,用 Rust 和 Axum 构建高性能聊天服务后端 Rust Web 开发实战:使用 SQLx 连接 PostgreSQL 数据库 硬核入门:从零开始,用 Actix Web 构建你的第一个 Rust REST API (推荐 🔥) Rust 并发编程:详解线程间数据共享的几种核心方法 Rust并发安全基石:Mutex与RwLock深度解析 Rust Web实战:构建优雅的 Actix Web 统一错误处理 煮咖啡里的大学问:用 Rust Async/Await 告诉你如何边烧水边磨豆 深入浅出:Rust 原子类型与多线程编程实践 Rust 并发编程利器:OnceCell 与 OnceLock 深度解析 Rust 懒人编程:LazyCell 与 LazyLock 的惰性哲学 Rust 入门精髓:详解 Enum 的三种魔法,从模式匹配到状态管理 Rust 字符串魔法:String 与 &str 的深度解析与实践 Rust 模块化编程:驾驭代码结构与可见性的三大法则 Rust 实用进阶:深度剖析 Rust 生命周期的奥秘 Rust 智能指针大揭秘:Box、Rc、Arc、Cow 深度剖析与应用实践 Rust 并发编程三步曲:Join、Arc<Mutex> 与 mpsc 通道同步实战 Rust 声明宏实战进阶:从基础定义到 #[macro_export] 与多规则重载 Rust 类型转换实战:利用 From/Into Trait 实现带 Default 容错的安全转换 Rust 实战:实现 FromStr Trait,定制化字符串 parse() 与精确错误报告 Rust 实战:TryFrom Trait——如何在类型转换中强制执行业务逻辑检查 Rust 泛型编程基石:AsRef 和 AsMut 的核心作用与实战应用 揭秘 Rust Unsafe 编程:程序员接管内存安全的契约与实践 Rust FFI 入门:extern、ABI 与底层符号链接解析 Rust性能优化:零内存拷贝的链表合并技术实战 Rust 进阶:用 NonNull 裸指针实现高性能双向链表 O(N) 反转实战 Rust实战:如何用泛型和特性实现一个高性能、通用的插入排序 Rust实战:深度解析二叉搜索树(BST)的实现,掌握泛型与内存安全 用 Rust 优雅实现图搜索核心算法:广度优先搜索 (BFS) 实战 Rust 多线程的高效等待术:park() 与 unpark() 信号通信实战 Rust 并发加速器:用 Condvar 实现线程间“精确握手”与高效等待 Rust 算法精讲:用 DFS 玩转图遍历,从起点“一走到底”的秘密 Rust 并发实战:用 MPSC 通道构建线程安全的“任务指挥中心” 一行代码提速 30 倍!Rust Rayon 并行计算:告别多线程管理困境 Rust 实战:使用自定义泛型栈实现高效、严谨的括号匹配算法 Rust 并行加速:4 个实操案例,深度解析 Rayon 线程池的 Fork-Join 与广播机制

Rust 并行加速:4 个实操案例,深度解析 Rayon 线程池的 Fork-Join 与广播机制

Rust并行加速:4个实操案例,深度解析Rayon线程池的Fork-Join与广播机制在现代软件开发中,充分利用多核CPU的并行计算能力是提升应用性能的关键。Rust语言通过其零成本抽象和所有权系统,在并发编程方面提供了卓越的安全保障。而Rayon库,作为Rust生态中最

Rust 并行加速:4 个实操案例,深度解析 Rayon 线程池的 Fork-Join 与广播机制

在现代软件开发中,充分利用多核 CPU 的并行计算能力是提升应用性能的关键。Rust 语言通过其零成本抽象所有权系统,在并发编程方面提供了卓越的安全保障。而 Rayon 库,作为 Rust 生态中最受欢迎的并行数据处理工具,更是将复杂的多线程编程简化为几行代码。

本文将通过 4 个精选的 Rayon 线程池实操案例,从最基本的矩阵求和到高级的线程广播和 join 模式,深入浅出地解释 Rayon 如何在底层实现高效的任务并行(Task Parallelism)、保证结构化同步(Structured Concurrency),并揭示其并行输出的非确定性特征。无论您是 Rust 初学者还是希望进一步优化 CPU 密集型任务的开发者,都将从这些实战示例中获益。

实操

Rust 多线程 - Rayon

示例一

fn main() {
    let pool = rayon::ThreadPoolBuilder::new()
        .num_threads(4)
        .build()
        .unwrap();

    let matrix = [
        vec![1, 2, 3],
        vec![4, 5, 6],
        vec![7, 8, 9],
        vec![10, 11, 12],
    ];

    pool.scope(|scope| {
        for (i, row) in matrix.iter().enumerate() {
            scope.spawn(move |_| {
                let sum: i32 = row.iter().sum();
                println!("Row {i} sum = {sum}");
            });
        }
    });

    println!("Main thread finished");
}

这段 Rust 代码使用了 Rayon 库来创建一个自定义的线程池并执行并行任务。

代码的详细解释如下:

  1. 创建线程池:

    let pool = rayon::ThreadPoolBuilder::new()
       .num_threads(4)
       .build()
       .unwrap();

    这行代码使用 rayon::ThreadPoolBuilder 构建了一个名为 pool 的自定义线程池。

    • .num_threads(4) 指定线程池中包含 4 个工作线程。
    • .build() 尝试创建线程池。
    • .unwrap() 处理可能出现的错误(例如线程创建失败),如果成功则返回 ThreadPool 实例。
  2. 定义数据:

    let matrix = [
       vec![1, 2, 3],
       vec![4, 5, 6],
       vec![7, 8, 9],
       vec![10, 11, 12],
    ];

    定义了一个包含 4 个向量的数组 matrix,可以将其视为一个 4 X 3 的矩阵。

  3. 使用作用域(Scoped Task)执行并行任务:

    pool.scope(|scope| {
       // ... 任务定义 ...
    });

    pool.scope(|scope| { ... }) 创建了一个 "fork-join" 作用域。在这个作用域内部(即闭包 { ... } 内部)可以安全地启动并发任务,这些任务可以借用外部栈上的局部变量(例如 matrix)。关键点在于,当 scope 闭包返回时,程序会阻塞,直到所有通过 scope.spawn() 启动的任务都完成

  4. 分发任务:

    for (i, row) in matrix.iter().enumerate() {
       scope.spawn(move |_| {
           let sum: i32 = row.iter().sum();
           println!("Row {i} sum = {sum}");
       });
    }
    • matrix.iter().enumerate() 遍历 matrix 数组,同时获取行索引 i 和行数据 row(一个向量的引用 &Vec&lt;i32>)。
    • scope.spawn(move |_| { ... }) 在线程池中创建一个新的异步任务。
      • move 关键字确保闭包获得了它所使用的变量(这里是 irow)的所有权或所有必要的拷贝。由于 row 是对 matrix 元素的引用,Rayon 的 scoped task 机制保证了在任务完成之前 matrix 不会被释放,从而使这个引用是安全的。
      • 每个任务都并行地计算一行向量的元素之和 (row.iter().sum()),并打印结果。
      • 因为有 4 行数据和 4 个线程,理论上这 4 个求和任务可以同时在 4 个线程上并行执行(但实际调度取决于 Rayon 运行时)。
  5. 主线程继续执行:

    println!("Main thread finished");

    这行代码会在 pool.scope(|...| { ... }) 完成(即所有并行任务都执行完毕)之后,由主线程执行并打印出来,这证明了 scoped task 机制确保了任务的完成性

总结:

这段代码利用 Rayon 库创建了一个 4 线程的线程池,然后使用一个 scoped task 机制将一个矩阵的 4 行数据的求和任务 分发给线程池中的线程进行 并行计算。程序保证在打印 "Main thread finished" 之前,所有行的求和任务都已经完成并打印了各自的结果。这是一种典型的 Fork-Join 并行模式的实现。

运行

RustJourney/rayon_examples on  main [?] is 📦 0.1.0 via 🦀 1.90.0 took 2.7s 
➜ cargo run
    Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.00s
     Running `target/debug/rayon_examples`
Row 3 sum = 33
Row 1 sum = 15
Row 2 sum = 24
Row 0 sum = 6
Main thread finished

RustJourney/rayon_examples on  main [?] is 📦 0.1.0 via 🦀 1.90.0 
➜ cargo run
    Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.01s
     Running `target/debug/rayon_examples`
Row 3 sum = 33
Row 0 sum = 6
Row 1 sum = 15
Row 2 sum = 24
Main thread finished

这段运行结果清晰地展示了 Rayon 库进行并行计算的两个关键特性:并行性(Parallelism)确定性同步(Deterministic Synchronization)

  1. 并行性与输出顺序不确定性(Non-deterministic Output Order):
    • 代码为矩阵的四行创建了四个独立的并行任务,分别计算 Row 0Row 3 的和。
    • 在两次运行中,Row 0 sumRow 3 sum 的输出顺序是不同的(第一次是 3, 1, 2, 0;第二次是 3, 0, 1, 2)。这正是使用 Rayon 线程池 进行并行计算的直接体现。由于这四个求和任务是并发执行的,它们完成的顺序取决于操作系统对线程的调度以及线程池中 4 个工作线程的可用性,因此 输出顺序是不确定的
  2. 作用域同步保证(Scoped Synchronization Guarantee):
    • 尽管并行任务的输出顺序不确定,但在两次运行中,Main thread finished 总是最后打印
    • 这证明了 pool.scope(|...| { ... }) 机制的有效性:主线程会阻塞并等待 scope 闭包内所有通过 scope.spawn() 启动的并行任务(即所有行的求和任务)全部完成后,才会继续执行 scope 之后的代码 (println!("Main thread finished"))。这确保了主程序的正确同步,即所有并行工作都已完成。

示例二

fn main() {
    let outer_pool = rayon::ThreadPoolBuilder::new()
        .num_threads(2)
        .build()
        .unwrap();

    outer_pool.scope(|scope| {
        for stage in 0..2 {
            scope.spawn(move |_scope| {
                println!("Stage {stage} started.");

                let inner_pool = rayon::ThreadPoolBuilder::new()
                    .num_threads(2)
                    .build()
                    .unwrap();

                inner_pool.scope(|inner_scope| {
                    for task in 0..2 {
                        inner_scope.spawn(move |_inner_scope| {
                            println!("\t-> Inner task {task} of stage {stage} started.");
                        });
                    }
                });

                println!("\t-> Stage {stage} completed.");
            });
        }
    });

    println!("-> All stages completed.");
}

这段 Rust 代码演示了 Rayon 线程池的嵌套使用,但其实现方式在性能上是低效且不推荐的,因为它在并行任务内部反复创建新的线程池

代码结构与逻辑解释

  1. 外部线程池创建:

    代码首先创建了一个名为 outer_pool 的 2 线程 Rayon 线程池。这个线程池用于执行顶层的并行任务。

  2. 外部作用域(Outer Scope):

    outer_pool.scope(|scope| { ... }) 创建了一个外部 "fork-join" 作用域。在这个作用域内,代码通过循环执行了两次 scope.spawn(),启动了 2 个并行任务,分别对应 stage 0 和 stage 1。

  3. 内部任务逻辑(低效部分):

    在每个 stage 的任务内部,代码执行了以下操作:

    • 打印 Stage {stage} started.
    • 在运行时动态创建了一个名为 inner_pool新的 2 线程 Rayon 线程池
    • 使用 inner_pool.scope(|inner_scope| { ... }) 再次创建了一个作用域,并在其中启动了 2 个更小的并行任务(inner task 0inner task 1)。
    • inner_pool.scope阻塞,直到这两个内部任务完成并打印 -> Inner task ... started.
    • 内部作用域完成后,inner_pool 被销毁(当 stage 任务结束时),然后打印 -> Stage {stage} completed.
  4. 同步机制:

    最外层的 outer_pool.scope 会阻塞主线程,直到 stage 0 和 stage 1 这两个并行任务全部完成。当所有工作完成后,主线程才会打印 -> All stages completed.。

核心要点和性能问题

这段代码的核心问题在于它没有利用 Rayon 的工作窃取(Work Stealing)机制。Rayon 的设计宗旨是使用一个全局线程池,通过 rayon::scope 或并行迭代器 (par_iter) 在这个单一线程池内高效地调度任务。

然而,这段代码的实现方式是:

  • Stage 0 任务启动后,它会在 outer_pool 的一个线程上运行。
  • 在该线程上,它又创建了 4 个全新的系统线程(通过 inner_pool)来处理内部任务。
  • 最终,程序在运行过程中可能同时拥有 6 个或更多的系统线程(主线程 1 个 + outer_pool 2 个 + 两个 inner_pool 各 2 个),这造成了额外的线程创建和销毁开销,浪费了资源。

正确的 Rayon 实践是在一个线程池内部,直接使用 rayon::scoperayon::spawn 来分发任务,而不是在任务内部创建新的线程池

运行

RustJourney/rayon_examples on  main [?] is 📦 0.1.0 via 🦀 1.90.0 
➜ cargo run     
   Compiling rayon_examples v0.1.0 (/Users/qiaopengjun/Code/Rust/RustJourney/rayon_examples)
    Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.63s
     Running `target/debug/rayon_examples`
Stage 1 started.
Stage 0 started.
        -> Inner task 1 of stage 1 started.
        -> Inner task 0 of stage 1 started.
        -> Inner task 1 of stage 0 started.
        -> Inner task 0 of stage 0 started.
        -> Stage 0 completed.
        -> Stage 1 completed.
-> All stages completed.

RustJourney/rayon_examples on  main [?] is 📦 0.1.0 via 🦀 1.90.0 
➜ cargo run
    Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.06s
     Running `target/debug/rayon_examples`
Stage 1 started.
Stage 0 started.
        -> Inner task 1 of stage 1 started.
        -> Inner task 0 of stage 1 started.
        -> Inner task 1 of stage 0 started.
        -> Inner task 0 of stage 0 started.
        -> Stage 0 completed.
        -> Stage 1 completed.
-> All stages completed.

这段运行结果清晰地展示了 Rayon 线程池的并行和同步机制,即使是在这种不推荐的线程池嵌套场景中。

  1. 顶层任务并行性(不确定顺序):

    两次运行的输出都以 Stage 1 started. 和 Stage 0 started. 交替开始,例如第一次运行是 Stage 1 先于 Stage 0 启动。这表明最外层 outer_pool 将 stage 0 和 stage 1 任务作为并行任务分发给了其两个工作线程,它们的启动顺序是不确定的,体现了并发执行。

  2. 内部任务并行性:

    一旦某个 stage 启动,它会立即在其内部创建并激活一个新的 inner_pool,然后并行地启动 inner task 0 和 inner task 1。因此,可以看到来自不同 Stage 的内部任务(如 Inner task 1 of stage 1 和 Inner task 1 of stage 0)的启动信息是混合交错在一起的,证实了它们也是并发执行的。

  3. Scoped Task 的同步保证(确定性完成):

    尽管所有的 started 消息都是不确定的交错输出,但程序的完成顺序是严格确定的:

    • 首先,每个 inner_pool.scope 保证其内部的两个 Inner task 结束后,才能打印相应的 -> Stage X completed.
    • 其次,最外层 outer_pool.scope 保证所有 Stage 任务Stage 0Stage 1)都打印了 completed 消息后,主线程才会继续执行,最终打印 -> All stages completed.

因此,结果表明:并行任务的执行顺序是随机的,但 Rayon 的 "fork-join" 作用域机制保证了程序会等待所有子任务完成后,才允许流程进入下一阶段,从而实现正确的程序同步。

示例三

线程广播

fn main() {
    let pool = rayon::ThreadPoolBuilder::new()
        .num_threads(4)
        .build()
        .unwrap();

    pool.scope(|scope| {
        scope.spawn_broadcast(|_scope, ctx| {
            let id = ctx.index();
            println!("Thread {id}.");
        });
    });
}

这段 Rust 代码利用 Rayon 库展示了线程广播(Thread Broadcast)这一高级功能,它用于在自定义线程池的所有工作线程上运行相同的任务,通常用于线程本地初始化或状态同步

  1. 线程池创建:

    代码首先使用 rayon::ThreadPoolBuilder::new().num_threads(4).build().unwrap() 创建了一个包含 4 个 工作线程的自定义线程池 pool。

  2. Scoped Task(作用域):

    pool.scope(|scope| { ... }) 创建了一个 "fork-join" 作用域,确保在主线程继续执行之前,作用域内所有派生的并行任务都将完成。

  3. 线程广播任务:

    核心是 scope.spawn_broadcast(|_scope, ctx| { ... })。这个方法不是像 spawn 那样创建一个任务让任一空闲线程去执行,而是特意为线程池中的 每一个 工作线程都安排一个相同的任务去执行。

    • 闭包接收一个 BroadcastContext 结构体 ctx
    • ctx.index() 方法会返回当前正在执行此广播任务的线程在线程池中的唯一索引(从 0 到线程数减 1)。

总结:

这段代码的功能是:在创建的 4 线程线程池的每个线程上运行一个任务,每个任务打印出自己线程的索引。因此,代码的运行结果会打印出 4 行消息,内容是 Thread 0.Thread 1.Thread 2.Thread 3.,但由于并行执行,这 4 行的输出顺序是不确定的。这个模式非常适合用于在并行工作开始前,对每个 Rayon 工作线程的本地状态进行设置进行一次性操作

运行

➜ cargo run
   Compiling rayon_examples v0.1.0 (/Users/qiaopengjun/Code/Rust/RustJourney/rayon_examples)
    Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.57s
     Running `target/debug/rayon_examples`
Thread 0.
Thread 1.
Thread 3.
Thread 2.

这段运行结果完美地证实了 scope.spawn_broadcast() 线程广播功能的行为以及 Rayon 的并行特性

  1. 广播执行: 代码创建了一个包含 4 个线程的线程池,并使用 spawn_broadcast 精确地在线程池的每个工作线程上执行了一次任务。因此,程序输出了 4 条 Thread X. 消息,分别对应线程索引 0 到 3,这证明了广播任务确实在所有线程上运行了。
  2. 并行性与输出顺序不确定性: 尽管这 4 个线程任务是同时被启动的,但由于操作系统的线程调度机制,这些任务完成和打印输出的顺序是不确定的。运行结果中的输出顺序是 0, 1, 3, 2,而不是严格的升序 0, 1, 2, 3,这充分体现了 Rayon 在多个核心上并行执行任务时,任务完成顺序的非确定性

结论: 运行结果表明,线程广播任务在线程池的所有 4 个线程上都成功执行,并且输出顺序的不确定性是并发编程的典型特征。

示例四

线程池 JOIN

fn main() {
    let pool = rayon::ThreadPoolBuilder::new()
        .num_threads(4)
        .build()
        .unwrap();

    let func = || println!("Hello, world!");

    pool.join(func, func);
}

这段 Rust 代码利用 Rayon 库展示了结构化并行中的 "分叉-汇合" (Fork-Join) 模式,旨在高效地并行执行两个独立的任务。

  1. 线程池初始化:

    代码首先创建了一个名为 pool 的自定义 Rayon 线程池,并使用 .num_threads(4) 明确指定该线程池拥有 4 个 工作线程。这确保了后续的并行任务会在这个受控的环境中执行。

  2. 定义任务:

    定义了一个简单的闭包 func,其唯一的副作用是打印 "Hello, world!"。

  3. 分叉-汇合操作:

    核心操作是 pool.join(func, func)。join 是 Rayon 提供的最基本且最高效的并行操作之一,它将两个闭包(oper_a 和 oper_b)作为参数:

    • 分叉 (Fork): join 函数会尝试同时启动这两个任务。具体来说,当前调用 join 的线程会立即执行第一个闭包 (func),同时将第二个闭包 (func) 作为一个新的并行任务提交给线程池。
    • 汇合 (Join): join 是一个阻塞式调用。它会一直等待,直到这两个闭包(无论它们在哪个线程上执行)都彻底完成并返回结果后,join 才会返回一个包含两个闭包返回值的元组。

总结:

这段代码通过自定义的 4 线程 Rayon 线程池,并行执行了两次打印 "Hello, world!" 的操作。由于这两个任务是并发运行的,它们在控制台输出的顺序将是不确定和交错的(但最终会输出两次 "Hello, world!")。pool.join() 确保了主线程会等待这两个并行任务完成后,程序才继续向下或结束。这种模式常用于递归算法(如快速排序)的分治并行化,具有极高的效率,因为它主要利用栈分配来管理任务,避免了复杂的堆分配开销。

运行

➜ cargo run
   Compiling rayon_examples v0.1.0 (/Users/qiaopengjun/Code/Rust/RustJourney/rayon_examples)
    Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.45s
     Running `target/debug/rayon_examples`
Hello, world!
Hello, world!

这段运行结果是前述 Rust 代码执行 pool.join(func, func) 的直接体现,结果是程序输出了两行 "Hello, world!"

  1. 并行任务执行: pool.join(func, func) 机制启动了两个完全相同的任务(即两次调用打印 "Hello, world!" 的闭包 func),并在创建的 4 线程 Rayon 线程池中并行执行。
  2. 任务完成和同步: 由于每个任务都只执行了一个简单的打印操作,它们几乎瞬间完成。join 操作保证了主程序会等待这两个并行任务都完成,因此确保了两次 "Hello, world!" 消息都成功输出。
  3. 确定性输出(偶然): 尽管 Rayon 是并行执行任务的,且任务的完成顺序通常是不确定的,但由于这两个任务的代码完全相同且执行速度极快,系统调度器在本次运行中恰好使得这两个打印操作紧接着完成并输出了两次 "Hello, world!"

结论: 运行结果证实了 rayon::join 成功地在线程池中并行执行了两个任务,并且主线程在所有并行工作完成后才结束。

总结

通过对这四个 Rayon 核心 API 的实操和结果分析,我们深刻理解了 Rayon 在 Rust 多线程编程中的强大和优雅:

  1. 结构化同步的保证(scopejoin: Rayon 的 scopejoin API 实现了经典的 Fork-Join 模式。它保证了父任务在所有子并行任务彻底完成之前不会结束,从而消除了传统线程中常见的生命周期和数据安全问题。
  2. 并行执行的非确定性: 无论是使用 scope.spawn() 还是 spawn_broadcast,任务在线程池中的执行顺序都由操作系统调度和 Rayon 的工作窃取机制决定,因此输出顺序是随机的,这是并行编程的固有特征。
  3. 高级控制能力: ThreadPoolBuilder 允许我们精确控制线程池大小;spawn_broadcast 则提供了在每个工作线程上执行一次任务的独特能力,非常适合复杂的线程本地状态初始化。

Rayon 使得 Rust 开发者能够以安全、高效且易于理解的方式,释放现代多核 CPU 的全部潜能,是进行 CPU 密集型任务优化的首选工具。

参考

点赞 0
收藏 0
分享
本文参与登链社区写作激励计划 ,好文好收益,欢迎正在阅读的你也加入。

0 条评论

请先 登录 后评论