Какое поведение OTP следует использовать для бесконечного повторения задач?

Я хочу многократно запускать одну и ту же последовательность операций снова и снова рядом с приложением Phoenix (без сбоя всего веб-приложения, если что-то тормозит в воркере, конечно) и действительно не знаю, следует ли мне использовать GenServer, Elixir's Задачи, Агент или что-то совсем другое, о чем я пока не думал.

Когда я запускаю свое приложение Phoenix, также должен запускаться рабочий процесс, который периодически извлекает некоторые значения последовательного соединения, транслирует их через канал Phoenix, собирает их до тех пор, пока не будет достигнуто @save_interval, а затем вычисляет медиану, транслирует эту медиану через другой канал. и записывает его в InfluxDB. Прямо сейчас у меня есть что-то (вроде работы) вроде этого:

def do_your_thing(serial_pid) do
  Stream.interval(@interval_live)
    |> get_new_values_from_serial(serial_pid)
    |> broadcast!("live-channel:#{@name}")
    |> Enum.take(div(@interval_save, @interval_live))
    |> calculate_medians()
    |> broadcast!("update-channel:#{@name}")
    |> write_to_database()

  do_your_thing(serial_pid) # repeat
end

Я только начинаю разбираться во всем этом OTP и надеюсь, что кто-то из вас поможет мне наткнуться на правильное направление.


person optikfluffel    schedule 01.06.2015    source источник


Ответы (1)


Вы должны использовать GenServer, который отправляет сообщения через x секунд (60 секунд в приведенном ниже примере):

defmodule MyApp.Worker do
  use GenServer

  def start_link() do
    GenServer.start_link(__MODULE__, [])
  end

  def init([]) do
    schedule_work()
    {:ok, []}
  end

  def handle_info(:work, state) do
    state = do_work(state)
    schedule_work()
    {:noreply, state}
  end

  defp do_work(state) do
    # Do your work here and return state
  end

  defp schedule_work do
    Process.send_after(self(), :work, 60_000)
  end
end
person José Valim    schedule 01.06.2015
comment
Почему бы не Task, который периодически выполняет свою работу в бесконечном цикле (возможно, с помощью Stream.interval или Stream.repeatedly)? Если это просто периодическое извлечение, которое пересылает извлеченные данные дальше в систему, это не обязательно должен быть GenServer, верно. Task по-прежнему совместим с OTP, и мне он кажется более простым для работы. - person sasajuric; 01.06.2015
comment
Причина в том, что Task не может получать системные сообщения. Мы хотим, чтобы Stream и друзья знали об этом, но этого нет в 1.0 и, возможно, будет только в 1.3. - person José Valim; 01.06.2015
comment
Также вы можете захотеть, чтобы ваши супервайзеры могли перезапускать рабочего, выполняющего периодическую работу, а не просто позволять бесконечному потоку завершать текущий процесс, если что-то пойдет не так. - person Letseatlunch; 01.06.2015
comment
Хосе: Да, это хорошая мысль. @Letseatlunch Я не думаю, что это проблема. Супервайзер может запустить/остановить/перезапустить задачу. Использование опции завершения работы :brutal_kill, вероятно, имеет смысл для таких работников, поскольку они, вероятно, не будут обрабатывать сообщение о завершении работы. - person sasajuric; 02.06.2015
comment
@JoséValim Есть ли лучший способ сделать это в Эликсире 1.3? - person knite; 22.07.2016
comment
Спасибо, Хосе! Я рад видеть ответ на этот вопрос из наилучшего возможного источника. - person DCameronMauch; 04.10.2016