Я работаю над клиентом Java кластера салата. Он настроен внутри топологии bolt
(Apache Strom). spout
читает данные из kafka
и передает их bolt
. Однако, когда я начинаю свою топологию, я получаю сообщение об ошибке ниже, и программа завершается. Я что-то упускаю? Чем это вызвано?
Трассировки стека
29502 [Тема-17-РекомендацияLettuceBolt-executor [2 2]] ОШИБКА o.a.s.util - Асинхронный цикл завершился! io.lettuce.core.RedisException: не удается получить начальные разделы кластера из начальных URI [RedisURI [host = '127.0.0.1', port = 7001]] в io.lettuce.core.cluster.RedisClusterClient.loadPartitions (RedisClusterClient.java:865 ) ~ [lettuce-core-5.1.7.RELEASE.jar :?] в io.lettuce.core.cluster.RedisClusterClient.initializePartitions (RedisClusterClient.java:819) ~ [lettuce-core-5.1.7.RELEASE.jar: ?] в io.lettuce.core.cluster.RedisClusterClient.connect (RedisClusterClient.java:345) ~ [lettuce-core-5.1.7.RELEASE.jar :?] в com.projectName.indexer.lettuce.LettuceClusterClientProvider.getConnection LettuceClusterClientProvider.java:72) ~ [классы / :?] в com.projectName.indexer.lettuce.LettuceCacheRepopulationHandler.openLettuceConnection (LettuceCacheRepopulationHandler.java:42) ~ [классы /: подготовить (РекомендацияLettuceBolt.java:35) ~ [классы / :?] в org.apache.storm.daemon.executor $ fn__8058 $ fn__8071.inv oke (executor.clj: 795) ~ [storm-core-1.0.2.jar: 1.0.2] в org.apache.storm.util $ async_loop $ fn__624.invoke (util.clj: 482) [storm-core- 1.0.2.jar: 1.0.2] в clojure.lang.AFn.run (AFn.java:22) [clojure-1.7.0.jar :?] в java.base / java.lang.Thread.run (Thread .java: 844) [?:?] Вызвано: io.lettuce.core.RedisConnectionException: невозможно установить соединение с кластером Redis в [RedisURI [host = '127.0.0.1', port = 7001]] в io.lettuce .core.cluster.topology.AsyncConnections.get (AsyncConnections.java:89) ~ [lettuce-core-5.1.7.RELEASE.jar :?] в io.lettuce.core.cluster.topology.ClusterTopologyRefresh.loadViews (ClusterTopologyRefresh. java: 73) ~ [lettuce-core-5.1.7.RELEASE.jar :?] в io.lettuce.core.cluster.RedisClusterClient.doLoadPartitions (RedisClusterClient.java:871) ~ [lettuce-core-5.1.7.RELEASE .jar :?] в io.lettuce.core.cluster.RedisClusterClient.loadPartitions (RedisClusterClient.java:844) ~ [lettuce-core-5.1.7.RELEASE.jar :?] ... еще 9
Входной код
private void init() {
redisUri = RedisURI.Builder
.redis(lettuceConfig.getLettuceClusterHost())
.withPort(lettuceConfig.getLettuceClusterPort())
.withTimeout(Duration.ofMillis(lettuceConfig.getLettuceClusterTimeout()))
.build();
}
public StatefulRedisClusterConnection getConnection() {
if (connection == null || !connection.isOpen()) {
redisClusterClient = RedisClusterClient.create(redisUri);
final ClusterTopologyRefreshOptions topologyRefreshOptions = ClusterTopologyRefreshOptions.builder()
.enablePeriodicRefresh(Duration.ofMinutes(BoltConstants.Lettuce.PERIODIC_REFRESH_TIME_IN_MIN))
.enableAdaptiveRefreshTrigger()
.build();
final ClusterClientOptions clusterClientOptions = ClusterClientOptions.builder()
.autoReconnect(true)
.topologyRefreshOptions(topologyRefreshOptions)
.build();
redisClusterClient.setOptions(clusterClientOptions);
connection = redisClusterClient.connect(SnappyCompressor.wrap(new StringCodec()));
log.info("Connected to Redis client lettuce. lettuce connection is up and running.");
}
return connection;
}
Окружающая обстановка
compile 'io.lettuce:lettuce-core:5.1.7.RELEASE'