首先,我们需要了解什么是 I/O 密集型任务和 CPU 密集型任务。
CPU-bound tasks:CPU 密集型任务,指的是任务需要大量的 CPU 计算资源,而不是 I/O 操作。CPU 密集型任务的特点是 CPU 使用率高,计算时间长,I/O 操作少。
I/O-bound tasks:I/O 密集型任务,指的是任务需要大量的 I/O 操作,而不是 CPU 计算资源。I/O 密集型任务的特点是 CPU 使用率低,计算时间短,I/O 操作多。
异步编程是一种编程模型,用于处理 I/O 密集型任务。在异步编程中,任务的执行不会被阻塞,而是在等待 I/O 操作完成的过程中,可以执行其他任务。这样可以充分利用 CPU 资源,提高系统的性能。
async/await 是 Rust 语言提供的异步编程特性。通过 async/await 关键字,可以定义异步函数。异步函数可以在执行过程中暂停,等待 I/O 操作完成,然后继续执行。这样可以避免线程阻塞,提高系统的并发能力。
暂不关心底层,从高层看,#[tokio::main]
宏的作用是,启动一个 tokio 运行时,然后复制函数体给这个运行时执行。
tokio 提供了 tokio::task
模块,用于创建异步任务。异步任务是一个可以在 tokio 运行时中执行的异步函数。异步任务可以通过 tokio::spawn
函数创建。
Join handle 是一个用于等待异步任务完成的句柄。Join handle 的 await
方法会等待异步任务完成,并返回任务的执行结果。
1 2 3 4 5 6 7 8 9 10 use tokio::task::JoinHandle;async fn hello () { println! ("Hello, world!" ); } #[tokio::main] async fn main () { let handle : JoinHandle<()> = tokio::spawn (hello ()); handle.await .unwrap (); }
spwan blocking tasks CPU 密集的任务尤其需要用线程的方式去处理,例如使用 spawn_blocking 创建一个阻塞的线程去完成相应 CPU 密集任务。使用 spawn_blocking 后,会创建一个单独的 OS 线程,该线程并不会被 tokio 所调度( 被 OS 所调度 ),因此它所执行的 CPU 密集任务也不会导致 tokio 调度的那些异步任务被饿死
#[tokio::test]
宏用于定义异步测试函数。异步测试函数是一个异步函数,可以在 tokio 运行时中执行。异步测试函数可以通过 tokio::test
宏创建。因为异步函数只能从其他异步函数中调用,不能从同步函数中调用。
tokio::sync::mpsc
模块提供了多生产者单消费者通道。多生产者单消费者通道是一种异步通道,用于在多个生产者和单个消费者之间传递消息。多生产者单消费者通道可以通过 tokio::sync::mpsc
模块创建。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 use tokio::sync::mpsc;use tokio::time::{sleep, Duration};#[tokio::main] async fn main () { let (tx, mut rx) = mpsc::channel (32 ); tokio::spawn (async move { for i in 0 ..5 { tx.send (i).await .unwrap (); sleep (Duration::from_secs (1 )).await ; } }); while let Some (i) = rx.recv ().await { println! ("Received: {}" , i); } }
tokio::sync::oneshot
模块提供了一次性通道。一次性通道是一种异步通道,用于在一个生产者和一个消费者之间传递单个消息。一次性通道可以通过 tokio::sync::oneshot
模块创建。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 use tokio::sync::oneshot;use tokio::time::{sleep, Duration};#[tokio::main] async fn main () { let (tx, rx) = oneshot::channel (); tokio::spawn (async move { sleep (Duration::from_secs (2 )).await ; tx.send ("Hello, world!" ).unwrap (); }); let msg = rx.await .unwrap (); println! ("{}" , msg); }
tokio::sync::watch
模块提供了观察者模式。观察者模式是一种设计模式,用于在多个观察者和一个主题之间建立一对多的依赖关系。观察者模式可以通过 tokio::sync::watch
模块创建。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 use tokio::sync::watch;use tokio::time::{sleep, Duration};#[tokio::main] async fn main () { let (tx, mut rx) = watch::channel ("Hello, world!" ); tokio::spawn (async move { sleep (Duration::from_secs (2 )).await ; tx.send ("Hello, tokio!" ).unwrap (); }); while let Some (msg) = rx.recv ().await { println! ("{}" , msg); } }
tokio::sync::broadcast
模块提供了广播通道。广播通道是一种异步通道,用于在一个生产者和多个消费者之间传递消息。广播通道可以通过 tokio::sync::broadcast
模块创建。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 use tokio::sync::broadcast;use tokio::time::{sleep, Duration};#[tokio::main] async fn main () { let (tx, mut rx1) = broadcast::channel (32 ); let mut rx2 = tx.subscribe (); tokio::spawn (async move { for i in 0 ..5 { tx.send (i).unwrap (); sleep (Duration::from_secs (1 )).await ; } }); tokio::spawn (async move { while let Ok (i) = rx1.recv ().await { println! ("Received1: {}" , i); } }); tokio::spawn (async move { while let Ok (i) = rx2.recv ().await { println! ("Received2: {}" , i); } }); tokio::time::sleep (Duration::from_secs (10 )).await ; }
tokio::sync::Barrier
屏障是一种用于同步多个任务的同步原语。屏障可以让多个任务在某个点上等待,然后同时继续执行。屏障可以通过 tokio::sync::Barrier
模块创建。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 use tokio::sync::Barrier;use tokio::time::{sleep, Duration};#[tokio::main] async fn main () { let barrier = Barrier::new (2 ); tokio::spawn (async move { println! ("Task1: before wait" ); barrier.wait ().await ; println! ("Task1: after wait" ); }); tokio::spawn (async move { println! ("Task2: before wait" ); barrier.wait ().await ; println! ("Task2: after wait" ); }); tokio::time::sleep (Duration::from_secs (10 )).await ; }
tokio::sync::RwLock
读写锁是一种用于控制并发访问的同步原语。读写锁允许多个线程同时读取共享资源,但只允许一个线程写入共享资源。读写锁可以通过 tokio::sync::RwLock
模块创建。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 use tokio::sync::RwLock;use tokio::time::{sleep, Duration};#[tokio::main] async fn main () { let rwlock = RwLock::new (0 ); tokio::spawn (async move { let mut lock = rwlock.write ().await ; *lock += 1 ; println! ("Task1: write lock = {}" , *lock); }); tokio::spawn (async move { let lock = rwlock.read ().await ; println! ("Task2: read lock = {}" , *lock); }); tokio::time::sleep (Duration::from_secs (10 )).await ; }
tokio::sync::Mutex
互斥锁是一种用于控制并发访问的同步原语。互斥锁允许一个线程访问共享资源,其他线程必须等待该线程释放锁后才能访问共享资源。互斥锁可以通过 tokio::sync::Mutex
模块创建。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 use tokio::sync::Mutex;use tokio::time::{sleep, Duration};async fn person (name: &str , remote_arc: Arc<Mutex<i32 >>, new_channel: i32 ) { let mut remote = remote.lock ().await ; *remote = new_channel; println! ("{} changed the channel to {}" , name, new_channel); sleep (Duration::from_secs (1 )).await ; } #[tokio::main] async fn main () { let tv_channel = 10 ; let remote = Mutex::new (tv_channel); let remote_arc = Arc::new (remote); let mut task_handles = Vec ::new (); for (name, new_channel) in vec! [("Alice" , 5 ), ("Bob" , 3 ), ("Carol" , 2 )] { task_handles.push (tokio::spawn (person ( name.to_string (), remote_arc.clone (), new_channel ))); } for task in task_handles { task.await .unwrap (); } }
tokio::sync::Semaphore
信号量是一种用于控制并发访问的同步原语。信号量可以限制同时访问共享资源的线程数量。信号量可以通过 tokio::sync::Semaphore
模块创建。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 use std::sync::Arc;use tokio::sync::Semaphore;use tokio::time::{sleep, Duration};async fn person (semaphore: Arc<Semaphore>, name: String ) { println! ("{} is waiting" , name); teller (&semaphore, name).await ; } async fn teller (semaphore: &Arc<Semaphore>, name: String ) { let permit = semaphore.clone ().acquire ().await .unwrap (); println! ("{} is in" , name); sleep (Duration::from_secs (1 )).await ; drop (permit); println! ("{} is out" , name); } #[tokio::main] async fn main () { let semaphore_arc = Arc::new (Semaphore::new (2 )); let mut task_handles = Vec ::new (); for name in vec! ["Alice" , "Bob" , "Carol" , "Dave" , "Eve" ] { task_handles.push (tokio::spawn (person ( semaphore_arc.clone (), name.to_string () ))); } for task in task_handles { task.await .unwrap (); } }
notify 用于通知异步任务完成。notify 会返回一个通知器,通知器可以用于通知异步任务完成。通知器是一个 Future,可以通过 await 关键字等待通知器完成。通知器完成后,会返回一个通知结果,通知结果是一个 Result 类型,包含通知的结果。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 use tokio::sync::Notify;use tokio::time::{sleep, Duration};#[tokio::main] async fn main () { let notify = Notify::new (); let notify_clone = notify.clone (); tokio::spawn (async move { sleep (Duration::from_secs (2 )).await ; notify_clone.notify (); }); notify.notified ().await ; println! ("Notified" ); }