lundi 25 mai 2015

multiprocessing.Pool.imap_unordered with fixed queue size or buffer?

I am reading data from large CSV files, processing it, and loading it into a SQLite database. Profiling suggests 80% of my time is spent on I/O and 20% is processing input to prepare it for DB insertion. I sped up the processing step with multiprocessing.Pool so that the I/O code is never waiting for the next record. But, this caused serious memory problems because the I/O step could not keep up with the workers.

The following toy example illustrates my problem:

#!/usr/bin/env python  # 3.4.3
import time
from multiprocessing import Pool

def records(num=100):
    """Simulate generator getting data from large CSV files."""
    for i in range(num):
        print('Reading record {0}'.format(i))
        time.sleep(0.05)  # getting raw data is fast
        yield i

def process(rec):
    """Simulate processing of raw text into dicts."""
    print('Processing {0}'.format(rec))
    time.sleep(0.1)  # processing takes a little time
    return rec

def writer(records):
    """Simulate saving data to SQLite database."""
    for r in records:
        time.sleep(0.3)  # writing takes the longest
        print('Wrote {0}'.format(r))

if __name__ == "__main__":
    data = records(100)
    with Pool(2) as pool:
        writer(pool.imap_unordered(process, data, chunksize=5))

This code results in a backlog of records that eventually consumes all memory because I cannot persist the data to disk fast enough. Run the code and you'll notice that Pool.imap_unordered will consume all the data when writer is at the 15th record or so. Now imagine the processing step is producing dictionaries from hundreds of millions of rows and you can see why I run out of memory. Amdahl's Law in action perhaps.

What is the fix for this? I think I need some sort of buffer for Pool.imap_unordered that says "once there are x records that need insertion, stop and wait until there are less than x before making more." I should be able to get some speed improvement from preparing the next record while the last one is being saved.

I tried using NuMap from the papy module (which I modified to work with Python 3) to do exactly this, but it wasn't faster. In fact, it was worse than running the program sequentially; NuMap uses two threads plus multiple processes.

Bulk import features of SQLite are probably not suited to my task because the data need substantial processing and normalization.

I have about 85G of compressed text to process. I'm open to other database technologies, but picked SQLite for ease of use and because this is a write-once read-many job in which only 3 or 4 people will use the resulting database after everything is loaded.

Aucun commentaire:

Enregistrer un commentaire