JanusGraph, кластер Spark не может подключиться к Cassandra

Я пытаюсь запустить задание Spark в кластере, который создает JanusGraph.

У меня есть экземпляр сервера JanusGraph, Cassandra, ES, работающего на одной машине, только вычисление Spark происходит в кластере. (В принципе, я сделал janusgraph.sh start на машине

Моя конфигурация выглядит следующим образом (x - это IP-адрес машины, на которой я запускаю указанные выше экземпляры):

def getGraph(): JanusGraph = {
    val config = JanusGraphFactory.build()
    config.set("storage.backend", "cassandrathrift")
    config.set("storage.cassandrathrift.keyspace", "jgex")
    config.set("storage.hostname", "x")
    config.set("index.jgex.backend", "elasticsearch")
    config.set("index.jgex.index-name", "jgex")
    config.set("jgex.hostname", "x")
    config.open()
  }

Но когда я делаю spark-submit толстую банку на кластере, я получаю следующее:

    java.lang.IllegalArgumentException: Could not instantiate implementation: org.janusgraph.diskstorage.cassandra.thrift.CassandraThriftStoreManager
    at org.janusgraph.util.system.ConfigurationUtil.instantiate(ConfigurationUtil.java:69)
    at org.janusgraph.diskstorage.Backend.getImplementationClass(Backend.java:477)
    at org.janusgraph.diskstorage.Backend.getStorageManager(Backend.java:409)
    at org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.<init>(GraphDatabaseConfiguration.java:1376)
    at org.janusgraph.core.JanusGraphFactory.open(JanusGraphFactory.java:164)
    at org.janusgraph.core.JanusGraphFactory.open(JanusGraphFactory.java:133)
    at org.janusgraph.core.JanusGraphFactory.open(JanusGraphFactory.java:123)
    at org.janusgraph.core.JanusGraphFactory$Builder.open(JanusGraphFactory.java:264)
    at janus_create$.getGraph(janus_create.scala:66)
    at janus_create$.makePropertiesandIndexes(janus_create.scala:830)
    at janus_create$.main(janus_create.scala:921)
    at janus_create.main(janus_create.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:627)
Caused by: java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at org.janusgraph.util.system.ConfigurationUtil.instantiate(ConfigurationUtil.java:58)
    ... 16 more
Caused by: org.janusgraph.diskstorage.TemporaryBackendException: Temporary failure in storage backend
    at org.janusgraph.diskstorage.cassandra.thrift.CassandraThriftStoreManager.getCassandraPartitioner(CassandraThriftStoreManager.java:219)
    at org.janusgraph.diskstorage.cassandra.thrift.CassandraThriftStoreManager.<init>(CassandraThriftStoreManager.java:198)
    ... 21 more
Caused by: org.apache.thrift.transport.TTransportException: java.net.ConnectException: Connection refused (Connection refused)
    at org.apache.thrift.transport.TSocket.open(TSocket.java:187)
    at org.apache.thrift.transport.TFramedTransport.open(TFramedTransport.java:81)
    at org.janusgraph.diskstorage.cassandra.thrift.thriftpool.CTConnectionFactory.makeRawConnection(CTConnectionFactory.java:110)
    at org.janusgraph.diskstorage.cassandra.thrift.thriftpool.CTConnectionFactory.makeObject(CTConnectionFactory.java:74)
    at org.janusgraph.diskstorage.cassandra.thrift.thriftpool.CTConnectionFactory.makeObject(CTConnectionFactory.java:43)
    at org.apache.commons.pool.impl.GenericKeyedObjectPool.borrowObject(GenericKeyedObjectPool.java:1179)
    at org.janusgraph.diskstorage.cassandra.thrift.CassandraThriftStoreManager.getCassandraPartitioner(CassandraThriftStoreManager.java:216)
    ... 22 more
Caused by: java.net.ConnectException: Connection refused (Connection refused)
    at java.net.PlainSocketImpl.socketConnect(Native Method)
    at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
    at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
    at java.net.Socket.connect(Socket.java:589)
    at org.apache.thrift.transport.TSocket.open(TSocket.java:182)
    ... 28 more

Я пробовал переключаться между cassandra и cassandrathrift, но оба не работали. Кроме того, где мне указать, где работает мой гремлин. Это актуально?


person J.Doe    schedule 12.04.2018    source источник
comment
Используете ли вы предварительно упакованный дистрибутив JanusGraph для одного узла Cassandra или используете автономный узел Cassandra? И чтобы уточнить, у вас есть кластер Spark, работающий на разных машинах, чем Cassandra / Elasticsearch / JanusGraph?   -  person Jason Plurad    schedule 12.04.2018
comment
Привет, извините за поздний ответ. Я понял, что происходит, cassandra и ES не отвечали на удаленные запросы. Я запускал предварительно упакованный дистрибутив JanusGraph, и да, у меня есть кластер Spark, работающий на разных машинах. Требуется ли для удаленной настройки ES / Cassandra особая настройка? Кроме того, как запустить несколько экземпляров сервера Janusgraph? Спасибо.   -  person J.Doe    schedule 20.04.2018
comment
Я пробовал исправить много вещей, думаю, скудная документация, связанная с этим, делает вещи немного сложнее, чем должно быть. Сервер Janusgraph - это, по сути, сервер gremlin, и я могу внести изменения в конфигурацию, чтобы сервер gremlin указывал на правильные экземпляры cassandra и ES. Но как мне настроить мою искровую задачу, чтобы она указывала на правильный сервер gremlin / janusgraph? Заранее спасибо.   -  person J.Doe    schedule 23.04.2018


Ответы (1)


Предварительно упакованный дистрибутив предполагает наличие одного узла localhost для каждого сервера Cassandra, Elasticsearch и Gremlin. Обратите внимание на java.net.ConnectException: Connection refused внизу трассировки стека. Если у вас есть Spark, работающий в удаленном кластере, вам необходимо убедиться, что серверы доступны по адресу, отличному от localhost.

  • Сначала остановите распространение с помощью bin/janusgraph.sh stop
  • Обновите listen_address и rpc_address в $JANUSGRAPH_HOME/conf/cassandra/cassandra.yaml, используя IP-адрес машины (документы Cassandra)

  • Добавьте network.host в $JANUSGRAPH_HOME/elasticsearch/config/elasticsearch.yml, используя IP-адрес машины (Документы Elasticsearch)

  • Обновите host в $JANUSGRAPH_HOME/conf/gremlin-server/gremlin-server.yaml, используя IP-адрес машины (JanusGraph docs)

  • Предполагая, что вы используете конфигурацию сервера Gremlin по умолчанию gremlin-server.yaml, вам необходимо обновить файл свойств в $JANUSGRAPH_HOME/conf/gremlin-server/janusgraph-cassandra-es-server.properties, используя IP-адрес машины. Обновите storage.hostname и index.search.hostname, используя IP-адрес устройства, соответствующий настройкам сервера, указанным выше.

Обновление: похоже, что есть ошибки в свойствах подключения к графу, которые могут способствовать возникновению проблемы:

  • config.set("storage.cassandrathrift.keyspace", "jgex") - должно быть "storage.cassandra.keyspace"
  • config.set("jgex.hostname", "x") - должно быть "index.jgex.hostname"
person Jason Plurad    schedule 23.04.2018
comment
Спасибо, я сделал все, что вы упомянули, и теперь выдает аналогичную ошибку только для ElasticSearch. Кроме того, я хотел спросить, насколько мне известно, искровые задания должны взаимодействовать с сервером Gremlin, а они, в свою очередь, должны взаимодействовать с серверной частью / серверной частью индексации. Но в моем искровом коде мне нужно указать местоположение кассандры и ES. Итак, как искра узнает, где мой гремлин? Нужно ли мне указывать это в моем искровом задании? - person J.Doe; 24.04.2018
comment
См. Обновленный ответ. Ваш код выше подключается напрямую к серверной части хранилища и серверной части индексации, что является допустимым подходом. Если вы хотите подключиться через сервер Gremlin, вам нужно будет использовать Gremlin Driver подход. - person Jason Plurad; 24.04.2018
comment
Спасибо, отлично сработало! Кроме того, если я хотел добавить свойство Array [Double], как мне определить его в Scala? mgmt.makePropertyKey("prop1").dataType(classOf[java.util.Arrays]).make() не работает. - person J.Doe; 07.05.2018