rust基础知识

Basic Library

  • Cell

Cell提供了一种在不可变借用的情况下修改值的能力。这对于在借用检查器的规则下实现特定类型的内部可变性非常有用。

Cell包装了一个类型T的值,并提供了获取和设置这个值的方法,而不需要可变借用。这在编写没有运行时借用检查开销的代码时非常有用。然而,它只能用于实现Copy的类型,因为它提供了通过值而不是引用来获取和设置值的方法。

use std::cell::Cell;
 
struct Point<T> {
    x: Cell<T>,
    y: Cell<T>,
}
 
fn main() {
    let p = Point {
        x: Cell::new(1),
        y: Cell::new(1),
    };
    //获取x,y
    println!("x: {}, y: {}", p.x.get(), p.y.get());
    //修改x,y
    p.x.set(2);
    p.y.set(2);
    println!("x: {}, y: {}", p.x.get(), p.y.get());
}
  • RefCell

RefCell是Rust标准库中一个提供内部可变性的类型,与Cell不同,RefCell允许对存储在其中的值进行可变和不可变的借用,并在运行时而非编译时执行借用规则的检查。这使得你可以在不可变引用的情况下修改数据,但是你需要确保不会违反Rust的借用规则。

RefCell是Rust标准库中一个提供内部可变性的类型,与Cell不同,RefCell允许对存储在其中的值进行可变和不可变的借用,并在运行时而非编译时执行借用规则的检查。这使得你可以在不可变引用的情况下修改数据,但是你需要确保不会违反Rust的借用规则。

use std::cell::RefCell;
 
fn main() {
    let my_refcell = RefCell::new(String::from("Hello, Rust!"));
 
    // 借用RefCell中的值
    let borrowed_value = my_refcell.borrow();
    println!("Borrowed value: {}", borrowed_value);
 
    // 尝试进行另一个不可变借用(成功)
    let borrowed_value2 = my_refcell.borrow();
    println!("Second borrowed value: {}", borrowed_value2);
 
    // 尝试进行可变借用(这将导致运行时错误)
    // let mut borrowed_value_mut = my_refcell.borrow_mut();
    // println!("Mutable borrowed value: {}", borrowed_value_mut);
 
    // 当前的不可变借用超出作用域,现在我们可以进行可变借用
    drop(borrowed_value);
    drop(borrowed_value2);
    let mut borrowed_value_mut = my_refcell.borrow_mut();
    borrowed_value_mut.push_str(" Nice to meet you!");
    println!("Mutable borrowed value: {}", borrowed_value_mut);
}
 

因为RefMut<T>实现了DerefMut特征,这意味着你可以将RefMut<T>当作一个可变引用&mut T来使用。

  • Rc

Rc<T>是Rust标准库提供的一个引用计数指针类型。Rc代表”Reference Counted”,即”引用计数”。它用于在多处地方共享数据的所有权,但又不能使用普通的引用&T或可变引用&mut TRc<T>是非线程安全的,所以不能在多线程环境下使用。如果你需要在多线程环境下共享数据,可以使用Arc<T>,它是线程安全的引用计数类型。

use std::rc::Rc;
 
fn main() {
    let a = Rc::new(5);
    let b = Rc::clone(&a);
    let c = Rc::clone(&a);
 
    println!("a: {}, b: {}, c: {}", a, b, c);
    print!("Reference count: {}", Rc::strong_count(&a));
}
 
  • Arc

Arc<T>是Rust标准库提供的另一个引用计数指针类型,其全称为”Atomic Reference Counted”。和Rc<T>类似,Arc<T>提供了一种方式来在程序的多个部分之间共享数据的所有权,而不需要复制数据。不同之处在于,Arc<T>是线程安全的,这意味着你可以安全地将Arc<T>实例在多个线程之间共享。

use std::sync::{Arc, Mutex};
use std::thread;
 
fn main() {
    let counter = Arc::new(Mutex::new(0));
 
    let mut handles = vec![];
 
    for _ in 0..10 {
        let counter = Arc::clone(&counter);
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();
            *num += 1;
        });
        handles.push(handle);
    }
 
    for handle in handles {
        handle.join().unwrap();
    }
 
    println!("Result: {}", *counter.lock().unwrap());
}
 

Trait

多线程

线程屏障(Barrier)

在rust中,可以通过线程屏障让多个线程都执行到一个点后,再继续往下执行:

use std::{
    sync::{Arc, Barrier},
    thread,
};
 
fn main() {
    let mut handlers = Vec::with_capacity(4);
    let barrier = Arc::new(Barrier::new(4));
    for _ in 0..4 {
        let b = barrier.clone();
        handlers.push(thread::spawn(move || {
            println!("before wait");
            b.wait();
            println!("after wait");
        }));
    }
 
    for h in handlers {
        h.join().unwrap();
    }
}
//output:
before wait
before wait
before wait
before wait
after wait
after wait
after wait
after wait

线程局部变量

rust标准库对线程局部变量进行了支持。

