Разделять блокировки/семафоры gevent между потоками ThreadPool?

Есть ли способ сделать блокировку, которую можно использовать как между гринлетами, так и потоками ThreadPool?

В частности, мое приложение в основном основано на gevent, но некоторые части должны выполняться в «настоящем» потоке… но это вызывает проблемы с обработчиками logging, поскольку они используют семафор вокруг некоторых операций, что приводит к исключению, похожему на:

  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/logging/__init__.py", line 1300, in callHandlers
    hdlr.handle(record)
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/logging/__init__.py", line 742, in handle
    self.acquire()
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/logging/__init__.py", line 693, in acquire
    self.lock.acquire()
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py", line 128, in acquire
    rc = self.__block.acquire(blocking)
  File "_semaphore.pyx", line 112, in gevent._semaphore.Semaphore.acquire (gevent/gevent._semaphore.c:2984)
  File "…/gevent/hub.py", line 331, in switch
    return greenlet.switch(self)
LoopExit: This operation would block forever

Я подозреваю, что это происходит, когда поток A удерживает блокировку, а поток B пытается ее получить. Заметив, что блокировка уже удерживается, поток B пытается hub.switch()… но поскольку в потоке B hub есть только один гринлет, возникает исключение "блокировать навсегда". поднятый.

Так! Можно ли что-нибудь с этим сделать? Или я застрял?


person David Wolever    schedule 01.03.2014    source источник
comment
Я столкнулся с очень похожей проблемой, github.com/numba/numba/issues/1556 . Было бы неплохо увидеть решение для этого, действительно.   -  person Michael    schedule 01.12.2015


Ответы (1)


Я не могу быть уверен, что этот фрагмент кода считается пулом. Но проверьте это.

Вся суть gevent в том, что он асинхронный. Например, если вам нужно запросить 100 html-страниц (без gevent). Вы делаете первый запрос на первую страницу, и ваш интерпретатор Python зависает до тех пор, пока ответ не будет готов. Таким образом, gevent позволяет заморозить вывод первого запроса и перейти ко второму, что означает не терять время. Таким образом, мы можем легко исправить все здесь. Но если вам нужно записать результаты запросов в базу данных (например, Couchdb, Couchdb имеет ревизии, а значит, документы должны быть изменены синхронно). Здесь мы можем использовать семафор.

Давайте напишем чертов код (вот синхронный пример):

import os
import requests
import time

start = time.time()
path = os.path.dirname(os.path.abspath(__file__))

test_sites = [
    'https://vimeo.com/',
    'https://stackoverflow.com/questions/22108576/share-gevent-locks-semaphores-between-threadpool-threads',
    'http://www.gevent.org/gevent.monkey.html#gevent.monkey.patch_all',
    'https://www.facebook.com/',
    'https://twitter.com/',
    'https://www.youtube.com/',
    'https://zaxid.net/',
    'https://24tv.ua/',
    'https://zik.ua/',
    'https://github.com/'
]

# request each site and  write request status into file
def process_each_page(html_page):
    # all requests are executed synchronously
    response = requests.get(html_page)
    with open(path + '/results_no_sema.txt', 'a') as results_file:
        results_file.write(str(response.status_code) + ' ' +html_page +'\n')

for page in test_sites:
    process_each_page(page)

print(time.time() - start)

Вот аналоговый код с участием gevent:

from gevent import monkey
monkey.patch_all()

import gevent
import os
import requests

from gevent.lock import Semaphore
import time

start = time.time()

path = os.path.dirname(os.path.abspath(__file__))
gevent_lock = Semaphore()

test_sites = [
    'https://vimeo.com/',
    'https://stackoverflow.com/questions/22108576/share-gevent-locks-semaphores-between-threadpool-threads',
    'http://www.gevent.org/gevent.monkey.html#gevent.monkey.patch_all',
    'https://www.facebook.com/',
    'https://twitter.com/',
    'https://www.youtube.com/',
    'https://zaxid.net/',
    'https://24tv.ua/',
    'https://zik.ua/',
    'https://github.com/'
]

# request each site and  write request status into file
def process_each_page(html_page):
    # here we dont need lock
    response = requests.get(html_page)

    gevent_lock.acquire()
    with open(path + '/results.txt', 'a') as results_file:
        results_file.write(str(response.status_code) + ' ' +html_page +'\n')
    gevent_lock.release()


gevent_greenlets = [gevent.spawn(process_each_page, page) for page in test_sites]
gevent.joinall(gevent_greenlets)
print(time.time() - start)

Теперь давайте обнаружим выходные файлы. Это из синхронных результатов.

введите здесь описание изображения

И это из сценария, в котором участвовал gevent. введите здесь описание изображения

Как видите, при использовании gevent ответы приходили не по порядку. Итак, чей ответ пришел первым, тот и был записан в файл первым. Основная часть показывает, сколько времени мы сэкономили при использовании gevent.

введите здесь описание изображения

NOTA-BENE: в приведенном выше примере нам не нужно блокировать запись (добавление) в файл. Но для CouchDB это необходимо. Таким образом, когда вы используете Semaphore с CouchDB с документами get-save, вы не получаете конфликтов документов!

person Taras Vaskiv    schedule 19.01.2018