Эффективное перекрестное соединение с агрегацией и фильтром

Согласно заголовку, я хочу выполнить перекрестное соединение с таблицей, которая выполняет функцию агрегирования и фильтрует пару переменных в таблице.

У меня есть данные, похожие на следующие:

library(dplyr)
library(data.table)
library(sqldf)

sales <-  data.frame(salesx = c(3000, 2250,850,1800,1700,560,58,200,965,1525)
                     ,week = seq(from = 1, to = 10, by = 1)
                     ,uplift = c(0.04)
                     ,slope = c(100)
                     ,carryover = c(.35))
spend <- data.frame(spend = seq(from = 1, to = 50000, by = 1))

tempdata <- merge(spend,sales,all=TRUE)
tempdata$singledata <- as.numeric(1) 

И вот пример того, что я пытаюсь выполнить с помощью своего решения на основе sql:

newdata <- sqldf("select a.spend, a.week,
                 sum(case when b.week > a.week
                 then b.salesx*(b.uplift*(1-exp(-(power(b.singledata,b.week-a.week)/b.slope))))/b.spend
                 else 0.0 end) as calc3
                 from tempdata a, tempdata b  
                 where a.spend = b.spend 
                 group by a.spend,a.week")

Это дает желаемые результаты, но это немного медленно, особенно с моим реальным набором данных, состоящим из около 1 миллиона записей. Было бы здорово получить совет: а) как ускорить работу функции sqldf; или б) с использованием более эффективного подхода data.table / dplyr (я не могу разобраться в проблеме перекрестного соединения / агрегации / фильтрации trifecta).

Разъяснение решения для неравномерного соединения ниже:

У меня было несколько вопросов о решении для неэквивалентного соединения - вывод нормальный и очень быстрый. Чтобы понять, как работает код, я разбил его так:

breakdown <- setDT(tempdata)[tempdata, .(spend, uplift, slope,carryover,salesx,  singledata, week, i.week,x.week, i.salesx,x.salesx, x.spend, i.spend), on=.(spend, week > week)]

Исходя из разбивки, чтобы соответствовать первоначальному расчету, она должна быть:

x.salesx*(uplift*(1.0-exp(-(`^`(singledata,x.week-week)/slope))))/i.spend

Причина, по которой это неочевидно, заключается в том, что в примере, который я использовал, «силовая» часть уравнения на самом деле ничего не делала (всегда 1). Фактически используется вычисление (добавление переносимой переменной к данным):

SQL

b.salesx*(b.uplift*(1-exp(-(power((b.singledata*b.carryover),b.week-a.week)/b.slope))))/b.spend (sql)

Мое решение data.table

sum(salesx.y*(uplift.y*(1-exp(-((singledata.y*adstock.y)^(week.y-week.x)/slope.y))))/spend), by=list(spend, week.x)

Однако я не могу заставить это работать с решением неэквивалентного соединения при добавлении переменной «переходящего остатка», т.е.

x.salesx*(uplift*(1.0-exp(-(`^`((singledata*carryover),x.week-week)/slope))))/i.spend

person SlyGrogger    schedule 08.08.2017    source источник
comment
Ты загрузил data.table, а потом ничего не делаешь ??? (вообще лучше разбить процесс на этапы.)   -  person IRTFM    schedule 08.08.2017
comment
Возможно, вы сможете заставить существующий код работать быстрее, если добавите индексы.   -  person G. Grothendieck    schedule 09.08.2017
comment
Возможно, здесь может оказаться полезным неэкви-соединение data.table. Но, пожалуйста, опишите, какова ваша цель / намерение. Могут быть другие подходы, помимо существующего решения, которое вы просите улучшить.   -  person Uwe    schedule 30.08.2017


Ответы (2)


В версии 1.9.8 (от CRAN 25 ноября 2016 г.) были введены data.table неэквивалентные соединения, которые помогают избежать перекрестных соединений, потребляющих много памяти:

library(data.table)
newdata4 <- 
  # coerce to data.table
  setDT(tempdata)[
    # non-equi self-join
    tempdata, on = .(spend, week > week), 
    # compute result
    .(calc3 = sum(salesx*(uplift*(1.0-exp(-(`^`(singledata,week-i.week)/slope))))/i.spend)), 
    # grouped by join parameters
    by = .EACHI][
      # replace NA
      is.na(calc3), calc3 := 0.0][]

# check that results are equal
all.equal(newdata, as.data.frame(newdata4[order(spend, week)]))
[1] TRUE

Контрольный показатель

OP предоставил три разных решения, два sqldf варианта и один data.table подход с использованием перекрестного соединения. Они сравниваются с неэквивалентным соединением.

Код ниже

