Skip to content

进程线程

在大部分现代操作系统中, 已执行程序的代码在一个 进程(process)中运行, 操作系统则负责管理多个进程, 在程序内部, 也可以拥有多个同时运行的独立部分, 运行这些独立部分的功能被称为 线程(threads)

可以将操作系统看做一个超级大工厂, 每一个进程就是一个独立的生产流水线, 每一个线程就是生产流水线上的工人

如果一个进程中只有一个线程, 那么就是单线程, 如果有多个就是 多线程

多进程模块了解

rust
use std::process::Command;

fn convert_vec_u8_to_string(v: Vec<u8>) -> String {
    String::from_utf8(v).expect("not utf8")
}

fn main() {
    let out = Command::new("ls")
        .arg("-l")
        .arg("-a")
        .output()
        .expect("ls command failed to start");

    let exec_res = if out.status.success() {
        convert_vec_u8_to_string(out.stdout)
    } else {
        convert_vec_u8_to_string(out.stderr)
    };

    println!("{}", exec_res);
}

多线程

创建线程

rust
use std::thread;
use std::time::Duration;

fn set_timeout(wait: u64) {
    thread::sleep(Duration::from_millis(wait));
}

fn main() {            // <-主线程
    thread::spawn(|| { // <-子线程
        for i in 0..10 {
            println!("spawn {}:", i);
            set_timeout(100);
        }
    });

    for i in 0..5 {
        println!("main {}:", i);
        set_timeout(100);
    }
}

这个代码由于主线程比子线程会先执行完, 子线程并不会阻塞主线程, 就会导致子线程还未执行完, 主线程就已经退出了

等待线程结束

thread::spawn 方法 返回一个 线程句柄 JoinHandle 用于等待线程结束

rust
use std::thread;
use std::time::Duration;

fn set_timeout(wait: u64) {
    thread::sleep(Duration::from_millis(wait));
}

fn main() {
    let handle = thread::spawn(|| {
        for i in 0..10 {
            println!("spawn {}:", i);
            set_timeout(100);
        }
    });

    for i in 0..5 {
        println!("main {}:", i);
        set_timeout(100);
    }

    // join 方法会阻塞主线程, 直到子线程执行结束
    handle.join().unwrap();
}

线程闭包与move关键字

rust
use std::thread;

fn main() {
    let items = vec![1, 3, 5];

    // 此时编译器会报错: 闭包的生命周期可能比 items 变量的声明周期要长,
    // 所以, 不能够在闭包中借用一个生命周期比闭包要短的变量
    // closure may outlive the current function,
    // but it borrows `items`, which is owned by the current function

    // 由于 items 的所有权在主线程这里, 所以子线程(闭包)只是借用
    // 但是: 主线程会比闭包先执行完, items也会随着主线程执行完而销毁
    // 这时再去借用一个被销毁的变量, 那么就会报错
    // 如果要解决这个问题:
    // 只需要让 闭包获取到这个 items 变量的所有权, 即可解决, 这样的话
    // 不管主线程什么时候执行完, 都不会影响 items 变量的销毁
    let handle = thread::spawn(|| {
        let mut i = 0;
        items.iter().for_each(|item| {
            println!("items[{}]: {}", i, item);
            i += 1;
        });
    });

    handle.join().unwrap();
}
rust
use std::thread;

fn main() {
    let items = vec![1, 3, 5];

    // 1. 使用 move 关键字可以让变量的所有权移交给闭包
    // 2. 当主线程执行完, items 变量不会随着销毁
    // 3. 只有当子线程(闭包)执行完之后, items 才会随之销毁
    let handle = thread::spawn(move || {
        let mut i = 0;
        items.iter().for_each(|item| {
            println!("items[{}]: {}", i, item);
            i += 1;
        });
    });

    handle.join().unwrap();
}

消息传递共享线程的数据(推荐)

Do not communicate by sharing memory; instead, share memory by communicating.

这句话来自Go语言的文档, 意思是: 不要通过共享内存来通讯, 而是通过通讯来共享内存

