Проблемы Flink HA Cluster JobManager

У меня есть установка с кластером flink 1.2, состоящим из 3 менеджеров заданий и 2 диспетчеров задач. Я запускаю кворум Zookeeper из JobManager1, получаю подтверждение, что Zookeeper запускается на двух других менеджерах заданий, затем я запускаю задание Flink на этом JobManager1.

Flink-conf.yaml одинаков на всех 5 виртуальных машинах, это означает, что jobmanager.rpc.address: везде указывает на JobManager1.

Если я выключу виртуальную машину, на которой запущен JobManager1, я ожидаю, что Zookeeper скажет, что один из оставшихся менеджеров заданий является лидером, и диспетчеры задач должны повторно подключиться к нему. Вместо этого я получаю в журналах диспетчеров задач много таких сообщений.

2017-03-14 14:13:21,827 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Trying to register at JobManager akka.tcp://[email protected]:43660/user/jobmanager (attempt 11, timeout: 30 seconds)
2017-03-14 14:13:21,836 WARN  akka.remote.ReliableDeliverySupervisor                        - Association with remote system [akka.tcp://[email protected]:43660] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://[email protected]:43660]] Caused by: [Connection refused: /1.2.3.4:43660]

Я изменил исходный IP-адрес на 1.2.3.4 для конфиденциальности и потому, что это всегда один и тот же IP-адрес (JobManager1).

Еще журналы:

2017-03-15 10:28:28,655 INFO  org.apache.flink.core.fs.FileSystem                           - Ensuring all FileSystem streams are closed for Async calls on Source: Custom Source -> Flat Map (1/1)
2017-03-15 10:28:38,534 WARN  akka.remote.ReliableDeliverySupervisor                        - Association with remote system [akka.tcp://[email protected]:44779] has failed, address is now gated for [5000] ms. Reason: [Disassociated] 
2017-03-15 10:28:46,606 WARN  akka.remote.ReliableDeliverySupervisor                        - Association with remote system [akka.tcp://[email protected]:44779] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://[email protected]:44779]] Caused by: [Connection refused: /1.2.3.4:44779]
2017-03-15 10:28:52,431 WARN  akka.remote.ReliableDeliverySupervisor                        - Association with remote system [akka.tcp://[email protected]:44779] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://[email protected]:44779]] Caused by: [Connection refused: /1.2.3.4:44779]
2017-03-15 10:29:02,435 WARN  akka.remote.ReliableDeliverySupervisor                        - Association with remote system [akka.tcp://[email protected]:44779] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://[email protected]:44779]] Caused by: [Connection refused: /1.2.3.4:44779]
2017-03-15 10:29:10,489 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - TaskManager akka://flink/user/taskmanager disconnects from JobManager akka.tcp://[email protected]:44779/user/jobmanager: Old JobManager lost its leadership.
2017-03-15 10:29:10,490 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Cancelling all computations and discarding all cached data.
2017-03-15 10:29:10,491 INFO  org.apache.flink.runtime.taskmanager.Task                     - Attempting to fail task externally Source: Custom Source -> Flat Map (1/1) (75fd495cc6acfd72fbe957e60e513223).
2017-03-15 10:29:10,491 INFO  org.apache.flink.runtime.taskmanager.Task                     - Source: Custom Source -> Flat Map (1/1) (75fd495cc6acfd72fbe957e60e513223) switched from RUNNING to FAILED.
java.lang.Exception: TaskManager akka://flink/user/taskmanager disconnects from JobManager akka.tcp://[email protected]:44779/user/jobmanager: Old JobManager lost its leadership.
    at org.apache.flink.runtime.taskmanager.TaskManager.handleJobManagerDisconnect(TaskManager.scala:1074)
    at org.apache.flink.runtime.taskmanager.TaskManager.org$apache$flink$runtime$taskmanager$TaskManager$$handleJobManagerLeaderAddress(TaskManager.scala:1426)
    at org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:286)
    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
    at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:44)
    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
    at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
    at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
    at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
    at org.apache.flink.runtime.taskmanager.TaskManager.aroundReceive(TaskManager.scala:122)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
    at akka.dispatch.Mailbox.run(Mailbox.scala:220)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2017-03-15 10:29:10,512 INFO  org.apache.flink.runtime.taskmanager.Task                     - Triggering cancellation of task code Source: Custom Source -> Flat Map (1/1) (75fd495cc6acfd72fbe957e60e513223).
2017-03-15 10:29:10,515 INFO  org.apache.flink.runtime.taskmanager.Task                     - Attempting to fail task externally Flat Map (1/1) (dd555e0437867c3180a1ecaf0a9f4d04).
2017-03-15 10:29:10,515 INFO  org.apache.flink.runtime.taskmanager.Task                     - Flat Map (1/1) (dd555e0437867c3180a1ecaf0a9f4d04) switched from RUNNING to FAILED.
java.lang.Exception: TaskManager akka://flink/user/taskmanager disconnects from JobManager akka.tcp://[email protected]:44779/user/jobmanager: Old JobManager lost its leadership.
    at org.apache.flink.runtime.taskmanager.TaskManager.handleJobManagerDisconnect(TaskManager.scala:1074)
    at org.apache.flink.runtime.taskmanager.TaskManager.org$apache$flink$runtime$taskmanager$TaskManager$$handleJobManagerLeaderAddress(TaskManager.scala:1426)
    at org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$handleMessage$1.applyOrElse(TaskManager.scala:286)
    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
    at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:44)
    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
    at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
    at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
    at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
    at org.apache.flink.runtime.taskmanager.TaskManager.aroundReceive(TaskManager.scala:122)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
    at akka.dispatch.Mailbox.run(Mailbox.scala:220)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2017-03-15 10:29:10,516 INFO  org.apache.flink.runtime.taskmanager.Task                     - Triggering cancellation of task code Flat Map (1/1) (dd555e0437867c3180a1ecaf0a9f4d04).
2017-03-15 10:29:10,516 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Disassociating from JobManager
2017-03-15 10:29:10,525 INFO  org.apache.flink.runtime.blob.BlobCache                       - Shutting down BlobCache
2017-03-15 10:29:10,542 WARN  akka.remote.ReliableDeliverySupervisor                        - Association with remote system [akka.tcp://[email protected]:44779] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://[email protected]:44779]] Caused by: [Connection refused: /1.2.3.4:44779]
2017-03-15 10:29:10,546 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for Source: Custom Source -> Flat Map (1/1) (75fd495cc6acfd72fbe957e60e513223).
2017-03-15 10:29:10,548 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for Flat Map (1/1) (dd555e0437867c3180a1ecaf0a9f4d04).
2017-03-15 10:29:10,551 INFO  org.apache.flink.core.fs.FileSystem                           - Ensuring all FileSystem streams are closed for Flat Map (1/1)
2017-03-15 10:29:10,552 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Trying to register at JobManager akka.tcp://[email protected]:43893/user/jobmanager (attempt 1, timeout: 500 milliseconds)
2017-03-15 10:29:10,567 INFO  org.apache.flink.core.fs.FileSystem                           - Ensuring all FileSystem streams are closed for Source: Custom Source -> Flat Map (1/1)
2017-03-15 10:29:10,632 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Successful registration at JobManager (akka.tcp://[email protected]:43893/user/jobmanager), starting network stack and library cache.
2017-03-15 10:29:10,633 INFO  org.apache.flink.runtime.taskmanager.TaskManager              - Determined BLOB server address to be /1.2.3.5:42830. Starting BLOB cache.
2017-03-15 10:29:10,633 INFO  org.apache.flink.runtime.blob.BlobCache                       - Created BLOB cache storage directory /tmp/blobStore-d97e08db-d2f1-4f00-a7d1-30c2f5823934
2017-03-15 10:29:15,551 WARN  akka.remote.ReliableDeliverySupervisor                        - Association with remote system [akka.tcp://[email protected]:44779] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://[email protected]:44779]] Caused by: [Connection refused: /1.2.3.4:44779]
2017-03-15 10:29:20,571 WARN  akka.remote.ReliableDeliverySupervisor                        - Association with remote system [akka.tcp://[email protected]:44779] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://[email protected]:44779]] Caused by: [Connection refused: /1.2.3.4:44779]
2017-03-15 10:29:25,582 WARN  akka.remote.ReliableDeliverySupervisor                        - Association with remote system [akka.tcp://[email protected]:44779] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://[email protected]:44779]] Caused by: [Connection refused: /1.2.3.4:44779]
2017-03-15 10:29:30,592 WARN  akka.remote.ReliableDeliverySupervisor                        - Association with remote system [akka.tcp://[email protected]:44779] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://[email protected]:44779]] Caused by: [Connection refused: /1.2.3.4:44779]

Кто-нибудь знает, почему диспетчеры задач не пытаются повторно подключиться к одному из оставшихся диспетчеров заданий (например, 1.2.3.5 выше)?

Спасибо!


person razvan    schedule 14.03.2017    source источник
comment
Вы следовали инструкциям по этой ссылке: ci. apache.org/projects/flink/flink-docs-release-1.3/setup/, чтобы запустить кластер в режиме высокой доступности?   -  person Dawid Wysakowicz    schedule 14.03.2017
comment
Привет, Давид, да, похоже, проблема в stackoverflow.com/questions/42793598/.   -  person razvan    schedule 14.03.2017
comment
На самом деле я даже создал пиар, фиксируя это;) Постараюсь выговориться, чтобы поскорее слить. Пока он не будет объединен, просто удалите ветвь else первого внутреннего if. Вы можете посмотреть PR здесь: github.com/apache/flink/pull/3506 < / а>   -  person Dawid Wysakowicz    schedule 14.03.2017
comment
В любом случае, не могли бы вы закрыть один из этих двух вопросов? Не будем загрязнять ТАК;)   -  person Dawid Wysakowicz    schedule 14.03.2017
comment
Я добавил изменение в сценарий оболочки, кластер запускается в режиме высокой доступности, но я получаю такое же поведение, как TaskManager продолжает подключаться к убитому JobManager вместо доступных.   -  person razvan    schedule 15.03.2017
comment
Я ответил в вопросе. Я думаю, что все работает нормально, и только журналы могут немного вводить в заблуждение. Попробуйте использовать кластер до и после убийства JM, и вы увидите, что все в порядке.   -  person Dawid Wysakowicz    schedule 16.03.2017
comment
Это может вводить в заблуждение, но, что более важно, работа не возобновляется, поэтому мне понадобилась HA.   -  person razvan    schedule 16.03.2017
comment
HA в данном случае означает доступность кластера. Чтобы задание было перезапущено после такого сбоя, вы можете добавить RestartStrategy: ci.apache.org/projects/flink/flink-docs-release-1.2/dev/   -  person Dawid Wysakowicz    schedule 16.03.2017
comment
У меня это в коде env.setRestartStrategy (RestartStrategies.fixedDelayRestart (50, // количество // попыток перезапуска // // попыток Time.of (10, TimeUnit.SECONDS) // задержка)); и если я убью одного из диспетчеров задач, он возобновится на другом доступном   -  person razvan    schedule 16.03.2017
comment
Не могли бы вы опубликовать какой-нибудь минимальный пример кода, который где-то не может перезапустить задание в этом сценарии? Конечно, без каких-либо конфиденциальных данных, поскольку я не могу воссоздать это поведение, с помощью простой работы, которую я запускаю, все кажется перезапущенным правильно. Мои журналы очень похожи на ваши.   -  person Dawid Wysakowicz    schedule 16.03.2017
comment
Конечно, проверьте Jira, пожалуйста   -  person razvan    schedule 16.03.2017
comment
Хорошо, поэтому, во-первых, предоставленный код не является работающим примером, но в любом случае попытался запустить его с помощью kafka, все настройки среды и задание перезапускаются плавно. Я не думаю, что без ПОЛНЫХ журналов от всех ваших TaskManager и JobManager сможет вам помочь.   -  person Dawid Wysakowicz    schedule 16.03.2017


Ответы (1)


Для всех, кто сталкивается с одной и той же проблемой, HA требует, чтобы вы предоставили расположение DFS, доступное со всех узлов. У меня был каталог контрольных точек состояния серверной части и каталог хранилища zookeeper, указывающий на каждой виртуальной машине на местоположение локальной файловой системы, и когда один из менеджеров заданий вышел из строя, новый лидер не мог возобновить выполняемые задания из-за отсутствия информации / недоступного местоположения.

Изменить: поскольку это было задано, файл, который я изменил (в случае Apache Flink 1.2 (https://ci.apache.org/projects/flink/flink-docs-release-1.2/setup/config.html)) был

conf/flink-conf.yaml

Я установил

state.backend.fs.checkpointdir
high-availability.zookeeper.storageDir

к путям AWS S3, доступным как из диспетчеров задач, так и из диспетчеров заданий.

person razvan    schedule 24.03.2017
comment
Какие файлы вы редактировали, чтобы решить эту проблему? - person Luca Cappelletti; 26.01.2019
comment
conf / flink-conf.yaml (проверьте ci .apache.org / projects / flink / flink-docs-release-1.7 / ops /) - person razvan; 26.01.2019
comment
В частности, как вы приступили к изменению файла? Было бы действительно полезно, если бы вы могли подумать о том, чтобы дополнить свой текущий ответ этой дополнительной информацией. - person Luca Cappelletti; 26.01.2019