Почему не работает Hadoop mapReduce с python, но скрипты работают в командной строке?

Я пытаюсь реализовать простой пример уменьшения карты Hadoop с использованием Cloudera 5.5.0. Шаги отображения и уменьшения должны быть реализованы с использованием Python 2.6.6.

Проблема:

  • Если сценарии выполняются в командной строке unix, они работают отлично и выдают ожидаемый результат.

кошка join2*.txt | ./join3_mapper.py | сортировать | ./join3_reducer.py

  • Но выполнение скриптов как задачи Hadoop ужасно терпит неудачу:

hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar -input /user/cloudera/inputTV/join2_gen*.txt -output /user/cloudera/output_tv -mapper /home/cloudera/join3_mapper.py -reducer / home/cloudera/join3_reducer.py -numReduceTasks 1

16/01/06 12:32:32 INFO mapreduce.Job: Task Id : attempt_1452069211060_0026_r_000000_0, Status : FAILED Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1 at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325) at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538) at org.apache.hadoop.streaming.PipeReducer.close(PipeReducer.java:134) at org.apache.hadoop.io.IOUtils.cleanup(IOUtils.java:244) at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:459) at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:392) at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671) at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)

  • Mapper работает, если команда hadoop выполняется с -numReduceTasks 0, задание hadoop выполняет только шаг карты, завершается успешно, а выходной каталог содержит файлы результатов из шага карты.

  • Я предполагаю, что тогда должно быть что-то не так с шагом уменьшения?

  • Журналы stderr в Hue не показывают ничего важного:

Время загрузки журнала: среда, 06 января, 12:33:10 -08:00 2016 Длина журнала: 222 log4j:WARN Не удалось найти приложения для регистратора (org.apache.hadoop.ipc.Server). log4j:WARN Пожалуйста, правильно инициализируйте систему log4j. log4j:WARN См. http://logging.apache.org/log4j/1.2/faq.html#noconfig для получения дополнительной информации.

Код скриптов: 1-й файл: join3_mapper.py

#!/usr/bin/env python

import sys

for line in sys.stdin:
   line       = line.strip()   #strip out carriage return
   tuple2  = line.split(",")   #split line, into key and value, returns a list

   if len(tuple2) == 2:
      key = tuple2[0]
      value = tuple2[1]
      if value == 'ABC':
         print('%s\t%s' % (key, value) )
      elif value.isdigit():
         print('%s\t%s' % (key, value) ) 

2-й файл: join3_reducer.py

#!/usr/bin/env python
import sys

last_key      = None              #initialize these variables
running_total = 0
abcFound =False;
this_key      = None

# -----------------------------------
# Loop the file
#  --------------------------------
for input_line in sys.stdin:
    input_line = input_line.strip()

    # --------------------------------
    # Get Next Key value pair, splitting at tab
    # --------------------------------
    tuple2 = input_line.split("\t") 

    this_key = tuple2[0]    
    value = tuple2[1]
    if value.isdigit():
        value = int(value) 

    # ---------------------------------
    # Key Check part
    #    if this current key is same 
    #          as the last one Consolidate
    #    otherwise  Emit
    # ---------------------------------
    if last_key == this_key:     
        if value == 'ABC':  # filter for only ABC in TV shows
            abcFound=True;
        else:
            if isinstance(value, (int,long) ): 
                running_total += value   

    else:
        if last_key:         #if this key is different from last key, and the previous 
                             #   (ie last) key is not empy,
                             #   then output 
                             #   the previous <key running-count>
           if abcFound:
              print('%s\t%s' % (last_key, running_total) )
              abcFound=False;

        running_total = value    #reset values
        last_key = this_key

if last_key == this_key:
    print('%s\t%s' % (last_key, running_total) )

Я пробовал разные способы объявления входного файла для команды hadoop, без разницы, без успеха.

Что я делаю неправильно ? Советы, идеи очень ценятся, спасибо


person Marco P.    schedule 06.01.2016    source источник
comment
Разве вам не нужен инструментарий, чтобы иметь возможность запускать jar-файл из командной строки?   -  person Matt Cremeens    schedule 07.01.2016
comment
Кроме того, разве файлы jar не предназначены для программ Java?   -  person Matt Cremeens    schedule 07.01.2016
comment
Я не запускаю jar-файл сам, я выполняю команду hadoop и говорю Hadoop выполнить объявленный jar-файл. Остальные параметры, следующие за путем к библиотеке, являются параметрами, связанными с файлом hadoop-streaming.jar и связанными с выполняемым действием MapReduce. да, файлы jar - это java-программы   -  person Marco P.    schedule 07.01.2016


Ответы (2)


Какой удачный удар, бороться с этим в течение нескольких дней и знать, что он работает:

Поскольку локальное (unix) выполнение

cat join2_gen*.txt | ./join2_mapper.py | sort | ./join2_reducer.py

работал нормально, у меня была идея использовать 1 объединенный входной файл вместо предоставленных 6 входных файлов, поэтому:

cat join2_gen*.txt >> mergedinputFile.txt

hdfs dfs -put mergedInputFile.txt /user/cloudera/input

затем снова выполнить ту же самую команду hadoop, направив ввод в файл mergedInputFile во входной папке --> идеальный результат, никаких проблем, никаких исключений.

Для меня возникает вопрос:

  • Почему он работает с одним объединенным входным файлом, но теперь с предоставлением 6 файлов меньшего размера ?? Без понятия (пока)
person Marco P.    schedule 06.01.2016
comment
У меня такая же проблема! Он отлично работает в последовательном режиме. - person Amir Pournasserian; 18.02.2016

Попробуйте поместить все входные текстовые файлы в один каталог, а затем передать каталог в качестве входных данных. Таким образом, вам не придется объединять все ваши входные файлы.

person alpha    schedule 04.02.2017