这是一种确保安全(不会有数据竞争/脏数据等问题)的并发编程方式

Rust 中试下消息传递并发的工具是 通道 (channel), 标准库提供了实现

消息传递共享数据

rust
use std::thread;
use std::sync::mpsc;
use std::time::Duration;

fn main() {
    // 一个通道由2个部分组成: 发送端 和 接收端
    let (sender, receiver) = mpsc::channel();

    // 注意需要将 sender 的所有权移交给子线程
    thread::spawn(move || {
        // 子线程发送数据(发送端)
        let data = String::from("hello");

        // 延迟 1s 后发送
        thread::sleep(Duration::from_secs(1));
        sender.send(data).unwrap();
    });

    // 主线程接收数据(接收端)
    // recv 方法会阻塞线程, 等到接收到发送端的数据以后再往后执行
    let data = receiver.recv().unwrap();
    println!("receive data: {}", data);
}

多次发送数据

上面代码可以看出, 接收端会阻塞线程, 那如果发送多次数据, 要如何操作呢?

rust
use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (sender, receiver) = mpsc::channel();

    thread::spawn(move || {
        let data_items = vec![
           String::from("hello"),
           String::from("rust"),
           String::from("world"),
        ];

        for item in data_items {
            sender.send(item).unwrap();
            // 每次发送都停顿1s
            thread::sleep(Duration::from_secs(1));
        }
    });

    // 可以手动的调用 recv 方法, 但是弊端就是不知道后续
    // 是否还有数据, 应该调用几次 recv
    // let r1 = receiver.recv().unwrap();
    // println!("{}", r1);
    // let r2 = receiver.recv().unwrap();
    // println!("{}", r2);

    // 这个 receiver 其实实现了迭代器特性, 所以
    // 可以直接将 receiver 当作一个迭代器来使用
    for recv_item in receiver {
        println!("{}", recv_item);
    }
}

通过克隆发送者来创建多个生产者

mpsc 其实是 mutiple producer, single consumer 的缩写

发送端(sender) 实现了 Clone 特性, 所以可以通过 clone 方法来创建多个发送端

