Priority queues in Python

In the previous article I noted that Python’s heapq module is the only part of standard Python I could think of which deals with sorting, but which doesn’t give you full control over the sort order.

That means you need to take care when using a heapq as a priority queue.

For example, the A* search algorithm is a best first path finder. It maintains a priority queue of possible steps to take, ordered by an estimate of the total distance of the path routed through these steps. At each stage it pops the next step — the one with the shortest estimated total distance — from the queue, then updates based on the new position.

```import heapq

def a_star(start, heuristic, moves):
"""A* graph search

- start: initial state
- heuristic(s): estimates path length from state s to finish.
heuristic(s) == 0 => finished
- moves(s): neighbours of s

Returns the shortest path to the finish and its cost, on success.
None otherwise.
"""
costs = {start: 0}
prev = {start: None}
frontier = [(heuristic(start), start)]
heapq.heapify(frontier)

def path(s):
return [] if s is None else path(prev[s]) + [s]

while frontier:
priority, state = heapq.heappop(frontier)
if heuristic(state) == 0:
return costs[state], path(state)
for n in moves(state):
n_cost = costs[state] + 1
if n not in costs or n_cost < costs[n]:
costs[n] = n_cost
heapq.heappush(frontier, (n_cost + heuristic(n), n))
prev[n] = state

```

The code above is a Python A* implementation. For simplicity, we’ll assume the cost of each step is 1. It’s easy enough to adapt the function if this isn’t the case.

The priority queue here is named `frontier`, the collection of states we need to explore. The sequence `heapify`, `heappop`, `heappush` maintains the priority ordering. (The call to `heapify` isn’t even needed since a list with one element is already a heap.) So, each time we pop a state from the queue, we get the one with the lowest estimated cost. Then, based on the moves we can make from this new step, we update our internal records of costs and previous nodes.

Note that the items on the queue are `(cost, state)` pairs. The costs will be numbers, typically positive integers — the exact values depend on the heuristic function.

Exactly what gets used as `state` is up to the caller which supplies `start`, the initial state, and `moves`, which steps from a state to its neighbours.

However, if items on the queue are tied on `cost`, the heapq may need to compare `state` values. If the states have no defined ordering this results in a runtime error.

``````>>> class State: pass
...
>>> x1, x2 = State(), State()
>>> (1, x1) < (2, x2)
True
>>> (1, x1) < (1, x2)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
TypeError: '<' not supported between instances of 'State' and 'State'
``````

We cannot supply a sort key to the `heapq` functions. Does this mean clients must ensure state objects — whatever they actually are — can be compared? As the code stands, yes, but the module documentation has advice on handling this situatation:

[…] store entries as 3-element list including the priority, an entry count, and the task. The entry count serves as a tie-breaker so that two tasks with the same priority are returned in the order they were added. And since no two entry counts are the same, the tuple comparison will never attempt to directly compare two tasks.

This strategy gives a more usable `a_star`.

```import heapq
import itertools

def a_star(start, heuristic, moves):
costs = {start: 0}
prev = {start: None}
seq = itertools.count().__next__
frontier = [(heuristic(start), seq(), start)]

def path(s):
return [] if s is None else path(prev[s]) + [s]

while frontier:
_, _, state = heapq.heappop(frontier)
if heuristic(state) == 0:
return costs[state], path(state)
for n in moves(state):
n_cost = costs[state] + 1
if n not in costs or n_cost < costs[n]:
costs[n] = n_cost
heapq.heappush(frontier, (n_cost + heuristic(n), seq(), n))
prev[n] = state

```