Вопросы по теме 'spark-structured-streaming'

Как записать набор потоковых данных в Kafka?
Я пытаюсь немного обогатить данные тем. Поэтому читайте из Kafka обратно в Kafka, используя структурированную потоковую передачу Spark. val ds = spark.readStream .format("kafka") .option("kafka.bootstrap.servers", bootstrapServers)...
3181 просмотров

Как читать записи в формате JSON из Kafka с помощью структурированной потоковой передачи?
Я пытаюсь использовать подход структурированной потоковой передачи с использованием Spark- Потоковая передача на основе DataFrame / Dataset API для загрузки потока данных из Kafka. Я использую: Искра 2.10 Кафка 0,10 искра-sql-кафка-0-10...
9551 просмотров

Создание идемпотентного стока ES ForEachWriter со структурированной потоковой передачей в spark
У меня та же ситуация, что описана в Структурированная пересылка Spark из kafka - последнее сообщение обрабатывается снова после возобновления с контрольной точки . Когда я перезапускаю свое задание искры после сбоя, последнее сообщение снова...
409 просмотров

Как вывести записи из Kafka на консоль?
Я изучаю структурированную потоковую передачу, и мне не удалось отобразить вывод на консоль. import org.apache.spark.sql._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.StructType import org.apache.spark.sql.types._...
904 просмотров

Как задать количество документов, обрабатываемых в пакете?
В Spark 2.2.0 контрольные точки работают немного иначе, чем в версиях. Существует папка коммитов, которая создается, и после завершения каждого пакета файл записывается в папку. Я сталкиваюсь со сценарием, в котором у меня около 10 тыс....
181 просмотров

Как обновить конфигурацию max.request.size потребителя Kafka при использовании структурированного потока Spark
Spark readStream для Kafka выдает следующие ошибки: org.apache.kafka.common.errors.RecordTooLargeException (сообщение составляет 1166569 байт при сериализации, что превышает максимальный размер запроса, который вы настроили с конфигурацией...
5105 просмотров

Изящно остановить запрос структурированной потоковой передачи
Я использую Spark 2.1 и пытаюсь аккуратно остановить потоковый запрос. StreamingQuery.stop() изящная остановка, потому что я не видел подробной информации об этом методе в документация : void stop() Останавливает выполнение этого...
5194 просмотров

Почему запуск потокового запроса приводит к ExitCodeException exitCode=-1073741515?
Я пытался привыкнуть к новой структурированной потоковой передаче, но она продолжает выдавать мне ошибку ниже, как только я запускаю запрос .writeStream . Любая идея, что может быть причиной этого? Самое близкое, что я мог найти, это постоянная...
7675 просмотров

Apache Spark Structured Streaming vs Apache Flink: в чем разница?
Мы обсудили следующие вопросы: В чем разница между Apache Spark и Apache Flink? [закрыто] Что означает «потоковая передача» в Apache Spark и Apache Flink ? В чем разница Между мини-пакетами и потоковой передачей в реальном времени на...
5107 просмотров

Как подсчитать элементы за временное окно?
Я пытаюсь использовать структурированную потоковую передачу Spark для подсчета количества элементов из Kafka для каждого временного окна с помощью кода ниже: import java.text.SimpleDateFormat import java.util.Date import...
815 просмотров

Обходной путь для объединения двух потоков в структурированной потоковой передаче в Spark 2.x
У меня есть поток конфигураций (нечасто меняющихся, но если будет обновление, то будет сообщение) и еще один поток необработанных точек данных. Насколько я понимаю, на данный момент spark не поддерживает присоединение к потоковым наборам данных или...
1563 просмотров

Как создать собственный источник потоковых данных?
У меня есть специальный ридер для Spark Streaming, который читает данные из WebSocket. Я собираюсь попробовать Spark Structured Streaming. Как создать источник потоковых данных в Spark Structured Streaming?
6419 просмотров

потоковая передача искры: чтение строки CSV из кафки, запись в паркет
Есть много онлайн-примеров чтения json из Kafka (для записи на паркет), но я не могу понять, как применить схему к строке CSV из kafka. Потоковые данные: customer_1945,cusaccid_995,27999941 customer_1459,cusaccid_1102,27999942 Схема:...
2636 просмотров

Структурированная потоковая передача Spark через облачное хранилище Google
Я использую несколько пакетных конвейеров Spark, которые потребляют данные Avro в облачном хранилище Google. Мне нужно обновить некоторые конвейеры, чтобы они работали в режиме реального времени, и мне интересно, может ли потоковая передача с...
762 просмотров

Структурированное потоковое чтение из темы Kafka
Я прочитал файл csv, преобразовал поле значения в байты и написал в тему Kafka, используя приложение производителя Kafka. Теперь я пытаюсь читать из темы Kafka, используя структурированную потоковую передачу, но не могу применить настраиваемую...
378 просмотров

эвристика водяного знака потоковой обработки
Насколько точны оценки водяных знаков при потоковой обработке в Apache Beam или Spark Streaming. Моим источником данных являются файлы из gcs/s3, но я использую время события, связанное с каждым событием, в качестве метки времени для оконной функции....
884 просмотров

Расширенные агрегации в структурированной потоковой передаче
Я уже задавал этот вопрос несколько месяцев назад, но я спрошу еще раз, чтобы быть уверенным на 100%, и опишу свою проблему здесь: У меня есть тема потоковой передачи, которую я собираю каждую минуту с помощью скользящего окна в Spark Structured...
395 просмотров

Как напрямую записать потоковый структурированный поток в Hive?
Я хочу добиться чего-то вроде этого: df.writeStream .saveAsTable("dbname.tablename") .format("parquet") .option("path", "/user/hive/warehouse/abc/") .option("checkpointLocation", "/checkpoint_path") .outputMode("append") .start() Я открыт...
511 просмотров

Spark Structured Streaming — Customer Sink работал в Spark 2.2.0, но получил исключение в Spark 2.3.0
Недавно мы перенесли наш проект со Spark 2.2.0 cloudera2 на Spark 2.3.0 cloudera2 и заметили, что некоторые клиентские приемники работали, но теперь выходили из строя с исключениями. Для простоты я переписал крошечный кейс, чтобы помощники могли...
945 просмотров

Структурированная потоковая передача с использованием PySpark и Kafka, Py4JJavaError: произошла ошибка при вызове o70.awaitTermination
Я пытался использовать Kafka с помощью Spark, а точнее PySpark и Structured Streaming. import os import time import time from ast import literal_eval from pyspark.sql.types import * from pyspark.sql.functions import from_json, col, struct,...
1144 просмотров