Не удалось объединить два файла с одним ключом через каскадирование

Давайте посмотрим, что у нас есть. Первый файл [Класс интерфейса]:

list arrayList
list linkedList

Второй файл [количество класса 1]:

arrayList 120
linkedList 4

Я хотел бы присоединиться к этим двум файлам по ключу [класс] и получить количество для каждого интерфейса:

list arraylist 120
list linkedlist 4

Код:

public class Main
{
public static void main( String[] args )
{
String docPath = args[ 0 ];
String wcPath = args[ 1 ];
String doc2Path = args[ 2 ];

Properties properties = new Properties();
AppProps.setApplicationJarClass( properties, Main.class );
AppProps.setApplicationName( properties, "Part 1" );
AppProps.addApplicationTag( properties, "lets:do:it" );
AppProps.addApplicationTag( properties, "technology:Cascading" );
FlowConnector flowConnector = new Hadoop2MR1FlowConnector( properties );

// create source and sink taps
Tap wcTap = new Hfs(new TextDelimited(true, ","), wcPath);


    Fields classInterfaceFiles = new Fields("interface", "class");
    Tap classInterfaceTap = new Hfs(new TextDelimited(classInterfaceFiles, true, ","), docPath);

    Fields classAmountFields = new Fields("class1", "amount");
    Tap classAmountFileTap = new Hfs(new TextDelimited(classAmountFields, true, ","), doc2Path);

    Tap outTap = new MultiSinkTap(); // just saying, create your own tap
    Pipe classInterfaceFilePipe = new Pipe("classInterfaceFilePipe");

    Pipe classIAmountFilePipe = new Pipe("classIAmountFilePipe");

    Fields groupFields = new Fields("class");
    Fields groupFields1 = new Fields("class1"); // fields used as joining keys
    Pipe outPipe = new CoGroup(classInterfaceFilePipe, groupFields, classIAmountFilePipe, groupFields1, new InnerJoin());


 // build flow definition
    FlowDef flowDef = FlowDef.flowDef().setName("myFlow")
            .addSource(classInterfaceFilePipe, classInterfaceTap)
            .addSource(classIAmountFilePipe, classAmountFileTap)
            .addTailSink(outPipe, wcTap);
         //   .addTailSink( outPipe, wcTap );


// write a DOT file and run the flow
Flow wcFlow = flowConnector.connect( flowDef );
wcFlow.writeDOT( "dot/wc.dot" );
wcFlow.complete();
}

}

[это один шаг большой задачи]


person user3650408    schedule 22.06.2016    source источник


Ответы (1)


Это происходит потому, что у вас есть одно и то же поле в двух каналах, которые соединены вместе, то есть «класс». Вероятно, вы можете переименовать их в «class_interface» и «class_amount». Вам также придется внести изменения в groupFields, которые вы использовали в канале CoGroup.

person Amit    schedule 22.06.2016
comment
с вашим предложением и некоторым кодом улучшения работает хорошо. Поэтому я дал обновленный код, и в вопросе есть рабочий код. - person user3650408; 22.06.2016