use std::thread;
 
fn main() {
    thread_local! {
        static FOO: std::cell::RefCell<u32> = std::cell::RefCell::new(1);
    }
    //in main thread: init FOO = 1 and change FOO = 2
    FOO.with(|f| {
        assert!(*f.borrow() == 1);
        *f.borrow_mut() = 2;
    });
 
    //in child thread: init FOO = 1 and change FOO = 3
    let t = thread::spawn(move || {
        FOO.with(|f| {
            assert!(*f.borrow() == 1);
            *f.borrow_mut() = 3;
        })
    });
 
    //wait child thread
    t.join().unwrap();
 
    //in main thread: FOO = 2
    FOO.with(|f| {
        assert!(*f.borrow() == 2);
    })
}
  • 使用thread_local!这个micro创造线程局部的静态变量,这意味着每个线程都有一个独立的线程局部变量实例。
  • 这些变量不直接访问,而是通过with这个方法访问。并且这个方法接收一个闭包函数。
  • 最后,当主线程和子线程执行完毕后,FOO的值为2,是因为线程局部变量保证了每个线程看到的是自己的独立副本,因此主线程中的 FOO 不会受到新线程中操作的影响。

用条件变量控制线程的挂起和执行

use std::{
    sync::{Arc, Condvar, Mutex},
    thread,
};
 
fn main() {
    //元祖第一个值为互斥锁,第二个值为条件变量
    let pair = Arc::new((Mutex::new(false), Condvar::new()));
    let pair2 = pair.clone();
    thread::spawn(move || {
        let (lock, cvar) = &*pair2;
        //获取互斥锁
        let mut started = lock.lock().unwrap();
        println!("changing started");
        *started = true;
        cvar.notify_one();
    });
 
    let (lock, cvar) = &*pair;
    let mut started = lock.lock().unwrap();
    while !*started {
        println!("waiting");
        //等待条件变量
        started = cvar.wait(started).unwrap();
    }
    println!("started changed");
}
  • 这里解释一下Condvar这个条件变量的用法,他用来阻塞线程的执行直到某一个值为true。

只被调用一次的函数

有时,我们会需要某个函数在多线程环境下只被调用一次,例如初始化全局变量,无论是哪个线程先调用函数来初始化,都会保证全局变量只会被初始化一次,随后的其它线程调用就会忽略该函数:

use std::{sync::Once, thread};
 
static mut VAL: usize = 0;
static INIT: Once = Once::new();
fn main() {
    let mut handlers = Vec::with_capacity(10);
    for i in 1..=10 {
        handlers.push(thread::spawn(move || {
            INIT.call_once(|| unsafe {
                VAL = i;
            })
        }));
    }
    for handler in handlers {
        handler.join().unwrap();
    }
    println!("{}", unsafe { VAL });
}

在这段代码中,只有最先被调度的线程才会去执行INIT这个Once类型的函数。后续进来的线程会忽略这个函数。

线程同步:消息传递

发送者(sender)和接收者(reveiver)。

多发送者,单接收者

标准库提供了通道std::sync::mpsc,其中mpscmultiple producer, single consumer的缩写,代表了该通道支持多个发送者,但是只支持唯一的接收者。 当然,支持多个发送者也意味着支持单个发送者,我们先来看看单发送者、单接收者的简单例子:

use std::{sync::mpsc, thread};
 
fn main() {
    let (tx, rx) = mpsc::channel();
    thread::spawn(move || tx.send(32).unwrap());
    println!("receive {}", rx.recv().unwrap());
}
  • 这里主线程阻塞接收,等待子线程发送数据到channel中。

不阻塞的try_recv方法

use std::sync::mpsc;
use std::thread;
 
fn main() {
    let (tx, rx) = mpsc::channel();
 
    thread::spawn(move || {
        tx.send(1).unwrap();
    });
 
    println!("receive {:?}", rx.try_recv());
}
//receive Err(Empty)

使用for循环接收

如何连续接收通道中的值。

use std::sync::mpsc;
use std::thread;
use std::time::Duration;
 
fn main() {
    let (tx, rx) = mpsc::channel();
 
    thread::spawn(move || {
        let vals = vec![1, 2, 3, 4, 5, 6, 7, 8];
        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });
    for received in rx {
        println!("Got: {}", received);
    }
}

多发送者

use std::sync::mpsc;
use std::thread;
 
fn main() {
    let (tx, rx) = mpsc::channel();
    let tx1 = tx.clone();
    thread::spawn(move || {
        tx.send(String::from("hi from raw tx")).unwrap();
    });
 
    thread::spawn(move || {
        tx1.send(String::from("hi from cloned tx")).unwrap();
    });
 
    for received in rx {
        println!("Got: {}", received);
    }
}

同步和异步通道

mpsc实际上分为两种通道:同步通道和异步通道。

异步通道

我们在上面例子中使用的都是异步通道:无论接收者是否在接收,发送者的发送操作都不会被阻塞:

