Настройка Apache Pig UDF и outputSchema

Я пытаюсь реализовать функцию UDF для обработки различных исходных/входных файлов. Входные файлы отличаются количеством столбцов. Мое намерение состоит в том, чтобы иметь общую функцию UDF. Каждый запуск скрипта pig обрабатывает один тип входного файла (одинаковое количество записей, разделенных символом «|».

Функция UDF должна считывать все входные записи, разделенные разделителем (|), и создавать один пакет с двумя кортежами на основе некоторых условий, например. ввод (1,2,3,4,5,6) вывод a) {(1,3), (2,4,5,6)} или b) {(2,3,4), (1,5) ,6)}

Я не могу расширить метод outputSchema для обработки создания кортежей разного размера. Невозможно передать дополнительный аргумент методу outputSchema. Невозможно использовать временную переменную, определенную как часть определения класса EvalFunc, поскольку ее значение обнуляется при каждом запуске.

Любой намек? Спасибо

ОБНОВИТЬ:

Я выполняю приведенную ниже команду с помощью GRUNT, предоставляется inputSchema, как вы можете видеть после «AS».

sourceData = foreach sourceData generate com.pig.Data('test.json', *) as (t:(s:(VIN: chararray,Birthdate: chararray), n:(name: chararray,customerId: chararray,Mileage: chararray,Fuel_Consumption: chararray)));

Код UDF здесь...

public Schema outputSchema(Schema input) {

(строка 233) System.out.println("----------------------" + input.getFields().size());

Ошибка:

Pig Stack Trace
---------------
ERROR 1200: java.lang.NullPointerException

Failed to parse: java.lang.NullPointerException
        at org.apache.pig.parser.QueryParserDriver.parse(QueryParserDriver.java:201)
        at org.apache.pig.PigServer$Graph.validateQuery(PigServer.java:1707)
        at org.apache.pig.PigServer$Graph.registerQuery(PigServer.java:1680)
        at org.apache.pig.PigServer.registerQuery(PigServer.java:623)
        at org.apache.pig.tools.grunt.GruntParser.processPig(GruntParser.java:1082)
        at org.apache.pig.tools.pigscript.parser.PigScriptParser.parse(PigScriptParser.java:505)
        at org.apache.pig.tools.grunt.GruntParser.parseStopOnError(GruntParser.java:230)
        at org.apache.pig.tools.grunt.GruntParser.parseStopOnError(GruntParser.java:205)
        at org.apache.pig.tools.grunt.Grunt.run(Grunt.java:66)
        at org.apache.pig.Main.run(Main.java:565)
        at org.apache.pig.Main.main(Main.java:177)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.hadoop.util.RunJar.run(RunJar.java:221)
        at org.apache.hadoop.util.RunJar.main(RunJar.java:136)
Caused by: java.lang.RuntimeException: java.lang.NullPointerException
        at com.mortardata.pig.DataSpliter.outputSchema(DataSpliter.java:306)
        at org.apache.pig.newplan.logical.expression.UserFuncExpression.getFieldSchema(UserFuncExpression.java:244)
        at org.apache.pig.newplan.logical.optimizer.FieldSchemaResetter.execute(SchemaResetter.java:264)
        at org.apache.pig.newplan.logical.expression.AllSameExpressionVisitor.visit(AllSameExpressionVisitor.java:143)
        at org.apache.pig.newplan.logical.expression.UserFuncExpression.accept(UserFuncExpression.java:113)
        at org.apache.pig.newplan.ReverseDependencyOrderWalker.walk(ReverseDependencyOrderWalker.java:70)
        at org.apache.pig.newplan.PlanVisitor.visit(PlanVisitor.java:52)
        at org.apache.pig.newplan.logical.optimizer.SchemaResetter.visitAll(SchemaResetter.java:67)
        at org.apache.pig.newplan.logical.optimizer.SchemaResetter.visit(SchemaResetter.java:122)
        at org.apache.pig.newplan.logical.relational.LOGenerate.accept(LOGenerate.java:245)
        at org.apache.pig.newplan.DependencyOrderWalker.walk(DependencyOrderWalker.java:75)
        at org.apache.pig.newplan.logical.optimizer.SchemaResetter.visit(SchemaResetter.java:114)
        at org.apache.pig.parser.LogicalPlanBuilder.buildForeachOp(LogicalPlanBuilder.java:1055)
        at org.apache.pig.parser.LogicalPlanGenerator.foreach_clause(LogicalPlanGenerator.java:15896)
        at org.apache.pig.parser.LogicalPlanGenerator.op_clause(LogicalPlanGenerator.java:1933)

        at org.apache.pig.parser.LogicalPlanGenerator.statement(LogicalPlanGenerator.java:560)
        at org.apache.pig.parser.LogicalPlanGenerator.query(LogicalPlanGenerator.java:421)
        at org.apache.pig.parser.QueryParserDriver.parse(QueryParserDriver.java:191)
        ... 16 more
Caused by: java.lang.NullPointerException
        at com.mortardata.pig.DataSpliter.outputSchema(DataSpliter.java:233)
        ... 34 more
================================================================================

ОБНОВЛЕНИЕ2:

хорошо, входная схема распространяется из предыдущей команды свиньи...

sourceData = загрузить 'test.csv', используя PigStorage(',') как (VIN: chararray, Дата рождения: chararray, имя: chararray, customerId: chararray, Пробег: chararray, Fuel_Consumption: chararray);

sourceData = foreach sourceData генерирует com.pig.Data'test_data_desc.json', *) as (t:(s:(VIN: chararray,Дата рождения: chararray), n:(имя: chararray,customerId: chararray,Пробег: chararray, Fuel_Consumment: chararray)));

Что бесполезно -( потому что невозможно распространять какие-либо дополнительные атрибуты или невозможно создать какую-либо другую более сложную логику внутри метода outputSchema ;-(


person heap    schedule 14.12.2015    source источник


Ответы (1)


В функции outputSchema вы можете получить доступ к входной схеме и использовать информацию о входной схеме для динамического создания выходной схемы на основе ввода (если ввод каким-то образом отражает ожидаемый вывод). Пример:

  public Schema outputSchema(Schema input) {
    Schema mySchema = new Schema();
    if (input.getFields().size() == 3) {
      mySchema.add(new Schema.FieldSchema("data1", DataType.DOUBLE));
      mySchema.add(new Schema.FieldSchema("data2", DataType.DOUBLE));
      mySchema.add(new Schema.FieldSchema("data3", DataType.DOUBLE));
    } else {
      mySchema.add(new Schema.FieldSchema("data", DataType.CHARARRAY));
    }
    return mySchema;
  }

Надеюсь, это поможет.

person kecso    schedule 16.12.2015