rust
use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (sender, receiver) = mpsc::channel();
    let sender2 = sender.clone();

    thread::spawn(move || {
        let data_items = vec![
           String::from("hello"),
           String::from("rust"),
           String::from("world"),
        ];

        for item in data_items {
            sender.send(item).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    thread::spawn(move || {
        let data_items = vec![
           String::from("from sender2 data: hello"),
           String::from("from sender2 data: rust"),
           String::from("from sender2 data: world"),
        ];

        for item in data_items {
            sender2.send(item).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    // 会同时接收 sender 和 sender2 发送过来的数据
    for recv_item in receiver {
        println!("{}", recv_item);
    }
}

共享内存共享线程的数据

为了避免数据竞争等问题, 在多个线程共享数据时, 可以考虑使用互斥器

互斥器一次只允许一个线程访问数据

  1. 在使用数据之前, 应该先获得(数据)锁
  2. 处理完被互斥器所保护的数据之后会自动解开锁, 然后其他线程才能获取这个数据的锁

什么是锁?

锁是一种机制, 用于确保在多线程编程时 同一时刻只有一个线程可以访问某个数据

什么是互斥器?

互斥器就是 这种机制的具体实现, 它可以确保在多线程编程时 同一时刻只有一个线程可以访问 要想访问某个被互斥器保护的数据, 必须先获得这个数据的互斥器锁, 只有获得这个锁之后才能访问被保护的数据

rust
use std::sync::Mutex;

fn main() {
    let m = Mutex::new(11);

    // 直接获取数据时不行的, 这个字段是私有的
    // println!("m: {:?}", m.data);

    // 直接解引用也是不行的
    // println!("m: {:?}", *m);

    // 必须先获取锁, 然后才能访问到被锁保护的数据
    // 被保护的数据是一个 Result 枚举, 所以需要 unwrap
    // 用互斥器锁就可以避免多线程的数据竞争问题
    let protected_data = m.lock().unwrap();
    println!("m_data: {:?}", protected_data);
}

多线程和多所有权

这一节主要研究 RcArc 的用法和区别

rust
use std::sync::Mutex;
use std::thread;

fn main() {
    let counter = Mutex::new(10);
    let mut handles = vec![];

    for _ in 0..10 {
        // 编译器报错: value moved into closure here, in previous iteration of loop
        // 意思是, 第一次循环的时候, counter 的所有权就被移交到第一个子线程了,
        // 所以后续的无法获取到 counter 的所有权, 在 Rust 中默认是单所有权的,
        // 所以, 第一个子线程获取到所有权之后, 别的子线程就无法获取到所有权了
        // 1. 要解决这个问题就必须: 让一个数据有多个所有权
        // 2. 此时你可能会想到用引用计数智能指针Rc<T>, 但引用计数只能用于单线程
        // 3. 所以需要多线程版本的引用计数智能指针 Arc<T>, 这个智能指针可以看做
        //    Rc<T> 的多线程版本, 他们的用法基本一致, API设计的也很像
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();
            *num += 1;
            println!("thread-num: {}", *num);
        });
        handles.push(handle);
    }

    // 阻塞主线程, 等待所有子线程结束
    for handle in handles {
        handle.join().unwrap();
    }
}
rust
use std::sync::Mutex;
use std::thread;
use std::rc::Rc;

fn main() {
    let counter = Rc::new(Mutex::new(10));
    let mut handles = vec![];

    for _ in 0..10 {
        let counter = Rc::clone(&counter);
        let handle = thread::spawn(move || {
            // 编译器报错: `Rc<Mutex<i32>>` cannot be sent between threads safely
            // 翻译为中文就是: Rc<Mutex> 无法安全的在不同的线程之间发送(数据)
            // 虽然不能解决问题, 但是也可以看到错误, 避免后续遇到同样错误不知道如何解决
            let mut data = counter.lock().unwrap();
            *data += 1;
            println!("thread-num: {}", *data);
        });
        handles.push(handle);
    }


    // 阻塞主线程, 等待所有子线程结束
    for handle in handles {
        handle.join().unwrap();
    }
}
rust
use std::sync::Mutex;
use std::thread;
use std::sync::Arc;

fn main() {
    let counter = Arc::new(Mutex::new(10));
    let mut handles = vec![];

    for _ in 0..10 {
        let counter = Arc::clone(&counter);
        let handle = thread::spawn(move || {
            // unbelievable!!! 代码终于通过编译了
            // 既然Rust是内存安全的语言, 为什么还区分 Rc 和 Arc, 都使用 Arc 不就好了吗?
            // 虽然 Arc 可以让数据在多线程的情况下有多所有权, 但是也需要做更多的事情
            // 性能肯定是不如 Rc 的, 所以在单线程的情况下, 还是需要用性能更好的 Rc
            let mut data = counter.lock().unwrap();
            *data += 1;
            println!("thread-num: {}", *data);
        });
        handles.push(handle);
    }


    // 阻塞主线程, 等待所有子线程结束
    for handle in handles {
        handle.join().unwrap();
    }
}

Refcell 和 Mutex 与内部可变性

由上面的代码可以看出, counter 是一个由互斥器(数据锁)保护的数据, 但是我们发现 counter 是一个只读的互斥器, 但是却可以用 counter.lock().unwrap() 来获取被保护数据的可变引用(指针), 所以才能直接修改他的值

rust
use std::sync::Mutex;
use std::cell::RefCell;

fn main() {
    let counter = Mutex::new(10);

    // 手动解引用并修改被保护数据的值, 那么这就说明
    // counter.lock().unwrap() 返回的是一个指针(或者叫可变引用)
    // 所以才可以这样操作: *指针 = 值
    *counter.lock().unwrap() = 20;
    println!("counter: {}", *counter.lock().unwrap());

    // RefCell 不让编译器在编译时决定 x 是否是可变的
    // 而是手动在运行时决定数据是否可变
    let nums = RefCell::new(vec![1, 2, 3]);
    nums.borrow_mut().push(4);
    println!("nums: {:?}", nums.borrow());
}

循环引用 和 线程死锁

  1. 使用 Refcell + Rc 可能会导致循环引用
  2. 使用 Mutex + Arc 可能会导致线程死锁

循环引用(Reference Cycles): 两个Rc变量互相引用, 导致引用计数器无法清零, 导致内存泄漏

线程死锁(deadlock): 当一个操作需要锁住两个资源而两个线程各持一个锁, 这会造成们永远相互等待

rust
use std::sync::{Arc, Mutex};
use std::thread;

fn main() {
    let a = Arc::new(Mutex::new(0));
    let b = Arc::new(Mutex::new(0));

    // 克隆 Arc 智能指针
    let a_clone = Arc::clone(&a);
    let b_clone = Arc::clone(&b);

    let h1 = thread::spawn(move || {
        // 获取 a 的锁
        let a_data = a.lock().unwrap();
        println!("h1-a: {}", *a_data);

        // 模拟耗时的操作
        thread::sleep(std::time::Duration::from_secs(1));

        // 获取 b 的锁
        let b_data = b.lock().unwrap();
        println!("h1-b: {}", *b_data);
    });

    let h2 = thread::spawn(move || {
        // 获取 b_clone 的锁
        let b_clone_data = b_clone.lock().unwrap();
        println!("h2-b: {}", *b_clone_data);

        // 模拟耗时的操作
        thread::sleep(std::time::Duration::from_secs(1));

        // 获取 a_clone 的锁
        let a_clone_data = a_clone.lock().unwrap();
        println!("h2-a: {}", *a_clone_data);
    });

    // 1. 创建连个线程 h1, h2
    // 2. h1 执行(有个耗费时间的操作), h2 不会等待 h1 执行完再执行, 他们是并行的
    // 3. h1 执行时获取了 a 的锁, h2 执行时获取了 b_clone 的锁
    // 4. 当 h1 耗时的操作执行完之后, 想要获取 b 的锁, 发现 b 的锁已经被h2线程获取了(b_clone), 就需要等着 b 的锁解开
    // 5. 当 h2 耗时的操作执行完之后, 想要获取 a_clone 的锁, 发现 a_clone 的锁已经被h1线程获取了(a), 就需要等着 a_clone 的锁解开
    // 6. h1 等 h2 解锁, h2 等 h1 解锁, 这就没完没了了, 就会导致程序卡住, 不会结束了
    // 7. 要想解决这个问题: 必须要让一个线程让步(先执行完,自动解锁), 那么另外一个线程才能获得锁然后执行完
    //    比如: 注释34,35行, 让 h2 直接执行完, 那么 h1 就可以顺利的获取 b 的锁, 然后也执行完

    /*
      主线程: 被 h1 h2 阻塞了, 需要两个线程都执行完才能结束
      h1线程:
        1. 获取 a 的锁
        2. 执行耗时操作
        3. 获取 b 的锁, 发现锁被h2线程获取了, 需等待解锁后才能获取

      h2线程:
        1. 获取 b_clone 的锁
        2. 执行耗时操作
        3. 获取 a_clone 的锁, 发现锁被h1线程获取了, 需等待解锁后才能获取
    */

    h1.join().unwrap();
    h2.join().unwrap();
}

虽然说 Rust 是号称内存安全的编程语言, 但这只是相对的, 内存是否安全这取决于使用的人, 而不是编程语言, 熟练使用 C/C++ 的人写的代码可能也是 "内存安全" 的, 只是说, 太过自由的语法 容易让人忽略掉错误, 而绑住手脚的编译器检查可以减少忽略错误的几率

Sync 和 Send 特性

这两个 trait 的作用:

  • Sync 可以在线程之间安全的共享引用的类型
  • Send 可以跨线程传输的类型

注意这两个 trait 的特殊性:

不要手动的实现这两个 trait, Rust程序设计语言这本书中也说了: 手动实现 Send 和 Sync 是不安全的

标准库文档上也说: 当编译器确定适当时, 会自动实现此 trait

Released under the MIT License.