Различия в пропускной способности при использовании сопрограмм и потоков

Несколько дней назад я задал вопрос о SO о помощи в разработке парадигмы для структурирования нескольких HTTP-запросов.

Вот сценарий. Я хотел бы иметь систему с несколькими производителями и несколькими потребителями. Мои производители сканируют и очищают несколько сайтов и добавляют найденные ссылки в очередь. Поскольку я буду сканировать несколько сайтов, я хотел бы иметь несколько производителей/краулеров.

Потребители/работники питаются этой очередью, делают запросы TCP/UDP к этим ссылкам и сохраняют результаты в моей базе данных Django. Я также хотел бы иметь несколько рабочих, поскольку каждый элемент очереди полностью независим друг от друга.

Люди предложили использовать для этого библиотеку сопрограмм, например Gevent или Eventlet. Никогда не работая с сопрограммами, я читал, что хотя парадигма программирования похожа на поточные парадигмы, только один поток активно выполняется, но когда происходят блокирующие вызовы, такие как вызовы ввода-вывода, стеки переключаются в памяти, а другой зеленый поток берет на себя управление до тех пор, пока не столкнется с блокирующим вызовом ввода-вывода. Надеюсь, я правильно понял? Вот код из одного из моих сообщений SO:

import gevent
from gevent.queue import *
import time
import random

q = JoinableQueue()
workers = []
producers = []


def do_work(wid, value):
    gevent.sleep(random.randint(0,2))
    print 'Task', value, 'done', wid


def worker(wid):
    while True:
        item = q.get()
        try:
            print "Got item %s" % item
            do_work(wid, item)
        finally:
            print "No more items"
            q.task_done()


def producer():
    while True:
        item = random.randint(1, 11)
        if item == 10:
            print "Signal Received"
            return
        else:
            print "Added item %s" % item
            q.put(item)


for i in range(4):
    workers.append(gevent.spawn(worker, random.randint(1, 100000)))

# This doesn't work.
for j in range(2):
    producers.append(gevent.spawn(producer))

# Uncommenting this makes this script work.
# producer()

q.join()

Это хорошо работает, потому что вызовы sleep блокируют вызовы, а когда происходит событие sleep, другой зеленый поток берет на себя управление. Это намного быстрее, чем последовательное выполнение. Как видите, в моей программе нет кода, который намеренно передает выполнение одного потока другому потоку. Я не понимаю, как это вписывается в приведенный выше сценарий, поскольку я хотел бы, чтобы все потоки выполнялись одновременно.

Все работает нормально, но я чувствую, что пропускная способность, которой я добился с помощью Gevent/Eventlets, выше, чем у исходной последовательно работающей программы, но значительно ниже, чем та, которую можно было бы достичь с помощью реальной многопоточности.

Если бы я повторно реализовал свою программу, используя механизмы потоковой передачи, каждый из моих производителей и потребителей мог бы работать одновременно без необходимости менять местами стеки, как сопрограммы.

Следует ли это реализовать повторно с использованием потоков? Мой дизайн неправильный? Я не увидел реальных преимуществ использования сопрограмм.

Может быть, мои понятия немного мутны, но это то, что я усвоил. Любая помощь или разъяснение моей парадигмы и концепций были бы замечательными.

Спасибо


person Mridang Agarwalla    schedule 12.02.2012    source источник
comment
Почему бы не использовать несколько процессов?   -  person David Schwartz    schedule 12.02.2012
comment
Я не знаю плюсов и минусов многопоточности по сравнению с многопроцессорностью, поэтому я не знаю, хорошо это или нет.   -  person Mridang Agarwalla    schedule 12.02.2012
comment
в программах Python не существует такой вещи, как настоящая многопоточность (в любой момент времени выполняется только один фактический поток ОС) без использования расширений C (или тяжеловесных процессов ОС) из-за глобальной блокировки интерпретатора.   -  person    schedule 12.02.2012
comment
ваш производитель не дает контроля. Параллелизм невозможен, пока производители не закончат работу.   -  person jfs    schedule 12.02.2012


Ответы (3)


Как видите, в моей программе нет кода, который намеренно передает выполнение одного потока другому потоку. Я не понимаю, как это вписывается в приведенный выше сценарий, поскольку я хотел бы, чтобы все потоки выполнялись одновременно.

