トップNソートの検討

ID: 104
creation date: 2010/10/30 00:27
modification date: 2010/10/30 00:27
owner: mikio

上位N件をソートした状態で取り出すという、いわゆる「トップNソート」の効率的な実装について検討してみた。

背景

データベースに対して、ある順序でソートした時の最初の何件かが欲しいというクエリを投げることはよくあるだろう。SNSで言えば、誰かのコンテンツの最新10件を表示するとかいう場合だ。SQLだと "ORDER BY timestamp DESC LIMIT 10" とかいう感じ。同じような操作は全文検索システムのスコアリングでも定番である。俺もよく自分で実装するわけだが、その度に適当な試行錯誤をして時間がもったいないので、今回は入念に調べて決定版を出そうじゃないか。

全体をソートして上位を取り出せば目的は満たせるのだが、それだと無駄な計算が多い。100万件の中から上位10件だけ欲しい場合に、残りの99万9990件まで律儀にソートする必要はない。ということで、上位N件をソートして取り出すという「トップNソート」という操作を行うための専用のインターフェイスを考えるわけだ。

template <class TYPE, class LESS>
inline void topsort(TYPE* ary, size_t num, size_t top, LESS less);
template <class TYPE>
inline void topsort(TYPE* ary, size_t num, size_t top);

Nはレコード数のことを示すのが一般的なので、説明の便宜上、「トップN」のNは以後Mと呼ぶことにし、「トップNソート」でなく「トップソート」と呼ぶことにする。

ヒープソート

トップソートを実装するにあたり、まずはヒープを使うのが思い浮かぶ。ヒープソートは、以下の手順で構成される。

  1. 全要素を先頭からスキャンしつつ、配列の先頭部分に大きい順のヒープを構築する。
  2. そうしてできたヒープの一番上(最大値)を末尾と交換して、末尾をヒープから切り離す。
  3. 交換でヒープ構造を回復させる
  4. 手順2と手順3の操作を、サイズが1になるまで繰り返す。

手順1では、配列の先頭要素が、現在読み込んだ要素の最大値を示す。構築中のヒープのサイズがトップMよりも大きくなった時点で、現状の最大値よりも値が大きい要素は、決してトップMには残らないということがわかる。よって、配列の先頭要素の値よりも小さいものだけを取り込んでヒープを構築していけば、ヒープのサイズが常にMに抑えられる。ヒープの再構成はMに対してlogMなので、ソート全体の計算量は O(N * logM) ということになる。Mが十分に小さい場合には、ソート全体の計算量は O(N) になるのだ。これはクイックソートの O(N logN) よりも高速だということになる。

template <class TYPE, class LESS>
inline void topsort_heap(TYPE* ary, size_t nmemb, size_t top, LESS less) {
  if (nmemb < 2 || top < 1) return;
  if (top > nmemb) top = nmemb;
  size_t cur = 1;
  while (cur < top) {
    size_t cidx = cur;
    while (cidx > 0) {
      size_t pidx = (cidx - 1) / 2;
      if (!less(ary[pidx], ary[cidx])) break;
      std::swap(ary[cidx], ary[pidx]);
      cidx = pidx;
    }
    cur++;
  }
  while (cur < nmemb) {
    if (less(ary[cur], *ary)) {
      std::swap(ary[0], ary[cur]);
      size_t pidx = 0;
      size_t bot = top / 2;
      while (pidx < bot) {
        size_t cidx = pidx * 2 + 1;
        if (cidx < top - 1 && less(ary[cidx], ary[cidx+1])) cidx++;
        if (less(ary[cidx], ary[pidx])) break;
        std::swap(ary[pidx], ary[cidx]);
        pidx = cidx;
      }
    }
    cur++;
  }
  cur = top - 1;
  while (cur > 0){
    std::swap(ary[0], ary[cur]);
    size_t pidx = 0;
    size_t bot = cur / 2;
    while (pidx < bot){
      size_t cidx = pidx * 2 + 1;
      if (cidx < cur - 1 && less(ary[cidx], ary[cidx+1])) cidx++;
      if (less(ary[cidx], ary[pidx])) break;
      std::swap(ary[pidx], ary[cidx]);
      pidx = cidx;
    }
    cur--;
  }
}

template <class TYPE>
inline void topsort_heap(TYPE* ary, size_t nmemb, size_t top) {
  topsort_heap(ary, nmemb, top, std::less<TYPE>());
}

