## Top Ten Percent

Recently I discussed two different approaches to finding the first N items, in order, from a collection of size S:

- full sort then slice
- heap-based partial sort

These two operations are respectively of complexity:

- S·log(S)
- S·log(N)

If we fix N and increase S, partial sort gets the job done most efficiently, while clearly conveying the intent of the code.

What if we fix the ratio S/N? As an example, fixing S/N at 10 would correspond to finding the top 10% of the collection. In this case, a third approach will beat both full and partial sort as S increases.

### Partial Sort for fixed S/N

First, let’s see what happens to the heap based partial sort when we fix S/N at 10. The complexity, O(S·logN), becomes O(S·log(S/10)) which is equivalent to O(S·(logS-log10)), or simply O(S·logS).

Thus partial sort has the same complexity as full sort; but the lower constant multipliers in the full sort mean that eventually it outperforms partial sort.

How then can we reliably beat full sort?

### Partition then Sort

The idea is simple: partition the collection to place the first N elements at the front, then sort these elements in place. The partitioning algorithm is one of a family generally known as selection algorithms, implemented in C++ as std::nth_element. Std::sort handles the subsequent sort on the reduced range, as usual.

In code:

template <typename Iter> void nth_element_then_sort(Iter first, Iter nth, Iter last) { nth_element(first, nth, last); sort(first, nth); }

Here, `Iter`

must be a random access iterator, and the caller must ensure the distance between `nth`

and `first`

is N. Note that this algorithm arranges the **smallest** N elements in order at the front of the collection. If we want the largest, or indeed any other ordering, we’ll need to supply an ordering predicate to `nth_element`

and `sort`

.

The standard `nth_element`

algorithm uses a repeated partitioning scheme similar to the divide-and-conquer approach at the heart of quicksort. This time, though, after each division, the algorithm concentrates on just one side of what’s left. Hence we have iteration on a rapidly diminishing range, not recursion, and a linear operation, O(S). The subsequent `sort`

is O(N·logN).

Thus the overall complexity is O(S) + O(N·logN). If we fix the ratio S/N, the O(N·logN) term eventually dominates; that is, the algorithm is dominated by the time taken by the final sort of the first N items.

Using the example ratio of S/N = 10, O(N·logN) is O((S·logS)/10), which predicts this partition-then-sort combination to run 10 times more quickly than a full sort.

Appended to this article is a short C++ program which calibrates our three variant partial sorting algorithms on a collection of random numbers. It requires two numbers, S and N, as command line input, and generates clock times for full, partial and partition sorts. It cross-checks the results for consistency, raising an error if a problem is detected.

### Performance

I compiled the program and ran it with S set to 0, 2 million, 4 million, 6 million …, and the ratio S/N fixed at 10.

#! /bin/bash g++ -Wall -O3 partial_sort.cpp -o partial_sort S=0 S_STEP=2000000 while true do let "N = S / 10" ./partial_sort $S $N let "S += S_STEP" done

The loop in this script never terminates. Just kill the process once you’ve seen enough.

The results confirm this Nth-element-then-sort algorithm performs better than both full and partial sort at generating the first 10% of a collection. Full sort pulls ahead of partial sort before we get to 20 million.

### Test Platform

I ran the tests on a laptop using code compiled and optimised by GCC version 4.0.1.

Hardware Overview: Model Name: MacBook Model Identifier: MacBook1,1 Processor Name: Intel Core Duo Processor Speed: 2 GHz Number Of Processors: 1 Total Number Of Cores: 2 L2 Cache (per processor): 2 MB Memory: 2 GB Bus Speed: 667 MHz

Here’s a table showing the actual times, in milliseconds. Note that on this platform the `clock()`

resolution is 10 milliseconds, so I’ve multiplied the actual program output by 10.

S/million | Full Sort/ms | Partial/ms | Nth elem+sort/ms |

0 | 0 | 0 | 0 |

2 | 110 | 70 | 10 |

4 | 240 | 140 | 40 |

6 | 380 | 250 | 70 |

8 | 500 | 390 | 100 |

10 | 640 | 540 | 110 |

12 | 790 | 700 | 110 |

14 | 910 | 860 | 160 |

16 | 1050 | 1040 | 190 |

18 | 1190 | 1200 | 220 |

20 | 1340 | 1390 | 190 |

22 | 1470 | 1580 | 260 |

24 | 1610 | 1760 | 230 |

26 | 1760 | 1950 | 310 |

28 | 1890 | 2140 | 280 |

30 | 2040 | 2360 | 310 |

32 | 2190 | 2560 | 310 |

