Merging sorted streams in Python

Problem

In a post on his Code Monk blog David Jones offers up some Python tidbits he’s only recently discovered and invites readers to share similar discoveries. I’d like to respond here by talking about a recipe from the Python Cookbook.

Problem

You have several sorted sequences (iterables) and need to iterate on the overall sorted sequence that results from “merging” these sequences.

For example, we could merge multiples of 2, 3 and 5.

```>>> from itertools import count, imap, islice
>>> m2, m3, m5 = [imap(n.__mul__, count(1)) for n in (2, 3, 5)]
>>> m235 = merge(m2, m3, m5)
>>> list(islice(m235, 10))
[2, 3, 4, 5, 6, 6, 8, 9, 10, 10]

```

Here, 6 appears twice in the merged stream since it’s a multiple of 2 and also of 3, and similarly 10 makes a double appearance.

This example merges three infinite streams. If we were sure all our inputs were finite, we might well simply chain them together and sort the whole thing.

```>>> from itertools import chain
>>> def merge(*seqs):
>>>     return sorted(chain(*seqs))

```

An algorithm which deals with a (potential) mix of finite and infinite sequences is a little more interesting. We might consider an approach which repeatedly peeks at the head element of each sequence, finds the smallest of these, then pops and yields it. The recipe in the Cookbook improves on this idea.

Solution

The ingenious merge implementation shown below is credited to Sébastien Keim, Raymond Hettinger and Danny Yoo (the same Raymond Hettinger who appears on Code Monk to point out that zip can unzip).

The algorithm uses a priority queue as a staging area. This priority queue is initialised to hold a collection of pairs comprising the head value from each input stream and the tail of that stream. We then pop the first element from this queue, yielding its value, and queue the next (head, tail) pair from the stream it came from. At all times the priority queue contains a (head, tail) pair from each input stream.

This all works sweetly when the inputs are all infinite. The complication occurs when a finite sequence reaches its end.

```import heapq

def merge(*subsequences):
# prepare a priority queue whose items are pairs of the form
# (current-value, iterator), one each per (non-empty) subsequence
heap = [  ]
for subseq in subsequences:
iterator = iter(subseq)
for current_value in iterator:
# subseq is not empty, therefore add this subseq's pair
# (current-value, iterator) to the list
heap.append((current_value, iterator))
break
# make the priority queue into a heap
heapq.heapify(heap)
while heap:
# get and yield lowest current value (and corresponding iterator)
current_value, iterator = heap[0]
yield current_value
for current_value in iterator:
# subseq is not finished, therefore add this subseq's pair
# (current-value, iterator) back into the priority queue
heapq.heapreplace(heap, (current_value, iterator))
break
else:
# subseq has been exhausted, therefore remove it from the queue
heapq.heappop(heap)

```

I had to look twice at this code despite the copious comments. There’s nothing unusual about the two outer loops, a `for` loop and a `while` loop. The first sets up the priority queue, visiting each input to do so; the second actually generates the output results while items remain in this queue.

It’s the inner `for` loops which are less standard: execution of the body of each will `break` as soon as a single item has been processed, except when the body is never executed because the iterable is empty, which, in the first case means that iterable needn’t be queued, and in the second case means the `for`’s `else` clause executes, dequeuing the iterable. These are for loops which actually do something at most once.

You did remember that for loops, in Python, have an else clause, right?

I confess that if I ever knew that, I’d certainly forgotten it! The else clause executes if the loop is not broken out of, whether or not the iterable is empty1. There’s nothing very else-y about it! I’d be interested to learn of any other C-family languages with a similar construct?

I’m a fan of the Python Cookbook because it teaches Python idioms by example. Here’s how it explains this one.

Note the idiom that we use to advance an iterator by one step, dealing with the possibility that the iterator is exhausted:

```for current_value in iterator:
# if we get here the iterator was not empty, current_value was
# its first value, and the iterator has been advanced one step
...use pair (current_value, iterator)...
# we break at once as we only wanted the first item of iterator
break
else:
# if we get here the break did not execute, so the iterator
# was empty (exhausted)
# deal with the case of iterator being exhausted...
```

I have to admit the code still looks odd to me but it’s just about the perfect construct for this particular use case, eliminating any mention of `iterator.next()` and `StopIteration`.

Batteries Included

Python 2.6 includes merge as standard in the heapq module. Here’s the implementation. You’ll notice it doesn’t use the for-break-else idiom, hence the explicit exception catching. It also packs triples rather than pairs into the staging queue, presumably to guarantee a stable merge of the inputs.

```def merge(*iterables):
'''Merge multiple sorted inputs into a single sorted output.

Similar to sorted(itertools.chain(*iterables)) but returns a generator,
does not pull the data into memory all at once, and assumes that each of
the input streams is already sorted (smallest to largest).

>>> list(merge([1,3,5,7], [0,2,4,8], [5,10,15,20], [], [25]))
[0, 1, 2, 3, 4, 5, 5, 7, 8, 10, 15, 20, 25]

'''
_heappop, _heapreplace, _StopIteration = heappop, heapreplace, StopIteration

h = []
h_append = h.append
for itnum, it in enumerate(map(iter, iterables)):
try:
next = it.next
h_append([next(), itnum, next])
except _StopIteration:
pass
heapify(h)

while 1:
try:
while 1:
v, itnum, next = s = h[0]   # raises IndexError when h is empty
yield v
s[0] = next()               # raises StopIteration when exhausted
_heapreplace(h, s)          # restore heap condition
except _StopIteration:
_heappop(h)                     # remove empty iterator
except IndexError:
return

```

What else?

Personally I’ve only really used `else` alongside `if`2. As shown here, it also pairs up with `for` and `while`. It also also appears (optionally) towards the end of a try statement. From the documentation:

The try … except statement has an optional else clause, which, when present, must follow all except clauses. It is useful for code that must be executed if the try clause does not raise an exception. For example:

```    for arg in sys.argv[1:]:
try:
f = open(arg, 'r')
except IOError:
print 'cannot open', arg
else:
1 Coincidentally, the `for ... else` construct gets a mention today in Christopher Leary’s VaporWarning blog.