Каскадная фильтрация плохих записей в файле

Я использую пользовательские функции для проверки DQ в Cascading, где я устанавливаю индикатор, на основе которого я, наконец, отфильтрую записи в нужные каналы.

Я написал для него две функции. В приведенном ниже коде поле «A» — это строка, для которой необходимо выполнить проверку на нулевое значение, а поле «B» — это код, для которого необходимо выполнить десятичную проверку. Индикатор Ind устанавливается на основе результата проверки качества, передается и устанавливается внутри функций IndicatorNull/IndicatorDecimal.

Но я столкнулся с ошибкой в ​​​​этом коде. Я не могу передать поля «A»/«Ind» и поля «B»/«Ind» в первый и второй фильтр одной и той же трубы.

Я что-то упустил здесь? Пожалуйста, дайте мне знать, как с этим можно справиться. Спасибо!

Ниже часть кода -

       Scheme inscheme = new TextDelimited(new Fields("A","B","Ind"),",");

        Tap sourceTap = new Hfs(inscheme, infile);
        Tap sinkTap = new Hfs(inscheme, outfile);

        Pipe BooleanPipe = new Pipe ("BooleanPipe");

        Fields findreturnNull = new Fields( "A","Ind" );
        Fields findreturnDecimal = new Fields("B", "Ind" );

        BooleanPipe = new Each( BooleanPipe, findreturnNull, new    
IndicatorNull(findreturnNull), Fields.RESULTS );
        BooleanPipe = new Each( BooleanPipe, findreturnDecimal, new IndicatorDecimal(findreturnDecimal), Fields.RESULTS );

Ниже приведена ошибка, которую я получаю -

Exception in thread "main" cascading.flow.planner.PlannerException: could not build flow from assembly: [[BooleanPipe][first.Boolean.main(Boolean.java:48)] unable to resolve argument selector: [{2}:'B', 'Ind'], with incoming: [{2}:'A', 'Ind']]
    at cascading.flow.planner.FlowPlanner.handleExceptionDuringPlanning(FlowPlanner.java:577)
    at cascading.flow.hadoop.planner.HadoopPlanner.buildFlow(HadoopPlanner.java:286)
    at cascading.flow.hadoop.planner.HadoopPlanner.buildFlow(HadoopPlanner.java:80)
    at cascading.flow.FlowConnector.connect(FlowConnector.java:459)
    at cascading.flow.FlowConnector.connect(FlowConnector.java:450)
    at cascading.flow.FlowConnector.connect(FlowConnector.java:426)
    at cascading.flow.FlowConnector.connect(FlowConnector.java:275)
    at cascading.flow.FlowConnector.connect(FlowConnector.java:220)
    at cascading.flow.FlowConnector.connect(FlowConnector.java:202)
    at first.Boolean.main(Boolean.java:53)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.hadoop.util.RunJar.main(RunJar.java:212)

person user2732748    schedule 16.07.2014    source источник


Ответы (1)


Проблема с параметром Fields.RESULTS.

Если посмотреть на поток:

+----------+-----------------+-----------------+------------------------------------------+
| Command  | Incoming Fields | Outgoing Fields | Reason                                   |
+----------+-----------------+-----------------+------------------------------------------+
|  Input   | "A", "B", "Ind" | "A", "B", "Ind" | Input, TextDelimited                     |
+----------+-----------------+-----------------+------------------------------------------+
| 1st Each | "A", "B", "Ind" |    "A", "Ind"   | Fields.RESULTS will push only Results    | 
|          |                 |                 | fields. Rest will be discarded.          |
+----------+-----------------+-----------------+------------------------------------------+
| 2nd Each |    "A", "Ind"   |      ERROR      | IndicatorDecimal() is looking from Field |
|          |                 |                 | "B" and it does not exists in Pipe.      |
+----------+-----------------+-----------------+------------------------------------------+

Поскольку у вас одинаковые поля ввода и вывода, решение будет Fields.REPLACE.

Ссылка: Наборы полей

person Ambrish    schedule 05.09.2014