ただし、Mが大きい場合には、単なるヒープソートと同じアルゴリズムになってしまう。トップの判定とか余計なことをしている分、普通のヒープソートよりむしろ遅い。そして、ヒープソートは係数が高い傾向にあるので、一般的に言ってクイックソートより遅い。コムソートやシェルソートにも負けることがある。追記:上記実装とほぼ同じ特性のものがstd::partial_sortという標準関数として存在する。

クイックソート

クイックソートでもトップソートを実現できそうだ。クイックソートは以下の手順で構成される。

  1. 配列のどれかの要素を末尾と入れ替え、その値を閾値として記憶する。
  2. 閾値より小さい要素群を先頭に、閾値より大きい要素群を末尾に集め、その中央に閾値を置く。
  3. 小さい要素群と大きい要素群のそれぞれに再帰的に手順1および手順2を適用する。

手順3において、大きい要素群の先頭がMよりも大きかった場合には、大きい要素群には再帰処理を適用しなくてもよいということに気づく。ということは、Mが十分に小さい場合には、ほとんどの場合に大きい要素群は無視されることになる。小さい要素群の計算量は相変わらず O(N logN) なのだが、これは半減し続ける数列で展開できる。すなわち、ソート全体の計算量は、N + N/2 + N/4 + N/8 + ... ということになり、それはNの2倍より小さいことが明らかなので、結局 O(N) とみなせる。当然、元来のクイックソートより早い。

template <class TYPE, class LESS>
inline void topsort_quick(TYPE* ary, size_t nmemb, size_t top, LESS less) {
  if (nmemb < 2 || top < 1) return;
  if (top > nmemb) top = nmemb;
  struct { size_t low; size_t high; } stack[48];
  stack[0].low = 0;
  stack[0].high = nmemb - 1;
  size_t sidx = 1;
  while (sidx > 0) {
    sidx--;
    size_t low = stack[sidx].low;
    size_t high = stack[sidx].high;
    if (low > top) continue;
    if (low + 10 > high) {
      while (++low <= high) {
        size_t lidx = low;
        while (lidx >= 1 && less(ary[lidx], ary[lidx-1])) {
          std::swap(ary[lidx], ary[lidx-1]);
          lidx--;
        }
      }
    } else {
      size_t lidx = low;
      size_t hidx = high;
      size_t midx = (high + low) / 2;
      if (less(ary[midx], ary[hidx])) {
        if (less(ary[hidx], ary[lidx])) {
          midx = hidx;
        } else  if (less(ary[midx], ary[lidx])) {
          midx = lidx;
        }
      } else {
        if (less(ary[lidx], ary[hidx])) {
          midx = hidx;
        } else if (less(ary[lidx], ary[midx])) {
          midx = lidx;
        }
      }
      if (midx != hidx) std::swap(ary[midx], ary[hidx]);
      TYPE pv = ary[hidx];
      while (true) {
        while (less(ary[lidx], pv)) {
          lidx++;
        }
        hidx--;
        while (lidx < hidx && less(pv, ary[hidx])) {
          hidx--;
        }
        if (lidx >= hidx) break;
        std::swap(ary[lidx], ary[hidx]);
      }
      std::swap(ary[lidx], ary[high]);
      if (lidx - low < high - lidx) {
        stack[sidx].low = lidx + 1;
        stack[sidx++].high = high;
        if (low < lidx) {
          stack[sidx].low = low;
          stack[sidx++].high = lidx - 1;
        }
      } else {
        stack[sidx].low = low;
        stack[sidx++].high = lidx - 1;
        if (lidx + 1 < high) {
          stack[sidx].low = lidx + 1;
          stack[sidx++].high = high;
        }
      }
    }
  }
}

template <class TYPE>
void topsort_quick(TYPE* ary, size_t nmemb, size_t top) {
  topsort_quick(ary, nmemb, top, std::less<TYPE>());
}

ただし、Mが大きい場合には、こちらも単なるクイックソートと同じになってしまう。判定部分が集約されるのでオーバーヘッドはそんなに大きくないが、普通のクイックソートよりもほんの少し遅いだろう。

どっちがいいのか

おそらく、Mがすっげー小さい場合には、ヒープソートの方が早い気がする。なぜなら、ヒープを構築していく際にほぼ全ての要素を無視することができるし、その判定はたった1回のif文でできるからだ。一方で、Mが1割とかを越えたなら、クイックソートの方が早いだろう。普通のソートとしての効率性は断然クイックソートに分があるからだ。

Mがある基準より小さい場合にはヒープソートを適用し、そうでない場合にはクイックソートを適用するってことにすれば、最強のトップソートが完成するはずだ。でも、その具体的な基準関数をどうやって決めればよいだろう。数学に強い人なら理論的に求められるのかもしれないけれど、俺の数学力は中学校で止まっているので、実験に逃げたいと思う。