34 | 2340 | 2750 | 330 |

36 | 2480 | 2980 | 450 |

38 | 2630 | 3150 | 430 |

40 | 2780 | 3390 | 490 |

42 | 2910 | 3620 | 470 |

44 | 3080 | 3850 | 460 |

46 | 3210 | 4080 | 570 |

48 | 3360 | 4290 | 580 |

50 | 3500 | 4470 | 590 |

52 | 3660 | 4650 | 550 |

54 | 3800 | 4890 | 620 |

56 | 3960 | 5100 | 670 |

58 | 4110 | 5340 | 760 |

60 | 4260 | 5540 | 640 |

### Test Code

/* This program runs three different partial sort algorithms head to head, writing timing information to standard out. Usage: partial_sort S N Here, S is the size of the collection to partially sort and N is the number of elements from this collection to sort. S and N must be positive with S >= N. The program outputs three figures, the clock times taken by full_sort, partial_sort, nth_element_then_sort respectively. Example: $ partial_sort 1000000 500000 11 17 6 */ #include <algorithm> #include <functional> #include <iostream> #include <iterator> #include <stdexcept> #include <sstream> #include <vector> #include <time.h> #include <stdlib.h> namespace { typedef std::vector<long> numbers; typedef numbers::size_type size_type; typedef numbers::iterator iter; typedef std::pair<size_type, size_type> number_pair; // Generate a 31-bit random value. long random_value(numbers::value_type ignored) { return random(); } // Fill the input numbers with random values. void random_fill(numbers & ns) { srandomdev(); transform(ns.begin(), ns.end(), ns.begin(), random_value); } // full_sort(), partial_sort() and nth_element_then_sort() // implement three variant algorithms for solving the same problem: // of sorting the input numbers vector in place so that the N // smallest values are in order and at the front of the vector. typedef void (* sort_fn)(iter first, iter nth, iter last); void full_sort(iter first, iter nth, iter last) { // Ignore Nth, just sort the whole vector sort(first, last); } void part_sort(iter first, iter nth, iter last) { partial_sort(first, nth, last); } void nth_element_then_sort(iter first, iter nth, iter last) { nth_element(first, nth, last); sort(first, nth); } // Return the clock time taken by a sort function call. clock_t clock_it(sort_fn fn, iter first, iter nth, iter last) { clock_t const t_start = clock(); fn(first, nth, last); return clock() - t_start; } // Confirms the numbers vectors share the same leading N elements. // Throws an error if not. void check_equal_n(std::vector<numbers> const & nn, size_type N) { typedef numbers::const_iterator const_iter; if (!nn.empty()) { const_iter const first = nn[0].begin(); const_iter const nth = first + N; std::vector<numbers>::const_iterator ni = nn.begin() + 1; while (ni != nn.end() && equal(first, nth, ni->begin())) { ++ni; } if (ni != nn.end()) { throw std::runtime_error("check_equal_n fails"); } } } // "Clocks" time taken by the supplied sort functions, // publishing results to the output stream. void clock_sorters(std::ostream & out, number_pair SN) { typedef std::vector<numbers> numbers_vec; // Unpack S and N size_type const S = SN.first; size_type const N = SN.second; sort_fn sorters[] = { full_sort, part_sort, nth_element_then_sort }; unsigned const n_sorters = sizeof(sorters)/sizeof(*sorters); std::ostream_iterator<clock_t> put_times(out, " "); numbers ns(S); random_fill(ns); numbers_vec nn(n_sorters, ns); sort_fn * sorter = sorters; for (numbers_vec::iterator ni = nn.begin(); ni != nn.end(); ++ni) { *put_times++ = clock_it(*sorter++, ni->begin(), ni->begin() + N, ni->end()); } check_equal_n(nn, N); out << '\n'; } // Processes the command line reading S, the size of the // numbers array and N, the number of elements to sort. // Throws a runtime_error on failure. number_pair read_cli(int argc, char * argv[]) { number_pair SN; std::stringstream buf; if (!(argc == 3 && buf << argv[1] << ' ' << argv[2] && buf >> SN.first >> SN.second && SN.first >= SN.second)) { throw std::runtime_error("Usage: please supply S and N, " "where S and N are postive, and S >= N") ; } return SN; } } // anonymous namespace int main(int argc, char * argv[]) { int error = 0; try { clock_sorters(std::cout, read_cli(argc, argv)); } catch (std::exception const & exc) { std::cout << "An error occurred: " << exc.what() << '\n'; error = 1; } catch (...) { std::cout << "An error occurred.\n"; error = 1; } return error; }