Top Ten Percent
Recently I discussed two different approaches to finding the first N items, in order, from a collection of size S:
- full sort then slice
- heap-based partial sort
These two operations are respectively of complexity:
- S·log(S)
- S·log(N)
If we fix N and increase S, partial sort gets the job done most efficiently, while clearly conveying the intent of the code.
What if we fix the ratio S/N? As an example, fixing S/N at 10 would correspond to finding the top 10% of the collection. In this case, a third approach will beat both full and partial sort as S increases.
Partial Sort for fixed S/N
First, let’s see what happens to the heap based partial sort when we fix S/N at 10. The complexity, O(S·logN), becomes O(S·log(S/10)) which is equivalent to O(S·(logS-log10)), or simply O(S·logS).
Thus partial sort has the same complexity as full sort; but the lower constant multipliers in the full sort mean that eventually it outperforms partial sort.
How then can we reliably beat full sort?
Partition then Sort
The idea is simple: partition the collection to place the first N elements at the front, then sort these elements in place. The partitioning algorithm is one of a family generally known as selection algorithms, implemented in C++ as std::nth_element. Std::sort handles the subsequent sort on the reduced range, as usual.
In code:
template <typename Iter>
void
nth_element_then_sort(Iter first, Iter nth, Iter last)
{
nth_element(first, nth, last);
sort(first, nth);
}
Here, Iter
must be a random access iterator, and the caller must ensure the distance between nth
and first
is N. Note that this algorithm arranges the smallest N elements in order at the front of the collection. If we want the largest, or indeed any other ordering, we’ll need to supply an ordering predicate to nth_element
and sort
.
The standard nth_element
algorithm uses a repeated partitioning scheme similar to the divide-and-conquer approach at the heart of quicksort. This time, though, after each division, the algorithm concentrates on just one side of what’s left. Hence we have iteration on a rapidly diminishing range, not recursion, and a linear operation, O(S). The subsequent sort
is O(N·logN).
Thus the overall complexity is O(S) + O(N·logN). If we fix the ratio S/N, the O(N·logN) term eventually dominates; that is, the algorithm is dominated by the time taken by the final sort of the first N items.
Using the example ratio of S/N = 10, O(N·logN) is O((S·logS)/10), which predicts this partition-then-sort combination to run 10 times more quickly than a full sort.
Appended to this article is a short C++ program which calibrates our three variant partial sorting algorithms on a collection of random numbers. It requires two numbers, S and N, as command line input, and generates clock times for full, partial and partition sorts. It cross-checks the results for consistency, raising an error if a problem is detected.
Performance
I compiled the program and ran it with S set to 0, 2 million, 4 million, 6 million …, and the ratio S/N fixed at 10.
#! /bin/bash
g++ -Wall -O3 partial_sort.cpp -o partial_sort
S=0
S_STEP=2000000
while true
do
let "N = S / 10"
./partial_sort $S $N
let "S += S_STEP"
done
The loop in this script never terminates. Just kill the process once you’ve seen enough.
The results confirm this Nth-element-then-sort algorithm performs better than both full and partial sort at generating the first 10% of a collection. Full sort pulls ahead of partial sort before we get to 20 million.
Test Platform
I ran the tests on a laptop using code compiled and optimised by GCC version 4.0.1.
Hardware Overview: Model Name: MacBook Model Identifier: MacBook1,1 Processor Name: Intel Core Duo Processor Speed: 2 GHz Number Of Processors: 1 Total Number Of Cores: 2 L2 Cache (per processor): 2 MB Memory: 2 GB Bus Speed: 667 MHz
Here’s a table showing the actual times, in milliseconds. Note that on this platform the clock()
resolution is 10 milliseconds, so I’ve multiplied the actual program output by 10.
S/million | Full Sort/ms | Partial/ms | Nth elem+sort/ms |
0 | 0 | 0 | 0 |
2 | 110 | 70 | 10 |
4 | 240 | 140 | 40 |
6 | 380 | 250 | 70 |
8 | 500 | 390 | 100 |
10 | 640 | 540 | 110 |
12 | 790 | 700 | 110 |
14 | 910 | 860 | 160 |
16 | 1050 | 1040 | 190 |
18 | 1190 | 1200 | 220 |
20 | 1340 | 1390 | 190 |
22 | 1470 | 1580 | 260 |
24 | 1610 | 1760 | 230 |
26 | 1760 | 1950 | 310 |
28 | 1890 | 2140 | 280 |
30 | 2040 | 2360 | 310 |
32 | 2190 | 2560 | 310 |
34 | 2340 | 2750 | 330 |
36 | 2480 | 2980 | 450 |
38 | 2630 | 3150 | 430 |
40 | 2780 | 3390 | 490 |
42 | 2910 | 3620 | 470 |
44 | 3080 | 3850 | 460 |
46 | 3210 | 4080 | 570 |
48 | 3360 | 4290 | 580 |
50 | 3500 | 4470 | 590 |
52 | 3660 | 4650 | 550 |
54 | 3800 | 4890 | 620 |
56 | 3960 | 5100 | 670 |
58 | 4110 | 5340 | 760 |
60 | 4260 | 5540 | 640 |
Test Code
/*
This program runs three different partial sort algorithms head to head,
writing timing information to standard out.
Usage: partial_sort S N
Here, S is the size of the collection to partially sort and N is
the number of elements from this collection to sort. S and N must
be positive with S >= N.
The program outputs three figures, the clock times taken by
full_sort, partial_sort, nth_element_then_sort respectively.
Example:
$ partial_sort 1000000 500000
11 17 6
*/
#include <algorithm>
#include <functional>
#include <iostream>
#include <iterator>
#include <stdexcept>
#include <sstream>
#include <vector>
#include <time.h>
#include <stdlib.h>
namespace {
typedef std::vector<long> numbers;
typedef numbers::size_type size_type;
typedef numbers::iterator iter;
typedef std::pair<size_type, size_type> number_pair;
// Generate a 31-bit random value.
long random_value(numbers::value_type ignored)
{
return random();
}
// Fill the input numbers with random values.
void random_fill(numbers & ns)
{
srandomdev();
transform(ns.begin(), ns.end(), ns.begin(), random_value);
}
// full_sort(), partial_sort() and nth_element_then_sort()
// implement three variant algorithms for solving the same problem:
// of sorting the input numbers vector in place so that the N
// smallest values are in order and at the front of the vector.
typedef void (* sort_fn)(iter first, iter nth, iter last);
void full_sort(iter first, iter nth, iter last)
{
// Ignore Nth, just sort the whole vector
sort(first, last);
}
void part_sort(iter first, iter nth, iter last)
{
partial_sort(first, nth, last);
}
void nth_element_then_sort(iter first, iter nth, iter last)
{
nth_element(first, nth, last);
sort(first, nth);
}
// Return the clock time taken by a sort function call.
clock_t clock_it(sort_fn fn, iter first, iter nth, iter last)
{
clock_t const t_start = clock();
fn(first, nth, last);
return clock() - t_start;
}
// Confirms the numbers vectors share the same leading N elements.
// Throws an error if not.
void
check_equal_n(std::vector<numbers> const & nn, size_type N)
{
typedef numbers::const_iterator const_iter;
if (!nn.empty())
{
const_iter const first = nn[0].begin();
const_iter const nth = first + N;
std::vector<numbers>::const_iterator ni = nn.begin() + 1;
while (ni != nn.end() && equal(first, nth, ni->begin()))
{
++ni;
}
if (ni != nn.end())
{
throw std::runtime_error("check_equal_n fails");
}
}
}
// "Clocks" time taken by the supplied sort functions,
// publishing results to the output stream.
void
clock_sorters(std::ostream & out, number_pair SN)
{
typedef std::vector<numbers> numbers_vec;
// Unpack S and N
size_type const S = SN.first;
size_type const N = SN.second;
sort_fn sorters[] =
{
full_sort,
part_sort,
nth_element_then_sort
};
unsigned const n_sorters = sizeof(sorters)/sizeof(*sorters);
std::ostream_iterator<clock_t> put_times(out, " ");
numbers ns(S);
random_fill(ns);
numbers_vec nn(n_sorters, ns);
sort_fn * sorter = sorters;
for (numbers_vec::iterator ni = nn.begin(); ni != nn.end(); ++ni)
{
*put_times++ =
clock_it(*sorter++, ni->begin(), ni->begin() + N, ni->end());
}
check_equal_n(nn, N);
out << '\n';
}
// Processes the command line reading S, the size of the
// numbers array and N, the number of elements to sort.
// Throws a runtime_error on failure.
number_pair
read_cli(int argc, char * argv[])
{
number_pair SN;
std::stringstream buf;
if (!(argc == 3 &&
buf << argv[1] << ' ' << argv[2] &&
buf >> SN.first >> SN.second &&
SN.first >= SN.second))
{
throw std::runtime_error("Usage: please supply S and N, "
"where S and N are postive, and S >= N") ;
}
return SN;
}
} // anonymous namespace
int main(int argc, char * argv[])
{
int error = 0;
try
{
clock_sorters(std::cout, read_cli(argc, argv));
}
catch (std::exception const & exc)
{
std::cout << "An error occurred: " << exc.what() << '\n';
error = 1;
}
catch (...)
{
std::cout << "An error occurred.\n";
error = 1;
}
return error;
}