use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
    let (tx, rx)= mpsc::channel();
 
    let handle = thread::spawn(move || {
        println!("发送之前");
        tx.send(1).unwrap();
        println!("发送之后");
    });
 
    println!("睡眠之前");
    thread::sleep(Duration::from_secs(3));
    println!("睡眠之后");
 
    println!("receive {}", rx.recv().unwrap());
    handle.join().unwrap();
}

我们在这里让主线程睡眠了3s,这时候子线程一定完成了创建和向通道中发送数据的操作,可以看到子线程的发送操作并没有被阻塞。

同步通道
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
    let (tx, rx) = mpsc::sync_channel(0);
 
    let handle = thread::spawn(move || {
        println!("发送之前");
        tx.send(1).unwrap();
        println!("发送之后");
    });
 
    println!("睡眠之前");
    thread::sleep(Duration::from_secs(3));
    println!("睡眠之后");
 
    println!("receive {}", rx.recv().unwrap());
    handle.join().unwrap();
}

mpsc::sync_channel中的参数值是同步通道中的初始化容量大小,如果为1的话这时候的发送操作就不会被阻塞。

传输多种数据类型

通过枚举类型来实现在通道中传输多种数据类型:

use std::sync::mpsc::{self, Receiver, Sender};
 
enum Fruit {
    Apple(String),
    Banana(i32),
}
fn main() {
    let (tx, rx): (Sender<Fruit>, Receiver<Fruit>) = mpsc::channel();
    tx.send(Fruit::Apple("red".to_string())).unwrap();
    tx.send(Fruit::Banana(1)).unwrap();
 
    for _ in 0..2 {
        match rx.recv().unwrap() {
            Fruit::Apple(count) => println!("received {} apples", count),
            Fruit::Banana(flavor) => println!("received {} oranges", flavor),
        }
    }
}

有一点要注意的是,rust会按照枚举中最大的成员的类型进行内存对齐,这意味着就算你传输的是枚举中占用内存最小的成员,它占用的内存依然和最大的成员相同, 因此会造成内存上的浪费。

容易遇到的坑

use std::sync::mpsc;
fn main() {
 
    use std::thread;
 
    let (send, recv) = mpsc::channel();
    let num_threads = 3;
    for i in 0..num_threads {
        let thread_send = send.clone();
        thread::spawn(move || {
            thread_send.send(i).unwrap();
            println!("thread {:?} finished", i);
        });
    }
 
    // 在这里drop send...
 
    for x in recv {
        println!("Got: {}", x);
    }
    println!("finished iterating");
}

代码发生死锁,会永远阻塞在for x in recv这句代码上,因为send的所有clone在子线程结束后被自动drop掉了,但是send本身没有被drop,但是recv会一直阻塞接收通道中的消息(因为通道没有满足关闭的条件)。

线程同步、锁、Conv、信号量

当你需要同时访问一个资源、控制不同线程的执行次序时,都需要使用到同步性。

线程同步可以有多种方式来实现,上一章提到的消息传递就是其中的一种方式。

还可以通过共享内存来实现同步性,例如通过锁和原子操作等并发原语来实现多个线程同时且安全地去访问一个资源。

共享内存可以说是同步的灵魂,因为消息传递的底层实际上也是通过共享内存来实现,两者的区别如下:

  • 共享内存相对消息传递能节省多次内存拷贝的成本
  • 共享内存的实现简洁的多
  • 共享内存的锁竞争更多

消息传递适用的场景很多,我们下面列出了几个主要的使用场景:

  • 需要可靠和简单的(简单不等于简洁)实现时
  • 需要模拟现实世界,例如用消息去通知某个目标执行相应的操作时
  • 需要一个任务处理流水线(管道)时,等等

而使用共享内存(并发原语)的场景往往就比较简单粗暴:需要简洁的实现以及更高的性能时。

互斥锁Mutex

既然是共享内存,那并发原语自然是重中之重,先来一起看看皇冠上的明珠: 互斥锁Mutex

Mutex让多个线程并发的访问同一个值变成了排队访问:同一时间,只允许一个线程A访问该值,其它线程需要等待A访问完成后才能继续。

单线程中使用Mutex
use std::sync::Mutex;
 
fn main() {
    // 使用`Mutex`结构体的关联函数创建新的互斥锁实例
    let m = Mutex::new(5);
 
    {
        // 获取锁,然后deref为`m`的引用
        // lock返回的是Result
        let mut num = m.lock().unwrap();
        *num = 6;
        // 在这里锁被释放掉
    }
 
    println!("m = {:?}", m);
}

这里你可能奇怪,m.lock明明返回一个锁,怎么就变成我们的num数值了?聪明的读者可能会想到智能指针,没错,因为Mutex<T>是一个智能指针,准确的说是m.lock()返回一个智能指针MutexGuard<T>:

  • 它实现了Deref特征,会被自动解引用后获得一个引用类型,该引用指向Mutex内部的数据
  • 它还实现了Drop特征,在超出作用域后,自动释放锁,以便其它线程能继续获取锁