Существует один поток ОС, но несколько гринлетов. В вашем случае gevent.sleep() позволяет рабочим выполняться одновременно. Блокирующие вызовы ввода-вывода, такие как urllib2.urlopen(url).read(), делают то же самое, если вы используете urllib2 patched для работы с gevent (путем вызова gevent.monkey.patch_*()).

См. также A Curious Course on Coroutines and Concurrency, чтобы понять, как код может работать одновременно в однопоточная среда.

Чтобы сравнить различия в пропускной способности между gevent, threading, multiprocessing, вы можете написать код, совместимый со всеми подходами:

#!/usr/bin/env python
concurrency_impl = 'gevent' # single process, single thread
##concurrency_impl = 'threading' # single process, multiple threads
##concurrency_impl = 'multiprocessing' # multiple processes

if concurrency_impl == 'gevent':
    import gevent.monkey; gevent.monkey.patch_all()

import logging
import time
import random
from itertools import count, islice

info = logging.info

if concurrency_impl in ['gevent', 'threading']:
    from Queue import Queue as JoinableQueue
    from threading import Thread
if concurrency_impl == 'multiprocessing':
    from multiprocessing import Process as Thread, JoinableQueue

Остальная часть скрипта одинакова для всех реализаций параллелизма:

def do_work(wid, value):
    time.sleep(random.randint(0,2))
    info("%d Task %s done" % (wid, value))

def worker(wid, q):
    while True:
        item = q.get()
        try:
            info("%d Got item %s" % (wid, item))
            do_work(wid, item)
        finally:
            q.task_done()
            info("%d Done item %s" % (wid, item))

def producer(pid, q):
    for item in iter(lambda: random.randint(1, 11), 10):
        time.sleep(.1) # simulate a green blocking call that yields control
        info("%d Added item %s" % (pid, item))
        q.put(item)
    info("%d Signal Received" % (pid,))

Не выполняйте код на уровне модуля, поместите его в main():

def main():
    logging.basicConfig(level=logging.INFO,
                        format="%(asctime)s %(process)d %(message)s")

    q = JoinableQueue()
    it = count(1)
    producers = [Thread(target=producer, args=(i, q)) for i in islice(it, 2)]
    workers = [Thread(target=worker, args=(i, q)) for i in islice(it, 4)]
    for t in producers+workers:
        t.daemon = True
        t.start()

    for t in producers: t.join() # put items in the queue
    q.join() # wait while it is empty
    # exit main thread (daemon workers die at this point)

if __name__=="__main__":    
   main()
person jfs    schedule 12.02.2012
comment
Привет, Себастьян, я просмотрел свой код и увидел, что мои производители и потребители работают одновременно. Когда операция блокировки происходит в одном из моих гринлетов, она передает управление другим гринлетам. Я добавил отсутствующий вызов monkey_patch, чтобы модуль сокета тоже не блокировался, но я не могу получить достаточную нагрузку на свой процессор. У обычного ПК достаточно мощности, чтобы иметь больше одновременных подключений и больше зеленых точек, но мне не хватает скорости. Я очень потерян и сбит с толку, почему он не использует больше процессора и работает быстрее. Не могли бы вы помочь мне понять, пожалуйста? Я очень потерян. Спасибо. - person Mridang Agarwalla; 13.02.2012
comment
@Mridang Agarwalla: я прокомментировал код, который вы разместили в своем вопросе. producers не работают в нем одновременно. - person jfs; 13.02.2012
comment
@Mridang Agarwalla: если ваша проблема связана с вводом-выводом (диск, сеть), то не имеет значения, насколько быстр ваш процессор, например, если вы можете записывать на диск только со скоростью 50 МБ / с, тогда не имеет значения, что ваш процессор может обрабатывать 1 ГБ/с. Также ваша программа может потреблять другие конечные ресурсы, такие как количество открытых файлов. Если вы используете gevent, убедитесь, что все блокирующие вызовы отмечены зеленым, т. е. они не блокируются, например, ваш драйвер базы данных может быть несовместим с gevent. - person jfs; 13.02.2012
comment
Привет, Себастьян! Я попробовал простой фрагмент, используя Gevent с 10 000 зеленых элементов, который не имел сетевого или дискового ввода-вывода, но имел sleep вызовы и использовал много ресурсов ЦП. В этом примере я пришел к тому же предположению, что есть и другие незеленые блокирующие вызовы, т. е. единственное соединение с базой данных Django и отсутствие пула соединений. Есть ли способ узнать, какие части моего кода не зеленые? Один совет, который я получил, был от людей из SO, которые рекомендовали мне использовать что-то вроде pgPool для пула соединений в базе данных Postgres. - person Mridang Agarwalla; 13.02.2012
comment
@Mridang Agarwalla: 1. Пул соединений и одно соединение с БД, выделенное зеленым цветом, являются отдельными проблемами, но вы можете получить и то, и другое с помощью одного инструмента, если вам это нужно (я не знаю, что предпочтительнее для django+ гевент+постгрес). 2. Если запрос вызывает не зеленый блокирующий вызов, то он блокирует весь интерпретатор, и никакие другие запросы не могут быть обработаны до тех пор, пока он не вернется (другими словами, ваше приложение будет работать медленно, если вызов занимает значительное время). - person jfs; 13.02.2012
comment
@ J.F.Sebastian любопытно узнать, к какому выводу вы пришли. Я сейчас на том же месте, что и ты - person vumaasha; 13.09.2016
comment
@vumaasha: Вы имеете в виду, как бы я ответил на вопрос в заголовке? (мой ответ больше о конкретном коде и неправильных представлениях в теле вопроса). Если бы я попытался ответить на заголовок: единственный тест, который имеет значение, — это ваш код на вашем оборудовании. На протяжении многих лет были ответы в пользу обеих сторон. Хотя чем больше одновременных подключений вам нужно, тем более вероятно, что потоки ОС не являются ответом. Чтобы получить максимальную пропускную способность: найти узкое место, устранить его, повторить. Иногда достаточно приобрести более качественный кабель, иногда нужно переделывать весь проект. Вопрос слишком широкий. - person jfs; 13.09.2016

