QtConcurrent::map не дает никаких преимуществ

Я хочу манипулировать QVector с помощью функции QtConcurrent::map. Все, что делает мой пример программы, это увеличивает все значения в QVector на 1.

QVector<double> arr(10000000, 0);
QElapsedTimer timer;
qDebug() << QThreadPool::globalInstance()->maxThreadCount() << "Threads";

int end;
/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
timer.start();
for(int i = 0; i < 100; ++i) {
    std::transform(arr.begin(), arr.end(), arr.begin(), [](double x){ return ++x; });
}
end = timer.elapsed();
qDebug() << end;
/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
timer.start();
for(int i = 0; i < 100; ++i) {
    std::for_each(arr.begin(), arr.end(), [](double &x){ ++x; });
}
end = timer.elapsed();
qDebug() << end;
/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
timer.start();
for(int i = 0; i < 100; ++i) {
    QFuture<void> qf = QtConcurrent::map(arr.begin(), arr.end(), [](double &x){ ++x; });
    qf.waitForFinished();
}
end = timer.elapsed();
qDebug() << end;

Однако программа выводит

4 Threads
905 // std::transform
886 // std::for_each
876 // QtConcurrent::map

так что в многопоточной версии почти нет выигрыша в скорости. Я проверил, что на самом деле работает 4 потока. Я использовал оптимизацию -O2. Подходит ли для этой ситуации более распространенный подход QThreadPool?

ИЗМЕНИТЬ:

Я попробовал другой метод, используя QtConcurrent::run(). Вот соответствующие части программного кода:

void add1(QVector<double>::iterator first, QVector<double>::iterator last) {
    for(; first != last; ++first) {
        *first += 1;
    }
}

/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
std::for_each(arr.begin(), arr.end(), [](double &x){ ++x; });
/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
QFuture<void> qf[numThreads];
for(int j = 0; j < numThreads; ++j) {
    qf[j] = QtConcurrent::run(add1, arr.begin()+j*n/numThreads, arr.begin()+(j+1)*n/numThreads-1);
}
for(int j = 0; j < numThreads; ++j) {
    qf[j].waitForFinished();
}

Поэтому я вручную распределяю задачу по разным потокам. Но все же я почти не получаю прироста производительности:

181 ms // std::for_each
163 ms // QtConcurrent::run

Что еще здесь не так?


person NullAchtFuffZehn    schedule 24.05.2017    source источник
comment
Почему вы ожидаете ускорения? Вы ждете будущего в каждой итерации цикла.   -  person juanchopanza    schedule 24.05.2017
comment
Я не эксперт в этой области, но я ожидаю, что map() запустит 4 потока, что должно заставить эту строку кода закончиться быстрее, чем функции STL. Или я неправильно понял концепцию этой функции?   -  person NullAchtFuffZehn    schedule 24.05.2017


Ответы (1)


Похоже, что QtConcurrent имеет большие накладные расходы по сравнению с простым использованием примитивов многопоточности C++ и пулов потоков.

template<class T>
struct threaded_queue {
  using lock = std::unique_lock<std::mutex>;
  void push_back( T t ) {
    {
      lock l(m);
      data.push_back(std::move(t));
    }
    cv.notify_one();
  }
  boost::optional<T> pop_front() {
    lock l(m);
    cv.wait(l, [this]{ return abort || !data.empty(); } );
    if (abort) return {};
    auto r = std::move(data.back());
    data.pop_back();
    return std::move(r);
  }
  void terminate() {
    {
      lock l(m);
      abort = true;
      data.clear();
    }
    cv.notify_all();
  }
  ~threaded_queue()
  {
    terminate();
  }
private:
  std::mutex m;
  std::deque<T> data;
  std::condition_variable cv;
  bool abort = false;
};
struct thread_pool {
  thread_pool( std::size_t n = 1 ) { start_thread(n); }
  thread_pool( thread_pool&& ) = delete;
  thread_pool& operator=( thread_pool&& ) = delete;
  ~thread_pool() = default; // or `{ terminate(); }` if you want to abandon some tasks
  template<class F, class R=std::result_of_t<F&()>>
  std::future<R> queue_task( F task ) {
    std::packaged_task<R()> p(std::move(task));
    auto r = p.get_future();
    tasks.push_back( std::move(p) );
    return r;
  }
  template<class F, class R=std::result_of_t<F&()>>
  std::future<R> run_task( F task ) {
    if (threads_active() >= total_threads()) {
      start_thread();
    }
    return queue_task( std::move(task) );
  }
  void terminate() {
    tasks.terminate();
  }
  std::size_t threads_active() const {
    return active;
  }
  std::size_t total_threads() const {
    return threads.size();
  }
  void clear_threads() {
    terminate();
    threads.clear();
  }
  void start_thread( std::size_t n = 1 ) {
    while(n-->0) {
      threads.push_back(
        std::async( std::launch::async,
          [this]{
            while(auto task = tasks.pop_front()) {
              ++active;
              try{
                (*task)();
              } catch(...) {
                --active;
                throw;
              }
              --active;
            }
          }
        )
      );
    }
  }
private:
  std::vector<std::future<void>> threads;
  threaded_queue<std::packaged_task<void()>> tasks;
  std::atomic<std::size_t> active = {};
};