多线程中使用Mutex
  • 无法运行的Rc
use std::rc::Rc;
use std::sync::Mutex;
use std::thread;
 
fn main() {
    // 通过`Rc`实现`Mutex`的多所有权
    let counter = Rc::new(Mutex::new(0));
    let mut handles = vec![];
 
    for _ in 0..10 {
        let counter = Rc::clone(&counter);
        // 创建子线程,并将`Mutex`的所有权拷贝传入到子线程中
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();
 
            *num += 1;
        });
        handles.push(handle);
    }
 
    // 等待所有子线程完成
    for handle in handles {
        handle.join().unwrap();
    }
 
    // 输出最终的计数结果
    println!("Result: {}", *counter.lock().unwrap());
}

以上的代码会报错:

error[E0277]: `Rc<Mutex<i32>>` cannot be sent between threads safely
                // `Rc`无法在线程中安全的传输
   --> src/main.rs:11:22
    |
13  |           let handle = thread::spawn(move || {
    |  ______________________^^^^^^^^^^^^^_-
    | |                      |
    | |                      `Rc<Mutex<i32>>` cannot be sent between threads safely
14  | |             let mut num = counter.lock().unwrap();
15  | |
16  | |             *num += 1;
17  | |         });
    | |_________- within this `[closure@src/main.rs:11:36: 15:10]`
    |
    = help: within `[closure@src/main.rs:11:36: 15:10]`, the trait `Send` is not implemented for `Rc<Mutex<i32>>`
     // `Rc`没有实现`Send`特征
    = note: required because it appears within the type `[closure@src/main.rs:11:36: 15:10]`
  • 使用Arc改造上面的代码
use std::sync::{Arc, Mutex};
use std::thread;
 
fn main() {
    // 通过`Rc`实现`Mutex`的多所有权
    let counter = Arc::new(Mutex::new(0));
    let mut handles = vec![];
 
    for _ in 0..10 {
        let counter = Arc::clone(&counter);
        // 创建子线程,并将`Mutex`的所有权拷贝传入到子线程中
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();
 
            *num += 1;
        });
        handles.push(handle);
    }
 
    // 等待所有子线程完成
    for handle in handles {
        handle.join().unwrap();
    }
 
    // 输出最终的计数结果
    println!("Result: {}", *counter.lock().unwrap());
}

改造方法很简单,将Rc换成Arc即可。关于两者的区别,在上面已经介绍过了。

死锁

现在来了解一下关于创建死锁的方式。

单线程死锁
use std::sync::Mutex;
 
fn main() {
    let data = Mutex::new(0);
    let d1 = data.lock();
    let d2 = data.lock();
} // d1锁在此处释放

很简单的场景,当一个锁还没释放的时候去申请加锁操作会造成死锁的产生。但是当这种情况在代码量多的时候会不那么显眼。

多线程死锁

当我们拥有两个锁,两个线程在持有各自锁的同时尝试去获取另外一把锁的时候就会产生死锁:

use std::{
    sync::{Mutex, MutexGuard},
    thread,
};
 
use std::time::Duration;
 
use lazy_static::lazy_static;
lazy_static! {
    static ref MUTEX1: Mutex<i64> = Mutex::new(0);
    static ref MUTEX2: Mutex<i64> = Mutex::new(0);
}
 
fn main() {
    // 存放子线程的句柄
    let mut children = vec![];
    for i_thread in 0..2 {
        children.push(thread::spawn(move || {
            // 线程1
            if i_thread % 2 == 0 {
                // 锁住MUTEX1
                let guard: MutexGuard<i64> = MUTEX1.lock().unwrap();
                println!("线程 {} 锁住了MUTEX1,接着准备去锁MUTEX2 !", i_thread);
 
                // 当前线程睡眠一小会儿,等待线程2锁住MUTEX2
                thread::sleep(Duration::from_millis(10));
                // 去锁MUTEX2
                let guard = MUTEX2.lock().unwrap();
            // 线程2
            } else {
                let _guard = MUTEX2.lock().unwrap();
 
                println!("线程 {} 锁住了MUTEX2, 准备去锁MUTEX1", i_thread);
 
                let _guard = MUTEX1.lock().unwrap();
            }
        }));
    }
 
    // 等子线程完成
    for child in children {
        let _ = child.join();
    }
 
    println!("死锁没有发生");
}

以上这段代码会产生死锁。

try_lock

和上面不同,try_lock会尝试去获取锁,当获取不到的时候会抛出一个错误而不是产生死锁。

改造上面多线程死锁的例子:

	use std::{
    sync::{Mutex, MutexGuard},
    thread,
};
 
use std::time::Duration;
 
use lazy_static::lazy_static;
lazy_static! {
    static ref MUTEX1: Mutex<i64> = Mutex::new(0);
    static ref MUTEX2: Mutex<i64> = Mutex::new(0);
}
 
