Вопросы по теме 'apache-kafka-streams'
Выполнение асинхронного преобразования в потоке Kafka
Предположим, у меня есть две темы Kafka, A и B . Я пытаюсь разработать систему, которая извлекает записи из A , применяет преобразование к каждой записи, а затем публикует преобразованные записи в B . В этом случае преобразование включает вызов...
2597 просмотров
schedule
04.04.2022
Объединение потоковых данных с помощью потоков Kafka
Я отправляю сообщения Кафке с таким кодом:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);...
1178 просмотров
schedule
02.04.2022
Идеальный способ дополнить KStream поисковыми данными
В моем потоке есть столбец под названием «категория», и у меня есть дополнительные статические метаданные для каждой «категории» в другом магазине, они обновляются раз в пару дней. Как правильно выполнять этот поиск? Есть два варианта с потоками...
4967 просмотров
schedule
16.03.2022
Динамическое подключение входного потока Kafka к нескольким выходным потокам
Есть ли в Kafka Streams встроенная функция, позволяющая динамически соединять один входной поток с несколькими выходными потоками? KStream.branch разрешает ветвление на основе предикатов истина / ложь, но это не совсем то, что я хочу. Я бы хотел,...
2765 просмотров
schedule
27.03.2023
org.apache.kafka.common.errors.RecordTooLargeException
Я выполняю агрегирование потоков kafka и записываю агрегированные записи в тему и получаю следующие ошибки. Я использую пользовательский json serde для вспомогательного класса агрегации. В некоторых блогах я нашел решение этой проблемы - увеличить...
1388 просмотров
schedule
04.05.2022
Фиксация смещения Kafka-Streams - повторная обработка очень старых сообщений
Мы используем apache kafka-streams 0.10.2.0 в приложении. Мы используем топологию kafka-streams для передачи обработанных данных в следующую тему до конца обработки.
Кроме того, мы используем контейнер AWS ECS для развертывания потребительского...
913 просмотров
schedule
12.03.2022
Государственное хранилище Kafka Stream 0.10.2.0 получает исключение при сохранении значения
Я использую API процессора низкого уровня с хранилищем состояний, поэтому до 0.10.0.1 он работает нормально, но я обновил потоки kafka, но я получаю ошибку ниже, поэтому после этого я понял, что это связано с журналом изменений, и он смотрит на...
1553 просмотров
schedule
20.02.2022
KStream из одного кластера во многие
Мне нужно иметь один java-модуль в моем кластере Kafka, который получит сообщение, и в зависимости от двух полей сообщения он будет обогащен и опубликован в другом кластере kafka. Я изучаю KStream API и не могу найти способ отправить данные в другой...
114 просмотров
schedule
09.02.2023
Kafka Streams - несколько объединений и количество потоков в одном экземпляре
У меня есть вариант использования для нескольких объединений по двум темам,
Допустим, у меня есть тема A (2 раздела) и тема B (2 раздела) и запущен один экземпляр приложения KafkaStreams.
У меня есть вариант использования для поиска перерывов,...
427 просмотров
schedule
11.03.2022
Приложение для подсчета слов Kafka Streams
Я играю с API потоковой передачи kafka (версия Kakfa: 0.10.2.0), пытаясь заставить простой пример подсчета слов работать: суть приложения Wordcount . Я использую как производителя, так и консоль:
./kafka-console-producer.sh -topic input-topic...
1113 просмотров
schedule
26.02.2024
Использование моего собственного драйвера Cassandra для записи результатов агрегации
Я пытаюсь создать простое приложение, которое записывает в Cassandra просмотры каждой веб-страницы на моем сайте. Я хочу писать каждые 5 минут накопительные просмотры страниц с начала логического часа.
Мой код для этого выглядит примерно так:...
118 просмотров
schedule
01.04.2023
приложение kafka streams - игнорировать старые сообщения при перезапуске
Я имею дело с данными таймсерий для живого приложения. Так что старые данные не имеют значения. Я просто хочу обрабатывать данные, полученные после запуска потокового приложения, а не из ранее зафиксированного смещения. Как правильно игнорировать...
1480 просмотров
schedule
14.03.2022
KStream несовместимые типы
Я пробовал что-то с частью кодирования, используя строки, и придумал код ниже, пожалуйста, помогите с проблемами синтаксиса с кодом.
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> textlines =...
246 просмотров
schedule
24.01.2024
org.apache.kafka.common.errors.TimeoutException
У меня есть кластер kafka с двумя брокерами 1.0.0, и я запускаю приложение 1.0.0 kafka stream API для этой kafka. Я увеличил запрос производителя request.timeout.ms до 5 минут, чтобы исправить TimeoutException производителя.
В настоящее время я...
7778 просмотров
schedule
30.04.2024
Агрегация таймсерий Kafka Streams
Я использую Kafka Streams для обработки данных временных рядов. Одним из вариантов использования является почасовое агрегирование данных для каждого датчика (идентификатор датчика - это ключ сообщения в теме test ).
Я написал конвейер, который...
1094 просмотров
schedule
01.11.2022
Очистка государственного хранилища приложения
Я хочу очистить хранилище состояний конкретного экземпляра внутри приложения потока kafka. Например, предположим, что если я поддерживаю список 5 максимальных значений в состоянии, я хочу очищать его ежечасно. Есть ли способ сделать это - не...
554 просмотров
schedule
21.03.2022
Kafka Streams — Неизвестный магический байт на GenericAvroSerde
при попытке передать данные Avro с помощью Kafka Streams я столкнулся с этой ошибкой:
Exception in thread "StreamThread-1" org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by:...
3510 просмотров
schedule
01.08.2022
Почему один поток Kafka блокирует запуск другого?
Я работаю с новым API-интерфейсом Kafka-scala-streams, который недавно был открыт Lightbend. И я пытаюсь запустить два потока. Но происходит то, что два из них не работают одновременно, и я не получаю желаемого результата.
package in.internity...
451 просмотров
schedule
17.10.2022
Модульное тестирование реализации одного процессора (java) в Kafka Streams?
Конкретная проблема, с которой приходится сталкиваться, заключается в имитации контекста, хранилищ состояний и оконных объектов, которые передаются в процесс функции.
Похоже, все примеры, например, здесь и здесь модульные тесты на уровне...
424 просмотров
schedule
09.07.2023
Тестирование приложений KafkaStreams
Я установил простую агрегацию, усредняющую значения из нескольких потоков вместе, и пытаюсь ее протестировать. Я прожигал много времени и, кажется, не могу понять концепции прямо в моей голове. Мой поток ниже:
// Combine multiple streams...
654 просмотров
schedule
25.05.2022