Стратегия событий прослушивателя мутации с удаленным JanusGraph выдает исключение сериализации

Я хотел бы прослушивать мутации на удаленном JanusGraph, но я не могу найти правильную настройку, чтобы он работал.

Стек JanusGraph:
Образ докера JanusGraph ** 0.5.2 (который использует Apache TinkerPop Gremlin 3.4.6) с конфигурацией cql-es
Cassandra образ докера 3.11.6
Образ докера ElasticSearch 7.3.1
Раздел «Сериализаторы» в gremlin-server-cql-es.yaml обновлен следующей строкой :

- { className: org.apache.tinkerpop.gremlin.driver.ser.GryoMessageSerializerV3d0, config: { ioRegistries: [org.janusgraph.graphdb.tinkerpop.JanusGraphIoRegistry, org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerIoRegistryV3d0] }}
- { className: org.apache.tinkerpop.gremlin.driver.ser.GryoMessageSerializerV3d0, config: { serializeResultToString: true }}

Стек клиента Java: на основе pluradj / janusgraph-java-example
Java8
janusgraph-core 0.5.2
gremlin-driver 3.4.6
remote-objects.yaml выглядит как следует:

hosts: [127.0.0.1]
port: 8182
serializer: {
  className: org.apache.tinkerpop.gremlin.driver.ser.GryoMessageSerializerV3d0,
  config: {
    ioRegistries: [org.janusgraph.graphdb.tinkerpop.JanusGraphIoRegistry, org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerIoRegistryV3d0]
  }
}

Полный код (без ConsoleMutationListener) выглядит так:

public static void main(String[] args) {

    MutationListener mutationListener = new ConsoleMutationListener("Test");
    EventStrategy eventStrategy = EventStrategy.build().addListener(mutationListener).create();

    try (GraphTraversalSource g = AnonymousTraversalSource.traversal()
        .withRemote("conf/remote-graph.properties")
        .withStrategies(eventStrategy)) {

        g.addV("person").property("name", "Test").next();
    } catch (Exception e) {
        e.printStackTrace();
    }
}

ConsoleMutationListener является копией образца TinkerPop ConsoleMutationListener с измененным конструктором, принимающим имя графика вместо полного графика, поскольку toString() был единственным используемым методом в любом случае.

Трассировки стека:

io.netty.handler.codec.EncoderException: org.apache.tinkerpop.gremlin.driver.exception.ResponseException: An error occurred during serialization of this request [RequestMessage{, requestId=9436b08c-7e31-4fc0-b480-40904055f491, op='bytecode', processor='traversal', args={gremlin=[[withStrategies(EventStrategy)], [addV(person), property(name, Test)]], aliases={g=g}}}] - it could not be sent to the server - Reason: org.apache.tinkerpop.gremlin.driver.ser.SerializationException: java.lang.IllegalArgumentException: Class is not registered: org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.EventStrategy
Note: To register this class use: kryo.register(org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.EventStrategy.class);
    at io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:107)
    at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:716)
    at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:708)
    at io.netty.channel.AbstractChannelHandlerContext.access$1700(AbstractChannelHandlerContext.java:56)
    at io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.write(AbstractChannelHandlerContext.java:1102)
    at io.netty.channel.AbstractChannelHandlerContext$WriteAndFlushTask.write(AbstractChannelHandlerContext.java:1149)
    at io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.run(AbstractChannelHandlerContext.java:1073)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:510)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:518)
    at io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1044)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.tinkerpop.gremlin.driver.exception.ResponseException: An error occurred during serialization of this request [RequestMessage{, requestId=9436b08c-7e31-4fc0-b480-40904055f491, op='bytecode', processor='traversal', args={gremlin=[[withStrategies(EventStrategy)], [addV(person), property(name, Test)]], aliases={g=g}}}] - it could not be sent to the server - Reason: org.apache.tinkerpop.gremlin.driver.ser.SerializationException: java.lang.IllegalArgumentException: Class is not registered: org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.EventStrategy
Note: To register this class use: kryo.register(org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.EventStrategy.class);
    at org.apache.tinkerpop.gremlin.driver.handler.WebSocketGremlinRequestEncoder.encode(WebSocketGremlinRequestEncoder.java:60)
    at org.apache.tinkerpop.gremlin.driver.handler.WebSocketGremlinRequestEncoder.encode(WebSocketGremlinRequestEncoder.java:38)
    at io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:89)
    ... 12 more

