MPI_Gather Вектор объектов

Я пишу программу mpich для параллельной сортировки. Мне нужно использовать интерфейс mpi_gather, но он не поддерживает передачу вектора объектов. Поэтому я использую boost_serialization.

Реализация

Я использую boost_serialization для сериализации вектора.

std::string serial_str;
boost::iostreams::back_insert_device<std::string> inserter(serial_str);
boost::iostreams::stream<boost::iostreams::back_insert_device<std::string>> s(inserter);
boost::archive::binary_oarchive send_ar(s);
//samples is the vector<object>
send_ar << samples;
s.flush();
int len = serial_str.size();

Затем я использую mpi_gather, чтобы отправить все serial_str в корневой процесс (data_recv).

char *data_recv = NULL;
if(myid == 0){
data_recv = (char*)malloc(sizeof(char) * (len_all+1));
    data_recv[len_all] = '\0';
}
MPI_Gather((void*)serial_str.data(), len, MPI_BYTE, data_recv, len, MPI_BYTE, 0, MPI_COMM_WORLD);

Наконец, я десериализую данные в data_recv.

boost::iostreams::basic_array_source<char> device(data_recv,len_all);
    boost::iostreams::stream<boost::iostreams::basic_array_source<char>> s(device);
    boost::archive::binary_iarchive recv_ar(s);

std::vector<mdata> recv_vec;    
    recv_ar >> recv_vec;

Моя реализация основана на Как отправить заданный объект в MPI_Send< /а>

Проблема

Я не могу правильно десериализовать данные в data_recv. Я напечатал data_recv, затем обнаружил, что данные в data_recv неправильно отформатированы после mpi_gather. Второй архив покрывает первый. (выделен жирным шрифтом)

сериализация::архив

XYLVXE-M x 000000000000000000000000002595 DDDDDFFFFCCCCBBBB11113333322222DDDDDDD3333888888888882222 Сериализация :: ARCHIVE 0000000000000023D0AFFFFEAIS :: ARCHIVE 0000000000000023DD0ABFFFFERIAGIS

Xylvxe-M x 000000000000000000000000002595 DDDDDFFFFCCCCBBBB11113333322222DDDDDD333388888888882222O@F &! O, T.B x 0000000000000000000000000023D0877777777777777777777777.aff4777777777777777777777777777777777777777777777.aff4777777777777777777777777777777777777778.aff47777777777777777777777788888888888888222o@f &! O, oee

Правильный формат должен быть: (без перекрытия, чтобы я мог десериализовать)

сериализация::архив

Xylvxe-M x 000000000000000000000000002595 DDDDDFFFFCCCCBBBB11113333322222DDDDDD333388888888882222O@F &! O, T.B x 0000000000000000000000000023D0877777777777777777777777.aff4777777777777777777777777777777777777777777777.aff4777777777777777777777777777777777777778.aff47777777777777777777777788888888888888222o@f &! O, oee

сериализация::архив

Xylvxe-M x 000000000000000000000000002595 DDDDDFFFFCCCCBBBB11113333322222DDDDDD333388888888882222O@F &! O, T.B x 0000000000000000000000000023D0877777777777777777777777.aff4777777777777777777777777777777777777777777777.aff4777777777777777777777777777777777777778.aff47777777777777777777777788888888888888222o@f &! O, oee

Вопрос

Почему это случилось? Это потому, что mpi_gather несовместим с объектом С++? Если бы кто-то мог помочь мне, это решило бы мою большую проблему. Благодарю вас!

код

//processor rank, and total number of processors
int myid, world_size;

//for timing used by root processor
double startwtime = 0.0, endwtime;

//init MPI World
MPI_Init(&argc, &argv);
MPI_Comm_size(MPI_COMM_WORLD, &world_size);
MPI_Comm_rank(MPI_COMM_WORLD, &myid);
//get the processor name
char processor_name[MPI_MAX_PROCESSOR_NAME];
int name_len;
MPI_Get_processor_name(processor_name,&name_len);

//read local data
std::vector<mdata> mdatas; 
string data_path = "/home/jiang/mpi_data";
readAsciiData(data_path, mdatas);
cout <<"rank: "<<myid <<" mdata_vector.size(): "<<mdatas.size()<<endl;

//local sort according ASCII order 
std::sort(mdatas.begin(), mdatas.end());

//regular sample
std::vector<mdata> samples;
for(int i=0; i<mdatas.size(); i=i+mdatas.size()/world_size){
samples.push_back(mdatas[i]);
}

//gather the regular samples
//passing data in byte stream by using boost serialization

std::string serial_str;
boost::iostreams::back_insert_device<std::string> inserter(serial_str);
boost::iostreams::stream<boost::iostreams::back_insert_device<std::string>> s(inserter);
boost::archive::binary_oarchive send_ar(s);

send_ar << samples;
s.flush();
int len = serial_str.size();
//int len = s.str().size();
int *pivot_lens = NULL;
if(myid == 0){
pivot_lens = (int*)malloc(sizeof(int) * world_size);
}

cout  <<serial_str <<endl;
//first, gathering the lens and calculate the sum
cout << "rank " << myid << " on "<< processor_name << " is sending len: "<< len << endl;
MPI_Gather(&len, 1, MPI_INT, pivot_lens, 1, MPI_INT, 0, MPI_COMM_WORLD);
//calculate the sum of lens
int len_all = 0;
if(myid == 0){
for(int i=0;i<world_size;i++){
       len_all = len_all + pivot_lens[i];
       //cout << pivot_lens[i] << endl;     
}
    cout << "len_all:" << len_all << endl;
    free(pivot_lens);   
}
//then, gathering string of bytes from all the processes
char *data_recv = NULL;
if(myid == 0){
data_recv = (char*)malloc(sizeof(char) * (len_all+1));
    data_recv[len_all] = '\0';

}


MPI_Gather((void*)serial_str.data(), len, MPI_BYTE, data_recv, len, MPI_BYTE, 0, MPI_COMM_WORLD);

// cout << serial_str <<endl;
if(myid == 0){
//deconstructe from byte of string to vector<mdata>
boost::iostreams::basic_array_source<char> device(data_recv,len_all);
    boost::iostreams::stream<boost::iostreams::basic_array_source<char>> s(device);
    boost::archive::binary_iarchive recv_ar(s);

std::vector<mdata> recv_vec;    
    recv_ar >> recv_vec;
int count =0;
for(int i=0;i<len_all;i++){
    cout<<data_recv[i];
    count ++;
}
cout <<endl <<count ;
cout <<endl;
//cout << "rank " << myid << " gets the samples: " << recv_vec.size()<<endl; 
iterateForTest(myid, recv_vec);

free(data_recv);
}

MPI_Finalize();
return 0;

person JZH    schedule 04.07.2018    source источник
comment
Пожалуйста, поделитесь своим кодом.   -  person Federico Navarrete    schedule 04.07.2018
comment
Я разместил свой код. Проблема в том, что данные в буфере recv_data неверны, поэтому я не могу десериализовать поток байтов.   -  person JZH    schedule 04.07.2018
comment
Вам нужно переместить его на свой вопрос, нажмите «Изменить» и добавьте больше деталей.   -  person Federico Navarrete    schedule 04.07.2018
comment
Я загрузил более подробную информацию, спасибо за ваше терпение.   -  person JZH    schedule 04.07.2018