Вызов `Arc::new()` внутри потока Rayon приводит к ошибкам компилятора мусора

У меня есть функция, которая генерирует хэши из разных входных данных. Один такой хэш я хочу сохранить как Arc<[u8; 16]>, чтобы я мог делиться им между потоками и структурами. Раньше я сохранял его как Vec<u8>, передавал по каналам, а затем локально преобразовывал в [u8; 16]. Очевидно неэффективно, поэтому Arc<[u8; 16]>. Но когда я сделал преобразование, я получил такие ошибки:

error[E0277]: `std::sync::mpsc::Sender<Header<'a, Body<'a>>>` cannot be shared between threads safely
   --> src/lib.rs:182:13
    |
182 |             spawn(move || {
    |             ^^^^^ `std::sync::mpsc::Sender<Header<'a, Body<'a>>>` cannot be shared between threads safely
    |
    = help: within `CreatorPipeline<Header<'a, Body<'a>>, std::sync::Arc<[u8; 16]>, (std::sync::Arc<[u8; 16]>, std::path::PathBuf, u64), (std::vec::Vec<u8>, u64, std::vec::Vec<u8>), Block, Body<'a>>`, the trait `std::marker::Sync` is not implemented for `std::sync::mpsc::Sender<Header<'a, Body<'a>>>`
    = note: required because it appears within the type `(std::sync::mpsc::Sender<Header<'a, Body<'a>>>, std::sync::mpsc::Receiver<Header<'a, Body<'a>>>)`
    = note: required because it appears within the type `CreatorPipeline<Header<'a, Body<'a>>, std::sync::Arc<[u8; 16]>, (std::sync::Arc<[u8; 16]>, std::path::PathBuf, u64), (std::vec::Vec<u8>, u64, std::vec::Vec<u8>), Block, Body<'a>>`
    = note: required because of the requirements on the impl of `std::marker::Send` for `&CreatorPipeline<Header<'a, Body<'a>>, std::sync::Arc<[u8; 16]>, (std::sync::Arc<[u8; 16]>, std::path::PathBuf, u64), (std::vec::Vec<u8>, u64, std::vec::Vec<u8>), Block, Body<'a>>`
    = note: required because it appears within the type `[closure@src/lib.rs:182:19: 232:14 file:std::path::PathBuf, self:&CreatorPipeline<Header<'a, Body<'a>>, std::sync::Arc<[u8; 16]>, (std::sync::Arc<[u8; 16]>, std::path::PathBuf, u64), (std::vec::Vec<u8>, u64, std::vec::Vec<u8>), Block, Body<'a>>, tx_main:std::sync::mpsc::Sender<std::sync::Arc<[u8; 16]>>, tx_input:std::sync::mpsc::Sender<(std::sync::Arc<[u8; 16]>, std::path::PathBuf, u64)>, tx_fd:std::sync::mpsc::Sender<(std::vec::Vec<u8>, u64, std::vec::Vec<u8>)>]`
    = note: required by `rayon_core::spawn::spawn`

Таких ошибок еще 11. Все некоторые вариации Foo cannot be shared between threads safely. Вот суть со всеми ошибками.

Вот фрагмент моего кода:

...

// File creation pipeline
pub struct CreatorPipeline<A, B, C, D, E, F> {
    magic: Arc<[u8; 8]>,
    rec_set_id: Arc<RwLock<Option<&'static [u8; 16]>>>,
    writes: (Sender<A>, Receiver<A>),           // A: Header<Body>
    main: (Sender<B>, Receiver<B>),             // B: Arc<[u8; 16]>
    input: (Sender<C>, Receiver<C>),            // C: (Arc<[u8; 16]>, File)
    body: (Sender<F>, Receiver<F>),             // F: Body
    file_description: (Sender<D>, Receiver<D>), // D: (Arc<[u8; 16]>, u64, Vec<u8>)
    recovery: (Sender<E>, Receiver<E>),         // E: Block
}



// Creation pipeline methods
impl<'a>
    CreatorPipeline<
        Header<Body<'a>>,          // writes Packet
        Arc<[u8; 16]>,                 // main file_id
        (Arc<[u8; 16]>, PathBuf, u64), // input (file_id, file, length)
        (Vec<u8>, u64, Vec<u8>),       // file_description (name, length, hash_16k)
        Block,                         // recovery Block
        Body<'a>,                      // packet Body
    >
{
    ...

    // First Stage: Create file ids and partial bodies for FileDescription. Send
    // file ids, partial bodies and file readers to the correct channels.
    fn create_file_id(&self, files: Vec<PathBuf>) -> Result<(), ExitFailure> {
        let (tx_main, _) = &self.main; // sender for create_main()
        let (tx_input, _) = &self.input; // sender for create_input()
        let (tx_fd, _) = &self.file_description; // sender for create_fd()

        for file in files {
            let tx_main = tx_main.clone();
            let tx_input = tx_input.clone();
            let tx_fd = tx_fd.clone();

            // Spawn thread
            spawn(move || {
                let mut reader = File::open(&file)
                    .with_context(|_| format!("Could not open file {}", file.display()))
                    .unwrap();

                // Get filename from path
                let name = file
                    .file_stem()
                    .unwrap()
                    .to_string_lossy()
                    .into_owned()
                    .into_bytes();

                let length = {
                    let metadata = metadata(&file).unwrap();
                    metadata.len()
                };

                // Hash first 16k of the file
                let hash_16k = {
                    let mut hasher_16k = Md5::new();
                    let mut buffer = [0; 16384];
                    reader.read(&mut buffer).unwrap();
                    for byte in buffer.iter() {
                        hasher_16k.input([byte.clone()]);
                    }

                    let result = hasher_16k.result();
                    let hash_16k = result.as_slice().to_owned();
                    hash_16k
                };

                // Generate File ID
                let file_id = {
                    let mut hasher_file_id = Md5::new();
                    hasher_file_id.input(&hash_16k);
                    hasher_file_id.input(&length.to_le_bytes());
                    hasher_file_id.input(&name);
                    let file_id = hasher_file_id.result().to_vec();
                    let file_id = self.convert_to_byte_array(file_id);
                    Arc::new(file_id) // Problem line
                };

                // Partial FileDescription (name, length, hash_16k)
                let partial_body = (name, length, hash_16k);

                // sender for channels
                tx_main.send(Arc::clone(&file_id)).unwrap();
                tx_input.send((Arc::clone(&file_id), file, length)).unwrap();
                tx_fd.send(partial_body).unwrap();
            });
        }

        Ok(())
    }

    ...
}

Вот полный исходный код.
Документация Arc
Районная документация

Изменить: забыл упомянуть, что я использую стабильную версию 2018 года.


person Ironlenny    schedule 29.04.2019    source источник
comment
Может быть, это из-за жизни? Вы можете передать только 'static времени жизни потокам. И какой смысл иметь Arc<&'a [u8; 8]>? Почему не Arc<[u8; 8]>?   -  person chpio    schedule 29.04.2019
comment
Vec<u8> реализует Send чтобы его можно было поместить в Arc и разделить между потоками. array не могут, потому что они не реализуют Send.   -  person Jmb    schedule 29.04.2019
comment
Хорошо, как UUID получает < i>Отправить и синхронизировать автоматически? Это просто оболочка над [u8; 16].   -  person chpio    schedule 29.04.2019
comment
@chpio Хороший вопрос. На самом деле, похоже, что u8 массивы реализуют Send документы не упоминайте об этом…   -  person Jmb    schedule 29.04.2019
comment
Может быть, это не отображается в документах из-за того, что черты Send и Sync являются автоматическими, поэтому они не отображаются для универсальных типов?   -  person chpio    schedule 29.04.2019
comment
@chpio Но Vec является универсальным типом, а Send показывает здесь. Он также отображается для примитивных типов. Может быть, проблема в том, что array является общим общим и примитивным?   -  person Jmb    schedule 29.04.2019
comment
Я рад, что я не единственный, кто в тупике.   -  person Ironlenny    schedule 29.04.2019
comment
@chpio Оглядываясь назад, мне не нужна Arc‹&'a [u8; 8]›. Я все еще изучаю Rust.   -  person Ironlenny    schedule 29.04.2019
comment
@Ironlenny ах, да, извините за это, я думал, что это вызывает проблему   -  person chpio    schedule 29.04.2019


Ответы (1)


Мне помог сервер Rust Discord. Выяснилось, что let file_id = self.convert_to_byte_array(file_id); была проблемной строкой. Что имеет смысл, я пытаюсь переместить self в несколько потоков, что, очевидно, не работает. Я просто хочу, чтобы ошибки компилятора были более полезными.

person Ironlenny    schedule 29.04.2019