fn main() {
    // 存放子线程的句柄
    let mut children = vec![];
    for i_thread in 0..2 {
        children.push(thread::spawn(move || {
            // 线程1
            if i_thread % 2 == 0 {
                // 锁住MUTEX1
                let _guard: MutexGuard<i64> = MUTEX1.lock().unwrap();
 
                println!("线程 {} 锁住了MUTEX1,接着准备去锁MUTEX2 !", i_thread);
 
                // 当前线程睡眠一小会儿,等待线程2锁住MUTEX2
                thread::sleep(Duration::from_millis(10));
 
                // 去锁MUTEX2
                let guard = MUTEX2.try_lock();
                println!("线程 {} 获取 MUTEX2 锁的结果: {:?}", i_thread, guard);
            // 线程2
            } else {
                let _guard = MUTEX2.lock().unwrap();
 
                println!("线程 {} 锁住了MUTEX2, 准备去锁MUTEX1", i_thread);
 
                let guard = MUTEX1.try_lock();
                println!("线程 {} 获取 MUTEX1 锁的结果: {:?}", i_thread, guard);
            }
        }));
    }
 
    // 等子线程完成
    for child in children {
        let _ = child.join();
    }
 
    println!("死锁没有发生");
}

输出结果如下:

线程 0 锁住了MUTEX1,接着准备去锁MUTEX2 !
线程 1 锁住了MUTEX2, 准备去锁MUTEX1
线程 1 获取 MUTEX1 锁的结果: Err("WouldBlock")
线程 0 获取 MUTEX2 锁的结果: Ok(0)
死锁没有发生
 
#或者
线程 1 锁住了MUTEX2, 准备去锁MUTEX1
线程 1 获取 MUTEX1 锁的结果: Ok(0)
线程 0 锁住了MUTEX1,接着准备去锁MUTEX2 !
线程 0 获取 MUTEX2 锁的结果: Ok(0)
死锁没有发生

根据结果推断加锁的过程并不难。

读写锁RwLock

Mutex对读写操作都会进行加锁,当遇到大量的读的情况的时候,十分低效。

use std::sync::RwLock;
 
fn main() {
    let lock = RwLock::new(4);
    {
        let a = lock.read().unwrap();
        let b = lock.read().unwrap();
        assert!(*a == 4);
        assert!(*b == 4);
    }
    //read lock drop here
 
    {
        let mut c = lock.write().unwrap();
        *c += 1;
        assert!(*c == 5);
    }
    //write lock drop here
    //we can't have read lock and write lock at the same time
}

Mutex vs RwLock

转载:rust圣经

首先简单性上Mutex完胜,因为使用RwLock你得操心几个问题:

  • 读和写不能同时发生,如果使用try_xxx解决,就必须做大量的错误处理和失败重试机制
  • 当读多写少时,写操作可能会因为一直无法获得锁导致连续多次失败(writer starvation)
  • RwLock 其实是操作系统提供的,实现原理要比Mutex复杂的多,因此单就锁的性能而言,比不上原生实现的Mutex

再来简单总结下两者的使用场景:

  • 追求高并发读取时,使用RwLock,因为Mutex一次只允许一个线程去读取
  • 如果要保证写操作的成功性,使用Mutex
  • 不知道哪个合适,统一使用Mutex

需要注意的是,RwLock虽然看上去貌似提供了高并发读取的能力,但这个不能说明它的性能比Mutex高,事实上Mutex性能要好不少,后者唯一的问题也仅仅在于不能并发读取

RwLock真正发挥作用的场景是当多个线程需要对数据进行长时间的读取操作。例如,如果你在读取数据后进行复杂的计算或其他长时间操作,那么RwLock就很有用,因为它允许多个线程同时进行这些操作。但是,对于HashMap,通常读取操作很快,没有“长时间”的操作。

使用条件变量控制线程的同步

加锁可以解决资源访问的安全性问题,那么如何解决资源访问的顺序问题呢?答案就是条件变量Condvar,他经常和Mutex一起使用。

	use std::sync::{Arc,Mutex,Condvar};
use std::thread::{spawn,sleep};
use std::time::Duration;
 
fn main() {
    let flag = Arc::new(Mutex::new(false));
    let cond = Arc::new(Condvar::new());
    let cflag = flag.clone();
    let ccond = cond.clone();
 
    let hdl = spawn(move || {
        let mut lock = cflag.lock().unwrap();
        let mut counter = 0;
 
        while counter < 3 {
            while !*lock {
                // wait方法会接收一个MutexGuard<'a, T>,且它会自动地暂时释放这个锁,使其他线程可以拿到锁并进行数据更新。
                // 同时当前线程在此处会被阻塞,直到被其他地方notify后,它会将原本的MutexGuard<'a, T>还给我们,即重新获取到了锁,同时唤醒了此线程。
                lock = ccond.wait(lock).unwrap();
            }
            
            *lock = false;
 
            counter += 1;
            println!("inner counter: {}", counter);
        }
    });
 
    let mut counter = 0;
    loop {
        sleep(Duration::from_millis(1000));
        *flag.lock().unwrap() = true;
        counter += 1;
        if counter > 3 {
            break;
        }
        println!("outside counter: {}", counter);
        cond.notify_one();
    }
    hdl.join().unwrap();
    println!("{:?}", flag);
}