dt_tempdata <- data.table(tempdata)
microbenchmark::microbenchmark(
  sqldf = {
    newdata <- sqldf("select a.spend, a.week,
                 sum(case when b.week > a.week
                     then b.salesx*(b.uplift*(1-exp(-(power(b.singledata,b.week-a.week)/b.slope))))/b.spend
                     else 0.0 end) as calc3
                     from tempdata a, tempdata b  
                     where a.spend = b.spend 
                     group by a.spend,a.week")
  },
  sqldf_idx = {
    newdata2 <- sqldf(c('create index newindex on tempdata(spend)',
                        'select a.spend, a.week,
                        sum(case when b.week > a.week
                        then b.salesx*(b.uplift*(1-exp(-(power(b.singledata,b.week-a.week)/b.slope))))/b.spend
                        else 0.0 end) as calc3
                        from main.tempdata a left join main.tempdata b  
                        on a.spend = b.spend 
                        group by a.spend,a.week'), dbname = tempfile())
  },
  dt_merge = { 
    newdata3 <- merge(dt_tempdata, dt_tempdata, by="spend", all=TRUE, allow.cartesian=TRUE)[
      week.y > week.x, 
      .(calc3 = sum(salesx.y*(uplift.y*(1-exp(-(singledata.y^(week.y-week.x)/slope.y)))))), 
      by=.(spend, week.x)]
  },
  dt_nonequi = {
    newdata4 <- dt_tempdata[
      dt_tempdata, on = .(spend, week > week), 
      .(calc3 = sum(salesx*(uplift*(1.0-exp(-(`^`(singledata,week-i.week)/slope))))/i.spend)), 
      by = .EACHI][is.na(calc3), calc3 := 0.0]
  },
  times = 3L
)

возвращает эти тайминги:

Unit: seconds
       expr       min        lq      mean    median        uq       max neval cld
      sqldf  9.456110 10.081704 10.647193 10.707299 11.242735 11.778171     3   b
  sqldf_idx 10.980590 11.477774 11.734239 11.974958 12.111064 12.247170     3   b
   dt_merge  3.037857  3.147274  3.192227  3.256692  3.269412  3.282131     3  a 
 dt_nonequi  1.768764  1.776581  1.792359  1.784397  1.804156  1.823916     3  a

Для данного размера проблемы неравномерное соединение является самым быстрым, почти в два раза быстрее, чем подход слияния / перекрестного соединения data.table, и в 6 раз быстрее, чем коды sqldf. Интересно, что создание индекса и / или использование временного файла в моей системе обходится довольно дорого.

Обратите внимание, что я оптимизировал решение OP data.table.

Наконец, все версии, кроме слияния / перекрестного соединения (я воздержался от исправления этой версии), возвращают один и тот же результат.

all.equal(newdata, newdata2) # TRUE
all.equal(newdata, as.data.frame(newdata3[order(spend, week.x)])) # FALSE (last week missing)
all.equal(newdata, as.data.frame(newdata4[order(spend, week)])) # TRUE

Большой размер проблемы

OP сообщил, что решение слияния / перекрестного соединения data.table исчерпывает память для его набора производственных данных из 1 M строк. Чтобы убедиться, что подход non-equi join потребляет меньше памяти, я протестировал его с размером задачи 5 M строк (nrow(tempdata)), что в десять раз больше, чем в предыдущих тестах. На моем компьютере с 8 ГБ памяти запуск без проблем завершился примерно за 18 секунд.

Unit: seconds
       expr      min       lq     mean   median       uq      max neval
 dt_nonequi 18.12387 18.12657 18.23454 18.12927 18.28987 18.45047     3
person Uwe    schedule 30.08.2017
comment
У меня было несколько вопросов по поводу этого решения - я добавил его в конец исходного вопроса, поскольку он слишком длинный для комментария. Спасибо! - person SlyGrogger; 05.09.2017

Наконец-то нашлось время еще раз разобраться в этом:

Мое оригинальное решение:

  system.time(newdata <- sqldf("select a.spend, a.week,
                   sum(case when b.week > a.week
                   then b.salesx*(b.uplift*(1-exp(-(power(b.singledata,b.week-a.week)/b.slope))))/b.spend
                   else 0.0 end) as calc3
                   from tempdata a, tempdata b  
                   where a.spend = b.spend 
                   group by a.spend,a.week"))

   user  system elapsed 
  11.99    3.77   16.11 

С индексом (хотя что-то мне подсказывает, что это не работает должным образом):

system.time(newdata2 <- sqldf(c('create index newindex on tempdata(spend)',
                                    'select a.spend, a.week,
                                    sum(case when b.week > a.week
                                    then b.salesx*(b.uplift*(1-exp(-(power(b.singledata,b.week-a.week)/b.slope))))/b.spend
                                    else 0.0 end) as calc3
                                    from main.tempdata a left join main.tempdata b  
                                    on a.spend = b.spend 
                                    group by a.spend,a.week'), dbname = tempfile()))

   user  system elapsed 
  12.73    2.93   15.76 

Решение Data.table (не возвращает 0 из оператора ifelse в sql):

    datatablefunc <- function(g){
    tempdata2 <- as.data.table(g)
    setkey(tempdata2, spend)
    tempdata3 <- merge(tempdata2, tempdata2, by="spend", all=TRUE, allow.cartesian=TRUE)
    tempdata4 <-  tempdata3[week.y > week.x, sum(salesx.y*(uplift.y*(1-exp(-(singledata.y^(week.y-week.x)/slope.y))))/spend), by=list(spend, week.x)] 
    return(tempdata4)
  }
  system.time(newdata3 <- datatablefunc(tempdata))

   user  system elapsed 
   2.36    0.25    2.62 

Прелесть решения на основе sql заключается в том, что, поскольку временный вывод хранится на сервере sql, а не в памяти, я не сталкиваюсь с надоедливыми проблемами «не могу выделить вектор», что происходит с решениями data.table / dplyr (когда Я добавляю больше данных) ... обратная сторона медали в том, что запускается немного дольше.

person SlyGrogger    schedule 30.08.2017