多线程的使用
在大部分现代操作系统中,已执行程序的代码在一个 进程(process)
中运行,操作系统则会负责管理多个进程。在程序内部,也可以拥有多个同时运行的独立部分。这些运行这些独立部分的功能被称为 线程(threads)
。
将程序中的计算拆分进多个线程可以改善性能,因为程序可以同时进行多个任务,不过这也会增加复杂性。因为线程是同时运行的,所以无法预先保证不同线程中的代码的执行顺序。这会导致诸如此类的问题:
编程语言提供的线程叫做绿色线程,如go
语言,在底层实现了M:N
的模型,即M
个绿色线程对应N
个OS
线程。但是,Rust
标准库只提供1:1
的线程模型的实现,即一个Rust
线程对应一个OS
线程。
为了创建一个新线程,需要调用thread::spawn
函数并传递一个闭包:
fn main() {
thread::spawn(|| {
for i in 1..10 {
println!("{} in spawn thread", i);
thread::sleep(Duration::from_millis(1));
}
});
for i in 1..5 {
println!("{} in main thread", i);
thread::sleep(Duration::from_millis(1));
}
}
注意当 Rust 程序的主线程结束时,新线程也会结束,而不管其是否执行完毕。这个程序的输出可能每次都略有不同,不过它大体上看起来像这样:
1 in main thread
1 in spawn thread
2 in main thread
2 in spawn thread
3 in main thread
3 in spawn thread
4 in main thread
4 in spawn thread
5 in spawn thread
thread::sleep
调用强制线程停止执行一小段时间,这会允许其他不同的线程运行。这些线程可能会轮流运行,不过并不保证如此:这依赖操作系统如何调度线程。
由于主线程结束,大部分时候不光会提早结束新建线程,因为无法保证线程运行的顺序,我们甚至不能实际保证新建线程会被执行!
可以通过将thread::spawn
的返回值储存在变量中来修复新建线程部分没有执行或者完全没有执行的问题。thread::spawn
的返回值类型是 JoinHandle
。JoinHandle
是一个拥有所有权的值,当对其调用 join
方法时,它会等待其线程结束:
use std::{thread, time::Duration};
fn main() {
let t1 = thread::spawn(|| {
for i in 1..10 {
println!("{} in spawn thread", i);
thread::sleep(Duration::from_millis(1));
}
});
for i in 1..5 {
println!("{} in main thread", i);
thread::sleep(Duration::from_millis(1));
}
t1.join().unwrap();
}
通过调用 t1
的 join
会阻塞当前线程直到 t1
所代表的线程结束。阻塞线程意味着阻止该线程执行工作或退出。因为我们将 join
调用放在了主线程的 for
循环之后,运行输出:
1 in main thread
1 in spawn thread
2 in main thread
2 in spawn thread
3 in main thread
3 in spawn thread
4 in main thread
4 in spawn thread
5 in spawn thread
6 in spawn thread
7 in spawn thread
8 in spawn thread
9 in spawn thread
这两个线程仍然会交替执行,不过主线程会由于 t1.join()
调用会等待直到新建线程执行完毕。
不过让我们看看将 t1.join()
移动到main
中 for
循环之前会发生什么,如下:
use std::{thread, time::Duration};
fn main() {
let t1 = thread::spawn(|| {
for i in 1..10 {
println!("{} in spawn thread", i);
thread::sleep(Duration::from_millis(1));
}
});
t1.join().unwrap();
for i in 1..5 {
println!("{} in main thread", i);
thread::sleep(Duration::from_millis(1));
}
}
运行输出:
1 in spawn thread
2 in spawn thread
3 in spawn thread
4 in spawn thread
5 in spawn thread
6 in spawn thread
7 in spawn thread
8 in spawn thread
9 in spawn thread
1 in main thread
2 in main thread
3 in main thread
4 in main thread
将 join
放置于何处,会影响线程是否同时运行。
在之前示例thread::spawn
的闭包并没有任何参数,并没有在新建线程代码中使用任何主线程的数据。为了在新建线程中使用来自于主线程的数据,需要新建线程的闭包获取它需要的值:
use std::thread;
fn main() {
let s = "hello".to_string();
let t1 = thread::spawn(|| {
println!("{}", s);
});
t1.join().unwrap();
}
闭包使用了s
,所以闭包会捕获 s
并使其成为闭包环境的一部分。因为 thread::spawn
在一个新线程中运行这个闭包,所以可以在新线程中访问s
。然而当编译这个例子时,会得到如下错误:
$ cargo run
Compiling multithreading v0.1.0 (/rsut/multithreading)
error[E0373]: closure may outlive the current function, but it borrows `s`, which is owned by the current function
--> src/main.rs:4:28
|
4 | let t1 = thread::spawn(|| {
| ^^ may outlive borrowed value `s`
5 | println!("{}", s);
| - `s` is borrowed here
|
note: function requires argument type to outlive `'static`
--> src/main.rs:4:14
|
4 | let t1 = thread::spawn(|| {
| ______________^
5 | | println!("{}", s);
6 | | });
| |______^
help: to force the closure to take ownership of `s` (and any other referenced variables), use the `move` keyword
|
4 | let t1 = thread::spawn(move || {
| ++++
For more information about this error, try `rustc --explain E0373`.
error: could not compile `multithreading` due to previous error
Rust
会 推断 如何捕获 s
,因为 println!
只需要 s
的引用,闭包尝试借用s
。然而这有一个问题:Rust
不知道这个新建线程会执行多久,所以无法知晓 s
的引用是否一直有效。
看错误信息的建议在闭包之前增加move
关键字,我们强制闭包获取其使用的值的所有权,而不是任由 Rust
推断它应该借用值:
use std::thread;
fn main() {
let s = "hello".to_string();
let t1 = thread::spawn(move || {
println!("{}", s);
});
t1.join().unwrap();
}
Rust
是保守的并只会为线程借用 s
,这意味着主线程理论上可能使新建线程的引用无效。通过告诉Rust
将s
的所有权移动到新建线程,我们向 Rus
t 保证主线程不会再使用s
。
Rust
中一个实现消息传递并发的主要工具是通道,Rust
的通道(channel)
可以把一个线程的消息(数据)传递到另一个线程,从而让信息在不同的线程中流动,从而实现协作。通道由两部分组成,一个是发送端,一个是接收端,发送端用来发送消息,接收端用来接收消息。发送者或者接收者任一被丢弃时就可以认为通道被关闭了。
mpsc::channel
,创建通道,mpsc
是多个生产者,单个消费者。spmc::channel
,创建通道,spmc
是一个生产者,多个消费者。首先,创建了一个通道但没有做任何事。注意这还不能编译,因为 Rust
不知道我们想要在信道中发送什么类型:
use std::sync::mpsc;
fn main() {
let (tx, rx) = mpsc::channel();
}
这里使用 mpsc::channel
函数创建一个新的通道;mpsc
是 多个生产者,单个消费者。Rust
标准库实现通道的方式意味着一个通道可以有多个产生值的 发送端,但只能有一个消费这些值的 接收端。
mpsc::channel
函数返回一个元组:第一个元素是发送端,而第二个元素是接收端。
让我们将发送端移动到一个新建线程中并发送一个字符串,这样新建线程就可以和主线程通讯了:
use std::{sync::mpsc, thread};
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let val = String::from("你好");
tx.send(val).unwrap();
});
}
这里再次使用 thread::spawn
来创建一个新线程并使用move
将 tx
移动到闭包中这样新建线程就拥有 tx
了。新建线程需要拥有通道的发送端以便能向信道发送消息。通道的发送端有一个 send
方法用来获取需要放入通道的值。
我们在主线程中从通道的接收者获取值:
use std::{sync::mpsc, thread};
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let val = String::from("你好");
tx.send(val).unwrap();
});
let received = rx.recv().unwrap();
println!("rx: {}", received);
}
通道的接收者有两个有用的方法:recv
和 try_recv
。这里,我们使用了recv
,这个方法会阻塞主线程执行直到从信道中接收一个值。一旦发送了一个值,recv
会在一个 Result<T, E>
中返回它。当通道发送端关闭,recv
会返回一个错误表明不会再有新的值到来了。
try_recv
不会阻塞,相反它立刻返回一个 Result<T, E>:Ok
值包含可用的信息,而 Err
值代表此时没有任何消息。如果线程在等待消息过程中还有其他工作时使用 try_recv
很有用:可以编写一个循环来频繁调用 try_recv
,在有可用消息时进行处理,其余时候则处理一会其他工作直到再次检查。
我们在新建线程中的通道中发送完 val
值 之后 再使用它:
use std::{sync::mpsc, thread};
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let val = String::from("hi");
tx.send(val).unwrap();
println!("tx : {}", val);
});
let received = rx.recv().unwrap();
println!("rx: {}", received);
}
这里尝试在通过 tx.send
发送 val
到通道中之后将其打印出来。在调用 send
的时候,会发生move
动作,所以此处不能再使用val
:
$ cargo run
Compiling multithreading v0.1.0 (/rsut/multithreading)
error[E0382]: borrow of moved value: `val`
--> src/main.rs:9:29
|
7 | let val = String::from("hi");
| --- move occurs because `val` has type `String`, which does not implement the `Copy` trait
8 | tx.send(val).unwrap();
| --- value moved here
9 | println!("tx : {}", val);
| ^^^ value borrowed here after move
|
= note: this error originates in the macro `$crate::format_args_nl` which comes from the expansion of the macro `println` (in Nightly builds, run with -Z macro-backtrace for more info)
For more information about this error, try `rustc --explain E0382`.
error: could not compile `multithreading` due to previous error
send
函数获取其参数的所有权并移动这个值归接收者所有。这可以防止在发送后再次意外地使用这个值;
新建线程现在会发送多个消息并在每个消息之间暂停一秒钟:
use std::{sync::mpsc, thread, time::Duration};
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let vals = vec![
String::from("A"),
String::from("B"),
String::from("C"),
String::from("D"),
];
for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
for received in rx {
println!("rx: {}", received);
}
}
这一次,在新建线程中有一个字符串vector
希望发送到主线程。我们遍历他们,单独的发送每一个字符串并通过一个Duration
值调用 thread::sleep
函数来暂停一秒。
在主线程中,不再显式调用 recv
函数:而是将 rx
当作一个迭代器。对于每一个接收到的值,我们将其打印出来。当信道被关闭时,迭代器也将结束。
看到如下输出,每一行都会暂停一秒:
rx: A
rx: B
rx: C
rx: D
因为主线程中的 for
循环里并没有任何暂停或等待的代码,所以可以说主线程是在等待从新建线程中接收值。
之前有向同一接收者发送值的多个线程。这可以通过复制发送者来做到:
use std::{sync::mpsc, thread, time::Duration};
fn main() {
let (tx, rx) = mpsc::channel();
let tx1 = tx.clone();
thread::spawn(move || {
let vals = vec![
String::from("A1"),
String::from("B1"),
String::from("C1"),
String::from("D1"),
];
for val in vals {
tx1.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
thread::spawn(move || {
let vals = vec![
String::from("A2"),
String::from("B2"),
String::from("C2"),
String::from("D2"),
];
for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
for received in rx {
println!("rx: {}", received);
}
}
这一次,在创建新线程之前,我们对发送者调用了 clone
方法。这会给我们一个可以传递给第一个新建线程的发送端句柄。我们会将原始的信道发送端传递给第二个新建线程。这样就会有两个线程,每个线程将向信道的接收端发送不同的消息。
如果运行这些代码输出:
rx: A1
rx: A2
rx: B2
rx: B1
rx: C1
rx: C2
rx: D1
rx: D2
虽然你可能会看到这些值以不同的顺序出现;
互斥器任意时刻,其只允许一个线程访问某些数据。为了访问互斥器中的数据,线程首先需要通过获取互斥器的锁(lock)
来表明其希望访问数据。锁是一个作为互斥器一部分的数据结构,它记录谁有数据的排他访问权。因此,我们描述互斥器为通过锁系统 保护(guarding)
其数据。
互斥器以难以使用著称,因为你不得不记住:
互斥器的单线程例子:
use std::sync::Mutex;
fn main() {
let m = Mutex::new(5);
{
let mut num = m.lock().unwrap();
*num = 6;
}
println!("m = {:?}", m);
}
像很多类型一样,我们使用关联函数new
来创建一个 Mutex<T>
。使用 lock
方法获取锁,以访问互斥器中的数据。这个调用会阻塞当前线程,直到我们拥有锁为止。
如果另一个线程拥有锁,并且那个线程 panic
了,则 lock
调用会失败。在这种情况下,没人能够再获取锁,所以这里选择 unwrap
并在遇到这种情况时使线程 panic
。
一旦获取了锁,就可以将返回值(在这里是num)视为一个其内部数据的可变引用了。类型系统确保了我们在使用m
中的值之前获取锁。m
的类型是 Mutex<i32>
而不是 i32
,所以 必须
获取锁才能使用这个 i32
值。我们是不会忘记这么做的,因为反之类型系统不允许访问内部的 i32
值。
Mutex<T>
其实一个智能指针。更准确的说,lock
调用 返回 一个叫做MutexGuard
的智能指针。这个智能指针实现了 Deref
来指向其内部数据;其也提供了一个 Drop 实现当 MutexGuard 离开作用域时自动释放锁。丢弃了锁之后,可以打印出互斥器的值,并发现能够将其内部的 i32
改为6
。
现在尝试使用Mutex<T>
在多个线程间共享值。我们将启动十个线程,并在各个线程中对同一个计数器值加一,这样计数器将从 0
变为10
。
use std::{sync::Mutex, thread};
fn main() {
let counter = Mutex::new(0);
let mut handles = vec![];
for _ in 0..10 {
let handle = thread::spawn(move || {
let mut num = counter.lock().unwrap();
*num += 1;
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("counter = {}", *counter.lock().unwrap());
}
这里创建了一个 counter
变量来存放内含 i32
的 Mutex<T>
,接下来遍历 range
创建了10
个线程。使用了 thread::spawn
并对所有线程使用了相同的闭包:他们每一个都将调用 lock
方法来获取 Mutex<T>
上的锁,接着将互斥器中的值加一。当一个线程结束执行,num
会离开闭包作用域并释放锁,这样另一个线程就可以获取它了。
在主线程中,我们收集了所有的 join
句柄,调用它们的 join
方法来确保所有线程都会结束。这时,主线程会获取锁并打印出程序的结果。
这个例子不能编译:
$ cargo run
Compiling multithreading v0.1.0 (/rsut/multithreading)
error[E0382]: use of moved value: `counter`
--> src/main.rs:8:36
|
4 | let counter = Mutex::new(0);
| ------- move occurs because `counter` has type `Mutex<i32>`, which does not implement the `Copy` trait
...
8 | let handle = thread::spawn(move || {
| ^^^^^^^ value moved into closure here, in previous iteration of loop
9 | let mut num = counter.lock().unwrap();
| ------- use occurs due to use in closure
For more information about this error, try `rustc --explain E0382`.
error: could not compile `multithreading` due to previous error
错误信息表明 counter
值在上一次循环中被移动了。所以 Rust
告诉我们不能将counter
锁的所有权移动到多个线程中。
通过使用智能指针Rc<T>
来创建引用计数的值,以便拥有多所有者。让我们在这也这么做看看会发生什么。将Mutex<T>
封装进 Rc<T>
中并在将所有权移入线程之前克隆了Rc<T>
。
use std::{rc::Rc, sync::Mutex, thread};
fn main() {
let counter = Rc::new(Mutex::new(0));
let mut handles = vec![];
for _ in 0..10 {
let counter = Rc::clone(&counter);
let handle = thread::spawn(move || {
let mut num = counter.lock().unwrap();
*num += 1;
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("counter = {}", *counter.lock().unwrap());
}
再一次编译并出现了不同的错误:
`Rc<Mutex<i32>>` cannot be sent between threads safely
Rc<T>
并不能安全的在线程间共享。当Rc<T>
管理引用计数时,它必须在每一个 clone
调用时增加计数,并在每一个克隆被丢弃时减少计数。Rc<T>
并没有使用任何并发原语,来确保改变计数的操作不会被其他线程打断。在计数出错时可能会导致诡异的 bug
,比如可能会造成内存泄漏,或在使用结束之前就丢弃一个值。我们所需要的是一个完全类似 Rc<T>
,又以一种线程安全的方式改变引用计数的类型。
所幸 Arc<T>
正是 这么一个类似Rc<T>
并可以安全的用于并发环境的类型。
use std::{
sync::{Arc, Mutex},
thread,
};
fn main() {
let counter = Arc::new(Mutex::new(0));
let mut handles = vec![];
for _ in 0..10 {
let counter = Arc::clone(&counter);
let handle = thread::spawn(move || {
let mut num = counter.lock().unwrap();
*num += 1;
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("counter = {}", *counter.lock().unwrap());
}
运行:
counter = 10
你可能注意到了,因为 counter
是不可变的,不过可以获取其内部值的可变引用;这意味着 Mutex<T>
提供了内部可变性,就像 Cell
系列类型那样。RefCell<T>
可以改变Rc<T>
中的内容那样,同样的可以使用 Mutex<T>
来改变 Arc<T>
中的内容。
如果觉得我的文章对您有用,请随意打赏。你的支持将鼓励我继续创作!