Если я удалю withStrategies(eventStrategy), вершина будет добавлена ​​к графику, и я также смогу запросить график в обычном режиме. Однако я не могу настроить GraphTraversalSource с EventStrategy.

Q1: я думаю, что сообщение с определенной стратегией событий не может быть сериализовано с GryoMessageSerializerV3d0, или же на стороне сервера должен быть каким-то образом зарегистрирован прослушиватель мутаций / стратегия событий, но я не могу найти никаких ссылок на как это сделать. Есть ли примеры такой конфигурации?

Q2: что я делаю не так? Можно ли вообще использовать EventStrategy TinkerPop с JanusGraph?

Q3: есть ли другой подход для прослушивания удаленных мутаций JanusGraph?

Изменение сериализатора на GraphSONMessageSerializerV3d0 дает:

java.util.concurrent.CompletionException: org.apache.tinkerpop.gremlin.driver.exception.ResponseException: EventStrategy does can only be constructed with instance() or create(Configuration)
    at java.util.concurrent.CompletableFuture.reportJoin(CompletableFuture.java:375)
    at java.util.concurrent.CompletableFuture.join(CompletableFuture.java:1947)
...
Caused by: org.apache.tinkerpop.gremlin.driver.exception.ResponseException: EventStrategy does can only be constructed with instance() or create(Configuration)

Изменение сериализатора на GraphBinaryMessageSerializerV1 дает:

java.util.concurrent.CompletionException: io.netty.handler.codec.DecoderException: org.apache.tinkerpop.gremlin.driver.ser.SerializationException: The most significant bit should be set according to the format
    at java.util.concurrent.CompletableFuture.reportJoin(CompletableFuture.java:375)
    at java.util.concurrent.CompletableFuture.join(CompletableFuture.java:1947)
...
Caused by: io.netty.handler.codec.DecoderException: org.apache.tinkerpop.gremlin.driver.ser.SerializationException: The most significant bit should be set according to the format


person NejcT    schedule 28.07.2020    source источник


Ответы (1)


Q1: Я думаю, что сообщение с определенной стратегией событий не может быть сериализовано с помощью GryoMessageSerializerV3d0 или Mutation Listener / Event Strategy должны каким-то образом быть зарегистрированы на стороне сервера, но я не могу найти никаких ссылок о том, как это сделать.

Это правильно. EventStrategy не работает через удаленные соединения.

Q2: Что я делаю не так? Можно ли вообще использовать EventStrategy TinkerPop с JanusGraph?

Его можно использовать с JanusGraph, но только во встроенном режиме, поскольку реализации MutationListener не знают, как отправлять события обратно клиенту. Драйверу, вероятно, потребуются некоторые существенные изменения, чтобы ввести механизм поддержки этого, так что это нетривиальное изменение. Если бы это было выяснено, то все еще остались бы проблемы с сериализацией, которые нужно решить для пользователей, которые предоставляют пользовательский MutationListeners (хотя, возможно, это просто не разрешено).

Q3: Есть ли другой подход для прослушивания удаленных мутаций JanusGraph?

Ключевое слово здесь - удаленный, и я не думаю, что в настоящее время существует что-либо, позволяющее это сделать. Вам нужно будет построить что-то свое. Один из способов - настроить g с EventStrategy на сервере, а затем добавить MutationListener, который отправлял бы эти события в отдельную очередь, которую вы могли бы использовать удаленно. Вы также можете рассмотреть возможность использования шины JanusGraph и разработать аналогичную схему.

person stephen mallette    schedule 29.07.2020
comment
Спасибо, @stephen mallette. Вы подтвердили мои подозрения, поскольку все документы / примеры с EventStrategy были довольно простыми и без удаленного графика. Итак, если я правильно понимаю, мне действительно нужно было бы расширить код на стороне сервера с помощью настраиваемого прослушивателя мутаций, который будет отправлять события (скажем,) Kafka? Я посмотрю на JanusGraph Bus, но на первый взгляд кажется, что я столкнусь с той же проблемой. Поскольку Trigger Log работает с идентифицированными транзакциями, и в случае удаленного подключения у вас действительно нет контроля над транзакциями. - person NejcT; 29.07.2020
comment
да, я думал, что вы запустите свой собственный MutationListener и поместите его на сервер, и он будет писать что-то вроде Kafka. - person stephen mallette; 29.07.2020