gevent отлично подходит, когда у вас очень много (зеленых) тем. Я протестировал его с тысячами, и он работал очень хорошо. вы должны убедиться, что все библиотеки, которые вы используете как для очистки, так и для сохранения в БД, становятся зелеными. afaik, если они используют сокет python, инъекция gevent должна работать. Однако расширения, написанные на C (например, mysqldb), будут блокироваться, и вместо этого вам нужно будет использовать зеленые эквиваленты.

если вы используете gevent, вы можете в основном покончить с очередями, создать новый (зеленый) поток для каждой задачи, код для потока будет таким же простым, как db.save(web.get(address)). gevent позаботится о вытеснении, когда какая-либо библиотека в db или веб-блоках. он будет работать до тех пор, пока ваши задачи помещаются в память.

person Dima Tisnek    schedule 12.02.2012

В этом случае ваша проблема связана не со скоростью программы (т.е. с выбором gevent или threading), а с пропускной способностью сетевого ввода-вывода. Это (должно быть) узкое место, определяющее скорость работы программы.

Gevent — это хороший способ убедиться, что является узким местом, а не архитектура вашей программы.

Это тот процесс, который вам нужен:

import gevent
from gevent.queue import Queue, JoinableQueue
from gevent.monkey import patch_all


patch_all()  # Patch urllib2, etc


def worker(work_queue, output_queue):
    for work_unit in work_queue:
        finished = do_work(work_unit)
        output_queue.put(finished)
        work_queue.task_done()


def producer(input_queue, work_queue):
    for url in input_queue:
        url_list = crawl(url)
        for work in url_list:
            work_queue.put(work)
        input_queue.task_done()


def do_work(work):
    gevent.sleep(0)  # Actually proces link here
    return work


def crawl(url):
    gevent.sleep(0)
    return list(url)  # Actually process url here

input = JoinableQueue()
work = JoinableQueue()
output = Queue()

workers = [gevent.spawn(worker, work, output) for i in range(0, 10)]
producers = [gevent.spawn(producer, input, work) for i in range(0, 10)]


list_of_urls = ['foo', 'bar']

for url in list_of_urls:
    input.put(url)

# Wait for input to finish processing
input.join()
print 'finished producing'
# Wait for workers to finish processing work
work.join()
print 'finished working'

# We now have output!
print 'output:'
for message in output:
    print message
# Or if you'd like, you could use the output as it comes!

Вам не нужно ждать завершения очереди ввода и обработки, я только что продемонстрировал это здесь.

person Ivo    schedule 12.02.2012