Puma спит важный поток при загрузке приложения rails

Я запускаю Rails 3 с Ruby 2.3.3 на puma с postgresql. У меня есть файл initializer/twitter.rb, который запускает поток при загрузке с потоковым API для твиттера. Когда я использую rails server для запуска своего приложения, потоковая передача в Твиттере работает, и я могу получить доступ к своему веб-сайту, как обычно. (Если я не помещаю потоковую передачу в другой поток, потоковая передача работает, но я не могу просматривать свое приложение в браузере, поскольку поток блокируется потоком твиттера). Но когда я использую puma -C config/puma.rb для запуска своего приложения, я получаю следующее сообщение о том, что мой поток был найден при запуске и переведен в спящий режим. Как я могу указать Puma разрешить мне запускать этот поток в фоновом режиме при запуске?

инициализатор/twitter.rb

### START TWITTER THREAD ### if production

if Rails.env.production?
  puts 'Starting Twitter Stream...'
  Thread.start {
    twitter_stream.user do |object|
      case object
        when Twitter::Tweet
          handle_tweet(object)
        when Twitter::DirectMessage
          handle_direct_message(object)
        when Twitter::Streaming::Event
          puts "Received Event: #{object.to_yaml}"
        when Twitter::Streaming::FriendList
          puts "Received FriendList: #{object.to_yaml}"
        when Twitter::Streaming::DeletedTweet
          puts "Deleted Tweet: #{object.to_yaml}"
        when Twitter::Streaming::StallWarning
          puts "Stall Warning: #{object.to_yaml}"
        else
          puts "It's something else: #{object.to_yaml}"
      end
    end
  }
end

конфиг/puma.rb

workers Integer(ENV['WEB_CONCURRENCY'] || 2)
threads_count = Integer(ENV['RAILS_MAX_THREADS'] || 5)
threads threads_count, threads_count

preload_app!

rackup      DefaultRackup
port        ENV['PORT']     || 3000
environment ENV['RACK_ENV'] || 'development'


on_worker_boot do
  # Valid on Rails up to 4.1 the initializer method of setting `pool` size
  ActiveSupport.on_load(:active_record) do
    config = ActiveRecord::Base.configurations[Rails.env] ||
    Rails.application.config.database_configuration[Rails.env]
    config['pool'] = ENV['RAILS_MAX_THREADS'] || 5
    ActiveRecord::Base.establish_connection(config)
  end
end

Сообщение при запуске

2017-04-19T23:52:47.076636+00:00 app[web.1]: Connecting to database specified by DATABASE_URL
2017-04-19T23:52:47.115595+00:00 app[web.1]: Starting Twitter Stream...
2017-04-19T23:52:47.229203+00:00 app[web.1]: Received FriendList: --- !ruby/array:Twitter::Streaming::FriendList []
2017-04-19T23:52:47.865735+00:00 app[web.1]: [4] * Listening on tcp://0.0.0.0:13734
2017-04-19T23:52:47.865830+00:00 app[web.1]: [4] ! WARNING: Detected 1 Thread(s) started in app boot:
2017-04-19T23:52:47.865870+00:00 app[web.1]: [4] ! #<Thread:0x007f4df8bf6240@/app/config/initializers/twitter.rb:135 sleep> - /app/vendor/ruby-2.3.3/lib/ruby/2.3.0/openssl/buffering.rb:125:in `sysread'
2017-04-19T23:52:47.875056+00:00 app[web.1]: [4] - Worker 0 (pid: 7) booted, phase: 0
2017-04-19T23:52:47.865919+00:00 app[web.1]: [4] Use Ctrl-C to stop
2017-04-19T23:52:47.882759+00:00 app[web.1]: [4] - Worker 1 (pid: 11) booted, phase: 0
2017-04-19T23:52:48.148831+00:00 heroku[web.1]: State changed from starting to up

Заранее спасибо за помощь. Я просмотрел несколько других сообщений, в которых упоминается WARNING: Detected 1 Thread(s) started in app boot, но в ответах говорится игнорировать предупреждение, если тема не важна. В моем случае тред очень важен и мне нужно, чтобы этот тред не спал.


person Todd Sutter    schedule 20.04.2017    source источник
comment
Обновление: я узнал, что такое спящий поток, и это не проблема. Однако я не знаю, с чего начать свою ветку в Твиттере. Должен ли он оставаться в инициализаторе? Должен ли я использовать отдельный процесс (фоновое приложение)? Должен ли я запускать твиттер-поток на каждом работнике Puma?   -  person Todd Sutter    schedule 20.04.2017


