Java EventHubClient не получает события при прослушивании более двух разделов

Я пробую пример начала работы с Java на https://docs.microsoft.com/en-us/azure/iot-hub/iot-hub-java-java-getstarted в концентраторе Интернета вещей с 4 разделами.

Я адаптировал часть о получении событий, создав 1 экземпляр клиента и вызвав client.createReceiver для каждого раздела, как показано ниже:

public class App {

  private static String connStr = "...";

  public static void main(String[] args) throws IOException, EventHubException, ExecutionException, InterruptedException {

    EventHubClient client = createClient();
    String[] partitionIds = client.getRuntimeInformation()
            .thenApply(eventHubRuntimeInformation -> eventHubRuntimeInformation.getPartitionIds())
            .get();

    client.createReceiver(DEFAULT_CONSUMER_GROUP_NAME, "0", Instant.now())
            .thenAccept(receiverHandler("0"));
    client.createReceiver(DEFAULT_CONSUMER_GROUP_NAME, "1", Instant.now())
            .thenAccept(receiverHandler("0"));
    client.createReceiver(DEFAULT_CONSUMER_GROUP_NAME, "2", Instant.now())
            .thenAccept(receiverHandler("2"));
    client.createReceiver(DEFAULT_CONSUMER_GROUP_NAME, "3", Instant.now())
            .thenAccept(receiverHandler("3"));


    System.out.println("Press ENTER to exit.");
    System.in.read();
    try {
        client.closeSync();
        System.exit(0);
    } catch (Exception e) {
        System.exit(1);
    }

  }

  private static Consumer<PartitionReceiver> receiverHandler(String partitionId) {
    return receiver -> {
        System.out.println("** Created receiver on partition " + partitionId);
        try {
            while (true) {
                receiver.receive(10)
                        .thenAccept(receivedEvents -> {
                                    int batchSize = 0;
                                    if (receivedEvents != null) {
                                        System.out.println("Got some evenst");
                                        for (EventData receivedEvent : receivedEvents) {
                                            System.out.println(String.format("Offset: %s, SeqNo: %s, EnqueueTime: %s",
                                                    receivedEvent.getSystemProperties().getOffset(),
                                                    receivedEvent.getSystemProperties().getSequenceNumber(),
                                                    receivedEvent.getSystemProperties().getEnqueuedTime()));
                                            System.out.println(String.format("| Device ID: %s",
                                                    receivedEvent.getSystemProperties().get("iothub-connection-device-id")));
                                            System.out.println(String.format("| Message Payload: %s",
                                                    new String(receivedEvent.getBytes(), Charset.defaultCharset())));
                                            batchSize++;
                                        }
                                    }
                                    System.out.println(String.format("Partition: %s, ReceivedBatch Size: %s", partitionId, batchSize));
                                }
                        ).get();

            }
        } catch (Exception e) {
            System.out.println("Failed to receive messages: " + e.getMessage());
        }
    };
  }

  private static EventHubClient createClient() {
    EventHubClient client = null;
    try {
        client = EventHubClient.createFromConnectionStringSync(connStr);
    } catch (Exception e) {
        System.out.println("Failed to create client: " + e.getMessage());
        System.exit(1);
    }
    return client;
  }
}

События, отправляемые с имитируемого устройства, поступают в раздел 3. Проблема заключается в том, что события не принимаются в следующих ситуациях:

  1. устройство не отправляет события при запуске приемника событий
  2. устройство отправляет события при запуске приемника событий, но затем перезапускается. В этом случае приведенный выше код перестает получать события после перезагрузки устройства.

Вышеупомянутые проблемы не возникают, когда мы подключаемся только к разделу 3.
Вышеуказанные проблемы не возникают, когда мы подключаемся только к разделу 3 и 1 другому разделу.
Вышеуказанные проблемы возникают при подключении к разделу. 3 и 2 или 3 другие перегородки.

Любые подсказки?


person Jan Bols    schedule 28.11.2017    source источник


Ответы (1)


Вышеупомянутые проблемы действительно возникают, когда мы подключаемся к разделу 3 и 2 или 3 другим разделам.

Я использую ваш код с ошибкой «лямбда-выражения не поддерживаются в -source 1.5 java».

Затем я редактирую пример кода в руководство и разрешить одному клиенту подключаться к четырем получателям раздела (здесь четыре события устройств помещаются в три раздела, а не четыре, потому что все события устройства1 и устройства2 помещаются в раздел2 .):

public class App 
{
    private static String connStr = "Endpoint=sb://iothub-ns-ritatestio-265731-93e2a49f65.servicebus.windows.net/;EntityPath=ritatestiothub;SharedAccessKeyName=iothubowner;SharedAccessKey=z+QM62TftPlTfwS3CnN9348X2cmMkCaEFaC1IqYpiW8=";
    private static EventHubClient client = null;

    public static void main( String[] args ) throws IOException
    {       
        try {
          client = EventHubClient.createFromConnectionStringSync(connStr);
        } catch (Exception e) {
          System.out.println("Failed to create client: " + e.getMessage());
          System.exit(1);
        }

        // Create receivers for partitions 0 and 1.
        receiveMessages("0");
        receiveMessages("1");
        receiveMessages("2");
        receiveMessages("3");
        System.out.println("Press ENTER to exit.");

        System.in.read();
        try {
          client.closeSync();
          System.exit(0);
        } catch (Exception e) {
          System.exit(1);
        }
    }

    // Create a receiver on a partition.
private static void receiveMessages(final String partitionId) {
  try {
    // Create a receiver using the
    // default Event Hubs consumer group
    // that listens for messages from now on.
    client.createReceiver(EventHubClient.DEFAULT_CONSUMER_GROUP_NAME, partitionId, Instant.now())
      .thenAccept(new Consumer<PartitionReceiver>() {
        public void accept(PartitionReceiver receiver) {
          System.out.println("** Created receiver on partition " + partitionId);
          try {
            while (true) {
              Iterable<EventData> receivedEvents = receiver.receive(100).get();

              int batchSize = 0;
              if (receivedEvents != null) {
                for (EventData receivedEvent : receivedEvents) {
                  batchSize++;
                  System.out.println(String.format("Partition: %s, ReceivedBatch Size: %s, Device ID: %s, SeqNo: %s", partitionId, batchSize,receivedEvent.getSystemProperties().get("iothub-connection-device-id"),receivedEvent.getSystemProperties().getSequenceNumber()));
                }
              }

            }
          } catch (Exception e) {
            System.out.println("Failed to receive messages: " + e.getMessage());
          }
        }
      });
    } catch (Exception e) {
      System.out.println("Failed to create receiver: " + e.getMessage());
  }

}
}

Я могу получить три получателя разделов, получающих события после перезапуска устройства. Вы можете проверить следующий снимок:

введите здесь описание изображения

Обновление: замените 4 устройства на 1 устройство для отправки. Тем не менее получатель может продолжать получать сообщения после перезагрузки устройства.

введите здесь описание изображения

введите здесь описание изображения

person Rita Han    schedule 29.11.2017
comment
В моем примере у меня есть только одно устройство, поэтому все события попадают в один и тот же раздел. Похоже, в вашем примере у вас есть 4 устройства, попадающие в 3 из 4 разделов. - person Jan Bols; 04.12.2017
comment
@JanBols Я тестирую отправку только 1 устройства, но до сих пор не воспроизвел вашу проблему. Пожалуйста, проверьте мое обновление. Вы можете использовать мой код, чтобы проверить, существует ли проблема. - person Rita Han; 05.12.2017