Реализация декартова соединения в каскадировании

Я хотел бы знать, возможно ли, что мы можем сделать декартово соединение в каскаде. Если кто-нибудь может привести простой наглядный пример, чтобы понять декартово соединение в каскадировании?


person Baskar    schedule 07.04.2016    source источник
comment
Проверьте это сообщение: stackoverflow.com/questions/14681506/   -  person chinglun    schedule 08.04.2016


Ответы (1)


Используйте следующий SubAssembly для декартова соединения:

/**
 * Created by dhruv.pancholi on 16/01/17.
 */
public class CartesianJoin extends SubAssembly {

    public static class CommonFieldAddOperation extends BaseOperation implements Function, Serializable {

        public CommonFieldAddOperation(Fields outputFields) {
            super(outputFields);
        }

        @Override
        public void operate(FlowProcess flowProcess, FunctionCall functionCall) {
            TupleEntry arguments = functionCall.getArguments();

            // Copying the same tuple from input
            Tuple tuple = new Tuple(arguments.getTuple());

            // Adding 1 for joining on this field
            tuple.add(1);

            functionCall.getOutputCollector().add(tuple);
        }
    }

    public CartesianJoin(Pipe leftPipe, Fields leftFields, Pipe rightPipe, Fields rightFields) {

        // Adding 1 at the end of each tuple for joining
        leftPipe = new Each(leftPipe, Fields.ALL, new CommonFieldAddOperation(Fields.merge(leftFields, new Fields("cartesian_common"))), Fields.RESULTS);

        // Adding 1 at the end of each tuple for joining
        rightPipe = new Each(rightPipe, Fields.ALL, new CommonFieldAddOperation(Fields.merge(rightFields, new Fields("cartesian_common_"))), Fields.RESULTS);

        // Joining on the 1 which was added in both the pipes
        Pipe joinPipe = new CoGroup(leftPipe, new Fields("cartesian_common"), rightPipe, new Fields("cartesian_common_"), new InnerJoin());

        // Keeping only the original fields
        joinPipe = new Retain(joinPipe, Fields.merge(leftFields, rightFields));

        // Adding output pipe of the sub-assembly
        setTails(joinPipe);
    }

}

Используйте следующий фрагмент кода в основной функции или везде, где определен поток:

Pipe joinPipe = new CartesianJoin(leftPipe, new Fields("id", "name"), rightPipe, new Fields("id_", "name_"));

левая трубка

id  name
1   dhruv
3   arun

правая трубка

id_ name_
1   dhruv
2   gaj

присоединиться к каналу

id  name    id_ name_
3   arun    2   gaj
3   arun    1   dhruv
1   dhruv   2   gaj
1   dhruv   1   dhruv
person Dhruv Pancholi    schedule 16.01.2017