Spark Structured Streaming — Customer Sink работал в Spark 2.2.0, но получил исключение в Spark 2.3.0

Недавно мы перенесли наш проект со Spark 2.2.0 cloudera2 на Spark 2.3.0 cloudera2 и заметили, что некоторые клиентские приемники работали, но теперь выходили из строя с исключениями. Для простоты я переписал крошечный кейс, чтобы помощники могли копировать и вставлять код для его тестирования.

package question

import java.io.PrintWriter
import java.net.Socket
import org.apache.spark.sql._
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.sources._
import org.apache.spark.sql.streaming._

class NCSink extends Sink {
    def addBatch(batchId: Long, data: DataFrame): Unit = {
        data.foreachPartition { iterator =>
            val socket = new Socket("localhost", 7778)
            val writer = new PrintWriter(socket.getOutputStream, true)
            iterator.foreach(row => writer.println(row.getString(0)))
            socket.close
        }
    }
}

class NCSinkProvider extends StreamSinkProvider {
    def createSink(sc: SQLContext, params: Map[String, String], columns: Seq[String], mode: OutputMode): Sink = new NCSink()
}

object NCStreaming {
    def main(args: Array[String]) = {
        val spark = SparkSession.builder.getOrCreate
        import spark.implicits._
        spark.readStream.format("socket").option("host", "localhost").option("port", 7777).load.as[String].writeStream.format("question.NCSinkProvider").outputMode("append").option("checkpointLocation", "checkpoint").start.awaitTermination
    }
}

Вышеупомянутая программа может быть запущена в Spark 2.2.0 (развернута пакетом cloudera2)

Отправлено

[johnlin@localhost ~]$ nc -lk 7777
good
better
best
never
let
it
rest

Получила

[johnlin@localhost ~]$ nc -lk 7778
good
better
never
it
let
rest
best

Однако в Spark 2.3.0 (развернутом пакетом cloudera2) возникает исключение Queries with streaming sources must be executed with writeStream.start()

Exception in thread "main" org.apache.spark.sql.streaming.StreamingQueryException: Queries with streaming sources must be executed with writeStream.start();;
LogicalRDD [value#6], true

=== Streaming Query ===
Identifier: [id = 072fce9e-0cc5-482b-a971-17102da37528, runId = 415272e9-2c2a-47de-947e-fbf64c8cc0da]
Current Committed Offsets: {TextSocketSource[host: localhost, port: 7777]: 12}
Current Available Offsets: {TextSocketSource[host: localhost, port: 7777]: 13}

Current State: ACTIVE
Thread State: RUNNABLE

Logical Plan:
TextSocketSource[host: localhost, port: 7777]
        at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:295)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
Caused by: org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
LogicalRDD [value#6], true

        at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:374)
        at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:37)
        at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:35)
        at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
        at scala.collection.immutable.List.foreach(List.scala:381)
        at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
        at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(UnsupportedOperationChecker.scala:35)
        at org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecution.scala:51)
        at org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:62)
        at org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:60)
        at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:66)
        at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:66)
        at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:72)
        at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:68)
        at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:77)
        at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:77)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
        at org.apache.spark.sql.Dataset.withNewRDDExecutionId(Dataset.scala:3234)
        at org.apache.spark.sql.Dataset.foreachPartition(Dataset.scala:2674)
        at question.NCSink.addBatch(NCStreaming.scala:12)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3$$anonfun$apply$16.apply(MicroBatchExecution.scala:477)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3.apply(MicroBatchExecution.scala:475)
        at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
        at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:474)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:133)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
        at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
        at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121)
        at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117)
        at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
        ... 1 more

Я искал это исключение, чтобы узнать, что оно может быть связано с использованием неподдерживаемой операции, такой как множественные агрегации. Но я не знаю, как узнать это из плана запроса в сообщении об ошибке. Я знаю, что структурированная потоковая передача Spark 2.3 представила некоторые новые функции. Но я не могу понять, какие изменения в моем коде необходимы. Не могли бы вы помочь мне?


person John Lin    schedule 20.04.2018    source источник
comment
Я пытался пометить apache-spark-2.3, но не имел достаточных привилегий   -  person John Lin    schedule 20.04.2018
comment
Я копаюсь в исходном коде Spark и нахожу кусок кода. Если я изменю data.foreachPartition { iterator => на data.queryExecution.toRdd.foreachPartition { iterator =>, он будет работать как в Spark 2.2, так и в Spark 2.3.   -  person John Lin    schedule 20.04.2018


Ответы (2)


Я считаю, что две ветки будут созданы для логического плана для этого, но вы вызываете start во второй ветке, оставляя первую необработанной, поэтому, возможно, в Spark 2.3.0 они исправили ее для явной обработки. .

spark.readStream.format("socket").option("host", "localhost").option("port", 7777).load.as[String]
     .writeStream.format("question.NCSinkProvider").outputMode("append").option("checkpointLocation", "checkpoint")
     .start
     .awaitTermination

Это чисто мое предположение, и я могу ошибаться, но вы можете попробовать это?

object NCStreaming {
    def main(args: Array[String]) = {
        val spark = SparkSession.builder.getOrCreate
        import spark.implicits._
        val data = spark.readStream.format("socket").option("host", "localhost").option("port", 7777).load.as[String]
        val query = data.writeStream.format("question.NCSinkProvider").outputMode("append").option("checkpointLocation", "checkpoint").start()
        query.awaitTermination()
    }
}
person Sivaprasanna Sethuraman    schedule 20.04.2018
comment
Спасибо. Я видел аналогичный ответ и тестировал его, прежде чем опубликовать этот вопрос. Хорошо, я попробовал еще раз только что. Он выдал то же исключение. - person John Lin; 20.04.2018

Я копаюсь в исходном коде spark, чтобы изучить, что изменилось. Я нашел, если я изменю

    data.foreachPartition { iterator =>

в

    data.queryExecution.toRdd.foreachPartition { iterator =>

тогда он может работать правильно.

Более того, я понял, что, начиная с spark 2.0.0, я могу использовать ForeachWriter для такого рода потоковой передачи вывода.

package question

import java.io.PrintWriter
import java.net.Socket
import org.apache.spark.sql._

class NCWriter(host: String, port: Int) extends ForeachWriter[String] {
    var socket: Socket = _
    var writer: PrintWriter = _

    def open(partitionId: Long, version: Long): Boolean = {
        socket = new Socket(host, port)
        writer = new PrintWriter(socket.getOutputStream, true)
        true
    }

    def process(record: String): Unit = writer.println(record)

    def close(exception: Throwable): Unit = socket.close
}

object NCStreaming {
    def main(args: Array[String]) = {
        val spark = SparkSession.builder.getOrCreate
        import spark.implicits._
        spark.readStream.format("socket").option("host", "localhost").option("port", 7777).load.as[String].writeStream.foreach(new NCWriter("localhost", 7778)).outputMode("append").option("checkpointLocation", "checkpoint").start.awaitTermination
    }
}
person John Lin    schedule 23.04.2018