ソート対象の配列の要素数Nを1,2,4,8,16,...と増やしていく実験を行う。その個々の試行の中では、Nまでの範囲で、抽出するトップ要素数Mを、1,2,4,8,16,...と増やしていく。ソート対象の配列の値はランダムな整数値とする。実際のプログラムをアップしておく。それらの試行の時間を図った。

結果としては、予想通り、Mが小さいとヒープソートが有利であることがわかった。結果の全データをアップしておく。要約すると以下のようになる。

要素数N クイックソートが勝つ最初のM そのMの比率
64 32 0.5
128 32 0.25
256 32 0.125
512 64 0.125
1024 64 0.0625
2048 256 0.125
4096 256 0.0625
8192 512 0.0625
16384 1024 0.0625
32768 1024 0.0312
65536 2048 0.0312
131072 4096 0.0312
262144 8192 0.0312
524288 16384 0.0312
1048576 16384 0.0156
2097152 65536 0.0312
4194304 65536 0.0156
8388608 131072 0.0156
16777216 262144 0.0156
33554432 524288 0.0156
67108864 524288 0.0078

閾値のM/Nの比率を見るとどんどん減っていくことに気づく。要素数が少ない時はヒープソートが有利なシーンが多いのだが、要素数が多くなってくるほどにクイックソートが有利になってくるということだ。Nの何パーセントとかいう単純な関数で閾値を決めることはできない。

で、数字をボーッと見てると、何となく平方根のカーブに近い気がしてきたので、それっぽく冪乗の関数を試してみたところ、どうも0.7乗くらいで近似できるようだ。

要素数N クイックソートが勝つ最初のM N**0.7
64 32 18
128 32 29
256 32 48
512 64 78
1024 64 127
2048 256 207
4096 256 337
8192 512 548
16384 1024 891
32768 1024 1448
65536 2048 2352
131072 4096 3821
262144 8192 6208
524288 16384 10085
1048576 16384 16383
2097152 65536 26615
4194304 65536 43237
8388608 131072 70239
16777216 262144 114104
33554432 524288 185363
67108864 524288 301124

うん。なんかいい感じ。たまたまこのテストでそれっぽい曲線を描いているだけなので、0.7乗ってのが妥当なのかどうかわからないけど、今のところこれが最善の解である。

まとめ

トップNソートには普通のソートアルゴリズムをそのまま使うのではなく、専用のハックを加えた方が効率がはるかによくなる。そして、具体的な実装としては、ヒープソートの改造版と、クイックソートの改造版を、閾値によって切り替えて使うのがよさそうだ。そして、その閾値は、レコード数の0.7乗で近似するとそれっぽいという実験を行った。

boostとかでそれっぽいライブラリがあるんじゃないかなーという気がしつつも、カッとなって自分で実装してしまった。そんでもって、ちゃんとした解析をする能力がないので、最適な実装が何なのかわからない。まあ、最適でなくても、いい感じに動けば別にいいんだけども。

7 comments
dankogai : Mが小さい場合、すでにソート済みのTop N 配列にInsertion Sortで要素を加える方が速い気がするのですが、検討されたのでしょうか? (2010/10/30 03:51)
hanabusa : トップNではなく中央値の選択ですがR.セジウィックのアルゴリズム本でもクイックソートの分割を応用した選択を解説があります (2010/10/30 04:39)
mikio : dankogai: なるほど。思いつきませんでした。たしかに、Mが10とかの場合(つまりinsersion sortがheap sortよりも早いサイズの場合)には全体としてもinsersion sortの方がよさそうですね。ちょっと試してみます。 (2010/10/30 11:13)
mikio : hanabusa: 選択アルゴリズムであれば、最悪計算量がO(N)なのがいくつかあるみたいですね。 (2010/10/30 11:14)
whym : boostではないですが、std::partial_sortも同じことを実現していますね。実装方法はヒープソートのようです http://www.sgi.com/tech/stl/partial_sort.html (2010/10/30 12:14)
mikio : whym: 基本的にはstd::partial_sortでよいと思うですが、Mが大きい場合の遅さをどうするかですね。 (2010/10/30 12:29)
mikio : ちなみにlibstdc++においてもstd::partial_sortはヒープソートらしく、性能は上述のtopsort_heapとほぼ同じでした。 (2010/10/30 12:45)
riddle for guest comment authorization:
Where is the capital city of Japan? ...