Вопросы по теме 'spark-structured-streaming'
Как записать набор потоковых данных в Kafka?
Я пытаюсь немного обогатить данные тем. Поэтому читайте из Kafka обратно в Kafka, используя структурированную потоковую передачу Spark.
val ds = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)...
3181 просмотров
schedule
02.11.2022
Как читать записи в формате JSON из Kafka с помощью структурированной потоковой передачи?
Я пытаюсь использовать подход структурированной потоковой передачи с использованием Spark- Потоковая передача на основе DataFrame / Dataset API для загрузки потока данных из Kafka.
Я использую:
Искра 2.10
Кафка 0,10
искра-sql-кафка-0-10...
9551 просмотров
schedule
15.08.2023
Создание идемпотентного стока ES ForEachWriter со структурированной потоковой передачей в spark
У меня та же ситуация, что описана в Структурированная пересылка Spark из kafka - последнее сообщение обрабатывается снова после возобновления с контрольной точки . Когда я перезапускаю свое задание искры после сбоя, последнее сообщение снова...
409 просмотров
schedule
21.03.2023
Как вывести записи из Kafka на консоль?
Я изучаю структурированную потоковую передачу, и мне не удалось отобразить вывод на консоль.
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types._...
904 просмотров
schedule
28.08.2022
Как задать количество документов, обрабатываемых в пакете?
В Spark 2.2.0 контрольные точки работают немного иначе, чем в версиях. Существует папка коммитов, которая создается, и после завершения каждого пакета файл записывается в папку.
Я сталкиваюсь со сценарием, в котором у меня около 10 тыс....
181 просмотров
schedule
06.12.2022
Как обновить конфигурацию max.request.size потребителя Kafka при использовании структурированного потока Spark
Spark readStream для Kafka выдает следующие ошибки:
org.apache.kafka.common.errors.RecordTooLargeException (сообщение составляет 1166569 байт при сериализации, что превышает максимальный размер запроса, который вы настроили с конфигурацией...
5105 просмотров
schedule
04.06.2022
Изящно остановить запрос структурированной потоковой передачи
Я использую Spark 2.1 и пытаюсь аккуратно остановить потоковый запрос.
StreamingQuery.stop() изящная остановка, потому что я не видел подробной информации об этом методе в документация :
void stop() Останавливает выполнение этого...
5194 просмотров
schedule
21.04.2022
Почему запуск потокового запроса приводит к ExitCodeException exitCode=-1073741515?
Я пытался привыкнуть к новой структурированной потоковой передаче, но она продолжает выдавать мне ошибку ниже, как только я запускаю запрос .writeStream .
Любая идея, что может быть причиной этого? Самое близкое, что я мог найти, это постоянная...
7675 просмотров
schedule
12.06.2024
Apache Spark Structured Streaming vs Apache Flink: в чем разница?
Мы обсудили следующие вопросы:
В чем разница между Apache Spark и Apache Flink? [закрыто]
Что означает «потоковая передача» в Apache Spark и Apache Flink ?
В чем разница Между мини-пакетами и потоковой передачей в реальном времени на...
5107 просмотров
schedule
03.03.2024
Как подсчитать элементы за временное окно?
Я пытаюсь использовать структурированную потоковую передачу Spark для подсчета количества элементов из Kafka для каждого временного окна с помощью кода ниже:
import java.text.SimpleDateFormat
import java.util.Date
import...
815 просмотров
schedule
12.06.2024
Обходной путь для объединения двух потоков в структурированной потоковой передаче в Spark 2.x
У меня есть поток конфигураций (нечасто меняющихся, но если будет обновление, то будет сообщение) и еще один поток необработанных точек данных.
Насколько я понимаю, на данный момент spark не поддерживает присоединение к потоковым наборам данных или...
1563 просмотров
schedule
17.04.2024
Как создать собственный источник потоковых данных?
У меня есть специальный ридер для Spark Streaming, который читает данные из WebSocket. Я собираюсь попробовать Spark Structured Streaming.
Как создать источник потоковых данных в Spark Structured Streaming?
6419 просмотров
schedule
08.05.2024
потоковая передача искры: чтение строки CSV из кафки, запись в паркет
Есть много онлайн-примеров чтения json из Kafka (для записи на паркет), но я не могу понять, как применить схему к строке CSV из kafka.
Потоковые данные:
customer_1945,cusaccid_995,27999941
customer_1459,cusaccid_1102,27999942
Схема:...
2636 просмотров
schedule
02.09.2022
Структурированная потоковая передача Spark через облачное хранилище Google
Я использую несколько пакетных конвейеров Spark, которые потребляют данные Avro в облачном хранилище Google. Мне нужно обновить некоторые конвейеры, чтобы они работали в режиме реального времени, и мне интересно, может ли потоковая передача с...
762 просмотров
schedule
26.07.2023
Структурированное потоковое чтение из темы Kafka
Я прочитал файл csv, преобразовал поле значения в байты и написал в тему Kafka, используя приложение производителя Kafka. Теперь я пытаюсь читать из темы Kafka, используя структурированную потоковую передачу, но не могу применить настраиваемую...
378 просмотров
schedule
12.05.2022
эвристика водяного знака потоковой обработки
Насколько точны оценки водяных знаков при потоковой обработке в Apache Beam или Spark Streaming. Моим источником данных являются файлы из gcs/s3, но я использую время события, связанное с каждым событием, в качестве метки времени для оконной функции....
884 просмотров
schedule
14.06.2023
Расширенные агрегации в структурированной потоковой передаче
Я уже задавал этот вопрос несколько месяцев назад, но я спрошу еще раз, чтобы быть уверенным на 100%, и опишу свою проблему здесь:
У меня есть тема потоковой передачи, которую я собираю каждую минуту с помощью скользящего окна в Spark Structured...
395 просмотров
schedule
26.05.2024
Как напрямую записать потоковый структурированный поток в Hive?
Я хочу добиться чего-то вроде этого:
df.writeStream
.saveAsTable("dbname.tablename")
.format("parquet")
.option("path", "/user/hive/warehouse/abc/")
.option("checkpointLocation", "/checkpoint_path")
.outputMode("append")
.start()
Я открыт...
511 просмотров
schedule
25.03.2023
Spark Structured Streaming — Customer Sink работал в Spark 2.2.0, но получил исключение в Spark 2.3.0
Недавно мы перенесли наш проект со Spark 2.2.0 cloudera2 на Spark 2.3.0 cloudera2 и заметили, что некоторые клиентские приемники работали, но теперь выходили из строя с исключениями. Для простоты я переписал крошечный кейс, чтобы помощники могли...
945 просмотров
schedule
09.12.2022
Структурированная потоковая передача с использованием PySpark и Kafka, Py4JJavaError: произошла ошибка при вызове o70.awaitTermination
Я пытался использовать Kafka с помощью Spark, а точнее PySpark и Structured Streaming.
import os
import time
import time
from ast import literal_eval
from pyspark.sql.types import *
from pyspark.sql.functions import from_json, col, struct,...
1144 просмотров
schedule
02.09.2022