Почему один поток Kafka блокирует запуск другого?

Я работаю с новым API-интерфейсом Kafka-scala-streams, который недавно был открыт Lightbend. И я пытаюсь запустить два потока. Но происходит то, что два из них не работают одновременно, и я не получаю желаемого результата.

package in.internity

import java.util.Properties
import java.util.concurrent.TimeUnit

import com.lightbend.kafka.scala.streams.{KStreamS, StreamsBuilderS}
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.kstream.Produced
import org.apache.kafka.streams.{StreamsConfig, _}
import org.json4s.DefaultFormats
import org.json4s.native.JsonMethods.parse
import org.json4s.native.Serialization.write

import scala.util.Try

/**
  * @author Shivansh <[email protected]>
  * @since 8/1/18
  */
object Boot extends App {
  implicit val formats: DefaultFormats.type = DefaultFormats
  val config: Properties = {
    val p = new Properties()
    p.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application")
    p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
    p.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass)
    p.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass)
    p
  }

  val streams1 = wordSplit("lines", "wordCount")
  val streams2 = readAndWriteJson("person", "personName")

  private def wordSplit(intopic: String, outTopic: String) = {
    val builder = new StreamsBuilderS()
    val produced = Produced.`with`(Serdes.String(), Serdes.String())
    val textLines: KStreamS[String, String] = builder.stream(intopic)
    val data: KStreamS[String, String] = textLines.flatMapValues(value => value.toLowerCase.split("\\W+").toIterable)
    data.to(outTopic, produced)

    val streams: KafkaStreams = new KafkaStreams(builder.build(), config)
    streams
  }

  private def readAndWriteJson(intopic: String, outTopic: String) = {
    val builder = new StreamsBuilderS()
    val produced = Produced.`with`(Serdes.String(), Serdes.String())
    val textLines: KStreamS[String, String] = builder.stream(intopic)
    val data: KStreamS[String, String] = textLines.mapValues(value => {
      val person = Try(parse(value).extract[Person]).toOption
      println("1::", person)
      val personNameAndEmail = person.map(a => PersonNameAndEmail(a.name, a.email))
      println("2::", personNameAndEmail)
      write(personNameAndEmail)
    })
    data.to(outTopic, produced)

    val streams: KafkaStreams = new KafkaStreams(builder.build(), config)
    streams
  }

  streams1.start()
  streams2.start()
  Runtime.getRuntime.addShutdownHook(new Thread(() => {
    streams2.close(10, TimeUnit.SECONDS)
    streams1.close(10, TimeUnit.SECONDS)
  }))
}

case class Person(name: String, age: Int, email: String)

case class PersonNameAndEmail(name: String, email: String)

Когда я запускаю это и создаю сообщения по теме person, они не потребляются. Но когда я меняю порядок их запуска, т.е.

streams2.start()
streams1.start()

Это работает нормально. Итак, почему запуск одного потока блокирует другой. Разве мы не можем запускать несколько потоков одновременно.


person Shiv4nsh    schedule 09.01.2018    source источник


Ответы (1)


Заработало, похоже, я пытался дважды инициализировать поток разными методами (глупо с моей стороны: P)

Рабочий код:

package in.internity

import java.util.Properties
import java.util.concurrent.TimeUnit

import com.lightbend.kafka.scala.streams.{KStreamS, StreamsBuilderS}
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.kstream.Produced
import org.apache.kafka.streams.{StreamsConfig, _}
import org.json4s.DefaultFormats
import org.json4s.native.JsonMethods.parse
import org.json4s.native.Serialization.write

import scala.util.Try

/**
  * @author Shivansh <[email protected]>
  * @since 8/1/18
  */
object Boot extends App {
  implicit val formats: DefaultFormats.type = DefaultFormats
  val config: Properties = {
    val p = new Properties()
    p.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application")
    p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
    p.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass)
    p.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass)
    p
  }

  val builder = new StreamsBuilderS()

  private def wordSplit(intopic: String, outTopic: String) = {
    val produced = Produced.`with`(Serdes.String(), Serdes.String())
    val textLines: KStreamS[String, String] = builder.stream(intopic)
    val data: KStreamS[String, String] = textLines.flatMapValues(value => value.toLowerCase.split("\\W+").toIterable)
    data.to(outTopic, produced)
  }

  private def readAndWriteJson(intopic: String, outTopic: String) = {
    val produced = Produced.`with`(Serdes.String(), Serdes.String())
    val textLines: KStreamS[String, String] = builder.stream(intopic)
    val data: KStreamS[String, String] = textLines.mapValues(value => {
      val person = Try(parse(value).extract[Person]).toOption
      println("1::", person)
      val personNameAndEmail = person.map(a => PersonNameAndEmail(a.name, a.email))
      println("2::", personNameAndEmail)
      write(personNameAndEmail)
    })
    data.to(outTopic, produced)
  }


  wordSplit("lines", "wordCount")
  readAndWriteJson("person", "personName")
  val streams: KafkaStreams = new KafkaStreams(builder.build(), config)
  streams.start()
  streams

  Runtime.getRuntime.addShutdownHook(new Thread(() => {
    streams.close(10, TimeUnit.SECONDS)
  }))
}

case class Person(name: String, age: Int, email: String)
case class PersonNameAndEmail(name: String, email: String)
person Shiv4nsh    schedule 09.01.2018