Clojure: группировка слишком медленная (файл с 13 миллионами строк)

Ситуация

У меня есть CSV-файл на 13 миллионов строк, на котором я хочу выполнить логистическую регрессию (инкантер) для каждой группы. Мой файл такой (значения просто образец)

ID Max Probability
1  1   0.5 
1  5   0.6
1  10  0.99
2  1   0.1
2  7   0.95

Итак, я сначала прочитал его с помощью csv-reader, все в порядке.

У меня тогда что-то вроде этого:

( {"Id" "1", "Max" 1, "Probability" 0.5} {"Id" "1", "Max" 5, "Probability" 0.6} etc.

Я хочу сгруппировать эти значения по идентификатору. Если я правильно помню, существует около 1,2 миллиона идентификаторов. (Я сделал это на Python с пандами, и это очень быстро)

Это моя функция для чтения и форматирования файла (она отлично работает с небольшими наборами данных):

  (defn read-file
  []
    (let [path (:path-file @config)
          content-csv (take-csv path \,)]
      (->> (group-by :Id content-csv)
           (map (fn [[k v]]
                [k {:x (mapv :Max v) :y (mapv :Probability v)}]))
           (into {}))))

Я хочу, наконец, иметь что-то подобное для выполнения логистической регрессии (я гибко отношусь к этому, мне не нужны векторы для :x и :y , последовательности в порядке)

{"1" {:x [1 5 10] :y [0.5 0.6 0.99]} "2" {:x [1 7] :y [0.1 0.95]} etc.

Проблема

У меня проблемы с группировкой. Я попробовал это отдельно на выходе из CSV, и это длится вечно, когда он не умирает из-за памяти Java Heap Space. Я думал, что проблема связана с моей картой, но это group-by.

Я думал об использовании сокращения или сокращения-kv, но я не знаю, как использовать эти функции для таких целей.

Меня не волнует порядок ":x" и ":y" (поскольку они одинаковы между собой, я имею в виду, что x и y имеют одинаковый индекс ... не проблема, потому что они находятся на одном и том же строка) ну и ИДы на конечный результат и я читал, что сгруппировать по порядку. Может быть, это то, что дорого для операции?

Я даю вам образцы данных, если кто-то сталкивался с этим:

(def sample '({"Id" "1" "Max" 1 "Probability" 0.5} {"Id" "1" "Max" 5 "Probability" 0.6} {"Id" "1" "Max" 10 "Probability" 0.99} {"Id" "2" "Max" 1 "Probability" 0.1} {"Id" "2" "Max" 7 "Probability" 0.95}))

Другие варианты

У меня есть другие идеи, но я не уверен, что они совместимы с Clojure.

  • В Python из-за характера функции и из-за того, что файл уже упорядочен, вместо использования группировки я написал в фрейме данных начальный и конечный индексы для каждой группы, так что мне просто нужно было выбрать непосредственно вложенную вкладку данных.

  • Я также могу загрузить список идентификаторов вместо того, чтобы вычислять его из Clojure. Нравиться

    (def ids '("1" "2" и т.д.

Итак, возможно, можно начать с:

{"1" {:x [] :y []} "2" {:x [] :y []} etc.

из предыдущей последовательности, а затем сопоставьте большой файл с каждым идентификатором.

Я не знаю, действительно ли это эффективнее.

У меня есть все остальные функции для логистической регрессии, только этой части мне не хватает! Спасибо !

ИЗМЕНИТЬ

Спасибо за ответы, наконец-то у меня есть это решение.

В моем файле project.clj

 :jvm-opts ["-Xmx13g"])

Код :

(defn data-group->map [group]
  {(:Id (first group))
   {:x (map :Max group)
    :y (map :Probability group)}})


(defn prob-cumsum [data]
  (cag/fmap
    (fn [x]
      (assoc x :y (reductions + (x :y))))
  data))


(defn process-data-splitter [data]
  (->> (partition-by :Id data)
       (map data-group->map)
       (into {})
       (prob-cumsum)))

Я завернул весь свой код, и он работает. Сплит занимает около 5 минут, но мне не нужна мега-скорость. Использование памяти может доходить до всей памяти для чтения файлов, а затем меньше для сигмоида.


person Joseph Yourine    schedule 01.02.2016    source источник
comment
Является ли кардинальность идентификаторов высокой или низкой? Идентификаторы в CSV упорядочены? Если это так, вы можете выполнить группировку во время чтения CSV за один проход.   -  person Daniel Compton    schedule 01.02.2016
comment
Здравствуйте, спасибо за ответ. У меня около 1,2-1,3 миллиона идентификаторов (в 10 раз меньше реальных данных). Файл упорядочен так же, как в моем примере, то есть: первый уровень = ID, второй уровень = Max (вероятность и максимум упорядочены одинаково, потому что они связаны растущей кривой). Так что, возможно, ваша идея хороша, но я все еще не знаю, как это сделать. Является ли петля хорошей идеей? Я думаю, что это не дает преимуществ многопроцессорности. Я попробую что-нибудь со слиянием, сначала переформатировав данные.   -  person Joseph Yourine    schedule 01.02.2016


Ответы (1)


если ваш файл отсортирован по идентификатору, вы можете использовать partition-by вместо group-by.

тогда ваш код будет выглядеть так:

(defn data-group->map [group]
  [(:Id (first group))
   {:x (mapv :Max group)
    :y (mapv :Probability group)}])

(defn read-file []
  (let [path (:path-file @config)
        content-csv (take-csv path \,)]
    (->> content-csv
         (partition-by :Id)
         (map data-group->map)
         (into {}))))

это должно ускорить его. Тогда вы, вероятно, можете сделать это быстрее, используя преобразователи.

(defn read-file []
  (let [path (:path-file @config)
        content-csv (take-csv path \,)]
    (into {} (comp (partition-by :Id)
                   (map data-group->map))
          content-csv)))

давайте проведем несколько тестов:

сначала сгенерируйте огромные данные, подобные вашим:

(def huge-data
  (doall (mapcat #(repeat 
                     1000000
                     {:Id % :Max 1 :Probability 10})
           (range 10))))

у нас есть набор данных из десяти миллионов элементов, с миллионами {:Id 0 :Max 1 :Probability 10}, миллионами {:Id 1 :Max 1 :Probability 10} и так далее.

теперь функции для тестирования:

(defn process-data-group-by [data]
  (->> (group-by :Id data)
       (map (fn [[k v]]
              [k {:x (mapv :Max v) :y (mapv :Probability v)}]))
       (into {})))

(defn process-data-partition-by [data]
  (->> data
       (partition-by :Id)
       (map data-group->map)
       (into {})))

(defn process-data-transducer [data]
  (into {} (comp (partition-by :Id) (map data-group->map)) data))

а теперь время тестов:

(do (time (dorun (process-data-group-by huge-data)))
    (time (dorun (process-data-partition-by huge-data)))
    (time (dorun (process-data-transducer huge-data))))

"Elapsed time: 3377.167645 msecs"
"Elapsed time: 3707.03448 msecs"
"Elapsed time: 1462.955152 msecs"

Обратите внимание, что partition-by создает ленивую последовательность, в то время как группировка должна реализовывать всю коллекцию. Поэтому, если вам нужна группа данных по группам, а не вся карта, вы можете удалить (into {}) и получить доступ к каждой из них быстрее:

(defn process-data-partition-by [data]
  (->> data
       (partition-by :Id)
       (map data-group->map)))

чек об оплате:

user> (time (def processed-data (process-data-partition-by huge-data)))
"Elapsed time: 0.06079 msecs"
#'user/processed-data
user> (time (let [f (first processed-data)]))
"Elapsed time: 302.200571 msecs"
nil
user> (time (let [f (second processed-data)]))
"Elapsed time: 500.597153 msecs"
nil
user> (time (let [f (last processed-data)]))
"Elapsed time: 2924.588625 msecs"
nil
user.core> (time (let [f (last processed-data)]))
"Elapsed time: 0.037646 msecs"
nil
person leetwinski    schedule 01.02.2016
comment
Здравствуйте, спасибо за ответ. Я попробовал ваше решение с вашими примерами данных, и оно намного быстрее. С моим CSV это очень медленно. Так что, возможно, причина в чтении файла с slurp. Я не знаю, как это решить, но кажется, что группировка не является настоящей проблемой (даже если я узнал лучшее решение из вашего поста). Но проблема в том, что у меня проблема с Java Heap Space при использовании def, странно, потому что у меня 16 Go ram. - person Joseph Yourine; 01.02.2016
comment
Привет! Как вы загружаете и анализируете свой файл css? не могли бы вы обновить свой вопрос? - person leetwinski; 01.02.2016
comment
Проблема с пространством кучи java, вероятно, может быть решена с помощью настройки jvm, установив значение Xmx. stackoverflow.com/questions/14763079/ . Но настоящая проблема может быть связана с тем, что вы сохраняете все загруженные данные (даже ненужные). - person leetwinski; 01.02.2016
comment
Ага, буду разбираться. Я не очень вижу, что там не ленивого, потому что при отдельном применении операции кажутся быстрыми (даже csv-ридер возвращает ленивую последовательность), но когда дело доходит до их оборачивания, где-то возникает проблема. Странно, потому что я работал с очень большими данными из Google Cloud и у меня не было проблем. - person Joseph Yourine; 01.02.2016
comment
Наконец-то сработало, спасибо. Я немного подкорректировал ваш код для своего использования и увеличил пространство кучи java! Отредактировал мой пост. - person Joseph Yourine; 02.02.2016