9

Feb

Filed in Code, Django |

One of the repetitive tasks that I always seem to have, is handling large amounts of data in chunks. That is, typically I loop through every row in the database, but the result set is too large, or too slow to take into memory all at once. After reading my code a few too many times, I realized how lazy I had been on the first iteration, and decided to dry it up.

So, the result, is a couple of useful classes for managing large QuerySet’s, and cursors in Django.

First let’s handle iterating over a large QuerySet. Let’s just say we have a million rows in our database, and need to write each one to the file. Now in some situations, this could be a locking query, which isn’t good. In others, it’s simply just too slow and network heavy to send all million rows over the network. What we do instead, which is a little bit slower (overall), is query for a chunk of data at a time, and simply yield the current results.

class IterableQuerySet(object):
    """Allows iteration over a QuerySet breaking it off into smaller chunks."""
    def __init__(self, queryset, batch=10000):
        self.batch = batch
        self.queryset = queryset
 
    def __iter__(self):
        at = 0
 
        results = self.queryset[at:at+self.batch]
        while results:
            for result in results:
                yield result
            at += self.batch
            results = self.queryset[at:at+self.batch]

Now one of the other common tasks we had with similar applications, was doing something similar but with cursors. So instead of writing the same class, but for a cursor, we instead made a new class which would allow a cursor to easily act like a QuerySet. This one’s a bit more complex, but also allows you to use a Paginator class on it.

import re
 
CURSOR_UNION_REGEX = re.compile(r'^SELECT .*? FROM (.*)$', re.I)
 
class QuerySetCursor(object):
    def __init__(self, connection, sql, params=[], model_class=None):
        self._result_cache = None
        self._offset = 0
        self._limit = None
        self._params = params
        self._connection = connection
        self._sql = sql
        self._model_class = model_class
 
    def __getitem__(self, k):
        if not isinstance(k, (slice, int, long)):
            raise TypeError
        assert (not isinstance(k, slice) and (k >= 0)) \
            or (isinstance(k, slice) and (k.start is None or k.start >= 0) and (k.stop is None or k.stop >= 0)), \
            "Negative indexing is not supported."
        if type(k) == slice:
            if self._offset < k.start or k.stop-k.start > self._limit:
                self._result_cache = None
        else:
            if k not in range(self._offset, self._limit+self._offset):
                self._result_cache = None
        if self._result_cache is None:
            if type(k) == slice:
                self._offset = k.start
                self._limit = k.stop-k.start
                return self._get_results()
            else:
                self._offset = k
                self._limit = 1
                return self._get_results()[0]
        else:
            return self._result_cache[k]
 
    def __len__(self):
        return len(self._get_data())
 
    def __iter__(self):
        return iter(self._get_data())
 
    def _get_data(self):
        if self._result_cache is None:
            self._result_cache = list(self._get_results())
        return self._result_cache
 
    def _get_results(self):
        if self._limit:
            query = self._sql + ' LIMIT %s, %s'
            params = self._params + [self._offset, self._offset+self._limit]
        else:
            params = self._params
            query = self._sql
 
        cursor = self._connection.cursor()
        try:
            cursor.execute(query, params)
            results = cursor.fetchall()
            if self._model_class:
                results = [self._model_class(*r) for r in results]
        finally:
            cursor.close()
        return results
 
    def count(self):
        statements = self._sql.split(' UNION ')
        prepared = []
        for statement in statements:
            end_of_stmt = CURSOR_UNION_REGEX.match(statement)
            if not end_of_stmt:
                raise Exception("Error getting SQL")
            prepared.append("SELECT COUNT(1) FROM %s" % (end_of_stmt.group(1),))
        query = 'SELECT (%s)' % (') + ('.join(prepared),)
        cursor = self._connection.cursor()
        try:
            cursor.execute(query, self._params)
            results = cursor.fetchone()[0]
        finally:
            cursor.close()
        return results

Want to combine both results?

for r in IterableQuerySet(QuerySetCursor(connection, sql, params, MyModelClass)):
    print r

I believe there’s also some optimization that can be doing with cursor.fetchmany() to also chunk the SQL in memory, but I’ve yet to research how the cursor classes work internally.

5 Responses to "Large SQL Result Sets in Django"

Subscribe to this topic with RSS or get the Trackback URL
Dilandau (Feb 9th):

Very usefull, Thanks

JasonC (Feb 9th):

Why not use the queryset’s iterator method, e.g. model.objects.all().iterator()? It works fine for me with a table containing 1.2 million rows of which I write ~600 thousand to a csv file.

pterk (Feb 10th):

Little typo in IterableQuerySet:

self.batch = batch

nitin (Feb 17th):

Hi,
I am facing similar problem for which i had blogged as well.
I think that you code may help me, but I am unable to use it.
It wud be helpful if you could give some sample usage also.

David (Feb 17th):

It’s nothing complicated, just a normal iterable wrapper.

for result in IterableQuerySet(qs): print result

Leave A Reply

 Username (*required)

 Email Address (*private)

 Website (*optional)

Note: Comments moderation may be active so there is no need to resubmit your comment.