Загрузка изображений с помощью gevent

Моя задача - загрузить более 1 млн изображений из заданного списка URL-адресов. Каков рекомендуемый способ сделать это?

Прочитав Greenlet Vs. Потоки Я просмотрел gevent, но мне не удалось его надежно запустить. Я поиграл с тестовым набором из 100 URL-адресов, и иногда он завершается за 1,5 с, но иногда требуется более 30 с, что странно, поскольку время ожидания * на запрос составляет 0,1, поэтому оно никогда не должно занимать более 10 с.

* см. ниже в коде

Я также изучил grequests, но у них, похоже, есть проблемы с обработкой исключений.

Мои «требования» заключаются в том, что я могу

  • проверить ошибки, возникающие при загрузке (тайм-ауты, поврежденные изображения...),
  • следить за ходом обработки количества обработанных изображений и
  • быть как можно быстрее.
from gevent import monkey; monkey.patch_all()
from time import time
import requests
from PIL import Image
import cStringIO
import gevent.hub
POOL_SIZE = 300


def download_image_wrapper(task):
    return download_image(task[0], task[1])

def download_image(image_url, download_path):
    raw_binary_request = requests.get(image_url, timeout=0.1).content
    image = Image.open(cStringIO.StringIO(raw_binary_request))
    image.save(download_path)

def download_images_gevent_spawn(list_of_image_urls, base_folder):
    download_paths = ['/'.join([base_folder, url.split('/')[-1]])
                      for url in list_of_image_urls]
    parameters = [[image_url, download_path] for image_url, download_path in
             zip(list_of_image_urls, download_paths)]
    tasks = [gevent.spawn(download_image_wrapper, parameter_tuple) for parameter_tuple in parameters]
    for task in tasks:
        try:
            task.get()
        except Exception:
            print 'x',
            continue
        print '.',

test_urls = # list of 100 urls

t1 = time()
download_images_gevent_spawn(test_urls, 'download_temp')
print time() - t1

person Framester    schedule 04.11.2015    source источник
comment
Обязательно ли использовать нити? Если вы можете использовать несколько процессов вместо этого, вы можете сделать это с помощью multiprocessing.Pool, и вы также можете найти это проще. Я использую pool.map(download_image, url_list) и pool.join(), чтобы сделать что-то подобное.   -  person foz    schedule 09.11.2015
comment
@foz, спасибо, но я также пробовал multiprocessing.Pool с похожими проблемами. Также мне сказали, что multiprocessing не подходит для таких задач: stackoverflow.com/a/27016937/380038   -  person Framester    schedule 09.11.2015
comment
Интересный! Я вижу, что многопроцессорность не так эффективна/масштабируема, но я не понимаю, почему она не должна работать со скромным размером пула (32, как у вас). Надеюсь, вы получите хороший ответ на этот вопрос, так как я думаю, что тоже кое-чему научусь!   -  person foz    schedule 09.11.2015
comment
Я хочу загрузить 12 млн изображений, поэтому я хочу сделать это максимально эффективно.   -  person Framester    schedule 09.11.2015
comment
Вы смотрели на trollius pypi.python.org/pypi/trollius?   -  person Padraic Cunningham    schedule 10.11.2015
comment
могу ли я порекомендовать опубликовать ваш код в codereview? не то, чтобы это не по теме здесь (это не так), но это был бы отличный вопрос и для этого сайта, и вы, вероятно, могли бы получить отличные ответы для повышения эффективности алгоритмов.   -  person Joseph Farah    schedule 18.11.2015
comment
Код не работает правильно в соответствии с OP, и поэтому не будет обсуждаться в Code Review.   -  person Phrancis    schedule 18.11.2015


Ответы (3)


Думаю, будет лучше придерживаться urllib2, на примере https://github.com/gevent/gevent/blob/master/examples/concurrent_download.py#L1

Попробуйте этот код, я полагаю, это то, о чем вы спрашиваете.

import gevent
from gevent import monkey

# patches stdlib (including socket and ssl modules) to cooperate with other greenlets
monkey.patch_all()

import sys

urls = sorted(chloya_files)

if sys.version_info[0] == 3:
    from urllib.request import urlopen
else:
    from urllib2 import urlopen


def download_file(url):
    data = urlopen(url).read()
    img_name = url.split('/')[-1]
    with open('c:/temp/img/'+img_name, 'wb') as f:
        f.write(data)
    return True


from time import time

t1 = time()
tasks = [gevent.spawn(download_file, url) for url in urls]
gevent.joinall(tasks, timeout = 12.0)
print "Sucessful: %s from %s" % (sum(1 if task.value else 0 for task in tasks), len(tasks))
print time() - t1
person Alex Yu    schedule 16.11.2015
comment
Спасибо, я попробовал этот код с urlopen(..., timeout=0.1), но для 1000 URL-адресов все равно потребовалось более 100 секунд, что указывает мне на то, что он не выполнял запросы параллельно. - person Framester; 16.11.2015
comment
Может это проблемы с сетью? В моем тесте на 139 файлов с какого-то чешского сайта ушло 10,1 секунды. У меня тоже были сомнения по поводу параллелизма, но теперь я думаю, что я был ограничен удаленным веб-сервером, а не gevent-urlib2 - person Alex Yu; 16.11.2015

