Как исправить ошибку «Компонент: [x] подписывается на несуществующий компонент [y]» в топологии Apache Storm Trident

Я только что реализовал функцию Trident DRPC для обработки входящих сообщений, и я пытаюсь сохранить количество обработанных кортежей на заключительном этапе топологии как состояние Trident. Вот моя топология:

topology.newDRPCStream("portfolio")
    .map(parseMapFunction,
        new Fields("portfolioUrn", "portfolioSourceSystem", "portfolioRegion",
            "portfolioTimestamp", "portfolioPayload"))
    .filter(new FilterNull())
    .flatMap(splitMapFunction,
        new Fields("portfolioUrn", "portfolioSourceSystem", "portfolioRegion",
            "portfolioTimestamp", "strategyCode"))
    .parallelismHint(1)
    .shuffle()
    .each(new Fields("strategyCode"), findMongoTradesFunction,
        new Fields("uitid", "id", "sourceSystem", "sourceTransactionTime", "publicationTime",
            "tradeVersion", "urn", "riskViewFrom", "riskViewTo", "authorized"))
    .parallelismHint(10)
    .shuffle()
    .filter(tradeFilterFunction)
    .parallelismHint(150)
    .groupBy(new Fields("uitid"))
    .aggregate(
        new Fields("portfolioUrn", "portfolioTimestamp", "strategyCode", "id", "sourceSystem",
            "sourceTransactionTime", "publicationTime", "tradeVersion", "urn", "riskViewFrom",
            "riskViewTo", "uitid"), reduceAggregateFunction,
        new Fields("portfolioUrn", "portfolioTimestamp", "strategyCode", "id", "sourceSystem",
            "sourceTransactionTime", "publicationTime", "tradeVersion", "urn", "riskViewFrom",
            "riskViewTo"))
    .parallelismHint(200)
    .groupBy(new Fields("portfolioUrn"))
    .persistentAggregate(stateFactory, new Count(), new Fields("count"));

Пока я пытаюсь отправить эту топологию в Storm, у меня возникает эта ошибка:

Exception in thread "main" java.lang.RuntimeException: InvalidTopologyException(msg:Component: [b-4] subscribes from non-existent component [$mastercoord-bg0])
at org.apache.storm.StormSubmitter.submitTopologyAs(StormSubmitter.java:273)
at org.apache.storm.StormSubmitter.submitTopology(StormSubmitter.java:387)
at org.apache.storm.StormSubmitter.submitTopology(StormSubmitter.java:159)
at com.citi.tm.portfolio.tps.PortfolioLauncher.main(PortfolioLauncher.java:34)
Caused by: InvalidTopologyException(msg:Component: [b-4] subscribes from non-existent component [$mastercoord-bg0])
at org.apache.storm.generated.Nimbus$submitTopology_result$submitTopology_resultStandardScheme.read(Nimbus.java:8070)
at org.apache.storm.generated.Nimbus$submitTopology_result$submitTopology_resultStandardScheme.read(Nimbus.java:8047)
at org.apache.storm.generated.Nimbus$submitTopology_result.read(Nimbus.java:7981)
at org.apache.storm.thrift.TServiceClient.receiveBase(TServiceClient.java:86)
at org.apache.storm.generated.Nimbus$Client.recv_submitTopology(Nimbus.java:306)
at org.apache.storm.generated.Nimbus$Client.submitTopology(Nimbus.java:290)
at org.apache.storm.StormSubmitter.submitTopologyInDistributeMode(StormSubmitter.java:326)
at org.apache.storm.StormSubmitter.submitTopologyAs(StormSubmitter.java:260)
... 3 more

Я могу успешно отправить топологию, если удалю последние 2 функции из этой топологии, а именно:

.groupBy(new Fields("portfolioUrn"))
.persistentAggregate(stateFactory, new Count(), new Fields("count"));

После того, как я запустил свою функцию агрегации (aggregate()), я хотел бы сгруппировать кортежи с полем «portfolioUrn» и сохранить счет в mongoDB. Я не понимаю, почему последний раздел groupBy().persistentAggregate() вызывает эту ошибку. Не могли бы вы помочь найти причину?


person gox9    schedule 12.06.2019    source источник


Ответы (1)


После некоторых исследований я нашел эту страницу, которая кажется аналогичный случай у меня. Натан Марц заявляет, что топологии DRPC не поддерживают сохранение разделов (по состоянию на 2013 год), и я считаю, что это то же самое и для моего случая. Я думаю (не полностью проверено), что топологии Storm 1.2.1 DRPC могут вообще не поддерживать сохранение состояния.

person gox9    schedule 02.09.2019