Самый быстрый способ вычислить минимальное евклидово расстояние между двумя матрицами, содержащими многомерные векторы

Я задал аналогичный вопрос в другом потоке, но затем я сосредоточился на том, как использовать OpenCV. Не сумев добиться того, чего я изначально хотел, я спрошу здесь именно то, что я хочу.

У меня есть две матрицы. Матрица a имеет размер 2782 x 128, а матрица b – 4000 x 128, оба значения без знака char. Значения хранятся в одном массиве. Для каждого вектора в a мне нужен индекс вектора в b с ближайшим евклидовым расстоянием.

Хорошо, теперь мой код для этого:

#include <windows.h>
#include <stdlib.h>
#include <stdio.h>
#include <cstdio>
#include <math.h>
#include <time.h>
#include <sys/timeb.h>
#include <iostream>
#include <fstream>
#include "main.h"

using namespace std;

void main(int argc, char* argv[])
{
    int a_size;
    unsigned char* a = NULL;
    read_matrix(&a, a_size,"matrixa");
    int b_size;
    unsigned char* b = NULL;
    read_matrix(&b, b_size,"matrixb");

    LARGE_INTEGER liStart;
    LARGE_INTEGER liEnd;
    LARGE_INTEGER liPerfFreq;
    QueryPerformanceFrequency( &liPerfFreq );
    QueryPerformanceCounter( &liStart );

    int* indexes = NULL;
    min_distance_loop(&indexes, b, b_size, a, a_size);

    QueryPerformanceCounter( &liEnd );

    cout << "loop time: " << (liEnd.QuadPart - liStart.QuadPart) / long double(liPerfFreq.QuadPart) << "s." << endl;

    if (a)
    delete[]a;
if (b)
    delete[]b;
if (indexes)
    delete[]indexes;
    return;
}

void read_matrix(unsigned char** matrix, int& matrix_size, char* matrixPath)
{
    ofstream myfile;
    float f;
    FILE * pFile;
    pFile = fopen (matrixPath,"r");
    fscanf (pFile, "%d", &matrix_size);
    *matrix = new unsigned char[matrix_size*128];

    for (int i=0; i<matrix_size*128; ++i)
    {
        unsigned int matPtr;
        fscanf (pFile, "%u", &matPtr);
        matrix[i]=(unsigned char)matPtr;
    }
    fclose (pFile);
}

void min_distance_loop(int** indexes, unsigned char* b, int b_size, unsigned char* a, int a_size)
{
    const int descrSize = 128;

    *indexes = (int*)malloc(a_size*sizeof(int));
    int dataIndex=0;
    int vocIndex=0;
    int min_distance;
    int distance;
    int multiply;

    unsigned char* dataPtr;
    unsigned char* vocPtr;
    for (int i=0; i<a_size; ++i)
    {
        min_distance = LONG_MAX;
        for (int j=0; j<b_size; ++j)
        {
            distance=0;
            dataPtr = &a[dataIndex];
            vocPtr = &b[vocIndex];

            for (int k=0; k<descrSize; ++k)
            {
                multiply = *dataPtr++-*vocPtr++;
                distance += multiply*multiply;
                // If the distance is greater than the previously calculated, exit
                if (distance>min_distance)
                    break;
            }

            // if distance smaller
            if (distance<min_distance)
            {
                min_distance = distance;
                (*indexes)[i] = j;
            }
            vocIndex+=descrSize;
        }
        dataIndex+=descrSize;
        vocIndex=0;
    }
}

И прилагаю файлы с образцами матриц.

матрица matrixb

Я использую windows.h только для расчета потребляемого времени, поэтому, если вы хотите протестировать код на другой платформе, отличной от Windows, просто измените заголовок windows.h и измените способ расчета потребляемого времени.

Этот код на моем компьютере составляет около 0,5 секунды. Проблема в том, что у меня есть другой код в Matlab, который делает то же самое за 0,05 секунды. В моих экспериментах я получаю несколько матриц, таких как матрица a, каждую секунду, поэтому 0,5 секунды — это слишком много.

Теперь код Matlab для расчета этого:

