Функции массовой записи PyMongo с многопроцессорностью и генераторами

PyMongo поддерживает генераторы для пакетной обработки с помощью sDB.insert(iter_something(converted)). Функции массовой записи, которые выполняют операции записи в пакетах, чтобы уменьшить количество сетевых циклов и увеличить пропускную способность записи.

Следующий код, кажется, работает, но я не уверен, что PyMongo все еще может итерировать генератор вместе с многопроцессорной обработкой, пока он не даст 1000 документов или 16 МБ данных, а затем приостановит генератор, пока он вставляет пакет в MongoDB.

#!/usr/bin/env python
from __future__ import absolute_import, division, print_function
from itertools import groupby
from pymongo import MongoClient
from multiprocessing import Process, JoinableQueue
import csv

# > use test
# switched to db test
# > db.createCollection("abc")
# { "ok" : 1 }
# > db.abc.find()


parts = [["Test", "A", "B01", 828288,  1,    7, 'C', 5],
    ["Test", "A", "B01", 828288,  1,    7, 'T', 6],
    ["Test", "A", "B01", 171878,  3,    7, 'C', 5],
    ["Test", "A", "B01", 171878,  3,    7, 'T', 6],
    ["Test", "A", "B01", 871963,  3,    9, 'A', 5],
    ["Test", "A", "B01", 871963,  3,    9, 'G', 6],
    ["Test", "A", "B01", 1932523, 1,   10, 'T', 4],
    ["Test", "A", "B01", 1932523, 1,   10, 'A', 5],
    ["Test", "A", "B01", 1932523, 1,   10, 'X', 6],
    ["Test", "A", "B01", 667214,  1,   14, 'T', 4],
    ["Test", "A", "B01", 667214,  1,   14, 'G', 5],
    ["Test", "A", "B01", 667214,  1,   14, 'G', 6]]


def iter_something(rows):
    key_names = ['type', 'name', 'sub_name', 'pos', 's_type', 'x_type']
    chr_key_names = ['letter', 'no']
    for keys, group in groupby(rows, lambda row: row[:6]):
        result = dict(zip(key_names, keys))
        result['chr'] = [dict(zip(chr_key_names, row[6:])) for row in group]
        yield result

class Loading(Process):

    def __init__(self, task_queue):
        Process.__init__(self)
        self.task_queue = task_queue
        db = MongoClient().test
        self.sDB = db["abc"]

    def run(self):
        while True:
            doc = self.task_queue.get()
            if doc is None:  # None means shutdown
                self.task_queue.task_done()
                break
            else:
                self.sDB.insert(doc)

def main():
    num_cores = 2

    tasks = JoinableQueue()

    threads = [Loading(tasks) for i in range(num_cores)]

    for i, w in enumerate(threads):
        w.start()
        print('Thread ' + str(i+1) + ' has started!')

    converters = [str, str, str, int, int, int, str, int]
    with open("/home/mic/tmp/test.txt") as f:
        reader = csv.reader(f, skipinitialspace=True)
        converted = ([conv(col) for conv, col in zip(converters, row)] for row in reader)
        # sDB.insert(iter_something(converted))

        # Enqueue jobs
        for i in iter_something(converted):
            tasks.put(i)

    # Add None to kill each thread
    for i in range(num_cores):
        tasks.put(None)

    # Wait for all of the tasks to finish
    tasks.join()


if __name__ == '__main__':
    main()

person user977828    schedule 28.10.2014    source источник
comment
Будут ли db = MongoClient().test и self.sDB = db["abc"] в каждом потоке перезаписывать базу данных каждый раз?   -  person user977828    schedule 28.10.2014


Ответы (1)


В этом случае вы не используете преимущества пакетной вставки. Каждый вызов «self.sDB.insert(doc)» немедленно отправляет документ в MongoDB и ожидает ответа от сервера. Вы можете попробовать это:

def run(self):
    def gen():
        while True:
            doc = self.task_queue.get()
            if doc is None:  # None means shutdown
                self.task_queue.task_done()
                break

            else:
                yield doc

    try:
        self.sDB.insert(gen())
    except InvalidOperation as e:
        # Perhaps "Empty bulk write", this process received no documents.
        print(e)

Используйте mongosniff, чтобы убедиться, что вы отправляете на сервер большие пакеты, а не вставка одного документа за раз. В зависимости от количества документов и процессов некоторые процессы могут не получить документы. PyMongo выдает InvalidOperation, если вы пытаетесь вставить из пустого итератора, поэтому я «вставляю» с помощью «попробовать / кроме».

Кстати, вам не нужно вызывать createCollection с MongoDB: первая вставка в коллекцию создает ее автоматически. createCollection необходим только в том случае, если вам нужны специальные параметры, такие как ограниченная коллекция.

person A. Jesse Jiryu Davis    schedule 28.10.2014
comment
Спасибо, но теперь у меня есть InvalidOperation: cannot do an empty bulk write. Как-то поставить None в очередь задач не получается. - person user977828; 29.10.2014
comment
Я отредактирую свой ответ, чтобы справиться с исключением InvalidOperation. - person A. Jesse Jiryu Davis; 29.10.2014