Недавно мы перенесли наш проект со Spark 2.2.0 cloudera2 на Spark 2.3.0 cloudera2 и заметили, что некоторые клиентские приемники работали, но теперь выходили из строя с исключениями. Для простоты я переписал крошечный кейс, чтобы помощники могли копировать и вставлять код для его тестирования.
package question
import java.io.PrintWriter
import java.net.Socket
import org.apache.spark.sql._
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.sources._
import org.apache.spark.sql.streaming._
class NCSink extends Sink {
def addBatch(batchId: Long, data: DataFrame): Unit = {
data.foreachPartition { iterator =>
val socket = new Socket("localhost", 7778)
val writer = new PrintWriter(socket.getOutputStream, true)
iterator.foreach(row => writer.println(row.getString(0)))
socket.close
}
}
}
class NCSinkProvider extends StreamSinkProvider {
def createSink(sc: SQLContext, params: Map[String, String], columns: Seq[String], mode: OutputMode): Sink = new NCSink()
}
object NCStreaming {
def main(args: Array[String]) = {
val spark = SparkSession.builder.getOrCreate
import spark.implicits._
spark.readStream.format("socket").option("host", "localhost").option("port", 7777).load.as[String].writeStream.format("question.NCSinkProvider").outputMode("append").option("checkpointLocation", "checkpoint").start.awaitTermination
}
}
Вышеупомянутая программа может быть запущена в Spark 2.2.0 (развернута пакетом cloudera2)
Отправлено
[johnlin@localhost ~]$ nc -lk 7777
good
better
best
never
let
it
rest
Получила
[johnlin@localhost ~]$ nc -lk 7778
good
better
never
it
let
rest
best
Однако в Spark 2.3.0 (развернутом пакетом cloudera2) возникает исключение Queries with streaming sources must be executed with writeStream.start()
Exception in thread "main" org.apache.spark.sql.streaming.StreamingQueryException: Queries with streaming sources must be executed with writeStream.start();;
LogicalRDD [value#6], true
=== Streaming Query ===
Identifier: [id = 072fce9e-0cc5-482b-a971-17102da37528, runId = 415272e9-2c2a-47de-947e-fbf64c8cc0da]
Current Committed Offsets: {TextSocketSource[host: localhost, port: 7777]: 12}
Current Available Offsets: {TextSocketSource[host: localhost, port: 7777]: 13}
Current State: ACTIVE
Thread State: RUNNABLE
Logical Plan:
TextSocketSource[host: localhost, port: 7777]
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:295)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
Caused by: org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
LogicalRDD [value#6], true
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:374)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:37)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:35)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(UnsupportedOperationChecker.scala:35)
at org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecution.scala:51)
at org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:62)
at org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:60)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:66)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:66)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:72)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:68)
at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:77)
at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:77)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
at org.apache.spark.sql.Dataset.withNewRDDExecutionId(Dataset.scala:3234)
at org.apache.spark.sql.Dataset.foreachPartition(Dataset.scala:2674)
at question.NCSink.addBatch(NCStreaming.scala:12)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3$$anonfun$apply$16.apply(MicroBatchExecution.scala:477)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3.apply(MicroBatchExecution.scala:475)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:474)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:133)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
... 1 more
Я искал это исключение, чтобы узнать, что оно может быть связано с использованием неподдерживаемой операции, такой как множественные агрегации. Но я не знаю, как узнать это из плана запроса в сообщении об ошибке. Я знаю, что структурированная потоковая передача Spark 2.3 представила некоторые новые функции. Но я не могу понять, какие изменения в моем коде необходимы. Не могли бы вы помочь мне?
data.foreachPartition { iterator =>
наdata.queryExecution.toRdd.foreachPartition { iterator =>
, он будет работать как в Spark 2.2, так и в Spark 2.3. - person John Lin   schedule 20.04.2018