aa=sum(a.*a,2); bb=sum(b.*b,2); ab=a*b'; 
d = sqrt(abs(repmat(aa,[1 size(bb,1)]) + repmat(bb',[size(aa,1) 1]) - 2*ab));
[minz index]=min(d,[],2);

В порядке. Код Matlab использует это (xa)^2 = x^2 + a^2 - 2ab.

Поэтому моей следующей попыткой было сделать то же самое. Я удалил свой собственный код, чтобы сделать те же вычисления, но это было примерно 1,2 секунды.

Затем я попытался использовать разные внешние библиотеки. Первой попыткой был Эйген:

const int descrSize = 128;
MatrixXi a(a_size, descrSize);
MatrixXi b(b_size, descrSize);
MatrixXi ab(a_size, b_size);

unsigned char* dataPtr = matrixa;
for (int i=0; i<nframes; ++i)
{
    for (int j=0; j<descrSize; ++j)
    {
        a(i,j)=(int)*dataPtr++;
    }
}
unsigned char* vocPtr = matrixb;
for (int i=0; i<vocabulary_size; ++i)
{
    for (int j=0; j<descrSize; ++j)
    {
        b(i,j)=(int)*vocPtr ++;
    }
}
ab = a*b.transpose();
a.cwiseProduct(a);
b.cwiseProduct(b);
MatrixXi aa = a.rowwise().sum();
MatrixXi bb = b.rowwise().sum();
MatrixXi d = (aa.replicate(1,vocabulary_size) + bb.transpose().replicate(nframes,1) - 2*ab).cwiseAbs2();

int* index = NULL;
index = (int*)malloc(nframes*sizeof(int));
for (int i=0; i<nframes; ++i)
{
    d.row(i).minCoeff(&index[i]);
}

Этот собственный код стоит примерно 1,2 только за строку, в которой говорится: ab = a*b.transpose();

Аналогичный код с использованием opencv также использовался, и стоимость ab = a*b.transpose(); составлял 0,65 секунды.

Итак, очень раздражает, что Matlab может делать то же самое так быстро, а я не могу на C ++! Конечно, было бы здорово провести свой эксперимент, но я думаю, что отсутствие знаний меня действительно раздражает. Как мне добиться хотя бы такой же производительности, как в Matlab? Любое решение приветствуется. Я имею в виду любую внешнюю библиотеку (бесплатную, если это возможно), развертку циклов, шаблоны, инструкции SSE (я знаю, что они существуют), кеширование. Как я уже сказал, моя главная цель — расширить свои знания, чтобы иметь возможность кодировать подобные мысли с более высокой производительностью.

заранее спасибо

РЕДАКТИРОВАТЬ: больше кода, предложенного Дэвидом Хамменом. Я преобразовал массивы в int перед выполнением каких-либо вычислений. Вот код:

void min_distance_loop(int** indexes, unsigned char* b, int b_size, unsigned char* a, int a_size)
{
    const int descrSize = 128;

    int* a_int;
    int* b_int;

    LARGE_INTEGER liStart;
    LARGE_INTEGER liEnd;
    LARGE_INTEGER liPerfFreq;
    QueryPerformanceFrequency( &liPerfFreq );
    QueryPerformanceCounter( &liStart );

    a_int = (int*)malloc(a_size*descrSize*sizeof(int));
    b_int = (int*)malloc(b_size*descrSize*sizeof(int));

    for(int i=0; i<descrSize*a_size; ++i)
        a_int[i]=(int)a[i];
    for(int i=0; i<descrSize*b_size; ++i)
        b_int[i]=(int)b[i];

    QueryPerformanceCounter( &liEnd );

    cout << "Casting time: " << (liEnd.QuadPart - liStart.QuadPart) / long double(liPerfFreq.QuadPart) << "s." << endl;

    *indexes = (int*)malloc(a_size*sizeof(int));
    int dataIndex=0;
    int vocIndex=0;
    int min_distance;
    int distance;
    int multiply;

    /*unsigned char* dataPtr;
    unsigned char* vocPtr;*/
    int* dataPtr;
    int* vocPtr;
    for (int i=0; i<a_size; ++i)
    {
        min_distance = LONG_MAX;
        for (int j=0; j<b_size; ++j)
        {
            distance=0;
            dataPtr = &a_int[dataIndex];
            vocPtr = &b_int[vocIndex];

            for (int k=0; k<descrSize; ++k)
            {
                multiply = *dataPtr++-*vocPtr++;
                distance += multiply*multiply;
                // If the distance is greater than the previously calculated, exit
                if (distance>min_distance)
                    break;
            }

            // if distance smaller
            if (distance<min_distance)
            {
                min_distance = distance;
                (*indexes)[i] = j;
            }
            vocIndex+=descrSize;
        }
        dataIndex+=descrSize;
        vocIndex=0;
    }
}

Весь процесс теперь 0,6, а циклы литья в начале 0,001 секунды. Может я что-то не так сделал?

EDIT2: Что-нибудь об Эйгене? Когда я ищу внешние библиотеки, они всегда говорят об Eigen и их скорости. Я сделал что-то не так? Вот простой код с использованием Eigen, который показывает, что это не так быстро. Может быть, мне не хватает какой-то конфигурации или какого-то флага, или...

MatrixXd A = MatrixXd::Random(1000, 1000);
MatrixXd B = MatrixXd::Random(1000, 500);
MatrixXd X;

Этот код составляет около 0,9 секунды.


person min.yong.yoon    schedule 26.09.2012    source источник
comment
Вы скомпилировали все тесты в режиме релиза?   -  person Denis Ermolin    schedule 26.09.2012
comment
Вас может раздражать, что Matlab превосходит ваш код, но я, активно использующий Matlab, нахожу это очень удовлетворительным. У меня не так много конкретных советов для вас, но ключом к повышению производительности этого типа кода часто является оптимальное (или, по крайней мере, очень хорошее) использование иерархии памяти на современных процессорах. Еще один фактор, который следует учитывать, заключается в том, что большая часть основных функций Matlab теперь многопоточная для выполнения на многоядерных процессорах, мне не ясно, является ли какой-либо ваш собственный код многопоточным; это, вероятно, будет иметь некоторое влияние на производительность.   -  person High Performance Mark    schedule 26.09.2012
comment
Я пока не знаю, как помочь вам сделать ваш код C/C++ быстрее (ваш код больше похож на C, чем на C++. Доказательства: malloc), но я вижу, как вы можете сделать свой код Matlab быстрее: устраните sqrt. Даны два неотрицательных числа a и b, sqrt(a)›sqrt(b) ⇔ a›b.   -  person David Hammen    schedule 26.09.2012
comment
Денис Ермолин, да, В режиме отладки около 2,5 секунд. High Performance Mark, вы правы, когда работа с Matlab доставляет удовольствие, но теперь мне нужно сделать реальную реализацию кода прототипа Matlab. Дэвид Хэммен, я знаю это. Если вы видите код C++, я избегал sqrt. Я также пытался избежать умножения * умножения, используя расстояние + = абс (умножение). Результат? Хуже. Около 0,8 секунды.   -  person min.yong.yoon    schedule 26.09.2012
comment
Да, malloc — пережиток прошлого, лол. Но это нормально для производительности, верно?   -  person min.yong.yoon    schedule 26.09.2012
comment
@min.yong.yoon — Даже если бы замена distance += multiply*multiply на `distance+=abs(multiply)` была бы быстрее, вам, вероятно, не следует этого делать. Что вы делаете с этим изменением, так это меняете метрику с евклидовой нормы на норму такси. Если вам нужна евклидова норма, это то, что вам нужно вычислить.   -  person David Hammen    schedule 26.09.2012
comment
Да. Просто пробую разные вещи...   -  person min.yong.yoon    schedule 26.09.2012
comment
Скорее всего, Matlab использует быстрый алгоритм умножения матриц, такой как алгоритм Штрассена, который может быть значительным преимуществом для больших матриц.   -  person Vaughn Cato    schedule 26.09.2012
comment
Вот связанный с этим вопрос: stackoverflow.com/questions/6058139/   -  person Vaughn Cato    schedule 26.09.2012
comment
Спасибо, я видел эту ссылку, но там нет подсказки по оптимизации для C++.   -  person min.yong.yoon    schedule 26.09.2012
comment
Я думаю, у вас проблемы с использованием Eigen. Я попробовал небольшой тест. Я публикую ответ в связанном с ним вопрос   -  person remi    schedule 30.09.2012
comment
Я сравнил ваш собственный код (до кода MatrixXd d = ..., он занимает 0,62 с с использованием матрицы int, 0,81 с использованием двойной матрицы (MatrixXd) и 0,59 с использованием MatrixXf. Возможно, все еще дольше, чем matlabe, но собственный код однопоточный.   -  person remi    schedule 30.09.2012


Ответы (3)


Как вы заметили, в вашем коде преобладает матричное произведение, представляющее примерно 2,8e9 арифметических операций. Yopu говорит, что Matlab (или, скорее, высокооптимизированный MKL) вычисляет его примерно за 0,05 с. Это представляет собой скорость 57 GFLOPS, показывающую, что он использует не только векторизацию, но и многопоточность. С Eigen вы можете включить многопоточность, компилируя с включенным OpenMP (-fopenmp с gcc). На моем 5-летнем компьютере (2,66 ГГц Core2) с использованием чисел с плавающей запятой и 4 потоков ваш продукт занимает около 0,053 с, а без OpenMP - 0,16 с, поэтому должно быть что-то не так с вашими флагами компиляции. Подводя итог, чтобы получить лучшее от Eigen:

  • скомпилировать в 64-битном режиме
  • использовать поплавки (удвоения в два раза медленнее из-за векторизации)
  • включить OpenMP
  • если у вашего процессора есть гиперпоточность, то либо отключите ее, либо определите в переменной окружения OMP_NUM_THREADS количество физических ядер (это очень важно, иначе производительность будет очень плохой!)
  • если у вас есть другая работающая задача, было бы неплохо уменьшить OMP_NUM_THREADS до nb_cores-1
  • используйте самый последний компилятор, который вы можете, GCC, clang и ICC лучше всего, MSVC обычно медленнее.
person ggael    schedule 22.08.2013

Одна вещь, которая определенно мешает вам в вашем коде C++, заключается в том, что он имеет множество преобразований char в int. Под нагрузкой я имею в виду до 2 * 2782 * 4000 * 128 преобразований char в int. Эти преобразования char в int медленные, очень медленные.

Вы можете уменьшить это число до (2782+4000)*128 таких преобразований, выделив пару массивов int, один 2782*128, а другой 4000*128, чтобы содержать преобразованное в целое содержимое ваших массивов char* a и char* b. Работайте с этими массивами int*, а не с массивами char*.

Другая проблема может заключаться в том, что вы используете int вместо long. Я не работаю с окнами, поэтому это может быть неприменимо. На машинах, на которых я работаю, int составляет 32 бита, а long теперь 64 бита. 32 бита более чем достаточно, потому что 255*255*128 ‹ 256*256*128 = 223.

Очевидно, что проблема не в этом.

Что поразительно, так это то, что рассматриваемый код не вычисляет тот огромный массив 2728 на 4000, который создает код Matlab. Что еще более поразительно, так это то, что Matlab, скорее всего, делает это с числами типа double, а не с целыми числами, и он по-прежнему превосходит код C/C++.

Одна большая проблема - кеш. Этот массив 4000 * 128 слишком велик для кеша уровня 1, и вы выполняете итерацию по этому большому массиву 2782 раза. Ваш код слишком много ожидает в памяти. Чтобы преодолеть эту проблему, работайте с меньшими фрагментами массива b, чтобы ваш код работал с кешем уровня 1 как можно дольше.

Еще одна проблема — оптимизация if (distance>min_distance) break;. Я подозреваю, что это на самом деле дезоптимизация. Наличие if тестов внутри самого внутреннего цикла часто является плохой идеей. Взорвите этот внутренний продукт как можно быстрее. Если не считать напрасных вычислений, избавление от этого теста не принесет вреда. Иногда лучше произвести кажущиеся ненужными вычисления, если это может удалить ветвь в самом внутреннем цикле. Это один из таких случаев. Возможно, вы сможете решить свою проблему, просто исключив этот тест. Попробуйте сделать это.

Возвращаясь к проблеме кеша, вам нужно избавиться от этой ветки, чтобы вы могли разделить операции над матрицей a и b на более мелкие куски, куски не более 256 строк за раз. Именно столько строк из 128 беззнаковых символов помещается в один из двух кэшей L1 современных чипов Intel. Поскольку 250 делит 4000, попробуйте логически разбить эту матрицу b на 16 фрагментов. Вы вполне можете захотеть сформировать этот большой массив внутренних продуктов 2872 на 4000, но делать это небольшими порциями. Вы можете добавить это if (distance>min_distance) break; обратно, но делайте это на уровне фрагментов, а не на уровне байт за байтом.

Вы должны быть в состоянии победить Matlab, потому что он почти наверняка работает с двойными значениями, но вы можете работать с беззнаковыми символами и целыми числами.

person David Hammen    schedule 26.09.2012
comment
Спасибо, Дэвид, но кастинг занимает около 0,001 секунды. Я также пытался поместить массивы a и b в массивы int*, и производительность была хуже. - person min.yong.yoon; 26.09.2012
comment
Вы бросаете, несмотря ни на что, и эти забросы медленные. Эта строка multiply = *dataPtr++-*vocPtr++; включает два приведения типа unsigned char к типу int. Результаты *dataPtr и *vocPtr преобразуются в целые числа, а затем вычитаются. Вы сделали что-то не так, если сделали слепки раньше времени и ухудшили показатели. - person David Hammen; 26.09.2012
comment
Хорошо, я добавил новый ответ, выполняя предварительное приведение в массивы. Изменить: я не могу добавить новый ответ, я редактирую исходный пост. Думаю будет слишком много... - person min.yong.yoon; 26.09.2012
comment
Да, я также использовал int. Новый код использует int. Производительность такая же. Моя интуиция подсказывает, что это проблема кэширования, я делаю слишком много обращений к памяти. Я должен избегать некоторых обращений к памяти, но я не могу этого сделать. - person min.yong.yoon; 26.09.2012
comment
Преобразования char в int не очень медленные. Возможно, это одна дополнительная инструкция, но лучшая плотность данных в кэше более чем компенсирует преобразование, требуя меньшего количества обращений к памяти. - person Ben Voigt; 26.09.2012
comment
@BenVoigt да, я также сделал цикл в начале функции, чтобы массивы имели тип int. Как я уже говорил, это занимает больше времени, примерно на 0,1 секунды больше, в то время как циклы литья составляют 0,001 секунды, так что это бесполезно... - person min.yong.yoon; 26.09.2012
comment
@min.yong.yoon — Преобразование unsigned char в int, очевидно, не проблема. Я обновил свой ответ. - person David Hammen; 26.09.2012
comment
@DavidHammen - Спасибо, чувак. Сначала я отключил прерывание if (distance›min_distance); Тогда время потребления составляет 1,1 секунды. Хорошо, затем я делаю цикл, который говорит for (int j=0; j‹b_size; ++j) to for (int j=0; j‹256; ++j) . В этой ситуации время потребления составляет 0,0675795 с, то есть примерно 4000/16. Поэтому я не вижу преимуществ в этом. Я знаю, что после этого я должен выполнять те же операции, пока не достигну 4000, но если только одно время из 16 не улучшает скорость ... Я также изменил цикл для матрицы a на цикл только 256. Время: 0,00627586 с. это в 10 раз быстрее, но я зацикливаюсь в 10 раз меньше. - person min.yong.yoon; 27.09.2012

Умножение матриц обычно использует наихудший возможный шаблон доступа к кэшу для одной из двух матриц, и решение состоит в том, чтобы транспонировать одну из матриц и использовать специальный алгоритм умножения, который работает с данными, хранящимися таким образом.

Ваша матрица уже хранится в транспонированном виде. Перенося его в нормальный порядок, а затем используя умножение обычной матрицы, вы абсолютно убиваете производительность.

Напишите свой собственный цикл умножения матрицы, который инвертирует порядок индексов во вторую матрицу (что приводит к ее транспонированию, фактически ничего не перемещая и не нарушая поведение кэша). И передайте вашему компилятору любые параметры, которые у него есть для включения автоматической векторизации.

person Ben Voigt    schedule 26.09.2012
comment
Я думаю, что не понимаю вашу точку зрения. В моей собственной реализации минимального расстояния (первый код) я ничего не транспонирую. В случае кода Eigen мне нужно его транспонировать. - person min.yong.yoon; 26.09.2012
comment
@min.yong.yoon: Ну, вы видели код, сгенерированный компилятором для этого цикла умножения? Используются ли инструкции SSE2? - person Ben Voigt; 26.09.2012
comment
Я попытался активировать инструкции SSE2 в настройках проекта Visul Studio 10, но время потребления такое же, и код, сгенерированный для цикла, также одинаков. - person min.yong.yoon; 26.09.2012
comment
@min.yong.yoon: Я думаю, что компилятор Microsoft немного отстал от времени с автоматической векторизацией. Он поддерживает инструкции SSE, но вы должны использовать их самостоятельно с помощью встроенных функций. Самая последняя версия, входящая в состав Visual Studio 2012, окончательно изменила это. Вы можете протестировать код с помощью G++ или Intel C++, так как оба они могут автоматически векторизоваться. - person Ben Voigt; 26.09.2012
comment
Обновлять. Мой собственный цикл не улучшает скорость с SSE/SSE2, но способ Eigen быстрее. С SSE2, использующим матрицы int, 0,74 секунды, но если я использую матрицы с плавающей запятой, 0,49 секунды. Это улучшение, но оно похоже на мой собственный цикл. Есть еще идеи по этому поводу? - person min.yong.yoon; 27.09.2012
comment
@min.yong.yoon: Большинство SIMD-инструкций для ускорения целочисленной математики — это SSE3 и SSE4, поэтому SSE2 может не ускорить ее. - person Ben Voigt; 27.09.2012