进程线程
在大部分现代操作系统中, 已执行程序的代码在一个 进程(process)中运行, 操作系统则负责管理多个进程, 在程序内部, 也可以拥有多个同时运行的独立部分, 运行这些独立部分的功能被称为 线程(threads)
可以将操作系统看做一个超级大工厂, 每一个进程就是一个独立的生产流水线, 每一个线程就是生产流水线上的工人
如果一个进程中只有一个线程, 那么就是单线程
, 如果有多个就是 多线程
多进程模块了解
use std::process::Command;
fn convert_vec_u8_to_string(v: Vec<u8>) -> String {
String::from_utf8(v).expect("not utf8")
}
fn main() {
let out = Command::new("ls")
.arg("-l")
.arg("-a")
.output()
.expect("ls command failed to start");
let exec_res = if out.status.success() {
convert_vec_u8_to_string(out.stdout)
} else {
convert_vec_u8_to_string(out.stderr)
};
println!("{}", exec_res);
}
多线程
创建线程
use std::thread;
use std::time::Duration;
fn set_timeout(wait: u64) {
thread::sleep(Duration::from_millis(wait));
}
fn main() { // <-主线程
thread::spawn(|| { // <-子线程
for i in 0..10 {
println!("spawn {}:", i);
set_timeout(100);
}
});
for i in 0..5 {
println!("main {}:", i);
set_timeout(100);
}
}
这个代码由于主线程比子线程会先执行完, 子线程并不会阻塞主线程, 就会导致子线程还未执行完, 主线程就已经退出了
等待线程结束
thread::spawn 方法 返回一个 线程句柄 JoinHandle 用于等待线程结束
use std::thread;
use std::time::Duration;
fn set_timeout(wait: u64) {
thread::sleep(Duration::from_millis(wait));
}
fn main() {
let handle = thread::spawn(|| {
for i in 0..10 {
println!("spawn {}:", i);
set_timeout(100);
}
});
for i in 0..5 {
println!("main {}:", i);
set_timeout(100);
}
// join 方法会阻塞主线程, 直到子线程执行结束
handle.join().unwrap();
}
线程闭包与move关键字
use std::thread;
fn main() {
let items = vec![1, 3, 5];
// 此时编译器会报错: 闭包的生命周期可能比 items 变量的声明周期要长,
// 所以, 不能够在闭包中借用一个生命周期比闭包要短的变量
// closure may outlive the current function,
// but it borrows `items`, which is owned by the current function
// 由于 items 的所有权在主线程这里, 所以子线程(闭包)只是借用
// 但是: 主线程会比闭包先执行完, items也会随着主线程执行完而销毁
// 这时再去借用一个被销毁的变量, 那么就会报错
// 如果要解决这个问题:
// 只需要让 闭包获取到这个 items 变量的所有权, 即可解决, 这样的话
// 不管主线程什么时候执行完, 都不会影响 items 变量的销毁
let handle = thread::spawn(|| {
let mut i = 0;
items.iter().for_each(|item| {
println!("items[{}]: {}", i, item);
i += 1;
});
});
handle.join().unwrap();
}
use std::thread;
fn main() {
let items = vec![1, 3, 5];
// 1. 使用 move 关键字可以让变量的所有权移交给闭包
// 2. 当主线程执行完, items 变量不会随着销毁
// 3. 只有当子线程(闭包)执行完之后, items 才会随之销毁
let handle = thread::spawn(move || {
let mut i = 0;
items.iter().for_each(|item| {
println!("items[{}]: {}", i, item);
i += 1;
});
});
handle.join().unwrap();
}
消息传递共享线程的数据(推荐)
Do not communicate by sharing memory; instead, share memory by communicating.
这句话来自Go语言的文档, 意思是: 不要通过共享内存来通讯, 而是通过通讯来共享内存
这是一种确保安全(不会有数据竞争/脏数据等问题)的并发编程方式
Rust 中试下消息传递并发的工具是 通道
(channel), 标准库提供了实现
消息传递共享数据
use std::thread;
use std::sync::mpsc;
use std::time::Duration;
fn main() {
// 一个通道由2个部分组成: 发送端 和 接收端
let (sender, receiver) = mpsc::channel();
// 注意需要将 sender 的所有权移交给子线程
thread::spawn(move || {
// 子线程发送数据(发送端)
let data = String::from("hello");
// 延迟 1s 后发送
thread::sleep(Duration::from_secs(1));
sender.send(data).unwrap();
});
// 主线程接收数据(接收端)
// recv 方法会阻塞线程, 等到接收到发送端的数据以后再往后执行
let data = receiver.recv().unwrap();
println!("receive data: {}", data);
}
多次发送数据
上面代码可以看出, 接收端会阻塞线程, 那如果发送多次数据, 要如何操作呢?
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (sender, receiver) = mpsc::channel();
thread::spawn(move || {
let data_items = vec![
String::from("hello"),
String::from("rust"),
String::from("world"),
];
for item in data_items {
sender.send(item).unwrap();
// 每次发送都停顿1s
thread::sleep(Duration::from_secs(1));
}
});
// 可以手动的调用 recv 方法, 但是弊端就是不知道后续
// 是否还有数据, 应该调用几次 recv
// let r1 = receiver.recv().unwrap();
// println!("{}", r1);
// let r2 = receiver.recv().unwrap();
// println!("{}", r2);
// 这个 receiver 其实实现了迭代器特性, 所以
// 可以直接将 receiver 当作一个迭代器来使用
for recv_item in receiver {
println!("{}", recv_item);
}
}
通过克隆发送者来创建多个生产者
mpsc
其实是 mutiple producer, single consumer
的缩写
发送端(sender
) 实现了 Clone
特性, 所以可以通过 clone 方法来创建多个发送端
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (sender, receiver) = mpsc::channel();
let sender2 = sender.clone();
thread::spawn(move || {
let data_items = vec![
String::from("hello"),
String::from("rust"),
String::from("world"),
];
for item in data_items {
sender.send(item).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
thread::spawn(move || {
let data_items = vec![
String::from("from sender2 data: hello"),
String::from("from sender2 data: rust"),
String::from("from sender2 data: world"),
];
for item in data_items {
sender2.send(item).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
// 会同时接收 sender 和 sender2 发送过来的数据
for recv_item in receiver {
println!("{}", recv_item);
}
}
共享内存共享线程的数据
为了避免数据竞争等问题, 在多个线程共享数据时, 可以考虑使用互斥器
互斥器一次只允许一个线程访问数据
- 在使用数据之前, 应该先获得(数据)锁
- 处理完被互斥器所保护的数据之后会自动解开锁, 然后其他线程才能获取这个数据的锁
什么是锁?
锁是一种机制, 用于确保在多线程编程时 同一时刻只有一个线程可以访问某个数据
什么是互斥器?
互斥器就是 锁
这种机制的具体实现, 它可以确保在多线程编程时 同一时刻只有一个线程可以访问
要想访问某个被互斥器保护的数据, 必须先获得这个数据的互斥器锁, 只有获得这个锁之后才能访问被保护的数据
use std::sync::Mutex;
fn main() {
let m = Mutex::new(11);
// 直接获取数据时不行的, 这个字段是私有的
// println!("m: {:?}", m.data);
// 直接解引用也是不行的
// println!("m: {:?}", *m);
// 必须先获取锁, 然后才能访问到被锁保护的数据
// 被保护的数据是一个 Result 枚举, 所以需要 unwrap
// 用互斥器锁就可以避免多线程的数据竞争问题
let protected_data = m.lock().unwrap();
println!("m_data: {:?}", protected_data);
}
多线程和多所有权
use std::sync::Mutex;
use std::thread;
fn main() {
let counter = Mutex::new(10);
let mut handles = vec![];
for _ in 0..10 {
// 编译器报错: value moved into closure here, in previous iteration of loop
// 意思是, 第一次循环的时候, counter 的所有权就被移交到第一个子线程了,
// 所以后续的无法获取到 counter 的所有权, 在 Rust 中默认是单所有权的,
// 所以, 第一个子线程获取到所有权之后, 别的子线程就无法获取到所有权了
// 1. 要解决这个问题就必须: 让一个数据有多个所有权
// 2. 此时你可能会想到用引用计数智能指针Rc<T>, 但引用计数只能用于单线程
// 3. 所以需要多线程版本的引用计数智能指针 Arc<T>, 这个智能指针可以看做
// Rc<T> 的多线程版本, 他们的用法基本一致, API设计的也很像
let handle = thread::spawn(move || {
let mut num = counter.lock().unwrap();
*num += 1;
println!("thread-num: {}", *num);
});
handles.push(handle);
}
// 阻塞主线程, 等待所有子线程结束
for handle in handles {
handle.join().unwrap();
}
}
use std::sync::Mutex;
use std::thread;
use std::rc::Rc;
fn main() {
let counter = Rc::new(Mutex::new(10));
let mut handles = vec![];
for _ in 0..10 {
let counter = Rc::clone(&counter);
let handle = thread::spawn(move || {
// 编译器报错: `Rc<Mutex<i32>>` cannot be sent between threads safely
// 翻译为中文就是: Rc<Mutex> 无法安全的在不同的线程之间发送(数据)
// 虽然不能解决问题, 但是也可以看到错误, 避免后续遇到同样错误不知道如何解决
let mut data = counter.lock().unwrap();
*data += 1;
println!("thread-num: {}", *data);
});
handles.push(handle);
}
// 阻塞主线程, 等待所有子线程结束
for handle in handles {
handle.join().unwrap();
}
}
use std::sync::Mutex;
use std::thread;
use std::sync::Arc;
fn main() {
let counter = Arc::new(Mutex::new(10));
let mut handles = vec![];
for _ in 0..10 {
let counter = Arc::clone(&counter);
let handle = thread::spawn(move || {
// unbelievable!!! 代码终于通过编译了
// 既然Rust是内存安全的语言, 为什么还区分 Rc 和 Arc, 都使用 Arc 不就好了吗?
// 虽然 Arc 可以让数据在多线程的情况下有多所有权, 但是也需要做更多的事情
// 性能肯定是不如 Rc 的, 所以在单线程的情况下, 还是需要用性能更好的 Rc
let mut data = counter.lock().unwrap();
*data += 1;
println!("thread-num: {}", *data);
});
handles.push(handle);
}
// 阻塞主线程, 等待所有子线程结束
for handle in handles {
handle.join().unwrap();
}
}
Refcell 和 Mutex 与内部可变性
由上面的代码可以看出, counter
是一个由互斥器(数据锁)保护的数据, 但是我们发现 counter
是一个只读的互斥器, 但是却可以用 counter.lock().unwrap()
来获取被保护数据的可变引用(指针), 所以才能直接修改他的值
use std::sync::Mutex;
use std::cell::RefCell;
fn main() {
let counter = Mutex::new(10);
// 手动解引用并修改被保护数据的值, 那么这就说明
// counter.lock().unwrap() 返回的是一个指针(或者叫可变引用)
// 所以才可以这样操作: *指针 = 值
*counter.lock().unwrap() = 20;
println!("counter: {}", *counter.lock().unwrap());
// RefCell 不让编译器在编译时决定 x 是否是可变的
// 而是手动在运行时决定数据是否可变
let nums = RefCell::new(vec![1, 2, 3]);
nums.borrow_mut().push(4);
println!("nums: {:?}", nums.borrow());
}
循环引用 和 线程死锁
- 使用 Refcell + Rc 可能会导致循环引用
- 使用 Mutex + Arc 可能会导致线程死锁
循环引用(Reference Cycles): 两个Rc变量互相引用, 导致引用计数器无法清零, 导致内存泄漏
线程死锁(deadlock): 当一个操作需要锁住两个资源而两个线程各持一个锁, 这会造成们永远相互等待
use std::sync::{Arc, Mutex};
use std::thread;
fn main() {
let a = Arc::new(Mutex::new(0));
let b = Arc::new(Mutex::new(0));
// 克隆 Arc 智能指针
let a_clone = Arc::clone(&a);
let b_clone = Arc::clone(&b);
let h1 = thread::spawn(move || {
// 获取 a 的锁
let a_data = a.lock().unwrap();
println!("h1-a: {}", *a_data);
// 模拟耗时的操作
thread::sleep(std::time::Duration::from_secs(1));
// 获取 b 的锁
let b_data = b.lock().unwrap();
println!("h1-b: {}", *b_data);
});
let h2 = thread::spawn(move || {
// 获取 b_clone 的锁
let b_clone_data = b_clone.lock().unwrap();
println!("h2-b: {}", *b_clone_data);
// 模拟耗时的操作
thread::sleep(std::time::Duration::from_secs(1));
// 获取 a_clone 的锁
let a_clone_data = a_clone.lock().unwrap();
println!("h2-a: {}", *a_clone_data);
});
// 1. 创建连个线程 h1, h2
// 2. h1 执行(有个耗费时间的操作), h2 不会等待 h1 执行完再执行, 他们是并行的
// 3. h1 执行时获取了 a 的锁, h2 执行时获取了 b_clone 的锁
// 4. 当 h1 耗时的操作执行完之后, 想要获取 b 的锁, 发现 b 的锁已经被h2线程获取了(b_clone), 就需要等着 b 的锁解开
// 5. 当 h2 耗时的操作执行完之后, 想要获取 a_clone 的锁, 发现 a_clone 的锁已经被h1线程获取了(a), 就需要等着 a_clone 的锁解开
// 6. h1 等 h2 解锁, h2 等 h1 解锁, 这就没完没了了, 就会导致程序卡住, 不会结束了
// 7. 要想解决这个问题: 必须要让一个线程让步(先执行完,自动解锁), 那么另外一个线程才能获得锁然后执行完
// 比如: 注释34,35行, 让 h2 直接执行完, 那么 h1 就可以顺利的获取 b 的锁, 然后也执行完
/*
主线程: 被 h1 h2 阻塞了, 需要两个线程都执行完才能结束
h1线程:
1. 获取 a 的锁
2. 执行耗时操作
3. 获取 b 的锁, 发现锁被h2线程获取了, 需等待解锁后才能获取
h2线程:
1. 获取 b_clone 的锁
2. 执行耗时操作
3. 获取 a_clone 的锁, 发现锁被h1线程获取了, 需等待解锁后才能获取
*/
h1.join().unwrap();
h2.join().unwrap();
}
虽然说 Rust 是号称内存安全的编程语言, 但这只是相对的, 内存是否安全这取决于使用的人, 而不是编程语言, 熟练使用 C/C++ 的人写的代码可能也是 "内存安全" 的, 只是说, 太过自由的语法 容易让人忽略掉错误, 而绑住手脚的编译器检查可以减少忽略错误的几率
Sync 和 Send 特性
这两个 trait 的作用:
注意这两个 trait 的特殊性:
不要手动的实现这两个 trait, Rust程序设计语言这本书中也说了: 手动实现 Send 和 Sync 是不安全的
标准库文档上也说: 当编译器确定适当时, 会自动实现此 trait