Ответы (2)


Из вашего кода я думаю, что у вас есть более серьезная проблема, чем спящий поток ... что, я думаю, может быть вызвано тем фактом, что некоторые вещи неправильно названы, а другие просто не часто учитываются при использовании веб-фреймворка.

В мире серверов «воркеры» относятся к forked процессам, которые выполняют задачи, связанные с сервером, часто принимая новые соединения и обрабатывая веб-запросы.

НО - fork не дублирует темы! - новый процесс (рабочий) начинается только с одного единственного потока, который является копией потока, вызвавшего fork.

Это связано с тем, что процессы не используют общую память (обычно). Любые глобальные данные, которые у вас есть в процессе, являются частными для этого процесса (т. Е. Если вы сохраняете подключенные клиенты веб-сокетов в массиве, этот массив различен для каждого «работника»).

Ничего не поделаешь, это часть того, как спроектированы ОС и fork.

Таким образом, предупреждение нельзя обойти — это указание на конструктивный недостаток приложения (!).

Например, в вашем текущем дизайне (при условии, что поток не находится в спящем режиме) метод handle_tweet будет вызываться только для исходного серверного процесса и не будет вызываться для какого-либо рабочего процесса.

Если вы используете pub/sub, вам нужно только одно соединение twitter_stream для всего приложения (независимо от того, сколько серверов или рабочих есть у вашего приложения) — возможно, процесс twitter_stream (или фоновое приложение) будет лучше, чем поток.

Но если вы реализуете handle_tweet специфическим для процесса способом, т. е. отправляя сообщение каждому подключенному клиенту, сохраненному в массиве, вам необходимо убедиться, что каждый «работник» инициирует поток twitter_stream (!).

Когда я писал Iodine (отличный от Puma сервер), я обрабатывал эти варианты использования с помощью Iodine.runметод, который откладывает выполнение задач на потом. «Сохраненная» задача должна выполняться только после инициализации рабочих процессов и запуска цикла обработки событий, поэтому она выполняется в каждом процессе (что позволяет запускать новый поток в каждом процессе).

i.e.

Iodine.run do
   Thread.start do
    twitter_stream.user do |object|
    # ...
    end
   end
end

Я предполагаю, что у Puma есть похожее решение. Насколько я понимаю из документации Puma Clustered-Mode, добавление следующего блока в ваш config/puma.rb может помочь:

# config/puma.rb
on_worker_boot do
  Thread.start do
   twitter_stream.user do |object|
   # ...
   end
  end
end

Удачи!


EDIT: относительно комментария о twitter_stream с использованием ActiveRecord

Из комментариев я понял, что обратные вызовы twitter_stream хранят данные в базе данных, а также обрабатывают события или уведомления «push».

Хотя эти две проблемы связаны, они очень отличаются друг от друга.

Например, обратные вызовы twitter_stream должны сохранять данные в базе данных только один раз. Даже если ваше приложение разрастется до миллиарда пользователей, вам нужно будет сохранить данные в базе данных один раз.

Это означает, что обратные вызовы twitter_stream должны иметь свой собственный выделенный процесс, который запускается только один раз, возможно, отдельно от основного приложения.

Сначала, пока вы ограничиваете свое приложение одним (работает только один сервер/приложение), вы можете использовать fork вместе со сценарием initializer/twitter.rb... т.е.:

### START TWITTER PROCESS ### if production
if Rails.env.production?
  puts 'Starting Twitter Stream...'
  Process.fork do
    twitter_stream.user do |object|
      # ...
    end
  end
end

С другой стороны, уведомления должны быть адресованы конкретному пользователю по определенному соединению, принадлежащему определенному процессу.

Следовательно, уведомления должны быть отдельной задачей от twitter_stream обновления базы данных, и они должны выполняться в фоновом режиме каждого процесса, используя on_worker_boot (или Iodine.run), описанный выше.

Для этого вы можете on_worker_boot запустить фоновый поток, который будет прослушивать службу публикации/подписки, такую ​​как Redis, в то время как twitter_stream обратные вызовы «публикуют» обновления для службы публикации/подписки.

Это позволит каждому процессу просмотреть обновление и проверить, принадлежит ли какое-либо из соединений, которыми он «владеет», клиенту, который должен быть уведомлен об обновлении.

