Векторизовать точечный продукт со скользящим окном

У меня есть два больших вектора (одинаковой длины), для которых я вычисляю точечный продукт скользящего окна:

import numpy as np

a = np.array([1, 2, 3, 4, 5, 6])
b = np.array([11, 22, 33, 44, 55, 66])

out = np.array(
    [[a[0]*b[0]+a[1]*b[1]+a[2]*b[2]],
     [a[1]*b[1]+a[2]*b[2]+a[3]*b[3]],
     [a[2]*b[2]+a[3]*b[3]+a[4]*b[4]],
     [a[3]*b[3]+a[4]*b[4]+a[5]*b[5]],
    ])

[[154]
 [319]
 [550]
 [847]]

Конечно, я могу вызвать функцию скалярного произведения, но если длина окна / вектора велика, она не так эффективна, как следующий код:

window = 3
result = np.empty([4,1])
result[0] = a[0]*b[0]+a[1]*b[1]+a[2]*b[2]
for i in range(3):
    result[i+1] = result[i]-a[i]*b[i]+a[i+window]*b[i+window]

[[154]
 [319]
 [550]
 [847]]

Здесь мы используем тот факт, что скалярное произведение i+1th аналогично скалярному произведению ith. То есть,

result[i+1] = result[i]-a[i]*b[i]+a[i+window]*b[i+window]

Как я могу преобразовать мой цикл for в векторизованную функцию, чтобы вычисления могли использовать информацию из шага ith, чтобы уменьшить вычислительную избыточность при минимизации необходимого объема памяти.

ОБНОВЛЕНИЕ

Мне действительно понадобились:

import numpy as np

a = np.array([1, 2, 3, 4, 5, 6])
b = np.array([11, 22, 33, 44, 55, 66, 77, 88])

out = np.array(
    [a[0]*b[0]+a[1]*b[1]+a[2]*b[2]+a[3]*b[3]]+a[4]*b[4]]+a[5]*b[5],
     a[0]*b[1]+a[1]*b[2]+a[2]*b[3]+a[3]*b[4]]+a[4]*b[5]]+a[5]*b[6],
     a[0]*b[2]+a[1]*b[3]+a[2]*b[4]+a[3]*b[5]]+a[4]*b[6]]+a[5]*b[7],
    ])

[1001
 1232
 1463]

Таким образом, a будет перемещаться по b, и будут вычислены скалярные произведения.


person slaw    schedule 21.03.2017    source источник


Ответы (3)


Вы можете использовать частичные суммы для сложности O (n):

ps = np.r_[0, np.cumsum(a*b)]
ps[3:]-ps[:-3]
# array([154, 319, 550, 847])

Или вариант, который ближе к исходному циклу for и позволяет избежать очень больших частичных сумм:

k = 3
d = a*b
d[k:] -= d[:-k].copy()
np.cumsum(d)[k-1:]
# array([154, 319, 550, 847])

Обновить в соответствии с обновленным Q.

Теперь это действительно свертка, поэтому решение @Divakar более или менее применимо. Только вы бы сворачивали a[::-1] и b напрямую. Если скорость является проблемой, вы можете попробовать заменить np.convolve на scipy.signal.fftconvolve, что в зависимости от размеров ваших операндов может быть значительно быстрее. Однако для очень маленьких операндов или операндов совершенно разной длины вы можете даже потерять некоторую скорость, поэтому обязательно попробуйте оба метода:

np.convolve(b, a[::-1], 'valid')
scipy.signal.fftconvolve(b, a[::-1], 'valid')
person Paul Panzer    schedule 21.03.2017
comment
Ага. Это можно было бы представить как теоретический материал, который может быть _1 _ :) - person Divakar; 21.03.2017
comment
Спасибо! Это похоже на правильный ответ. Я не знаком с np.r_, но полагаю, что это лучше, чем сначала создать экземпляр пустого массива x = np.empty(len(a)+1), заполнить массив np.cumsum(a*b, out=x[1:]), а затем установить первый элемент в ноль с помощью x[0] = 0. Я прочитал документы на np.r, и он, очевидно, дает правильный результат, но не могли бы вы предоставить немного больше информации, поскольку я уверен, что это будет ново и для других. - person slaw; 21.03.2017
comment
@slaw нет, ваш подход к предварительному распределению лучше с точки зрения производительности, я просто был ленив. Не забудьте также взглянуть на только что добавленный вариант, потому что, если ваши векторы имеют длину 10 ^ 8, может быть важно контролировать размеры частичных сумм. - person Paul Panzer; 21.03.2017
comment
@PaulPanzer спасибо за то, что научил меня таким хорошо продуманным ответом! - person slaw; 21.03.2017
comment
@slaw Спасибо за добрые слова. Я только что исправил небольшую ошибку во втором решении. Назначение на месте d[k:] -= d[:-k] было небезопасным. Так что нужно было добавить .copy(). Не забудьте включить его, если вы используете этот код. - person Paul Panzer; 21.03.2017
comment
@PaulPanzer .copy() отметил. Спасибо! По-прежнему удивлен, что скалярное произведение может быть вычислено в O(n) таким образом, поскольку я бы никогда не заметил его с помощью cumsum. Вы знали об этом раньше? Возможно, другим будет полезна ссылка на соответствующие ресурсы. - person slaw; 21.03.2017
comment
@slaw cumsum по сути является дискретным первообразным. Скользящие суммы соответствуют определенным интегралам. Все, что я делаю, - это дискретная версия вычисления определенного интеграла как разности первообразной, вычисленной в конечных точках интервала интегрирования. - person Paul Panzer; 21.03.2017
comment
Позвольте нам продолжить это обсуждение в чате. - person slaw; 22.03.2017