Существует простое решение с использованием gevent и Requests простых запросов.

Используйте Requests Session для постоянного соединения HTTP. Поскольку gevent делает Requests асинхронным, я думаю, что нет необходимости в timeout в HTTP-запросах.

По умолчанию requests.Session кэширует TCP-соединения (pool_connections) для 10 хостов и ограничивает 10 одновременных HTTP-запросов на кэшированные TCP-соединения (pool_maxsize). Конфигурация по умолчанию должна быть изменена в соответствии с потребностями путем явного создания адаптера http.

session = requests.Session()
http_adapter = requests.adapters.HTTPAdapter(pool_connections=100, pool_maxsize=100)
session.mount('http://', http_adapter)

Разбейте задачи как производитель-потребитель. Загрузка изображения — это задача производителя, а обработка изображения — задача потребителя.

Если библиотека обработки изображений PIL не является асинхронной, она может блокировать сопрограммы производителя. Если это так, потребительский пул может быть gevent.threadpool.ThreadPool. например

from gevent.threadpool import ThreadPool
consumer = ThreadPool(POOL_SIZE)  

Это обзор того, как это можно сделать. Я не тестировал код.

from gevent import monkey; monkey.patch_all()
from time import time
import requests
from PIL import Image
from io import BytesIO
import os
from urlparse import urlparse
from gevent.pool import Pool

def download(url):
    try:
        response = session.get(url)
    except Exception as e:
        print(e)
    else:
        if response.status_code == requests.codes.ok:
            file_name = urlparse(url).path.rsplit('/',1)[-1]
            return (response.content,file_name)
        response.raise_for_status()

def process(img):
    if img is None:
        return None
    img, name = img
    img = Image.open(BytesIO(img))
    path = os.path.join(base_folder, name)
    try:
        img.save(path)
    except Exception as e:
        print(e)
    else:
        return True

def run(urls):        
    consumer.map(process, producer.imap_unordered(download, urls))

if __name__ == '__main__':
        POOL_SIZE = 300
        producer = Pool(POOL_SIZE)
        consumer = Pool(POOL_SIZE)

        session = requests.Session()
        http_adapter = requests.adapters.HTTPAdapter(pool_connections=100, pool_maxsize=100)
        session.mount('http://', http_adapter)

        test_urls = # list of 100 urls
        base_folder = 'download_temp'
        t1 = time()
        run(test_urls)
        print time() - t1  
person Nizam Mohamed    schedule 17.11.2015
comment
Спасибо за ваше предложение. Я попробовал ваш код на своих URL-адресах, но для 1k URL-адресов требуется 200 секунд. Одной из проблем может быть то, что большинство из них указывают на один домен, но многие из них также указывают на разные домены. - person Framester; 17.11.2015
comment
Как вы думаете, сколько времени это должно занять? размер файла, пропускная способность клиента и загрузка сервера — все это влияет на тайминги. - person Nizam Mohamed; 17.11.2015
comment
Я обновил свой ответ, чтобы предложить использовать ThreadPool для потребителей. Если обработка изображений связана с процессором, вы должны использовать multiprocessing.Pool. - person Nizam Mohamed; 17.11.2015
comment
вы можете попробовать установить timeout в запросе get, чтобы сократить время. Но некоторые файлы могут не загружаться. - person Nizam Mohamed; 17.11.2015
comment
Спасибо за все ваши предложения. Я просто синхронизировал простой синхронный вызов, и для 1k изображений потребовалось 350 секунд. Я попробую ваш код пула потоков. - person Framester; 17.11.2015
comment
больше чем скорость - это целостность скачиваемых файлов. код должен предвидеть такие состояния ошибок, как ошибки DNS, время ожидания соединения и ошибки HTTP, иначе загруженные файлы будут бесполезны. просто смотреть на тайминги без проверки целостности файлов — попытка вены. - person Nizam Mohamed; 17.11.2015
comment
Это хороший момент, но я использую request для обработки проблем с подключением и последующего открытия изображений в PIL, который выдает исключения, если не получает действительное изображение. - person Framester; 17.11.2015

Предложу обратить внимание на Grablib http://grablib.org/

Это асинхронный парсер, основанный на pycurl и multicurl. Также он пытается автоматически решить сетевую ошибку (например, повторить попытку, если тайм-аут и т. д.).

Я верю, что модуль Grab:Spider решит ваши проблемы на 99%. http://docs.grablib.org/en/latest/index.html#spider-toc

person Ashot Ogoltsov    schedule 11.11.2015
comment
Спасибо. Можете ли вы пояснить, в чем отличие от Grablib, или почему у вас есть идея, почему он будет работать лучше, чем мой подход? - person Framester; 12.11.2015
comment
Упс, у вас есть прямые ссылки на изображения? Если да, извините, вы все еще можете использовать Grab или что-то еще, что у вас есть. Grablib идеально подходит для сканирования и анализа. Однако вы также можете использовать его для загрузки изображений, Grablib (в частности, модуль Grab:Spider) повторяет задачи, в которых сетевая ошибка была >400 и !=404. Количество повторов можно установить вручную. Он имеет журналирование и мониторинг процессов. - person Ashot Ogoltsov; 13.11.2015