person Myst    schedule 20.04.2017
comment
Спасибо за помощь. Так как у меня два воркера, когда я помещаю свой поток в блок on_worker_boot, твиттер-поток запускается дважды. Все хорошо? Будет ли в этом случае все дублироваться? Метод handle_tweet следует вызывать только один раз, так как он сохраняет информацию в Active Record для каждого твита и потенциально отправляет прямое сообщение обратно пользователю. Если мне нужно использовать совершенно другой процесс (фоновое приложение) для потока в Твиттере, есть ли у вас какие-либо указания о том, как начать с этого? Спасибо всем за помощь и извините за мое невежество в этом вопросе. - person Todd Sutter; 20.04.2017
comment
Привет @ToddSutter, спасибо за комментарий. Я обновил свой ответ, чтобы отразить тот факт, что обратные вызовы twitter_stream хранят данные в базе данных. - person Myst; 20.04.2017
comment
Дополнительная информация чрезвычайно полезна. Мой твиттер-поток в основном представляет собой интерфейс базы данных, поэтому я попытаюсь запустить отдельный процесс, такой как описанный выше Process.fork. Я не знал, как использовать или даже что такое отдельный процесс, поэтому я рад, что обратился! Я проверю это в ближайшее время и вернусь к вам. - person Todd Sutter; 21.04.2017

Как я читаю ваш вопрос, это не похоже на проблему. спящий поток отличается от мертвого потока. Сон просто означает, что поток ожидает бездействия, не потребляя никакого процессора. Если все остальное подключено правильно, то, как только API-интерфейс Twitter обнаружит событие, он должен разбудить поток, запустить любой определенный вами обработчик, а затем снова вернуться в спящий режим. Сон — это не «работа в фоновом режиме», а «ожидание того, что что-то произойдет (например, кто-то твитнет @me.), чтобы я мог работать в фоновом режиме».

Быстрый пример, чтобы продемонстрировать это:

2.4.0 :001 > t = Thread.new { TCPServer.new(1234).accept ; puts "Got a connection! Dying..." }
 => #<Thread:0x007fa3941fed90@(irb):1 sleep> 
2.4.0 :002 > t
 => #<Thread:0x007fa3941fed90@(irb):1 sleep> 
2.4.0 :003 > t
 => #<Thread:0x007fa3941fed90@(irb):1 sleep> 
2.4.0 :004 > TCPSocket.new 'localhost', 1234
 => #<TCPSocket:fd 35> 
2.4.0 :005 > Got a connection! Dying...
t
 => #<Thread:0x007fa3941fed90@(irb):1 dead> 

Сон просто означает «ожидание действия».


Puma — это сервер на основе потоков, и он очень внимательно относится к раскрутке потоков в процессе загрузки, поэтому предупреждение о потоке, запущенном при загрузке приложения.

Как бы то ни было, довольно странно иметь поток, прослушивающий обновления от такого API на веб-сервере. Может быть, вам следует подумать о том, чтобы рабочий процесс обрабатывал события твиттера, используя что-то вроде Resque? Или, может быть, для вашего варианта использования подходит ActionCable?

person Glyoko    schedule 20.04.2017
comment
Итак, не могли бы вы предложить мне запустить другой сервер, который заботится только об этом API Twitter, отдельно от моего веб-сервера? Я очень новичок во всем этом (если это еще не очевидно). Или эти модули, Resque и ActionCable, являются плагинами для моего веб-сервера? - person Todd Sutter; 20.04.2017
comment
Это действительно зависит от того, что именно делают ваши обратные вызовы. Сообщают ли они что-то обратно пользователю? Сохраняют ли записи в БД? Они делают что-то совсем другое? Также важно, сколько времени вы готовы инвестировать в свое приложение. Наличие если не второго сервера, то, по крайней мере, отдельного процесса, обрабатывающего эти запросы, является более стандартным и строгим способом ведения дел, поскольку эти обратные вызовы не используются для ответа на немедленный запрос. С точки зрения архитектуры это немного больше работы, но это предотвращает подобные вопросы, разделяя проблемы сервера и рабочего процесса. - person Glyoko; 20.04.2017
comment
Примечание: Puma не является сервером на основе потоков, а сервером на основе реактора. Puma использует цикл обработки событий и ограниченный пул потоков вместо поточной серверной конструкции, предусматривающей поток для каждого соединения. Puma также поддерживает кластерный режим (с использованием процессов для параллелизма). - person Myst; 20.04.2017
comment
Спасибо за ответы! Очень признателен! Мне вообще не нужно отвечать пользователю, это скорее взаимодействие с базой данных. Похоже, мне нужно запустить отдельный процесс для обработки потока твиттера. - person Todd Sutter; 21.04.2017