struct my_timer_t {
    std::chrono::high_resolution_clock::time_point first;
    std::chrono::high_resolution_clock::duration duration;

    void start() {
        first = std::chrono::high_resolution_clock::now();
    }
    std::chrono::high_resolution_clock::duration finish() {
        return duration = std::chrono::high_resolution_clock::now()-first;
    }
    unsigned long long ms() const {
        return std::chrono::duration_cast<std::chrono::milliseconds>(duration).count();
    }
};
int main() {
    std::vector<double> arr(1000000, 0);
    my_timer_t timer;

    /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
    timer.start();
    for(int i = 0; i < 100; ++i) {
        std::transform(arr.begin(), arr.end(), arr.begin(), [](double x){ return ++x; });
    }
    timer.finish();
    auto time_transform = timer.ms();
    std::cout << time_transform << "<- std::transform (" << arr[rand()%arr.size()] << ")\n";
    /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
    timer.start();
    for(int i = 0; i < 100; ++i) {
        std::for_each(arr.begin(), arr.end(), [](double &x){ ++x; });
    }
    timer.finish();
    auto time_for_each = timer.ms();
    std::cout << time_for_each << "<- std::for_each (" << arr[rand()%arr.size()] << ")\n";
    /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
    enum { num_threads = 8 };
    thread_pool pool(num_threads);
    timer.start();
    for(int i = 0; i < 100; ++i) {
        std::array< std::future<void>, num_threads > tasks;
        for (int t = 0; t < num_threads; ++t) {
            tasks[t] = pool.run_task([&,t]{
                std::for_each( arr.begin()+(arr.size()/num_threads)*t, arr.begin()+(arr.size()/num_threads)*(t+1), [](double& x){++x;} );
            });
        }
        // std::cout << "loop! -- " << pool.threads_active() << "/" << pool.total_threads() << std::endl;
        for (int t = 0; t < num_threads; ++t)
            tasks[t].wait();
    }
    timer.finish();
    auto time_pool = timer.ms();
    std::cout << time_pool << "<- thread_pool (" << arr[rand()%arr.size()] << ")\n";
}

Живой пример.

Это генерирует:

153<- std::transform (100)
131<- std::for_each (200)
82<- thread_pool (300)

значительное ускорение при использовании простого пула потоков C++11 для разделения задач на 8 направлений. (При разделении задач на 4 части было около 105).

Теперь я использовал тестовый набор в 10 раз меньше вашего, так как время ожидания онлайн-системы истекло, когда моей программе потребовалось так много времени для запуска.

При общении с вашей системой пула потоков возникнут накладные расходы, но мой наивный пул потоков не должен превосходить настоящую библиотеку, подобную этой.

Теперь серьезная проблема заключается в том, что вы можете быть привязаны к памяти ввода-вывода; больше потоков, обращающихся к байтам быстрее, не поможет, если вам всем придется ждать байтов.

person Yakk - Adam Nevraumont    schedule 24.05.2017
comment
Как вы проверили накладные расходы QtConcurrent? Обратите внимание, что вы сгруппировали операцию ++ в пакеты num_threads. Вы можете сделать это и с QtConcurrent. - person m7913d; 24.05.2017
comment
@ m7913d m7913d Это то, что должен делать QtConcurrent; запустить несколько подпотоков для обработки части задачи в зависимости от количества аппаратных потоков. Я просто сделал это вручную. Я получал значительное ускорение по сравнению с for_each. - person Yakk - Adam Nevraumont; 24.05.2017
comment
QtConcurrent распределяет каждую операцию по потоку (с учетом максимального количества одновременных потоков). Он их не группирует. Обратите внимание, что неправильно группировать операции правильно, если они могут занимать разное время. - person m7913d; 24.05.2017
comment
@ m7913d m7913d О, это отстой. - person Yakk - Adam Nevraumont; 24.05.2017
comment
Я ошибался, Qt реализует некоторую группировку, глядя на пользователя и накладные расходы. Он начинается с одной операции на пакет и удваивает ее каждый раз, когда накладные расходы становятся большими по сравнению со временем выполнения пользователем. Следовательно, перед использованием больших партий требуется много испытаний и, следовательно, много накладных расходов. - person m7913d; 24.05.2017
comment
Я не могу воспроизвести ваш результат, вероятно, из-за привязки к вводу-выводу, как вы предложили. @NullAchtFuffZehn У вас работает? - person m7913d; 26.05.2017