Вопросы по теме 'apache-kafka-streams'

Выполнение асинхронного преобразования в потоке Kafka
Предположим, у меня есть две темы Kafka, A и B . Я пытаюсь разработать систему, которая извлекает записи из A , применяет преобразование к каждой записи, а затем публикует преобразованные записи в B . В этом случае преобразование включает вызов...
2597 просмотров

Объединение потоковых данных с помощью потоков Kafka
Я отправляю сообщения Кафке с таким кодом: Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1);...
1178 просмотров
schedule 02.04.2022

Идеальный способ дополнить KStream поисковыми данными
В моем потоке есть столбец под названием «категория», и у меня есть дополнительные статические метаданные для каждой «категории» в другом магазине, они обновляются раз в пару дней. Как правильно выполнять этот поиск? Есть два варианта с потоками...
4967 просмотров
schedule 16.03.2022

Динамическое подключение входного потока Kafka к нескольким выходным потокам
Есть ли в Kafka Streams встроенная функция, позволяющая динамически соединять один входной поток с несколькими выходными потоками? KStream.branch разрешает ветвление на основе предикатов истина / ложь, но это не совсем то, что я хочу. Я бы хотел,...
2765 просмотров
schedule 27.03.2023

org.apache.kafka.common.errors.RecordTooLargeException
Я выполняю агрегирование потоков kafka и записываю агрегированные записи в тему и получаю следующие ошибки. Я использую пользовательский json serde для вспомогательного класса агрегации. В некоторых блогах я нашел решение этой проблемы - увеличить...
1388 просмотров

Фиксация смещения Kafka-Streams - повторная обработка очень старых сообщений
Мы используем apache kafka-streams 0.10.2.0 в приложении. Мы используем топологию kafka-streams для передачи обработанных данных в следующую тему до конца обработки. Кроме того, мы используем контейнер AWS ECS для развертывания потребительского...
913 просмотров

Государственное хранилище Kafka Stream 0.10.2.0 получает исключение при сохранении значения
Я использую API процессора низкого уровня с хранилищем состояний, поэтому до 0.10.0.1 он работает нормально, но я обновил потоки kafka, но я получаю ошибку ниже, поэтому после этого я понял, что это связано с журналом изменений, и он смотрит на...
1553 просмотров
schedule 20.02.2022

KStream из одного кластера во многие
Мне нужно иметь один java-модуль в моем кластере Kafka, который получит сообщение, и в зависимости от двух полей сообщения он будет обогащен и опубликован в другом кластере kafka. Я изучаю KStream API и не могу найти способ отправить данные в другой...
114 просмотров
schedule 09.02.2023

Kafka Streams - несколько объединений и количество потоков в одном экземпляре
У меня есть вариант использования для нескольких объединений по двум темам, Допустим, у меня есть тема A (2 раздела) и тема B (2 раздела) и запущен один экземпляр приложения KafkaStreams. У меня есть вариант использования для поиска перерывов,...
427 просмотров
schedule 11.03.2022

Приложение для подсчета слов Kafka Streams
Я играю с API потоковой передачи kafka (версия Kakfa: 0.10.2.0), пытаясь заставить простой пример подсчета слов работать: суть приложения Wordcount . Я использую как производителя, так и консоль: ./kafka-console-producer.sh -topic input-topic...
1113 просмотров

Использование моего собственного драйвера Cassandra для записи результатов агрегации
Я пытаюсь создать простое приложение, которое записывает в Cassandra просмотры каждой веб-страницы на моем сайте. Я хочу писать каждые 5 минут накопительные просмотры страниц с начала логического часа. Мой код для этого выглядит примерно так:...
118 просмотров
schedule 01.04.2023

приложение kafka streams - игнорировать старые сообщения при перезапуске
Я имею дело с данными таймсерий для живого приложения. Так что старые данные не имеют значения. Я просто хочу обрабатывать данные, полученные после запуска потокового приложения, а не из ранее зафиксированного смещения. Как правильно игнорировать...
1480 просмотров
schedule 14.03.2022

KStream несовместимые типы
Я пробовал что-то с частью кодирования, используя строки, и придумал код ниже, пожалуйста, помогите с проблемами синтаксиса с кодом. KStreamBuilder builder = new KStreamBuilder(); KStream<String, String> textlines =...
246 просмотров
schedule 24.01.2024

org.apache.kafka.common.errors.TimeoutException
У меня есть кластер kafka с двумя брокерами 1.0.0, и я запускаю приложение 1.0.0 kafka stream API для этой kafka. Я увеличил запрос производителя request.timeout.ms до 5 минут, чтобы исправить TimeoutException производителя. В настоящее время я...
7778 просмотров
schedule 30.04.2024

Агрегация таймсерий Kafka Streams
Я использую Kafka Streams для обработки данных временных рядов. Одним из вариантов использования является почасовое агрегирование данных для каждого датчика (идентификатор датчика - это ключ сообщения в теме test ). Я написал конвейер, который...
1094 просмотров
schedule 01.11.2022

Очистка государственного хранилища приложения
Я хочу очистить хранилище состояний конкретного экземпляра внутри приложения потока kafka. Например, предположим, что если я поддерживаю список 5 максимальных значений в состоянии, я хочу очищать его ежечасно. Есть ли способ сделать это - не...
554 просмотров
schedule 21.03.2022

Kafka Streams — Неизвестный магический байт на GenericAvroSerde
при попытке передать данные Avro с помощью Kafka Streams я столкнулся с этой ошибкой: Exception in thread "StreamThread-1" org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1 Caused by:...
3510 просмотров

Почему один поток Kafka блокирует запуск другого?
Я работаю с новым API-интерфейсом Kafka-scala-streams, который недавно был открыт Lightbend. И я пытаюсь запустить два потока. Но происходит то, что два из них не работают одновременно, и я не получаю желаемого результата. package in.internity...
451 просмотров

Модульное тестирование реализации одного процессора (java) в Kafka Streams?
Конкретная проблема, с которой приходится сталкиваться, заключается в имитации контекста, хранилищ состояний и оконных объектов, которые передаются в процесс функции. Похоже, все примеры, например, здесь и здесь модульные тесты на уровне...
424 просмотров
schedule 09.07.2023

Тестирование приложений KafkaStreams
Я установил простую агрегацию, усредняющую значения из нескольких потоков вместе, и пытаюсь ее протестировать. Я прожигал много времени и, кажется, не могу понять концепции прямо в моей голове. Мой поток ниже: // Combine multiple streams...
654 просмотров
schedule 25.05.2022