+stream
Byte Stream 101
by
omo
2008-07-22 20:14
様々なバイトストリーム
森田です。こんにちは。いつの間にか投稿数で ikumi にぶっちぎられていることに気がつきました。 不覚です。あんにゃろう。
さて、弊社はネットワークのミドルウェアを販売しています。 ネットワークのミドルウェアの仕事というのは、 簡単にいうとソケットからデータを読んでアプリケーションに渡すことです。 あとはその逆、アプリケーションから渡されたデータをソケットに書き込むこと。 他にも色々やっていますが、ねっこはこれです。
ソケットで読み書きするデータはバイト列です。 このバイト列にはランダムアクセスができません。FIFO という制限があります。 FIFO でアクセスするバイト列のことを「バイトストリーム」と呼ぶことにしましょう。 バイトストリームを読み書きするのは一見とても簡単に見えます。 実際それほど難しくもないのですが、一方で実現方法にはいくつかのバリエーションがあり、 その選択が使いやすさや性能に影響を与えます。
というわけで今日は、バイトストリームの抽象を眺めてみることにしましょう。 話の都合で読み書きするソケットはノンブロッキングということにしておきます。 読み書き両方を扱うのは冗長なので、とりあえず読む方だけを考えていきます。書くのはその逆です。
read()
最初はいちばん素朴な抽象から。 read() と呼んでおきます。 read() は以下のような signature をもつ API です。
class socket_t {
...
error_t read(byte_t* b, size_t len, size_t* got);
...
};
ソケットにバッファされているバイト列を, 最大で len バイトだけ b にコピーします。 実際にコピーした長さを出力変数 got に返します。 通信にエラーがあったときは、戻り値のエラーで返します。お好みで例外を投げるのもよいでしょう。
この方法の良いところは、まず馴染み深い点です。UNIX から Windows まで、 ファイルなどのバイトストリームはおおよそこのスタイルの API でアクセスします。 もう一つの利点は、値をコピーすることです。 ソケット内部のバッファについてユーザはあれこれ考える必要がありません。
コピーがおこるぶん、効率は少し下がります。 またデータを読み込むバッファはユーザが用意する必要があります。
async_read()
次はもう少し効率の良い方法を考えてみます。
class socket_t {
...
typedef int (*functor_t)(size_t got, error_t err, void* user);
void async_read(byte_t* b, size_t len, size_t* got, functor_t callback, void* user);
...
};
関数にコールバックを渡すようになりました。read() では戻り値や出力引数だったものが、 コールバックの引数に変わっています。
この方法の利点は、ソケット内のバッファからユーザのバッファ b へのコピーを省略することができることです。 ソケットは内部のバッファを使わず、ネットワークからのデータを直接 b に書き込むことができます。 (ただし常にユーザからバッファを受け取っているとは限らないため、 ネットワークからデータが届くタイミングによっては内部のバッファを使います。) Boost ASIO がこの抽象を使っています。
多くの TCP/IP 実装は、この種の API を持っていません。 組込みのようなバッファの扱いに細かい環境では、類似の(バッファを渡す) API が利用できることがあります。 またファイル I/O の API には、こうした非同期アクセスを提供しているものもあります。 (例: posix aio) ただしコールバック関数のかわりに、次に示すような完了チェックの API を使うのが一般的です。
poll_read()
コールバック関数は便利ですが、コールバックされるタイミングがコードからわかりづらいなどの欠点もあります。 かわりに完了チェックの API を用意することができます。
class socket_t {
...
void async_read(byte_t* b, size_t len, size_t* got);
error_t poll_read();
...
};
ソケットからの読み込みが完了するまで、 poll_read() 関数は「未完了」をあらわすエラーを返します。 完了したあとは、async_read() でコールバックに渡ったのと同じエラー値が戻ります。
バリエーションとして、終了状態をチェックするオブジェクトをつくることもできます。
class socket_t {
...
completion_t* async_read(byte_t* b, size_t len, size_t* got);
...
};
...
class completion_t {
...
error_t poll_read();
...
};
また UNIX の select() 関数のように、複数のソケットの状態をまとめてチェックすることもあります。
class context_t {
...
void poll_read(socket_t* socks, size_t len, size_t* got); // 読み込みが完了したソケットの配列を返す
...
};
状態をまとめてチェックしたいのは、大量にある接続ソケットのうち、 ある瞬間にデータを受信するものがごく一部という場面を想定しているからです。
完了チェックの API は, async_read() 専用のものではありません。通常の read() を使う場合でも、 完了チェックがあると便利なことはあります。(実際に select() はそう使われています。)
read_buffer()
ここまではユーザがバッファを用意してきました。
ただ、ユーザがバッファを用意しなくても、 ネットワークから届いたデータは一旦どこかにバッファリングされます。 そのバッファに直接アクセスできても良いはずです。
class socket_t {
...
void read_buffer(byte_t** b, size_t* len);
void pop(size_t len);
...
};
read_buffer() 関数を呼ぶと、 b にはソケット内のバッファを差すポインタが、len にはバッファ内のデータ長が与えられます。 バッファの中身を解釈したら、先頭から読み終わったバイト数を pop() 関数で通知します。 (バイトストリームが FIFO なのを思いだしてください。)
OS の提供するソケットはカーネル空間にバッファがあるため、 システムコールがこのような形をとることはありません。 ただしユーザ空間で動くミドルウェアやライブラリなら選択肢の一つになります。 弊社のミドルウェア VCE2 もこれに似た API を持っています。
並列処理を前提としたシステムでは、バッファアクセスがロックを兼ねることもあります。
class socket_t {
...
void lock_buffer(byte_t** b, size_t* len);
void unlock_buffer();
...
};
read_buffer() 方式の利点は、ユーザがバッファ管理やデータの読み込みをしなくて良いことです。 煩雑な処理を下位のレイヤに任せることができます。(実際にはバッファサイズの調整など、いくつか仕事は残りますが...)
read_buffer() 方式の欠点はこの逆で、ユーザが凝ったバッファ管理を実現できないことです。 凝ったバッファ管理とはどんなものでしょうか? 次の例でそれを説明します。
read_chunk()
read_buffer() はひとつ大きな仮定をしていました。 「バッファが一つの連続したメモリ領域である」という仮定です。 こう仮定できるとアプリケーションのコードはとても簡単になります。 たとえばバッファの中身がテキストだとして、中身をパースするのに atoi() のような標準 C API が使えます。 なにしろ連続したメモリ領域は C/C++ 言語の一級市民なのです。
しかし一方で、この仮定を保つにはそれなりのコストがかかります。 たとえばいま、1024 バイトのバッファに先頭から 756 バイトのデータが書き込まれているとしましょう。
[oooooooooooo....] # "o" がデータ, "." が空き領域
ユーザが先頭の 256 バイトを解釈します。
[xxxxoooooooo....] # "x" が解釈されたデータ
そのあと 512 バイトのデータがネットワークから到着しました。 このときソケットはまず既存のデータを先頭まで移動し、 バッファ末尾に到着分のすきまを作らなければなりません。
[oooooooo........]
こうしてようやく、到着したデータをバッファに読み込めるわけです。
[ooooooooOOOOOOOO] # "O" が到着したデータ
この移動(コピー)が、仮定を保つために支払うコストです。 これを避けるには連続領域の仮定を捨て、何らかのデータ構造を持ち込む必要があります。 そうしたデータ構造の一つが「チャンク」です。
一般に、チャンクは断片的なメモリ領域のキューです。 大ざっぱにいうとこんなかんじ。
class chunk_t {
typedef std::vector<byte_t> block_t;
...
std::queue< block_t* > m_data;
};
class socket_t {
...
chunk_t* read_chunk();
...
};
チャンクの持つメモリ領域の断片を順に繋いだものが、バッファの保持するデータです。 先の例で考えてみましょう。
ユーザが読んだところまでは同じです。
[xxxxoooooooo....]
次にデータが到着すると、ソケットはデータを移動するかわりにキューに要素を追加します。
[xxxxooooooooOOOO] # 既存のバイト列の末尾までデータを読み込み... [OOOO............] # 残りは新しいバイト列を追加して書き込む
ユーザが更に 756 バイトのデータを解釈すると, 一つめのバイト列は不要になます。 不要なバイト列は破棄されます。
[xxxxxxxxxxxxxxxx] # 不要になった. 破棄/回収される [OOOO............] #
チャンクや類似のアイデアは、高性能な専用サーバの実装に広く利用されています。 各種 OS に付属する TCP の実装にも似通ったデータ構造を持つものがあります。 readv() や writev() のように 非連続なメモリ領域を利用するためのシステムコールが用意されていることからも、 このアイデアの人気が伺えます。
一方で、こうした非連続なメモリ領域の扱いは面倒が多いため、 本当に複雑な仕組みが割に合うのかとよく考えるのは大切です。 アプリケーションのパーサが連続な領域やデータのコピーを 求めるものなら、こうした工夫は台無しかもしれません。 またチャンクで利用するメモリを確保/開放するコストが、 コピーの節約を帳消しにしてしまうこともあります。実装には工夫が求められます。
normalize()
低レイヤでがんばりたい時のためにチャンクを使っておき、 同時に上位レイヤで手抜きをする方法を用意しておくことができます。
class chunk_t {
...
void read_normalized(byte_t* b, size_t len, size_t* got);
void normalize();
...
};
read_normalized() は, チャンクの中身を連結して b に書き込みます。 これは read() のかわりに使うことができます。
normalize() はチャンクに含まれるメモリ領域を連結し、 一つのメモリ領域にまとめてチャンクに格納しなおします。 結果のバッファは read_buffer() を代替します。
先に述べたとおり、非連続なメモリ領域を仮想的につなげてみなすアイデアは プログラミングの世界で一般的なものです。仮想記憶はその代表格ですね。 SGI STL の Rope や AVM3 の String も 同様のテクニックを使っており、 これらにも normalize() や read_normalized() に相当する操作が用意されています。
とはいえ、これも繰り返しになりますが、 これらの操作を多用するようなら、最初から連続したメモリ領域を使っておく方が堅実です。 作りも単純になります。
move()
アプリケーションが階層構造をもつとき、チャンクは威力を発揮します。
たとえば、TCP のレイヤをあらわすソケットの上に、HTTP のサーバライブラリを作ろうとしてみましょう。 read() から read_buffer() までの 抽象では、 ソケット -> HTTP ライブラリ -> アプリケーション と二回のコピーが必要です。 HTTP のヘッダはともかく、本文(body)は HTTP ライブラリを素通りします。無駄なコピーです。
このときソケットと HTTP ライブラリが内部表現にチャンクを使っていると、このコピーを節約できます。
class chunk_t {
...
void move(chunk_t* other, size_t len);
...
};
move() 関数は, チャンク自身から other のインスタンスに len バイトのデータを移動します。 このとき move() の実装はメモリをコピーするかわりに、block_t のインスタンスを引き渡すことができます。 body が十分に大きい場合、この引き渡しはコピーより高速かもしれません。
Apache の内部で利用されている bucket brigate は、こうした目的のために設計されています。
Apache のように階層やフィルタを強調したアーキテクチャでは、チャンクの仕組みが威力を発揮する余地があります。 一方で、レイヤの段数が少なく、また各レイヤ間でいつもデータの書き換えがおこるようなケースだと、 このややこしさは割に合いません。HTTP も本文が圧縮されていたら台無しです。
私も複雑な仕組みには心惹かれますが、あとの保守を考えるとなかなか実装には至りません。 覚悟とバランス感覚が問われるところです。
まとめ
というわけで、バイトストリームを扱う抽象のバリエーションを眺めてみました。
- バッファをユーザが確保する方法
- 内部のバッファからコピーする read()
- ユーザのバッファに直接書き込める read_async()
- コールバックのかわりに poll_read()
- システム内部のバッファを公開する方法
- 連続したメモリ領域を仮定する read_buffer()
- 非連続なメモリを利用する read_chunk()
- 非連続を隠す normalize() と read_normalized()
- メモリブロックを引き渡す move()
バイト列を読むだけの簡単な仕事にも、色々なやりかたがある。 そう再発見していただけたでしょうか。