信号量Semaphore

在多线程中,另一个重要的概念就是信号量,使用它可以让我们精准的控制当前正在运行的任务最大数量。想象一下,当一个新游戏刚开服时(有些较火的老游戏也会,比如wow),往往会控制游戏内玩家的同时在线数,一旦超过某个临界值,就开始进行排队进服。而在实际使用中,也有很多时候,我们需要通过信号量来控制最大并发数,防止服务器资源被撑爆。

我们使用tokio库中实现的Semaphore

use std::sync::Arc;
use tokio::sync::Semaphore;
 
#[tokio::main]
async fn main() {
    let semaphore = Arc::new(Semaphore::new(3));
    let mut join_handles = Vec::new();
 
    for _ in 0..5 {
        let permit = semaphore.clone().acquire_owned().await.unwrap();
        join_handles.push(tokio::spawn(async move {
            tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
            //do something
            println!("do something");
            drop(permit);
        }));
    }
 
    for handle in join_handles {
        handle.await.unwrap();
    }
}

可以很明显的看到首先输出了三次,还有两次输出明显延迟。

Atomic原子类型

从 Rust1.34 版本后,就正式支持原子类型。原子指的是一系列不可被 CPU 上下文交换的机器指令,这些指令组合在一起就形成了原子操作。在多核 CPU 下,当某个 CPU 核心开始运行原子操作时,会先暂停其它 CPU 内核对内存的操作,以保证原子操作不会被其它 CPU 内核所干扰。

由于原子操作是通过指令提供的支持,因此它的性能相比锁和消息传递会好很多。相比较于锁而言,原子类型不需要开发者处理加锁和释放锁的问题,同时支持修改,读取等操作,还具备较高的并发性能,几乎所有的语言都支持原子类型。

可以看出原子类型是无锁类型,但是无锁不代表无需等待,因为原子类型内部使用了CAS循环,当大量的冲突发生时,该等待还是得等待!但是总归比锁要好。

CAS 全称是 Compare and swap, 它通过一条指令读取指定的内存地址,然后判断其中的值是否等于给定的前置值,如果相等,则将其修改为新的值

使用Atomic作为全局变量

use std::{
    ops::Sub,
    sync::atomic::{AtomicU64, Ordering},
    thread::{self, JoinHandle},
    time::Instant,
};
 
const N_TIMES: u64 = 10000000;
const N_THREADS: usize = 10;
static R: AtomicU64 = AtomicU64::new(0);
 
fn add_n_times(n: u64) -> JoinHandle<()> {
    thread::spawn(move || {
        for _ in 0..n {
            R.fetch_add(1, Ordering::Relaxed);
        }
    })
}
fn main() {
    //use instant to measure time
    let instant = Instant::now();
    let mut threads = Vec::with_capacity(N_THREADS);
 
    for _ in 0..N_THREADS {
        threads.push(add_n_times(N_TIMES));
    }
 
    for thread in threads {
        thread.join().unwrap();
    }
 
    assert_eq!(R.load(Ordering::Relaxed), N_TIMES * N_THREADS as u64);
    println!("{:?}", Instant::now().sub(instant));
}

以上代码启动了数个线程,每个线程都在疯狂对全局变量进行加 1 操作, 最后将它与线程数 * 加1次数进行比较,如果发生了因为多个线程同时修改导致了脏数据,那么这两个必将不相等。好在,它没有让我们失望,不仅快速的完成了任务,而且保证了 100%的并发安全性。

电脑配置:mac m1 16g:用时:7.435661084s(平均也是7s左右)

Ordering 是 Rust 中提供的一个枚举,用于描述原子操作的内存顺序约束。原子操作通常与并发编程有关,特别是在多线程环境中。

当我们有多个线程并行执行,并且它们的操作可能会影响到其他线程的行为时,我们需要更细粒度的控制来描述这些操作的顺序。这就是 Ordering 发挥作用的地方。

Rust 中的 Ordering 有以下几个变种:

  1. Relaxed: 不提供任何顺序保证。
  2. Release: 用于释放操作,确保在此之前的所有操作不会被重新排序到这之后。
  3. Acquire: 用于获取操作,确保在此之后的所有操作不会被重新排序到这之前。
  4. AcqRel: 同时具有 AcquireRelease 的属性。
  5. SeqCst: 提供严格的顺序一致性。

总结:Ordering用于控制原子操作的内存顺序

内存顺序

转自:rust圣经

内存顺序是指 CPU 在访问内存时的顺序,该顺序可能受以下因素的影响:

  • 代码中的先后顺序
  • 编译器优化导致在编译阶段发生改变(内存重排序 reordering)
  • 运行阶段因 CPU 的缓存机制导致顺序被打乱

原因来自多个方面,编写时、编译时、运行时。

