Как я могу отправить каскадное задание в удаленный кластер YARN из Java?

Я знаю, что могу отправить каскадное задание, упаковав его в JAR-файл, как подробно описано в руководстве пользователя каскадирования. Затем это задание будет запущено в моем кластере, если я отправлю его вручную с помощью команды hadoop jar CLI.

Однако в исходной версии Hadoop 1 Cascading можно было отправить задание в кластер, установив определенные свойства в файле Hadoop JobConf. Установка fs.defaultFS и mapred.job.tracker приводит к тому, что локальная библиотека Hadoop автоматически пытается отправить задание в Hadoop1 JobTracker. Однако установка этих свойств, похоже, не работает в более новой версии. Отправка в кластер CDH5 5.2.1 Hadoop с использованием каскадной версии 2.5.3 (в которой CDH5 указана как поддерживаемая платформа) приводит к исключению IPC при согласовании с сервером, как подробно описано ниже.

Я считаю, что эта комбинация платформ — Cascading 2.5.6, Hadoop 2, CDH 5, YARN и API MR1 для отправки — является поддерживаемой комбинацией на основе таблица совместимости (см. в разделе "Предыдущие выпуски"). И отправка задания с использованием hadoop jar отлично работает в этом же кластере. Порт 8031 ​​открыт между отправляющим хостом и ResourceManager. В журналах ResourceManager на стороне сервера обнаружена ошибка с таким же сообщением.

Я использую библиотеку cascading-hadoop2-mr1.

Exception in thread "main" cascading.flow.FlowException: unhandled exception
    at cascading.flow.BaseFlow.complete(BaseFlow.java:894)
    at WordCount.main(WordCount.java:91)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)
Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.RpcServerException): Unknown rpc kind in rpc headerRPC_WRITABLE
    at org.apache.hadoop.ipc.Client.call(Client.java:1411)
    at org.apache.hadoop.ipc.Client.call(Client.java:1364)
    at org.apache.hadoop.ipc.WritableRpcEngine$Invoker.invoke(WritableRpcEngine.java:231)
    at org.apache.hadoop.mapred.$Proxy11.getStagingAreaDir(Unknown Source)
    at org.apache.hadoop.mapred.JobClient.getStagingAreaDir(JobClient.java:1368)
    at org.apache.hadoop.mapreduce.JobSubmissionFiles.getStagingDir(JobSubmissionFiles.java:102)
    at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:982)
    at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:976)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1614)
    at org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.java:976)
    at org.apache.hadoop.mapred.JobClient.submitJob(JobClient.java:950)
    at cascading.flow.hadoop.planner.HadoopFlowStepJob.internalNonBlockingStart(HadoopFlowStepJob.java:105)
    at cascading.flow.planner.FlowStepJob.blockOnJob(FlowStepJob.java:196)
    at cascading.flow.planner.FlowStepJob.start(FlowStepJob.java:149)
    at cascading.flow.planner.FlowStepJob.call(FlowStepJob.java:124)
    at cascading.flow.planner.FlowStepJob.call(FlowStepJob.java:43)
    at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

Ниже приведен демонстрационный код, который в основном идентичен образцу WordCount из руководства пользователя Cascading.

public class WordCount {

    public static void main(String[] args) {
        String inputPath = "/user/vagrant/wordcount/input";
        String outputPath = "/user/vagrant/wordcount/output";

        Scheme sourceScheme = new TextLine( new Fields( "line" ) );
        Tap source = new Hfs( sourceScheme, inputPath );

        Scheme sinkScheme = new TextDelimited( new Fields( "word", "count" ) );
        Tap sink = new Hfs( sinkScheme, outputPath, SinkMode.REPLACE );

        Pipe assembly = new Pipe( "wordcount" );


        String regex = "(?<!\\pL)(?=\\pL)[^ ]*(?<=\\pL)(?!\\pL)";
        Function function = new RegexGenerator( new Fields( "word" ), regex );
        assembly = new Each( assembly, new Fields( "line" ), function );


        assembly = new GroupBy( assembly, new Fields( "word" ) );

        Aggregator count = new Count( new Fields( "count" ) );
        assembly = new Every( assembly, count );

        Properties properties = AppProps.appProps()
            .setName( "word-count-application" )
            .setJarClass( WordCount.class )
            .buildProperties();

        properties.put("fs.defaultFS", "hdfs://192.168.30.101");
        properties.put("mapred.job.tracker", "192.168.30.101:8032");

        FlowConnector flowConnector = new HadoopFlowConnector( properties );
        Flow flow = flowConnector.connect( "word-count", source, sink, assembly );

        flow.complete();
    }
}

Я также попытался установить кучу других свойств, чтобы попытаться заставить его работать:

  • mapreduce.jobtracker.address
  • mapreduce.framework.name
  • yarn.resourcemanager.address
  • yarn.resourcemanager.host
  • yarn.resourcemanager.hostname
  • yarn.resourcemanager.resourcetracker.address

Ничего из этого не сработало, они просто заставляют задание выполняться в локальном режиме (если также не установлено mapred.job.tracker).


person amoe    schedule 22.12.2014    source источник


Ответы (1)


Теперь я решил эту проблему. Это происходит из-за попытки использовать старые классы Hadoop, которые распространяет Cloudera, в частности JobClient. Это произойдет, если вы используете hadoop-core с предоставленной версией 2.5.0-mr1-cdh5.2.1 или зависимость hadoop-client с тем же номером версии. Хотя заявлено, что это версия MR1, и мы используем API MR1 для отправки, эта версия на самом деле поддерживает ТОЛЬКО отправку в Hadoop1 JobTracker и не поддерживает YARN.

Чтобы разрешить отправку в YARN, вы должны использовать зависимость hadoop-client с версией 2.5.0-cdh5.2.1, отличной от MR1, которая по-прежнему поддерживает отправку заданий MR1 в YARN.

person amoe    schedule 23.12.2014