Python: как издеваться над темой кафки для модульных тестов?

У нас есть планировщик сообщений, который генерирует хэш-ключ из атрибутов сообщения, прежде чем поместить его в очередь тем Kafka с ключом.

Это сделано в целях дедупликации. Однако я не уверен, как я мог бы протестировать эту дедупликацию без фактической настройки локального кластера и проверки того, что он работает должным образом.

Поиск в Интернете инструментов для имитации очереди тем Kafka не помог, и я обеспокоен тем, что, возможно, думаю об этом неправильно.

В конечном счете, все, что используется для имитации очереди Kafka, должно вести себя так же, как локальный кластер, т. е. обеспечивать дедупликацию с помощью вставок ключа в очередь темы.

Есть ли такие инструменты?


person user1658296    schedule 31.10.2016    source источник


Ответы (3)


Если вам нужно проверить специфическую функцию Kafka или реализацию с специфической функцией Kafka, то единственный способ сделать это — использовать Kafka!

Есть ли у Kafka тесты логики дедупликации? Если да, то для снижения предполагаемых рисков неудачи вашей организации может быть достаточно комбинации следующих действий:

  • модульные тесты вашей хеш-логики (убедитесь, что один и тот же объект действительно генерирует один и тот же хэш)
  • Тесты дедупликации тем Kafka (внутренние для проекта Kafka)
  • предварительные дымовые тесты, проверяющие интеграцию вашего приложения с Kafka

Если в Kafka НЕТ каких-либо тестов для дедупликации темы или вы беспокоитесь о критических изменениях, важно иметь автоматизированные проверки специфичных для Kafka функций. Это можно сделать с помощью интеграционных тестов. В последнее время я добился больших успехов с конвейерами интеграционных тестов на основе Docker. После первоначальной работы по созданию образа докера Kafka (один из них, вероятно, уже доступен в сообществе), становится тривиальной настройка конвейеров интеграционных тестов. Конвейер может выглядеть так:

  • выполняются модульные тесты на основе приложений (хеш-логика)
  • как только они пройдут, ваш CI-сервер запустит Kafka
  • выполняются интеграционные тесты, проверяющие, что повторяющиеся записи отправляют только одно сообщение в тему.

Я думаю, что важно убедиться, что интеграционные тесты Kafka сведены к минимуму и включают ТОЛЬКО тесты, которые полностью полагаются на специфичные для Kafka функции. Даже при использовании docker-compose они могут быть на несколько порядков медленнее, чем модульные тесты, ~ 1 мс против 1 секунды? Еще одна вещь, которую следует учитывать, заключается в том, что накладные расходы на поддержку конвейера интеграции могут стоить риска веры в то, что Kakfa обеспечит дедупликацию темы, на которую она претендует.

person dm03514    schedule 31.10.2016

Вот пример автоматического тестирования в Python для функций, связанных с Kafka: https://github.com/up9inc/async-ms-demo/blob/main/grayscaler/tests.py

Он использует возможности Kafka Mock проекта http://mockintosh.io.

Отказ от ответственности: я связан с этим проектом.

person Andrey Pokhilko    schedule 21.06.2021

Чтобы имитировать модульные тесты Kafka под Python с тестовыми задачами SBT, я сделал, как показано ниже. Должен быть установлен Pyspark.

в build.sbt определить задачу, которая должна запускаться с тестами:

val testPythonTask = TaskKey[Unit]("testPython", "Run python tests.")

val command = "python3 -m unittest app_test.py"
val workingDirectory = new File("./project/src/main/python")

testPythonTask := {
  val s: TaskStreams = streams.value
  s.log.info("Executing task testPython")
  Process(command,
    workingDirectory,
    // arguments for using org.apache.spark.streaming.kafka.KafkaTestUtils in Python
    "PYSPARK_SUBMIT_ARGS" -> "--jars %s pyspark-shell"
      // collect all jar paths from project
      .format((fullClasspath in Runtime value)
      .map(_.data.getCanonicalPath)
        .filter(_.contains(".jar"))
        .mkString(",")),
    "PYSPARK_PYTHON" -> "python3") ! s.log
}

//attach custom test task to default test tasks
test in Test := {
  testPythonTask.value
  (test in Test).value
}

testOnly in Test := {
  testPythonTask.value
  (testOnly in Test).value
}

в тестовом примере Python (app_test.py):

import random
import unittest
from itertools import chain

from pyspark.streaming.kafka import KafkaUtils
from pyspark.streaming.tests import PySparkStreamingTestCase

class KafkaStreamTests(PySparkStreamingTestCase):
    timeout = 20  # seconds
    duration = 1

    def setUp(self):
        super(KafkaStreamTests, self).setUp()

        kafkaTestUtilsClz = self.ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader()\
            .loadClass("org.apache.spark.streaming.kafka.KafkaTestUtils")
        self._kafkaTestUtils = kafkaTestUtilsClz.newInstance()
        self._kafkaTestUtils.setup()

    def tearDown(self):
        if self._kafkaTestUtils is not None:
            self._kafkaTestUtils.teardown()
            self._kafkaTestUtils = None

        super(KafkaStreamTests, self).tearDown()

    def _randomTopic(self):
        return "topic-%d" % random.randint(0, 10000)

    def _validateStreamResult(self, sendData, stream):
        result = {}
        for i in chain.from_iterable(self._collect(stream.map(lambda x: x[1]),
                                                   sum(sendData.values()))):
            result[i] = result.get(i, 0) + 1

        self.assertEqual(sendData, result)

    def test_kafka_stream(self):
        """Test the Python Kafka stream API."""
        topic = self._randomTopic()
        sendData = {"a": 3, "b": 5, "c": 10}

        self._kafkaTestUtils.createTopic(topic)
        self._kafkaTestUtils.sendMessages(topic, sendData)

        stream = KafkaUtils.createStream(self.ssc, self._kafkaTestUtils.zkAddress(),
                                         "test-streaming-consumer", {topic: 1},
                                         {"auto.offset.reset": "smallest"})
        self._validateStreamResult(sendData, stream)

Дополнительные примеры для Flume, Kinesis и других в pyspark.streaming.tests.

person Eugene Lopatkin    schedule 24.01.2018