Kafka, распределенная платформа потоковой передачи данных, использует ZooKeeper. ZooKeeper отвечает за управление узлами Kafka в кластере.

Давайте посмотрим, как настроить Kafka на вашем компьютере. Прежде чем мы сможем что-то делать с Kafka, нам нужно настроить ZooKeeper.

Я использую ноутбук с Linux. Таким образом, приведенные здесь команды должны нормально работать с большинством систем Linux.

Установите и запустите Apache ZooKeeper

Загрузите сжатый файл Apache ZooKeeper (.tgz) с официальных зеркал Apache. После загрузки файла проверьте подпись файла, чтобы убедиться в его подлинности. Для этого выполните следующую команду в терминале Linux:

gpg –print-md SHA1 имя_загруженного_файла

Эта команда сгенерирует хэш SHA1 для загруженного файла. Сверьте сгенерированный хэш с файлом хэша, доступным на официальном зеркале apache.

Как только хеш будет проверен, мы готовы распаковать файл и использовать его. Чтобы распаковать файл .tgz, выполните следующую команду в терминале Linux:

tar xvzf имя_загруженного_файла

После распаковки файла создайте файл с именем zoo.cfg в каталоге conf в несжатом каталоге со следующими строками.

tickTime=2000
dataDir=/var/zookeeper
clientPort=2181

Этот файл является файлом конфигурации ZooKeeper. Затем cd в созданный каталог, затем cd в каталог bin и запустите скрипт zkServer.sh.

cd zookeeper-3.4.13/bin/
./zkServer.sh start

Итак, мы запустили сервер ZooKeeper.

Установите и запустите Apache Kafka

Загрузите сжатый файл Apache Kafka (.tgz) с зеркал Apache аналогично тому, что мы сделали для ZooKeeper. Проверьте хэш подписи загруженного файла и распакуйте файл, используя те же команды, которые мы использовали для ZooKeeper.

В терминале Linux cd перейдите в несжатый каталог apache kafka.

cd кафка_2.12–2.1.1

Запустите следующую команду, чтобы запустить сервер Kafka:

bin/kafka-server-start.sh config/server.properties

Следующий шаг — создать тему на сервере Kafka. Чтобы начать тему простым способом с одним фактором репликации и одним разделом:

bin/kafka-topics.sh — создать — zookeeper localhost:2181 — коэффициент репликации 1 — разделы 1 — тестирование темы

Начать продюсер Kafka

Сообщение исходит от производителя Kafka. Чтобы запустить производителя, выполните следующую команду в терминале Linux:

bin/kafka-console-producer.sh — список брокеров localhost:9092 — тема тестирования

Эта команда запустит приглашение, в котором вы можете что-то ввести. Мы будем использовать это позже, чтобы отправить сообщение узлу kafka.

Запустите потребителя Kafka

Запустите потребителя Kafka, используя приведенную ниже команду, с вашего терминала Linux, возможно, в терминале, отличном от того, где работает ваш производитель kafka.

bin/kafka-console-consumer.sh — bootstrap-server localhost:9092 — тестирование темы — с самого начала

Как только потребитель запустится, введите что-нибудь в терминал, где работает производитель. Это отразится на потребительском терминале.

Kafka с Spring Boot

Теперь, когда мы знаем, как запустить производителя и потребителя через терминал, давайте перейдем к созданию производителя и потребителя в приложении Spring Boot. Мы собираемся написать приложение Spring Boot на Java и с помощью Maven.

Чтобы создать базовое приложение Spring Boot, перейдите на https://start.spring.io/, выберите соответствующие параметры и зависимости и просто нажмите кнопку Создать проект. Это даст zip-файл с базовым приложением Spring Boot. Сейчас нам понадобится зависимость spring-kafka.

Kafka Producer в Spring Boot

Нам нужно установить ряд свойств для нашего bean-компонента KafkaProducer. Имя и значение свойств производителя приведены ниже:

  • bootstrap.servers — список пар хост/порт для использования для установления начального подключения к кластеру Kafka.
  • acks — количество подтверждений, которое производитель требует от лидера, прежде чем считать запрос выполненным.
  • повторные попытки — установка значения больше нуля приведет к тому, что клиент повторно отправит любую запись, отправка которой завершилась неудачно, с потенциально временной ошибкой.
  • batch.size — производитель попытается объединить записи в меньшее количество запросов всякий раз, когда несколько записей отправляются в один и тот же раздел.
  • linger.ms — производитель объединяет все записи, поступающие между передачами запроса, в один пакетный запрос.
  • buffer.memory — общее количество байтов памяти, которое производитель может использовать для буферизации записей, ожидающих отправки на сервер.
  • key.serializer — класс сериализатора для ключа, который реализует интерфейс org.apache.kafka.common.serialization.Serializer.
  • value.serializer — класс сериализатора для значения, который реализует интерфейс org.apache.kafka.common.serialization.Serializer.

С указанными выше свойствами мы можем создать bean-компонент KafkaProducer и отправлять сообщения, как показано ниже.

Kafka Consumer в Spring Boot

Чтобы создать потребительский компонент Kafka, нам нужно установить следующие свойства.

  • bootstrap.servers — список пар хост/порт для использования для установления начального подключения к кластеру Kafka.
  • group.id — уникальная строка, идентифицирующая группу потребителей, к которой принадлежит этот потребитель.
  • enable.auto.commit — если true, смещение потребителя будет периодически фиксироваться в фоновом режиме.
  • auto.commit.interval.ms — частота в миллисекундах, с которой смещения потребителя автоматически фиксируются в Kafka, если для enable.auto.commit установлено значение true.
  • max.poll.records — максимальное количество записей, возвращаемых одним вызовом poll().
  • key.serializer — класс сериализатора для ключа, который реализует интерфейс org.apache.kafka.common.serialization.Serializer.
  • value.serializer — класс сериализатора для значения, который реализует интерфейс org.apache.kafka.common.serialization.Serializer.

С установленными выше свойствами мы можем создать bean-компонент KafkaConsumer, который мы можем использовать для прослушивания (опроса) кластера Kafka, как показано ниже.

Полная версия вышеупомянутого приложения доступна по адресу https://github.com/TheLoneKing/kafka-demo.