Как преобразовать поток данных с массивом json в поток данных отдельных элементов массива

У меня есть поток данных [ObjectNode], который я читал как десериализованный json из темы kafka. Один из элементов этого ObjectNode - это фактически массив событий. Этот массив имеет разную длину. Входящий поток json выглядит так:

{
    "eventType": "Impression",
    "deviceId": "359849094258487",
    "payload": {
        "vertical_name": "",
        "promo_layout_type": "aa",
        "Customer_Id": "1011851",
        "ecommerce": {
            "promoView": {
                "promotions": [{
                    "name": "/-category_icons_all",
                    "id": "300275",
                    "position": "slot_5_1",
                    "creative": "Central/Gift Card/00000001B890D1739913DDA956AB5C79775991EC"
                }, {
                    "name": "/-category_icons_all",
                    "id": "300276",
                    "position": "slot_6_1",
                    "creative": "Lifestyle/Gift Card/00000001B890D1739913DDA956AB5C79775991EC"
                }, {
                    "name": "/-category_icons_all",
                    "id": "413002",
                    "position": "slot_7_1",
                    "creative": "Uber/Deals/00000001B890D1739913DDA956AB5C79775991EC"
                }]
            }
        }
    }
}

Я хочу иметь возможность взорвать массив promotions, чтобы каждый элемент внутри стал отдельным сообщением, которое можно было бы записать в тему приемника kafka. Предоставляет ли Flink функцию разнесения в DataStream и / или API таблиц?

Я попытался выполнить RichFlatMap в этом потоке, чтобы иметь возможность собирать отдельные строки, но это также просто возвращает мне DataStream [Seq [GenericRecord]], как показано ниже:

class PromoMapper(schema: Schema) extends RichFlatMapFunction[node.ObjectNode,Seq[GenericRecord]] {

  override def flatMap(value: ObjectNode, out: Collector[Seq[GenericRecord]]): Unit = {
    val promos = value.get("payload").get("ecommerce").get("promoView").get("promotions").asInstanceOf[Seq[node.ObjectNode]]

    val record = for{promo <- promos} yield {
      val processedRecord: GenericData.Record = new GenericData.Record(schema)
      promo.fieldNames().asScala.foreach(f => processedRecord.put(f,promo.get(f)))
      processedRecord
    }

    out.collect(record)
  }
}

Пожалуйста помоги.


person Faizan Ahmed    schedule 29.05.2019    source источник


Ответы (1)


Использование плоской карты - правильная идея (не знаю, почему вы возились с RichFlatMap, но это мелочь).

Похоже, вы должны вызывать out.collect(processedRecord) для каждого элемента внутри цикла for, а не один раз для Seq, созданного этим циклом.

person David Anderson    schedule 31.05.2019