测试实例:map-reduce
Rust 使并行数据处理变得非常简单,避免了传统并行处理中常见的诸多问题。
标准库提供了开箱即用的优秀线程原语。这些原语结合 Rust 的所有权概念和别名规则,自动防止了数据竞争。
别名规则(一个可写引用异或多个可读引用)自动防止你操作对其他线程可见的状态。(在需要同步的情况下,可以使用 Mutex 或 Channel 等同步原语。)
在这个例子中,我们将计算一个数字块中所有数字的总和。我们通过将数字块分成小块并分配给不同的线程来完成这个任务。每个线程将计算其小块数字的总和,随后我们将汇总每个线程产生的中间结果。
注意,尽管我们在线程间传递引用,但 Rust 理解我们只是传递只读引用,因此不会发生不安全操作或数据竞争。此外,由于我们传递的引用具有 'static 生命周期,Rust 确保这些线程运行时数据不会被销毁。(当需要在线程间共享非 static 数据时,可以使用 Arc 等智能指针来保持数据存活并避免非 static 生命周期。)
use std::thread;
// 这是主线程
fn main() {
// 这是我们要处理的数据
// 我们将通过一个线程化的 map-reduce 算法计算所有数字的总和
// 每个由空格分隔的块将在不同的线程中处理
//
// TODO:试试插入空格会对输出有什么影响!
let data = "86967897737416471853297327050364959
11861322575564723963297542624962850
70856234701860851907960690014725639
38397966707106094172783238747669219
52380795257888236525459303330302837
58495327135744041048897885734297812
69920216438980873548808413720956532
16278424637452589860345374828574668";
// 创建一个向量来存储我们将要生成的子线程。
let mut children = vec![];
/*************************************************************************
* "Map"阶段
*
* 将数据分割成多个段,并进行初步处理
************************************************************************/
// 将数据分割成多个段以进行单独计算
// 每个数据块都是指向实际数据的引用(&str)
let chunked_data = data.split_whitespace();
// 遍历数据段
// .enumerate() 为迭代的每个元素添加当前循环索引
// 生成的元组"(索引, 元素)"随后立即通过
// "解构赋值"被"解构"为两个变量:"i"和"data_segment"
//
for (i, data_segment) in chunked_data.enumerate() {
println!("数据段 {} 是\"{}\"", i, data_segment);
// 在单独的线程中处理每个数据段
//
// spawn() 返回新线程的句柄,
// 我们必须保留该句柄以访问返回值
//
// 'move || -> u32' 是一个闭包的语法,它:
// * 不接受参数('||')
// * 获取其捕获变量的所有权('move')
// * 返回一个无符号 32 位整数('-> u32')
//
// Rust 足够智能,能从闭包本身推断出 '-> u32',
// 所以我们可以省略它。
//
// TODO:尝试移除 'move' 并观察结果
children.push(thread::spawn(move || -> u32 {
// 计算此段的中间和:
let result = data_segment
// 遍历此段中的字符...
.chars()
// ...将文本字符转换为对应的数值...
.map(|c| c.to_digit(10).expect("应该是一个数字"))
// ...并对结果数字迭代器求和
.sum();
// println! 会锁定标准输出,因此不会出现文本交错
println!("已处理段 {},结果={}", i, result);
// 不需要使用 "return",因为 Rust 是一种"表达式语言",
// 每个代码块中最后求值的表达式会自动成为该块的返回值。
result
}));
}
/*************************************************************************
* "归约"阶段
*
* 收集中间结果,并将它们合并成最终结果
************************************************************************/
// 将每个线程的中间结果合并成一个最终总和。
//
// 我们使用 "turbofish" ::<> 为 sum() 提供类型提示。
//
// TODO:尝试不使用 turbofish,而是显式
// 指定 final_result 的类型
let final_result = children.into_iter().map(|c| c.join().unwrap()).sum::<u32>();
println!("最终求和结果:{}", final_result);
}
练习
让线程数量依赖于用户输入的数据并不明智。如果用户决定插入大量空格,我们真的想要创建 2,000 个线程吗?修改程序,使数据始终被分割成固定数量的块,这个数量应由程序开头定义的静态常量来确定。