QueryParallelizer の利用

QueryParallelizer の利用

QueryParallelizer を使うには、次のような実装を行うことになる:
  1. QueryWork の実装
  2. QueryParallelizer の作成と起動
  3. QueryWork の登録
  4. 結果の受け取り

コードベースで順番に説明していく。

QueryWork の実装

ユーザーは、QueryWork クラスを継承したクラスを作り、そこに DB 処理を記述する。

例えば次のようになる:
  class MyWork: public QueryWork
  {
    int32_t result_value;

    virtual void work(Connection conn, void* pData) {
      Statement stmt = conn->createStatement();
      ResultSet rs   = stmt->executeQuery("select * from tbl");
      rs->next();
      rs->getInt32(1, &result_value);
    }
  }
select を行い、結果を result_value メンバ変数にセットしている。

この並列フレームワークでは、同一の Connection に対して同時に実行されることはない。 同期を気にせずに DBI の操作を記述することができる。

QueryParallelizer の起動

アプリケーションでは、QueryParallelizer オブジェクトを一つ用意し、 そこに Work オブジェクトをどんどん登録していく形になる。

最初に Connection を作り、スレッドを動かす指示をする。 これには start() 関数を使う:
      bool start(size_t n,
                 Driver d,
                 const char* host,
                 const char* port,
                 const char* db,
                 const char* user,
                 const char* pass);
size_t n
n は作る接続の数、および同時に実行されるワーカースレッド数である。
Parallelizer 自身が使うスレッドがあるので、実際には n+1 個のスレッドが稼働する。
Driver d
Driver インスタンスはアプリケーションで用意する。それをこの引数で与える。
host,port,db,user,pass
Driver の connect() と同じである。
start() 関数内でこれらのパラメータを用いて Driver#connect() を呼び出し、Connection を n 個生成する。
典型的には次のようになる:
  Driver d = GetPgsqlDriver();
  QueryParallelizer par = new QueryParallelizer();
  par.start(5, d, ...);

QueryParallelizer の終了

終了には、stop() と join() 関数を使う。

stop() は終了を指示する。即座に停止するわけではなく、
  • Work の新規登録を不許可。
  • 既に登録されている Work は通常通りに実行。
  • 全ての Work が完了したらスレッドを終了し、Connection も解放する。

join() は完了を待つ。 join() 前に stop() を呼んでおくこと。

Work の登録

Work の登録には QueryParallelizer の addWork() を使う。 いくつかの登録方法があり、それぞれ結果の受け取り方が異なる。

結果を受け取らない方式

一つは結果が不要な場合であり、関数シグネチャは:
     bool addWork(QueryWork* w, void* data, bool autodelete);
である。

data は Work の work() 関数で受け取るパラメータである。

autodelete は、このワークオブジェクトをフレームワークが delete するかどうかを指示する真偽値である。真ならば work 呼び出し後に delete される。

コールバック方式

一つはコールバック関数を登録する方法で、関数シグネチャは:
     bool addWork(QueryWork* w, void* data, t_callback cb, void* cbdata);
である。

data は Work の work() 関数で受け取るパラメータである。

cb は Work 完了時に呼ばれるコールバック関数であり、定義は:
     typedef void(*t_callback)(QueryWork* w, void* cbdata, bool* autodelete);
となっている。addWork の cbdata がそのままコールバック関数に渡される。

コールバック関数にはもう一つ、autodelete がある。 これは out 引数で、フレームワークが Work オブジェクトを delete して欲しい場合に真にセットする。

コールバック方式のワーク登録は次のようなコードになる:
   void MyCallback(QoeryWork* _w, void* cbdata) {
     MyWork* w = static_cast< MyWork* >(_w);
     printf("result: %d\n", w->result_value);
     delete w;
     *cbdata = false;
   }
   int main() {
     par.addWork(new MyWork(), NULL, &MyCallback, NULL);
   }

結果をポーリングで受け取る方式

もうひとつ、Future< ResourceWork* > を返すワーク登録関数を提供している:
     typedef nine::concurrent::Future&lt; ResourceWork* > FutureResourceWork;
     FutureResourceWork addWork(ResourceWork* w, void* data);

FutureResourceWork は、ResourceWork* を受け渡しする Future クラスである。使い方は Futureクラスの説明を参照のこと。

FutureResourceWork の場合のコードは次のようになるだろう:
   FutureResultWork f = par.addWork(new MyWork(), NULL);
   ResourceWork* _w = f->get();
   MyWork* w = static_cast< MyWork* >(_w);
   printf("result: %d\n", w->result_value);
   f->release();