Priority queues in Python
In the previous article I noted that Python’s heapq module is the only part of standard Python I could think of which deals with sorting, but which doesn’t give you full control over the sort order.
That means you need to take care when using a heapq as a priority queue.
For example, the A* search algorithm is a best first path finder. It maintains a priority queue of possible steps to take, ordered by an estimate of the total distance of the path routed through these steps. At each stage it pops the next step — the one with the shortest estimated total distance — from the queue, then updates based on the new position.
import heapq def a_star(start, heuristic, moves): """A* graph search - start: initial state - heuristic(s): estimates path length from state s to finish. heuristic(s) == 0 => finished - moves(s): neighbours of s Returns the shortest path to the finish and its cost, on success. None otherwise. """ costs = {start: 0} prev = {start: None} frontier = [(heuristic(start), start)] heapq.heapify(frontier) def path(s): return [] if s is None else path(prev[s]) + [s] while frontier: priority, state = heapq.heappop(frontier) if heuristic(state) == 0: return costs[state], path(state) for n in moves(state): n_cost = costs[state] + 1 if n not in costs or n_cost < costs[n]: costs[n] = n_cost heapq.heappush(frontier, (n_cost + heuristic(n), n)) prev[n] = state
The code above is a Python A* implementation. For simplicity, we’ll assume the cost of each step is 1. It’s easy enough to adapt the function if this isn’t the case.
The priority queue here is named frontier
, the collection of states
we need to explore. The sequence heapify
, heappop
, heappush
maintains the priority ordering. (The call to heapify
isn’t even
needed since a list with one element is already a heap.) So, each
time we pop a state from the queue, we get the one with the lowest
estimated cost. Then, based on the moves we can make from this new
step, we update our internal records of costs and previous nodes.
Note that the items on the queue are (cost, state)
pairs. The costs
will be numbers, typically positive integers — the exact values depend
on the heuristic function.
Exactly what gets used as state
is up to the caller
which supplies start
, the initial state, and moves
, which steps
from a state to its neighbours.
However, if items on the queue are tied on cost
, the heapq may need to
compare state
values. If the states have no defined ordering
this results in a runtime error.
>>> class State: pass
...
>>> x1, x2 = State(), State()
>>> (1, x1) < (2, x2)
True
>>> (1, x1) < (1, x2)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
TypeError: '<' not supported between instances of 'State' and 'State'
We cannot supply a sort key to the heapq
functions. Does this mean
clients must ensure state objects — whatever they actually are — can be
compared? As the code stands, yes, but the module documentation has advice on handling this situatation:
[…] store entries as 3-element list including the priority, an entry count, and the task. The entry count serves as a tie-breaker so that two tasks with the same priority are returned in the order they were added. And since no two entry counts are the same, the tuple comparison will never attempt to directly compare two tasks.
This strategy gives a more usable a_star
.
import heapq import itertools def a_star(start, heuristic, moves): costs = {start: 0} prev = {start: None} seq = itertools.count().__next__ frontier = [(heuristic(start), seq(), start)] def path(s): return [] if s is None else path(prev[s]) + [s] while frontier: _, _, state = heapq.heappop(frontier) if heuristic(state) == 0: return costs[state], path(state) for n in moves(state): n_cost = costs[state] + 1 if n not in costs or n_cost < costs[n]: costs[n] = n_cost heapq.heappush(frontier, (n_cost + heuristic(n), seq(), n)) prev[n] = state