Merging sorted streams in Python

2008-10-01, Comments

Problem

Python Cookbook cover

In a post on his Code Monk blog David Jones offers up some Python tidbits he’s only recently discovered and invites readers to share similar discoveries. I’d like to respond here by talking about a recipe from the Python Cookbook.

Problem

You have several sorted sequences (iterables) and need to iterate on the overall sorted sequence that results from “merging” these sequences.

For example, we could merge multiples of 2, 3 and 5.

>>> from itertools import count, imap, islice
>>> m2, m3, m5 = [imap(n.__mul__, count(1)) for n in (2, 3, 5)]
>>> m235 = merge(m2, m3, m5)
>>> list(islice(m235, 10))
[2, 3, 4, 5, 6, 6, 8, 9, 10, 10]

Here, 6 appears twice in the merged stream since it’s a multiple of 2 and also of 3, and similarly 10 makes a double appearance.

This example merges three infinite streams. If we were sure all our inputs were finite, we might well simply chain them together and sort the whole thing.

>>> from itertools import chain
>>> def merge(*seqs):
>>>     return sorted(chain(*seqs))

An algorithm which deals with a (potential) mix of finite and infinite sequences is a little more interesting. We might consider an approach which repeatedly peeks at the head element of each sequence, finds the smallest of these, then pops and yields it. The recipe in the Cookbook improves on this idea.

Solution

The ingenious merge implementation shown below is credited to Sébastien Keim, Raymond Hettinger and Danny Yoo (the same Raymond Hettinger who appears on Code Monk to point out that zip can unzip).

The algorithm uses a priority queue as a staging area. This priority queue is initialised to hold a collection of pairs comprising the head value from each input stream and the tail of that stream. We then pop the first element from this queue, yielding its value, and queue the next (head, tail) pair from the stream it came from. At all times the priority queue contains a (head, tail) pair from each input stream.

This all works sweetly when the inputs are all infinite. The complication occurs when a finite sequence reaches its end.

import heapq

def merge(*subsequences):
    # prepare a priority queue whose items are pairs of the form
    # (current-value, iterator), one each per (non-empty) subsequence
    heap = [  ]
    for subseq in subsequences:
        iterator = iter(subseq)
        for current_value in iterator:
            # subseq is not empty, therefore add this subseq's pair
            # (current-value, iterator) to the list
            heap.append((current_value, iterator))
            break
    # make the priority queue into a heap
    heapq.heapify(heap)
    while heap:
        # get and yield lowest current value (and corresponding iterator)
        current_value, iterator = heap[0]
        yield current_value
        for current_value in iterator:
            # subseq is not finished, therefore add this subseq's pair
            # (current-value, iterator) back into the priority queue
            heapq.heapreplace(heap, (current_value, iterator))
            break
        else:
            # subseq has been exhausted, therefore remove it from the queue
            heapq.heappop(heap)

I had to look twice at this code despite the copious comments. There’s nothing unusual about the two outer loops, a for loop and a while loop. The first sets up the priority queue, visiting each input to do so; the second actually generates the output results while items remain in this queue.

It’s the inner for loops which are less standard: execution of the body of each will break as soon as a single item has been processed, except when the body is never executed because the iterable is empty, which, in the first case means that iterable needn’t be queued, and in the second case means the for’s else clause executes, dequeuing the iterable. These are for loops which actually do something at most once.

You did remember that for loops, in Python, have an else clause, right?

I confess that if I ever knew that, I’d certainly forgotten it! The else clause executes if the loop is not broken out of, whether or not the iterable is empty1. There’s nothing very else-y about it! I’d be interested to learn of any other C-family languages with a similar construct?

I’m a fan of the Python Cookbook because it teaches Python idioms by example. Here’s how it explains this one.

Note the idiom that we use to advance an iterator by one step, dealing with the possibility that the iterator is exhausted:

for current_value in iterator:
    # if we get here the iterator was not empty, current_value was
    # its first value, and the iterator has been advanced one step
    ...use pair (current_value, iterator)...
    # we break at once as we only wanted the first item of iterator
    break
else:
    # if we get here the break did not execute, so the iterator
    # was empty (exhausted)
    # deal with the case of iterator being exhausted...

I have to admit the code still looks odd to me but it’s just about the perfect construct for this particular use case, eliminating any mention of iterator.next() and StopIteration.

Batteries Included

Batteries included

Python 2.6 includes merge as standard in the heapq module. Here’s the implementation. You’ll notice it doesn’t use the for-break-else idiom, hence the explicit exception catching. It also packs triples rather than pairs into the staging queue, presumably to guarantee a stable merge of the inputs.

def merge(*iterables):
    '''Merge multiple sorted inputs into a single sorted output.

    Similar to sorted(itertools.chain(*iterables)) but returns a generator,
    does not pull the data into memory all at once, and assumes that each of
    the input streams is already sorted (smallest to largest).

    >>> list(merge([1,3,5,7], [0,2,4,8], [5,10,15,20], [], [25]))
    [0, 1, 2, 3, 4, 5, 5, 7, 8, 10, 15, 20, 25]

    '''
    _heappop, _heapreplace, _StopIteration = heappop, heapreplace, StopIteration

    h = []
    h_append = h.append
    for itnum, it in enumerate(map(iter, iterables)):
        try:
            next = it.next
            h_append([next(), itnum, next])
        except _StopIteration:
            pass
    heapify(h)

    while 1:
        try:
            while 1:
                v, itnum, next = s = h[0]   # raises IndexError when h is empty
                yield v
                s[0] = next()               # raises StopIteration when exhausted
                _heapreplace(h, s)          # restore heap condition
        except _StopIteration:
            _heappop(h)                     # remove empty iterator
        except IndexError:
            return

What else?

Personally I’ve only really used else alongside if2. As shown here, it also pairs up with for and while. It also also appears (optionally) towards the end of a try statement. From the documentation:

The try … except statement has an optional else clause, which, when present, must follow all except clauses. It is useful for code that must be executed if the try clause does not raise an exception. For example:

    for arg in sys.argv[1:]:
        try:
            f = open(arg, 'r')
        except IOError:
            print 'cannot open', arg
        else:
            print arg, 'has', len(f.readlines()), 'lines'
            f.close()

The use of the else clause is better than adding additional code to the try clause because it avoids accidentally catching an exception that wasn’t raised by the code being protected by the try … except statement.


1 Coincidentally, the for ... else construct gets a mention today in Christopher Leary’s VaporWarning blog.

The for-else statement looks a little strange when you first encounter it, but I’ve come to love it.

More breaking news … Python 2.6 officially released about 4 hours after I posted this article. I’ve tweaked a couple of links accordingly, and will tweak a couple more once the 2.6 documentation appears directly under http://docs.python.org.

2 If and else appear together in traditional if statements and the newer conditional expressions. The latter render one of the Cookbook recipes obsolete.