Я хочу понять API приема Kafka. Я включил пример кода, который работает.
- Почему в Kafka ConsumerStreamMap.get(topic) для одной темы есть список получателей KafkaStream‹>?
- Текущий процесс, кажется, перебирает KafkaStream‹> List, а затем перебирает сообщения. Но KafkaReceiver должен работать вечно, поэтому я ожидаю, что внутреннее время зациклится навсегда. Это делает List> излишним.
В нескольких примерах также используется ConsumerStreamMap.get(topic).get(0). Так правильно ли тогда писать продюсера?
Map<String, Integer> topicMap = new HashMap<String, Integer>(); // Define single thread for topic topicMap.put(topicName, new Integer(1)); Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreamsMap = consumer.createMessageStreams(topicMap); List<KafkaStream<byte[], byte[]>> streamList = consumerStreamsMap.get(topic); for (final KafkaStream<byte[], byte[]> stream : streamList) { ConsumerIterator<byte[], byte[]> consumerIte = stream.iterator(); while (consumerIte.hasNext()) { counter++; String message = new String(consumerIte.next().message()); String id = topic.hashCode() + "-" + date.getTime() + "-" + counter; System.out.println(message); } }