我试图通过使用线程来加速大向量上的昂贵计算。我的函数使用一个向量,计算一个新值的向量(它不聚合,但必须保留输入顺序),然​​后返回它。但是,我正在努力弄清楚如何生成线程,为每个线程分配向量切片,​​然后收集并合并结果。

// tunable
const NUMTHREADS: i32 = 4;

fn f(val: i32) -> i32 {
    // expensive computation
    let res = val + 1;
    res

}

fn main() {
    // choose an odd number of elements
    let orig = (1..14).collect::<Vec<i32>>();
    let mut result: Vec<Vec<i32>> = vec!();
    let mut flat: Vec<i32> = Vec::with_capacity(orig.len());
    // split into slices
    for chunk in orig.chunks(orig.len() / NUMTHREADS as usize) {
        result.push(
            chunk.iter().map(|&digit|
                f(digit)).collect()
            );
    };
    // flatten result vector
    for subvec in result.iter() {
        for elem in subvec.iter() {
            flat.push(elem.to_owned());
        }
    }
    println!("Flattened result: {:?}", flat);
}

线程计算应该在for chunk…// flatten …之间进行,但我找不到许多简单的示例,它们产生了x个线程,按顺序分配了块,然后将新计算出的向量从线程中返回到容器中,以便可以将其展平。我是否必须将orig.chunks()包装在Arc中,并手动在循环中抓取每个块?我是否必须将f传递到每个线程中?我是否必须使用B树来确保输入和输出顺序匹配?我可以只使用 simple_parallel 吗?

最佳答案

好吧,这是不稳定 thread::scoped() 的理想应用程序:

#![feature(scoped)]
use std::thread::{self, JoinGuard};

// tunable
const NUMTHREADS: i32 = 4;

fn f(val: i32) -> i32 {
    // expensive computation
    let res = val + 1;
    res
}

fn main() {
    // choose an odd number of elements
    let orig: Vec<i32> = (1..14).collect();

    let mut guards: Vec<JoinGuard<Vec<i32>>> = vec!();

    // split into slices
    for chunk in orig.chunks(orig.len() / NUMTHREADS as usize) {
        let g = thread::scoped(move || chunk.iter().cloned().map(f).collect());
        guards.push(g);
    };

    // collect the results
    let mut result: Vec<i32> = Vec::with_capacity(orig.len());
    for g in guards {
        result.extend(g.join().into_iter());
    }

    println!("Flattened result: {:?}", result);
}

它是不稳定的,因为它具有固有的缺陷(您可以找到更多的here),因此不太可能以这种形式稳定下来。据我所知,simple_parallel只是这种方法的扩展-它隐藏了对JoinGuards的摆弄,并且还可以在稳定的Rust中使用(我相信可能带有unsafe ty)。但是,不建议将其作为一般用途,如其文档所建议的那样。

当然,您可以使用thread::spawn(),但是随后您将需要克隆每个块,以便可以将其移动到每个线程中:

use std::thread::{self, JoinHandle};

// tunable
const NUMTHREADS: i32 = 4;

fn f(val: i32) -> i32 {
    // expensive computation
    let res = val + 1;
    res
}

fn main() {
    // choose an odd number of elements
    let orig: Vec<i32> = (1..14).collect();

    let mut guards: Vec<JoinHandle<Vec<i32>>> = vec!();

    // split into slices
    for chunk in orig.chunks(orig.len() / NUMTHREADS as usize) {
        let chunk = chunk.to_owned();
        let g = thread::spawn(move || chunk.into_iter().map(f).collect());
        guards.push(g);
    };

    // collect the results
    let mut result: Vec<i32> = Vec::with_capacity(orig.len());
    for g in guards {
        result.extend(g.join().unwrap().into_iter());
    }

    println!("Flattened result: {:?}", result);
}

关于multithreading - 消耗不重叠的矢量块,并合并结果,我们在Stack Overflow上找到一个类似的问题:https://stackoverflow.com/questions/31121250/

10-13 00:08