编译阶段改变内存顺序:

对于第二点,我们举个例子:

static mut X: u64 = 0;
static mut Y: u64 = 1;
 
fn main() {
    ...     // A
 
    unsafe {
        ... // B
        X = 1;
        ... // C
        Y = 3;
        ... // D
        X = 2;
        ... // E
    }
}

假如在CD代码片段中,根本没有用到X = 1,那么编译器很可能会将X = 1X = 2进行合并:

 ...     // A
 
unsafe {
    ... // B
    X = 2;
    ... // C
    Y = 3;
    ... // D
    ... // E
}

若代码A中创建了一个新的线程用于读取全局静态变量X,则该线程将无法读取到X = 1的结果,因为在编译阶段就已经被优化掉。

cpu缓存导致内存顺序改变

假设之前的X = 1没有被优化掉,并且在代码片段A中有一个新的线程:

initial state: X = 0, Y = 1
 
THREAD Main     THREAD A
X = 1;          if X == 1 {
Y = 3;              Y *= 2;
X = 2;          }

我们来讨论下以上线程状态,Y最终的可能值(可能性依次降低):

  • Y = 3: 线程Main运行完后才运行线程A,或者线程A运行完后再运行线程Main
  • Y = 6: 线程MainY = 3运行完,但X = 2还没被运行, 此时线程 A 开始运行Y *= 2, 最后才运行Main线程的X = 2
  • Y = 2: 线程Main正在运行Y = 3还没结束,此时线程A正在运行Y *= 2, 因此Y取到了值 1,然后Main的线程将Y设置为 3, 紧接着就被线程AY = 2所覆盖
  • Y = 2: 上面的还只是一般的数据竞争,这里虽然产生了相同的结果2,但是背后的原理大相径庭: 线程Main运行完Y = 3,但是 CPU 缓存中的Y = 3还没有被同步到其它 CPU 缓存中,此时线程A中的Y *= 2就开始读取Y,结果读到了值1,最终计算出结果2

甚至更改成:

initial state: X = 0, Y = 1
 
THREAD Main     THREAD A
X = 1;          if X == 2 {
Y = 3;              Y *= 2;
X = 2;          }

还是可能出现Y = 2,因为Main线程中的XY被同步到其它 CPU 缓存中的顺序未必一致。

限定内存顺序的五个规则

  • Relaxed, 这是最宽松的规则,它对编译器和 CPU 不做任何限制,可以乱序
  • Release 释放,设定内存屏障(Memory barrier),保证它之前的操作永远在它之前,但是它后面的操作可能被重排到它前面
  • Acquire 获取, 设定内存屏障,保证在它之后的访问永远在它之后,但是它之前的操作却有可能被重排到它后面,往往和Release在不同线程中联合使用
  • AcqRel, 是 AcquireRelease 的结合,同时拥有它们俩提供的保证。比如你要对一个 atomic 自增 1,同时希望该操作之前和之后的读取或写入操作不会被重新排序
  • SeqCst 顺序一致性SeqCst就像是AcqRel的加强版,它不管原子操作是属于读取还是写入的操作,只要某个线程有用到SeqCst的原子操作,线程中该SeqCst操作前的数据操作绝对不会被重新排在该SeqCst操作之后,且该SeqCst操作后的数据操作也绝对不会被重新排在SeqCst操作前。

从上到下越来越严格,良好的安全性是用牺牲性能为代价的。

原则上,Acquire用于读取,而Release用于写入。但是由于有些原子操作同时拥有读取和写入的功能,此时就需要使用AcqRel来设置内存顺序了。在内存屏障中被写入的数据,都可以被其它线程读取到,不会有 CPU 缓存的问题。

内存顺序的选择

  1. 不知道怎么选择时,优先使用SeqCst,虽然会稍微减慢速度,但是慢一点也比出现错误好
  2. 多线程只计数fetch_add而不使用该值触发其他逻辑分支的简单使用场景,可以使用Relaxed 参考 Which std::sync::atomic::Ordering to use?

多线程中使用Atomic

在多线程环境下使用Atomic要配合Arc:

use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::{hint, thread};
 
fn main() {
    let spinlock = Arc::new(AtomicUsize::new(1));
 
    let spinlock_clone = Arc::clone(&spinlock);
    let thread = thread::spawn(move || {
        spinlock_clone.store(0, Ordering::SeqCst);
    });
 
    // 等待其它线程释放锁
    while spinlock.load(Ordering::SeqCst) != 0 {
        println!("Waiting for lock to be released");
        hint::spin_loop();
    }
 
    if let Err(panic) = thread.join() {
        println!("Thread had an error: {:?}", panic);
    }
}
 

hint::spin_loop 是一个用于通知 CPU 该线程正在进行忙等待(busy-waiting)的函数。它允许处理器在等待其他线程释放锁时执行更有效的能量管理。

在这个例子中,hint::spin_loop 用于告诉 CPU 当前线程正在等待锁被释放。这样,CPU 可以在等待期间降低功耗,提高能效。当锁被其他线程释放时,该线程将立即恢复执行。

