RustAsync简易教程课程内容异步编程的概念同步、多线程、异步的例子理解Async理解Future最后一个例子一、异步编程的概念并发与并行并发(concurrency)是指程序不同部分可以同时不按顺序的执行且不影响最终结果的能力。而同时执行多个任务是并行(par
本课中,并发一次代表 concurrency 和 parallelism
软件程序处理任务的两种类型:
CPU 密集型的任务:通常可以利用多 CPU 或多核进行处理。
IO 密集型任务:
Synchronous 同步
Multi-Threading 多线程
Asynchronous 异步
Task 1 包含三个步骤:
多线程,针对每个请求都开启一个原生的系统线程。提供性能,但却引入了新的复杂性:
多线程的两种模型:
而 Rust 标准库实现的是 1:1 模型
每个 HTTP 请求被异步 WEB Server 接收,Web Server 会生成一个任务来处理它,并由异步运行时安排各个异步任务在可用的CPU上执行
同步例子
use std::thread::sleep;
use std::time::Duration;
fn main() {
println!("Hello before reading file!");
let file1_contents = read_from_file1();
println!("{:?}", file1_contents);
println!("Hello after reading file1!");
let file2_contents = read_from_file2();
println!("{:?}", file2_contents);
println!("Hello after reading file2!");
}
fn read_from_file1() -> String {
sleep(Duration::new(4, 0));
String::from("Hello, there from file 1")
}
fn read_from_file2() -> String {
sleep(Duration::new(2, 0));
String::from("Hello, there from file 2")
}
运行
asc on master [?] is 📦 0.1.0 via 🦀 1.67.1 via 🅒 base
➜ cargo run
Compiling asc v0.1.0 (/Users/qiaopengjun/rust/asc)
Finished dev [unoptimized + debuginfo] target(s) in 0.53s
Running `target/debug/asc`
Hello before reading file!
"Hello, there from file 1"
Hello after reading file1!
"Hello, there from file 2"
Hello after reading file2!
asc on master [?] is 📦 0.1.0 via 🦀 1.67.1 via 🅒 base took 7.3s
➜
多线程例子
use std::thread;
use std::thread::sleep;
use std::time::Duration;
fn main() {
println!("Hello before reading file!");
let handle1 = thread::spawn(|| {
let file1_contents = read_from_file1();
println!("{:?}", file1_contents);
});
let handle2 = thread::spawn(|| {
let file2_contents = read_from_file2();
println!("{:?}", file2_contents);
});
handle1.join().unwrap();
handle2.join().unwrap();
}
fn read_from_file1() -> String {
sleep(Duration::new(4, 0));
String::from("Hello, there from file 1")
}
fn read_from_file2() -> String {
sleep(Duration::new(2, 0));
String::from("Hello, there from file 2")
}
运行
asc on master [?] is 📦 0.1.0 via 🦀 1.67.1 via 🅒 base took 7.3s
➜ cargo run
Compiling asc v0.1.0 (/Users/qiaopengjun/rust/asc)
Finished dev [unoptimized + debuginfo] target(s) in 0.25s
Running `target/debug/asc`
Hello before reading file!
"Hello, there from file 2"
"Hello, there from file 1"
asc on master [?] is 📦 0.1.0 via 🦀 1.67.1 via 🅒 base took 4.7s
➜
异步例子
use std::thread::sleep;
use std::time::Duration;
#[tokio::main]
async fn main() {
println!("Hello before reading file!");
let h1 = tokio::spawn(async {
let _file1_contents = read_from_file1().await;
});
let h2 = tokio::spawn(async {
let _file2_contents = read_from_file2().await;
});
let _ = tokio::join!(h1, h2);
}
async fn read_from_file1() -> String {
sleep(Duration::new(4, 0));
println!("{:?}", "Processing file 1");
String::from("Hello, there from file 1")
}
async fn read_from_file2() -> String {
sleep(Duration::new(2, 0));
println!("{:?}", "Processing file 2");
String::from("Hello, there from file 2")
}
运行
asc on master [?] is 📦 0.1.0 via 🦀 1.67.1 via 🅒 base took 3.3s
➜ cargo run
Compiling asc v0.1.0 (/Users/qiaopengjun/rust/asc)
Finished dev [unoptimized + debuginfo] target(s) in 0.28s
Running `target/debug/asc`
Hello before reading file!
"Processing file 2"
"Processing file 1"
asc on master [?] is 📦 0.1.0 via 🦀 1.67.1 via 🅒 base took 4.9s
➜
异步编程,诀窍就是当 CPU 等待外部事件或动作时,异步运行时会安排其他可继续执行的任务在 CPU 上执行。而当从磁盘或 I/O 子系统的系统中断到达的时候,异步运行时会知道识别这事,并安排原来的任务继续执行。
一般来说,I/O 受限(I/O Bound)的程序(程序执行的速度依赖于I/O 子系统的速度)比起 CPU受限(CPU Bound)的任务(程序执行的速度依赖于CPU的速度)可能更适合于异步任务的执行。
async、.await 关键字是Rust标准库里用于异步编程的内置核心原语集的代表。就是语法糖。
Rust 异步的核心其实是 Future
Future 是由异步计算或函数产生的单一最终值。
Rust 的异步函数都会返回 Future,Future 基本上就是代表着延迟的计算
谁来调用poll 方法呢?
是异步执行器,它是异步运行时的一部分。异步执行器会管理一个 Future 的集合,并通过调用 Future 上的 poll 方法来驱动它们完成。所以函数或代码块在前面加上 async 关键字之后,就相当于告诉异步执行器他会返回 Future,这个Future需要被驱动直到完成。
但是异步执行器怎么知道异步已经准备好可以取得进展(可以产生值)了呢?它会持续不断的调用 poll 方法吗?
use std::thread::sleep;
use std::time::Duration;
#[tokio::main]
async fn main() {
println!("Hello before reading file!");
let h1 = tokio::spawn(async {
let _file1_contents = read_from_file1().await;
});
let h2 = tokio::spawn(async {
let _file2_contents = read_from_file2().await;
});
let _ = tokio::join!(h1, h2);
}
// async fn read_from_file1() -> String {
// sleep(Duration::new(4, 0));
// println!("{:?}", "Processing file 1");
// String::from("Hello, there from file 1")
// }
// async fn read_from_file2() -> String {
// sleep(Duration::new(2, 0));
// println!("{:?}", "Processing file 2");
// String::from("Hello, there from file 2")
// }
// Poll::Ready
// Poll::Pending
use std::future::Future;
fn read_from_file1() -> impl Future<Output = String> {
async {
sleep(Duration::new(4, 0));
println!("{:?}", "Processing file 1");
String::from("Hello, there from file 1")
}
}
fn read_from_file2() -> impl Future<Output = String> {
async {
sleep(Duration::new(3, 0));
println!("{:?}", "Processing file 2");
String::from("Hello, there from file 2")
}
}
Tokio 运行时就是管理异步任务并安排它们在 CPU上执行的组件。
一个程序可能生成多个任务,每个任务可能包含一个或多个Future
main()
任务1 ReadFileFuture
任务2 read_file2 返回的 Future
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::thread::sleep;
use std::time::Duration;
struct ReadFileFuture {}
impl Future for ReadFileFuture {
type Output = String;
fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
println!("Tokio! Stop polling me");
Poll::Pending
}
}
#[tokio::main]
async fn main() {
println!("Hello before reading file!");
let h1 = tokio::spawn(async {
let future1 = ReadFileFuture {};
future1.await
});
let h2 = tokio::spawn(async {
let file2_contents = read_from_file2().await;
println!("{:?}", file2_contents);
});
let _ = tokio::join!(h1, h2);
}
fn read_from_file2() -> impl Future<Output = String> {
async {
sleep(Duration::new(2, 0));
println!("{:?}", "Processing file 2");
String::from("Hello, there from file 2")
}
}
运行
asc on master [?] is 📦 0.1.0 via 🦀 1.67.1 via 🅒 base took 4.9s
➜ cargo run
Compiling asc v0.1.0 (/Users/qiaopengjun/rust/asc)
Finished dev [unoptimized + debuginfo] target(s) in 0.51s
Running `target/debug/asc`
Hello before reading file!
Tokio! Stop polling me
"Processing file 2"
"Hello, there from file 2"
它是一直不断的对它进行 poll 吗?肯定不会一直 poll
Tokio(Rust的异步设计)是使用一个 Waker 组件来处理这件事的。
当被异步执行器 poll 过的任务还没有准备好产生值的时候,这个任务就被注册到一个 Waker。Waker 会有一个处理程序(handle),它会被存储在任务管理的 Context 对象中。
Waker 有一个 wake() 方法,可以用来告诉异步执行器关联的任务应该被唤醒了。当 wake() 方法被调用了,Tokio 执行器就会被通知是时候再次 poll 这个异步的任务了,具体方式就是调用任务上的 poll() 函数。
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::thread::sleep;
use std::time::Duration;
struct ReadFileFuture {}
impl Future for ReadFileFuture {
type Output = String;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
println!("Tokio! Stop polling me");
cx.waker().wake_by_ref();
Poll::Pending
}
}
#[tokio::main]
async fn main() {
println!("Hello before reading file!");
let h1 = tokio::spawn(async {
let future1 = ReadFileFuture {};
future1.await
});
let h2 = tokio::spawn(async {
let file2_contents = read_from_file2().await;
println!("{:?}", file2_contents);
});
let _ = tokio::join!(h1, h2);
}
fn read_from_file2() -> impl Future<Output = String> {
async {
sleep(Duration::new(2, 0));
println!("{:?}", "Processing file 2");
String::from("Hello, there from file 2")
}
}
Tokio 运行时需要理解操作系统(内核)的方法来开启I/O操作(读取网络数据,读写文件等)。
Tokio 运行时会注册异步的处理程序,以便在事件发生时作为I/O操作的一部分进行调用。而在 Tokio 运行时里面,从内核监听这些事件并与Tokio 其他部分通信的组件就是反应器(reactor)。
Tokio 执行器,它会把一个 Future,当其可取得更多进展时,通过调用 Future 的poll() 方法来驱动其完成。
那么Future是如何告诉执行器它们准备好取得进展了呢?
就是 Future 调用 Waker 组件上的 wake() 方法。
Waker 组件就会通知执行器,然后再把 Future 放回队列,并再次调用 poll() 方法,直到 Future 完成。
Future1 Future2
Tokio Waker Tokio Waker
Tokio 执行器
Tokio 反应器
异步 kernel (epoll、kqueue)
操作系统
硬件/CPU...
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::thread::sleep;
use std::time::Duration;
struct ReadFileFuture {}
impl Future for ReadFileFuture {
type Output = String;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) ->
Poll<Self::Output> {
println!("Tokio! Stop polling me");
cx.waker().wake_by_ref();
Poll::Ready(String::from("Hello, there from file 1"))
}
}
#[tokio::main]
async fn main() {
println!("Hello before reading file!");
let h1 = tokio::spawn(async {
let future1 = ReadFileFuture {};
future1.await
});
let h2 = tokio::spawn(async {
let file2_contents = read_from_file2().await;
println!("{:?}", file2_contents);
});
let _ = tokio::join!(h1, h2);
}
fn read_from_file2() -> impl Future<Output = String> {
async {
sleep(Duration::new(2, 0));
println!("{:?}", "Processing file 2");
String::from("Hello, there from file 2")
}
}
创建一个自定义的 Future,它是一个定时器,具有以下功能:
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::thread::sleep;
use std::time::{Duration, Instant};
struct AsyncTimer {
expiration_time: Instant,
}
impl Future for AsyncTimer {
type Output = String;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if Instant::now() >= self.expiration_time {
println!("Hello, it's time for Future 1");
Poll::Ready(String::from("Future 1 has completed"))
} else {
println!("Hello, it's not yet time for Future 1. Going to sleep");
let waker = cx.waker().clone();
let expiration_time = self.expiration_time;
std::thread::spawn(move || {
let current_time = Instant::now();
if current_time < expiration_time {
std::thread::sleep(expiration_time - current_time);
}
waker.wake();
});
Poll::Pending
}
}
}
#[tokio::main]
async fn main() {
let h1 = tokio::spawn(async {
let future1 = AsyncTimer {
expiration_time: Instant::now() + Duration::from_millis(4000),
};
println!("{:?}", future1.await);
});
let h2 = tokio::spawn(async {
let file2_contents = read_from_file2().await;
println!("{:?}", file2_contents);
});
let _ = tokio::join!(h1, h2);
}
fn read_from_file2() -> impl Future<Output = String> {
async {
sleep(Duration::new(2, 0));
String::from("Future 2 has completed")
}
}
运行
asynctimer on master [?] is 📦 0.1.0 via 🦀 1.67.1 via 🅒 base
➜ cargo run
Compiling cfg-if v1.0.0
Compiling scopeguard v1.1.0
Compiling smallvec v1.10.0
Compiling pin-project-lite v0.2.9
Compiling bytes v1.4.0
Compiling libc v0.2.144
Compiling log v0.4.17
Compiling lock_api v0.4.9
Compiling parking_lot_core v0.9.7
Compiling num_cpus v1.15.0
Compiling signal-hook-registry v1.4.1
Compiling socket2 v0.4.9
Compiling mio v0.8.6
Compiling parking_lot v0.12.1
Compiling tokio v1.28.1
Compiling asynctimer v0.1.0 (/Users/qiaopengjun/rust/asynctimer)
Finished dev [unoptimized + debuginfo] target(s) in 2.84s
Running `target/debug/asynctimer`
Hello, it's not yet time for Future 1. Going to sleep
"Future 2 has completed"
Hello, it's time for Future 1
"Future 1 has completed"
asynctimer on master [?] is 📦 0.1.0 via 🦀 1.67.1 via 🅒 base took 7.4s
➜
如果觉得我的文章对您有用,请随意打赏。你的支持将鼓励我继续创作!