Подход №1

Используйте np.convolve для поэлементного умножения между двумя входы и с ядром всех единиц и size=3 -

np.convolve(a*b,np.ones(3),'valid')

Подход №2

Поскольку мы просто суммируем элементы в окне, мы также можем использовать _ 4_, вот так -

from scipy.ndimage.filters import uniform_filter1d as unif1d

def uniform_filter(a,W):
    hW = (W-1)//2
    return W*unif1d(a.astype(float),size=W, mode='constant')[hW:-hW]

out = uniform_filter(a*b,W=3)

Бенчмаркинг

Петлевой подход -

def loopy_approach(a,b):
    window = 3
    N = a.size-window+1

    result = np.empty([N,1])
    result[0] = a[0]*b[0]+a[1]*b[1]+a[2]*b[2]
    for i in range(N-1):
        result[i+1] = result[i]-a[i]*b[i]+a[i+window]*b[i+window]
    return result

Сроки и проверка -

In [147]: a = np.random.randint(0,100,(1000))
     ...: b = np.random.randint(0,100,(1000))
     ...: 

In [148]: out0 = loopy_approach(a,b).ravel()
     ...: out1 = np.convolve(a*b,np.ones(3),'valid')
     ...: out2 = uniform_filter(a*b,W=3)
     ...: 

In [149]: np.allclose(out0,out1)
Out[149]: True

In [150]: np.allclose(out0,out2)
Out[150]: True

In [151]: %timeit loopy_approach(a,b)
     ...: %timeit np.convolve(a*b,np.ones(3),'valid')
     ...: %timeit uniform_filter(a*b,W=3)
     ...: 
100 loops, best of 3: 2.27 ms per loop
100000 loops, best of 3: 7 µs per loop
100000 loops, best of 3: 10.2 µs per loop
person Divakar    schedule 21.03.2017
comment
Да, я знаю, что могу это сделать, но БПФ на самом деле O(nlogn), в то время как решение, приведенное выше, на самом деле O(n). Отсюда и просьба векторизовать его вместо цикла for. Опять же, и векторы, и размер окна будут очень длинными, поэтому метод свертки будет примерно на порядок хуже. - person slaw; 21.03.2017
comment
@slaw Я думаю, что вы необязательно вносите сюда вычислительную сложность. Эти свертки реализованы в C / fortran AFAIK и, как таковые, будут не хуже любого векторизованного метода в NumPy / Scipy и т. Д. - person Divakar; 21.03.2017
comment
@Divakar, что такое y во втором подходе, пожалуйста? Кроме того, я думаю, поскольку OP ожидает, что результат будет 2D, ваш первый подход должен быть np.convolve(a*b,np.ones(3),'valid')[:, np.newaxis] - person kmario23; 21.03.2017
comment
@ kmario23 Добавление новой оси здесь тривиально, не думаю, что OP будет возражать против этого. Тем не менее, спасибо, что указали на это! - person Divakar; 21.03.2017
comment
@slaw Добавлены тесты времени выполнения, если они могут убедить вас переосмыслить свои мысли о сложности. - person Divakar; 21.03.2017
comment
@Divakar Я пытаюсь воспроизвести некоторые опубликованные работы, в которых доказана временная сложность, о которой я упоминал выше. Я уже реализовал этот метод с помощью convolve и исследую более быструю альтернативу в своем исходном посте. Длина массива составляет порядка 100 миллионов точек данных с размером окна 400–2000. Таким образом, временная сложность должна иметь значение, и я хотел бы сравнить их. Опять же, у меня уже есть код, который использует convolve, но я хотел бы векторизовать алгоритм O(n), если это возможно для сравнения. Обратите внимание, что в моем случае скалярное произведение фактически находится в двойном вложенном цикле for. - person slaw; 21.03.2017

Еще один подход с использованием шагов:

In [12]: from numpy.lib.stride_tricks import as_strided
In [13]: def using_strides(a, b, w=3):
              shape = a.shape[:-1] + (a.shape[-1] - w + 1, w)
              strides = a.strides + (a.strides[-1],)
              res = np.sum((as_strided(a, shape=shape, strides=strides) * \ 
                            as_strided(b, shape=shape, strides=strides)), axis=1)
              return res[:, np.newaxis]


In [14]: using_strides(a, b, 3)
Out[14]: 
array([[154],
       [319],
       [550],
       [847]])
person kmario23    schedule 21.03.2017
comment
Да, использование шагов - это нормально, но для более длинных векторов (100 миллионов элементов) и больших окон это становится более дорогим из-за количества избыточных вычислений. Временная сложность для версии без избыточности O(n) - person slaw; 21.03.2017