基于Send后Sync的线程安全

为什么Rc,RefCell裸指针无法在多线程之间使用。

在多线程中使用Rc

use std::{rc::Rc, thread};
 
fn main() {
    let r = Rc::new(1);
    let t = thread::spawn(move || {
        println!("r: {}", r);
    });
    t.join().unwrap();
}

这段代码看似没有问题,实际上在编译的时候就会报错:

`Rc<i32>` cannot be sent between threads safely
within `[closure@src/main.rs:5:27: 5:34]`, the trait `Send` is not implemented for `Rc<i32>`

他们说的是Rc没有实现Send这个特征,看起来这个特征是在多线程之间传递值的关键。

从源码上分析

我们知道Arc是可以安全地在多线程的环境下传递的,因此,对比一下RcArc的源码应还可以找到问题的关键所在:

// Rc源码片段
impl<T: ?Sized> !marker::Send for Rc<T> {}
impl<T: ?Sized> !marker::Sync for Rc<T> {}
 
// Arc源码片段
unsafe impl<T: ?Sized + Sync + Send> Send for Arc<T> {}
unsafe impl<T: ?Sized + Sync + Send> Sync for Arc<T> {}

!代表移除特征的相应实现,上面代码中Rc<T>SendSync特征被特地移除了实现,而Arc<T>则相反,实现了Sync + Send,再结合之前的编译器报错,大概可以明白了:SendSync是在线程间安全使用一个值的关键。

Send & Sync

SendSync是 Rust 安全并发的重中之重,但是实际上它们只是标记特征(marker trait,该特征未定义任何行为,因此非常适合用于标记), 来看看它们的作用:

  • 实现Send的类型可以在线程间安全的传递其所有权
  • 实现Sync的类型可以在线程间安全的共享(通过引用)

这里还有一个潜在的依赖:一个类型要在线程间安全的共享的前提是,指向它的引用必须能在线程间传递。因为如果引用都不能被传递,我们就无法在多个线程间使用引用去访问同一个数据了。

由上可知,若类型 T 的引用&TSend,则TSync

手动实现 SendSync 是不安全的,通常并不需要手动实现 Send 和 Sync trait,实现者需要使用unsafe小心维护并发安全保证。

为裸指针实现Send

裸指针没有实现Send,因此下面的代码会报错:

use std::thread;
 
fn main() {
    let a = 5 as *mut u8;
    let t = thread::spawn(move || {
        println!("a: {:?}", a);
    });
    t.join().unwrap();
}

现在有一个问题,我们无法直接对裸指针实现对应的特征,但是我们可以使用元祖结构体:还记得之前的规则吗:复合类型中有一个成员没实现Send,该复合类型就不是Send,因此我们需要手动为它实现:

use std::thread;
 
#[derive(Debug)]
struct Mybox(*mut u8);
unsafe impl Send for Mybox {}
 
fn main() {
    let p = Mybox(5 as *mut u8);
 
    let t = thread::spawn(move || {
        println!("{:?}", p);
    });
    t.join().unwrap();
}

这样我们就可以在线程之间传递裸指针了。

但是注意:SendSync 的实现都是不安全的,因此实现的时候要使用unsafe进行包裹起来。

为裸指针实现Sync

由于Sync是多线程间共享一个值,大家可能会想这么实现:

use std::thread;
fn main() {
    let v = 5;
    let t = thread::spawn(|| {
        println!("{:?}",&v);
    });
 
    t.join().unwrap();
}

关于这种用法,在多线程章节也提到过,线程如果直接去借用其它线程的变量,会报错:closure may outlive the current function,, 原因在于编译器无法确定主线程main和子线程t谁的生命周期更长,特别是当两个线程都是子线程时,没有任何人知道哪个子线程会先结束,包括编译器!

因此我们得配合Arc去使用:

use std::thread;
use std::sync::Arc;
use std::sync::Mutex;
 
#[derive(Debug)]
struct MyBox(*const u8);
unsafe impl Send for MyBox {}
 
fn main() {
    let b = &MyBox(5 as *const u8);
    let v = Arc::new(Mutex::new(b));
    let t = thread::spawn(move || {
        let _v1 =  v.lock().unwrap();
    });
 
    t.join().unwrap();
}

上面代码将智能指针v的所有权转移给新线程,同时v包含了一个引用类型b,当在新的线程中试图获取内部的引用时,会报错:

error[E0277]: `*const u8` cannot be shared between threads safely
--> src/main.rs:25:13
|
25  |     let t = thread::spawn(move || {
|             ^^^^^^^^^^^^^ `*const u8` cannot be shared between threads safely
|
= help: within `MyBox`, the trait `Sync` is not implemented for `*const u8`

因为我们访问的引用实际上还是对主线程中的数据的借用,转移进来的仅仅是外层的智能指针引用。要解决很简单,为MyBox实现Sync:

unsafe impl Sync for MyBox {}