Сбой перекрестного соединения Hive при присоединении к локальной карте

Есть ли прямой способ устранить следующую ошибку или в целом лучший способ использовать Hive для получения нужного мне соединения? Вывод в сохраненную таблицу не является обязательным требованием, поскольку я могу довольствоваться INSERT OVERWRITE LOCAL DIRECTORY в csv.

Я пытаюсь выполнить следующее перекрестное соединение. ipint — это таблица размером 9 ГБ, а geoiplite — 270 МБ.

CREATE TABLE iplatlong_sample AS
SELECT ipintegers.networkinteger, geoiplite.latitude, geoiplite.longitude
FROM geoiplite
CROSS JOIN ipintegers
WHERE ipintegers.networkinteger >= geoiplite.network_start_integer AND ipintegers.networkinteger <= geoiplite.network_last_integer;

Я использую CROSS JOIN для ipintegers вместо geoiplite, потому что я читал, что по правилу меньшая таблица должна быть слева, а большая справа.

Этапы Map и Reduce завершены на 100% в соответствии с HIVE, но затем

2015-08-01 04:45:36,947 Карта Stage-1 = 100%, уменьшение = 100%, совокупная загрузка ЦП 8767,09 сек.

MapReduce Общее совокупное время процессора: 0 дней 2 часа 26 минут 7 секунд 90 мс

Завершенная работа = job_201508010407_0001

Этап-8 выбирается решателем условий.

Журнал выполнения по адресу: /tmp/myuser/.log

01.08.2015, 04:45:38 Начинаю запускать локальную задачу для обработки присоединения к карте; максимальная память = 12221153280

Выполнение не выполнено со статусом выхода: 3

Получение информации об ошибке

Задача не удалась!

Идентификатор задачи: Этап-8

Журналы:

/tmp/myuser/hive.log

FAILED: ошибка выполнения, код возврата 3 из org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask

Запущено заданий MapReduce: Задание 0: Карта: 38 Уменьшить: 1 Совокупное использование ЦП: 8767,09 с
Чтение HDFS: 9438495086 Запись HDFS: 8575548486 УСПЕШНО

Конфигурация моего улья:

SET hive.mapred.local.mem=40960;
SET hive.exec.parallel=true;
SET hive.exec.compress.output=true;
SET hive.exec.compress.intermediate = true;
SET hive.optimize.skewjoin = true;
SET mapred.compress.map.output=true;
SET hive.stats.autogather=false;

Я менял SET hive.auto.convert.join между true и false, но с тем же результатом.

Вот ошибки в журнале вывода из /tmp/myuser/hive.log

$ tail -12 -f tmp/mysyer/hive.log

2015-08-01 07:30:46,086 ERROR exec.Task (SessionState.java:printError(419)) - Execution failed with exit status: 3
2015-08-01 07:30:46,086 ERROR exec.Task (SessionState.java:printError(419)) - Obtaining error information
2015-08-01 07:30:46,087 ERROR exec.Task (SessionState.java:printError(419)) -
Task failed!
Task ID:
  Stage-8

Logs:

2015-08-01 07:30:46,087 ERROR exec.Task (SessionState.java:printError(419)) - /tmp/myuser/hive.log
2015-08-01 07:30:46,087 ERROR mr.MapredLocalTask (MapredLocalTask.java:execute(268)) - Execution failed with exit status: 3
2015-08-01 07:30:46,094 ERROR ql.Driver (SessionState.java:printError(419)) - FAILED: Execution Error, return code 3 from org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask

Я запускаю клиент куста на Мастере, экземпляре Google Cloud Platform типа n1-highmem-8 (8 ЦП, 52 ГБ), а рабочие — n1-highmem-4 (4 ЦП, 26 ГБ), но я подозреваю, что после MAP и REDUCE это локальное соединение (как подразумевается) происходит на Мастере. Несмотря на это, в bdutils я настроил JAVAOPTS для рабочих узлов (n1-highmem-4) на: n1-highmem-4

РЕДАКТИРОВАНИЕ РЕШЕНИЯ: решение состоит в том, чтобы организовать данные диапазона данных в дерево диапазонов.


person Brett Bonner    schedule 01.08.2015    source источник


Ответы (1)


Я не думаю, что можно выполнить такой перебор с перекрестным соединением - просто умножьте номера строк, это немного выходит из-под контроля. Вам нужны некоторые оптимизации, на которые я не думаю, что Hive еще способен.

Но действительно ли эта проблема может быть решена за O (N1 + N2) время, если вы отсортировали свои данные (что hive может сделать для вас) - вы просто просматриваете оба списка одновременно, на каждом шаге получая целое число ip, проверяя, есть ли какие-либо интервалы начинаются с этого целого числа, добавляются, удаляются те, которые закончились, выдаются совпадающие кортежи и так далее. Псевдокод:

intervals=[]
ipintegers = iterator(ipintegers_sorted_file)
intervals = iterator(intervals_sorted_on_start_file)
for x in ipintegers:
    intervals = [i for i in intervals if i.end >= x]

    while(intervals.current.start<=x):
        intervals.append(intervals.current)
        intervals.next()
    for i in intervals:
        output_match(i, x)

Теперь, если у вас есть внешний сценарий/функция UDF, которая знает, как читать меньшую таблицу и получает целые IP-адреса в качестве входных данных и выдает совпадающие кортежи в качестве выходных данных, вы можете использовать hive и SELECT TRANSFORM для потоковой передачи входных данных.

Или вы, вероятно, можете просто запустить этот алгоритм на локальной машине с двумя входными файлами, потому что это всего лишь O (N), и даже 9 ГБ данных вполне выполнимо.

person letitbee    schedule 01.08.2015
comment
Действительно, это будет мой следующий подход. У меня будут таблицы сортировки кустов по ipinteger и диапазону ip. Верхняя и нижняя границы диапазона должны быть уникальными. Таким образом, скрипт будет читать ipinteger, проверять верхнюю и нижнюю границы диапазона. Если false, следующий интервал. Если true, выдать и начать сравнение с последнего проверенного интервала. - person Brett Bonner; 02.08.2015
comment
ОК, я думаю, что моя реализация этого все еще O (n * m), и на ее завершение уйдут дни. Как бы вы сделали это быстрее? gist.github.com/bbopen/f94e407cef881085599f - person Brett Bonner; 03.08.2015
comment
Конечно, O(n*m) будет длиться вечно. Обновил мой ответ псевдокодом. - person letitbee; 03.08.2015
comment
Чем быстрее я закончил реализацию, тем лучше структурировать данные диапазона в дерево диапазонов. - person Brett Bonner; 04.08.2015