Пользовательская функция агрегатора до Spark 1.5

Я новичок в Spark, и мне интересно, как сделать что-то, что довольно просто сделать с помощью Cascading framework.

Предположим, у меня есть следующий набор данных:

<date> <cpt_id> <mesure_type> <value>
20160603093021556 cpt1 idx1 11
20160603093021556 cpt1 idx2 22
20160603093021556 cpt1 idx3 33
20160603093021556 cpt1 idx4 44
20160603113021556 cpt2 idx1 09
20160603113021556 cpt2 idx2 45
20160603113021556 cpt2 idx3 66
20160603193021556 cpt1 idx1 13
20160603193021556 cpt1 idx2 25
20160603193021556 cpt1 idx3 33
20160603193021556 cpt1 idx4 44

и я хочу агрегировать это, чтобы получить следующие результаты (своего рода денормализация):

<date> <cpt_id> <idx1> <idx2> <idx3> <idx4>
20160603093021556 cpt1 11 22 33 44
20160603113021556 cpt2 09 45 66 null
20160603193021556 cpt1 13 25 33 44

При каскадировании я бы использовал GroupBy (с датой и идентификатором cpt в качестве ключей группировки) и буфер Every для создания денормализованных кортежей.

Со Spark кажется, что потребуется функция определяемого пользователем агрегатора, но она доступна только начиная с Spark 1.5 (и 1.3.1 доступна в моем кластере Yarn).

Я не понимаю, как сделать такой процесс с API 1.3.1.

Спасибо за вашу помощь и предложение


person rgirodon    schedule 03.06.2016    source источник


Ответы (1)


Мне удалось сделать это с помощью следующего процесса:

1- Группировать строки по составному ключу (дата, cpt_id) В итоге получаю набор данных JavaPairRDD>

2- Примените преобразование карты к этому набору данных, выполнив «денормализацию» в функции, переданной в качестве аргумента для карты.

Вот мой код:

@Test
public void testCustomAggregator2() {

    DataFrame df = sqlContext.load("src/test/resources/index.json", "json").select("date_mesure", "compteur_id", "type_mesure", "value");

    JavaRDD<Row> rows = df.javaRDD();

    JavaPairRDD<IndexKey, Iterable<Row>> groupedIndex = rows.groupBy(new Function<Row, IndexKey>() {
        @Override
        public IndexKey call(Row row) throws Exception {
            return new IndexKey(row.getString(0), row.getString(1));
        }
    });

    JavaRDD<Row> computedRows = groupedIndex.map(new Function<Tuple2<IndexKey, Iterable<Row>>, Row>() {

        @Override
        public Row call(Tuple2<IndexKey, Iterable<Row>> indexKeyIterableTuple2) throws Exception {

            Row result = null;

            IndexKey key = indexKeyIterableTuple2._1;

            Iterable<Row> rowsForKey = indexKeyIterableTuple2._2;

            String idx1 = null;

            String idx2 = null;

            String idx3 = null;

            for (Row rowForKey : rowsForKey) {

                String typeMesure = rowForKey.getString(2);

                String value = rowForKey.getString(3);

                switch(typeMesure) {

                    case "idx1" :
                        idx1 = value;
                        break;

                    case "idx2" :
                        idx2 = value;
                        break;

                    case "idx3" :
                        idx3 = value;
                        break;

                    default :
                        break;
                }
            }

            result = RowFactory.create(key.getDateMesure(),
                                        key.getCompteurId(),
                                        idx1,
                                        idx2,
                                        idx3);

            return result;
        }
    });

    List<Row> resultRows = computedRows.collect();

    boolean found = false;

    for (Row resultRow : resultRows) {

        String dateMesure = resultRow.getString(0);

        String compteurId = resultRow.getString(1);

        if ("20160603093021556".equals(dateMesure)
                && "cpt1".equals(compteurId)) {

            found = true;

            String idx1 = resultRow.getString(2);
            String idx2 = resultRow.getString(3);
            String idx3 = resultRow.getString(4);

            Assert.assertEquals("11", idx1);
            Assert.assertEquals("22", idx2);
            Assert.assertEquals("33", idx3);
        }
    }

    if (!found) {

        Assert.fail("Ligne d'index non trouvée");
    }
}

Надеюсь, это поможет, и если кто-то увидит что-то не так в коде, пожалуйста, дайте мне знать.

Как я уже сказал, я новичок в Spark и с нетерпением жду саморазвития.

person rgirodon    schedule 03.06.2016