## Priority queues in Python

In the previous article I noted that Python’s heapq module is the only part of standard Python I could think of which deals with sorting, but which doesn’t give you full control over the sort order.

That means you need to take care when using a heapq as a priority queue.

For example, the A* search
algorithm is a
**best first** path finder. It maintains a priority queue of possible
steps to take, ordered by an estimate of the total distance of the
path routed through these steps. At each stage it pops the next step
— the one with the shortest estimated total distance — from the queue,
then updates based on the new position.

import heapq def a_star(start, heuristic, moves): """A* graph search - start: initial state - heuristic(s): estimates path length from state s to finish. heuristic(s) == 0 => finished - moves(s): neighbours of s Returns the shortest path to the finish and its cost, on success. None otherwise. """ costs = {start: 0} prev = {start: None} frontier = [(heuristic(start), start)] heapq.heapify(frontier) def path(s): return [] if s is None else path(prev[s]) + [s] while frontier: priority, state = heapq.heappop(frontier) if heuristic(state) == 0: return costs[state], path(state) for n in moves(state): n_cost = costs[state] + 1 if n not in costs or n_cost < costs[n]: costs[n] = n_cost heapq.heappush(frontier, (n_cost + heuristic(n), n)) prev[n] = state

The code above is a Python A* implementation. For simplicity, we’ll assume the cost of each step is 1. It’s easy enough to adapt the function if this isn’t the case.

The priority queue here is named `frontier`

, the collection of states
we need to explore. The sequence `heapify`

, `heappop`

, `heappush`

maintains the priority ordering. (The call to `heapify`

isn’t even
needed since a list with one element is already a heap.) So, each
time we pop a state from the queue, we get the one with the lowest
estimated cost. Then, based on the moves we can make from this new
step, we update our internal records of costs and previous nodes.

Note that the items on the queue are `(cost, state)`

pairs. The costs
will be numbers, typically positive integers — the exact values depend
on the heuristic function.

Exactly what gets used as `state`

is up to the caller
which supplies `start`

, the initial state, and `moves`

, which steps
from a state to its neighbours.

However, if items on the queue are tied on `cost`

, the heapq may need to
compare `state`

values. If the states have no defined ordering
this results in a runtime error.

```
>>> class State: pass
...
>>> x1, x2 = State(), State()
>>> (1, x1) < (2, x2)
True
>>> (1, x1) < (1, x2)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
TypeError: '<' not supported between instances of 'State' and 'State'
```

We cannot supply a sort key to the `heapq`

functions. Does this mean
clients must ensure state objects — whatever they actually are — can be
compared? As the code stands, yes, but the module documentation has advice on handling this situatation:

[…] store entries as 3-element list including the priority, an entry count, and the task. The entry count serves as a tie-breaker so that two tasks with the same priority are returned in the order they were added. And since no two entry counts are the same, the tuple comparison will never attempt to directly compare two tasks.

This strategy gives a more usable `a_star`

.

import heapq import itertools def a_star(start, heuristic, moves): costs = {start: 0} prev = {start: None} seq = itertools.count().__next__ frontier = [(heuristic(start), seq(), start)] def path(s): return [] if s is None else path(prev[s]) + [s] while frontier: _, _, state = heapq.heappop(frontier) if heuristic(state) == 0: return costs[state], path(state) for n in moves(state): n_cost = costs[state] + 1 if n not in costs or n_cost < costs[n]: costs[n] = n_cost heapq.heappush(frontier, (n_cost + heuristic(n), seq(), n)) prev[n] = state