SSブログ

パイプ(pipe)でスレッド(thread)間非同期通信 [Raspberry Pi]

GigEドライバの進捗がはかばかしくなくて、最近このブログで純粋に技術的な記事が減ってる。別にどうということはないんだけど、僕自身としてはそれではこのブログの存在意義がないと思うのでもうちょっと技術的な話を増やしたい。

ということで今日はunixのパイプをスレッド間通信に使おう、という話....ちょっと長くなるけど....

pipeとは?

pipeとはunixのプロセス間通信のメカニズムのひとつである。これは非常に古くてたぶん最初に実装されてから半世紀は経っている。pipeはいわゆるunixのシステムコール(OSのAPIでCPUの特権レベルの切り替えが伴う)だけど、シェル上で直接表現することができて
% ps axuwww | grep usbd
なんてやるときのバーティカルバー「|」がpipeそのものである。シンプルで理解しやすくて便利なアイデアである。そのpipeがいつの間にかマルチスレッドに対応していた。ここではそのことを書く。



Raspbian上でCを使ってマルチスレッドのプログラムを書くとき、スレッド間の通信に何を使ったらいいのかよくわからなかった。ずっとOS Xでばかり書いていてすっぴんのunixだと何が簡単で効率がいいんだろう、と思った。特に、独立したプロセス間と違ってスレッド間では非同期(出す方も受ける方も相手を気にせず自分の勝手にする)にできないとスレッドに分けた意味がなくなるので、非同期通信が可能である、というのが重要になる。

スレッド間通信とは一旦離れて、unixに備わったプロセス間通信の手段、例えばpipeであり、signalであり、共有メモリであり、セマフォであり、といったものは基本的には非同期なものである。しかしpipeだけは非同期といってもちょっと雰囲気が違う。例えばさっきの
% ps axuwww | grep usbd
ではpsがpipeの入り口に文字列を書き込んで、grepはpipeの出口を読む。psが文字列を用意するまではgrepはただpipeの出口を読み込もうとする状態でブロック(プロセスにCPU時間は割り当てられていない)している。psがpipeに1行書き込むと、そのデータがgrepの覗き込んでいるpipeから出てきてブロックが解かれ、grep内部の処理が実行される。

ということで、psが自分の仕事をしている間に、grepは他のことをするというわけにはいかない。grepにとってそもそも他の仕事はなくて待つしかないので、この動作は正しいし、無駄なCPUの消費もなく、シンプルでわかりやすい。

pipeの実装もいたってシンプルなもので、OSのどこか(カーネル内である必要はない)に小さなリングバッファを用意して、pipeの入り口からくるデータをそのバッファに書き込み、pipeの出口へ書き出す。バッファが空だと出口からの読み込みをブロックし、バッファが一杯だと入り口への書き込みをブロックする。こんな簡単な仕組みがunixではプログラマが日に何十回となく使う強力な道具になった。

スレッドとの関係

でも、マルチスレッドの場合は違ってくる。マルチスレッドに書く、ということは複数のスレッドにそれぞれ別のことをさせるために書くわけで、ひとつのスレッドの処理が終わるまで別のスレッドが待つようなマルチスレッドプログラムはありえない。それはスレッドに分けないで、連続した一連の処理にしたほうがいい。

pipeのアイデアがunixに実装されたころはスレッドの概念は無かった。従ってスレッド間通信の必要性は存在せず、また完全に非同期なプロセス間通信には、セマフォと共有メモリを組み合わせて使ったりするのが普通だった。

たぶんpipeが実装されてから10年後ぐらい後にスレッドが提案された。そもそもはプロセスをforkするほどのこともない作業に高価な専用の仮想空間を割り当てるのはもったいない、ちょっとした作業だったら軽いプロセスにしようぜ、ということでメモリ空間を共有する擬似的なプロセスを作ろうというのが動機だった。それがイベントドリブンなプログラムに利用され、さらにマルチコアCPUが一般的になってそのコアを使い倒すという目的が意識されるようになって、スレッドは重要になっていった。

スレッドセーフ性

unixのシステムコールは昔はスレッドセーフではなかった。最初はスレッドがなかったので当然である。スレッドセーフ性は微妙で難しい問題で、大きなプログラムではデバグに苦しむ原因になりやすいが、もともとunixのシステムコールは処理単位が小さく、アトミックな操作にしやすかった。システムコールは徐々に書き換えられ、スレッドセーフなプログラムに使えるようになってきた(もちろんそれでスレッドセーフ性が保証されるわけではない)。

ちなみに、unixのシステムコールの処理単位の小ささの例として、fork()、exec()を上げることができる。unixでは新しいプロセスを生成する方法はfork(プロセスのコピーを作る)してexec(プロセスを別のプログラムの実行に書き換える)する以外に方法はない。その昔、他のOSではそれをいっぺんに行うシステムコールがあった(たいていspawnと呼ばれた)。しかし新たに作られたプロセスの属性を変更するためにはspawnに多くの引数が必要になる。unixでは属性の変更はfork()を呼んでからexec()するまでにその作業をすればいい。一見不便なようで、でもそのおかげでかえってシンプルになるというunixの思想の典型である。これは半世紀経った今でも見習うべき美しいアイデアである。

話を戻して、今ではpipeもある条件を満たせばアトミックな操作とみなすことができて、スレッドセーフに使うことができる。つまり複数のスレッドから別々に読んだり書いたりしても問題は起きない。 ただし、pipeのデフォルトの動作では最初に書いたようにバッファが空のときにreadしたり、いっぱいのときにwriteするとブロックする。もちろん、この動作は変更することができる。

ファイル固有の操作の方法

ちょっと話はそれるけど、古いunixの思想では外にあるものを全てファイルとみなす。ハードディスク上のファイルはあたりまえとして、キーボードやマウスディスプレイといったユーザインターフェイス用のデバイスや、ネットワークやプリンタ、あるいはいろいろなハードウェアインターフェイスで繋がったデバイスも全てファイルだった。ファイルはopenしてreadしたりwriteして終わればcloseすることができる。何でもそうなので同じプログラムを別のデバイス用に使うことができる。それもシェルから直接繋ぎ変えの操作ができる。

ところが、やはりどうしても具体的なデバイスごとに違う操作をしたい、あるいはする必要がある、ということもある。そこでunixは一般的な操作以外のことをするために特別のシステムコール
int fcntl(int fd, int cmd, ...);
int ioctl(int fd, unsigned long request, ...);  
というのがある。ファイルに固有の設定をしたいときに使うことになる。controlの省略がcntlなのかctlなのか一貫していないのが古いunixの雰囲気を残している。

ブロックしないpipeの設定

pipeにはこのfcntlを使ってO_NONBLOCKというフラグをセットすることができる。O_NONBLOCKフラグは空のpipeを読んだりいっぱいのpipeに書こうとすると失敗する。具体的には
#include <unistd.h>
#include <fcntl.h>
....
    int pipeFd[2];
    pipe(pipeFd);
    fcntl(pipeFd[0], F_SETFL, O_NONBLOCK);
....
で設定できる(エラーチェックは無視している)。このように設定して
    char buf[256];
    ssize_t rlen = read(pipeFd[0], buf, 256);
で読んだときpipeが空ならreadは-1を返してerrorにEAGAINが設定される(Resource temporarily unavailable, try again)。このエラーを無視することでブロックしない読み込みが可能になる。ポーリングしながらデータがないときは他のことをしていればいい。

ブロックしないpipeの具体的なコード例

例えばこうやって二つのpipeを作ってひとつスレッドを走らせる
pthread_t           thread;
int                 pipeToSub[2];
int                 pipeToMain[2];

void    *subLoop(void *arg);

int main(int argc, const char * argv[])
{
	int	err;
    err = pipe(pipeToSub);
    err = pipe(pipeToMain);
    err = fcntl(pipeToSub[0], F_SETFL, O_NONBLOCK);
    err = fcntl(pipeToSub[1], F_SETFL, O_NONBLOCK);
    err = fcntl(pipeToMain[0], F_SETFL, O_NONBLOCK);
    err = fcntl(pipeToMain[1], F_SETFL, O_NONBLOCK);

    err = pthread_create(&thread, NULL, subLoop, NULL);
この後、メインスレッド側では
    char    buf[256];
    int	i;
    for (i = 0 ; i <= 50 ; i ++) {
        ssize_t len = read(pipeToMain[0], buf, 256);
        if (len > 0) {
            buf[len] = '\0';
            printf("main read [%s]\n", buf);
        }
        if (i % 10 == 0) {
            char    wbuf[256];
            sprintf(wbuf, "counter = %d", i);
            printf("main write [%s]\n", wbuf);
            ssize_t wlen = write(pipeToSub[1], wbuf, strlen(wbuf));
        }
        //	some chores on main thread
        usleep(100 * 1000);
    }
という風に0.1秒ごとに一方のpipeにデータが来ていないか確認しながら、1秒ごとにpipeのひとつに文字列を書き込む。サブスレッド側では
void    *subLoop(void *arg)
{
    while (1) {
        char    buf[256];
        ssize_t len = read(pipeToSub[0], buf, 256);
        if (len > 0) {
            buf[len] = '\0';
            char returnBuf[256];
            sprintf(returnBuf, "subloop read {%s}", buf);
            write(pipeToMain[1], returnBuf, strlen(returnBuf));
        }
        else if (len < 0)
            ;//perror("");
        //	other chores on sub thread
        usleep(100 * 1000);
    }
同じように0.1秒ごとにメインスレッドから文字列が来ていないか確認して、来ていたらそれを返す。これを実行すると
main write [counter = 0]
main read [subloop read {counter = 0}]
main write [counter = 10]
main read [subloop read {counter = 10}]
main write [counter = 20]
main read [subloop read {counter = 20}]
main write [counter = 30]
main read [subloop read {counter = 30}]
main write [counter = 40]
main read [subloop read {counter = 40}]
main write [counter = 50]
main read [subloop read {counter = 50}]
となって、期待通りに動く。しかもこのコードはOS XとRaspbianの両方で修正なしに動く。これは僕にとって非常にありがたい。デバグをEclipseでRaspbianへリモート接続してやらなくてもOS XのXcodeの上でできる。

pipeによる非同期スレッド間通信の利点

では、そもそもpipeを非同期スレッド間通信に使うと何がいいのか。それは
  1. ロックを使わなくてもいい
  2. 共有されたデータの更新状態が自明になる
の2点である。

ロックに関して、たとえば両方のスレッドで同じメモリ領域をアクセスすることで通信するとき、排他制御する必要がある。ひとつのフラグだけを共有するのであればアトミック操作を使えば自動的に排他制御できる。が、一連のメモリ領域でその中で一貫性が必要な場合、たとえば、文字列とその長さを保持しているような場合、文字列そのものの変更と長さデータの変更が同時にされないといけない。文字列を書き換えてから長さデータをそれに従って書き換えたとき、その間に他のスレッドが文字列にアクセスすると一貫性が保たれない。そこでロック(POSIXではpthreadのmutex_lockなど、OS XではNSLockとか)を使って排他制御する必要がある。

ちなみに、strcpy()などの文字列操作関数はシステムコールではなくて実行時ライブラリの関数なのでアトミックではない。したがってstrcpy実行の途中で他のスレッドが割り込む可能性がある。文字列の長さを別の変数で保持しないとしてもロックは必要になる。

pipeを使えば、pipeの両端で専用のデータのコピーがそれぞれあることになるので、排他制御は必要なくなる。もちろん書き込みによってリングバッファへのコピーが発生するのでロックよりもオーバーヘッドは大きい。しかし多重ループの深いところでロックを使うとパフォーマンスに影響することがあるが、pipeではその心配がなくなる。ときどきあることだけど、ループの一番深いところでデータの書き換えが発生し、そこでロックを使った場合、その一番深いループの中でロックが解放される期間が少ない、要するに他にすることがないと、ロックの空いている時間がほとんどなくなって、他のスレッドからのアクセスが実質的にシリアルになってしまう(常にどちらかのスレッドしか動いていない)、ということが発生する。pipeだとロックを使わずに済み、その心配がなくなる。

また普通のスレッド間通信では、データが更新されたか、更新されたデータを読んだか、などといった他のスレッドで発生した動作を知らせるための手段が必要になる。共用しているメモリ領域の一部にフラグを設けたりするのが普通であるが、pipeを使えば不要になる。

pipeによるスレッド間通信が有利になる例

例として、Raspberry PiではよくあるA/Dなどのデータを読み書きするループを専用のサブスレッドに置くような場合を考える。サブスレッドではループの一番底ではA/Dコンバータの変換を指示して、変換が終わればメインスレッドに通知してデータを渡すというようなもので、サブスレッドではそれだけに専念し、メインスレッドでは定期的にデータを読み込むというような場合である。

普通のやり方

実装にはいろいろなやり方はあるだろうけど、例えば共通して使うメモリ領域へのポインタをメインとサブのスレッドで保持するとしよう(これが普通のやり方かどうか疑問かもしれないけど)。そこにはA/Dのデータとフラグの領域を確保する。サブスレッドで新しいデータができたら、まず共有領域全体が他からアクセスできないようにロックしてデータを書き込み、データが更新されたというフラグをセットしてそのあとアンロックする。

メインスレッド側では定期的にフラグをチェックする。データが更新されていればデータを読みだしてフラグをクリアする。メイン側でもロック→フラグチェック→更新なら読み出し→アンロックとする必要がある。フラグチェックだけならアトミックにできてロックの必要はないように思えるけど、もし更新されていればそのあとデータを読み出すことになり、結局ロックが必要になる。

ところで、メインスレッド側ではユーザインターフェイスの操作もあるので、常に定期的にできるとは限らない。さっきの最も簡単な場合では、サブスレッドの更新間隔よりメインスレッドでのチェックの頻度が少ないと、データが捨てられることになる。これはイマイチなので、共通のメモリ領域には複数のデータが確保できるようにして、データの個数を表す領域も作ることにする。サブスレッドでは新しいデータを書き込むときに、フラグが(メインスレッドによって)クリアされていなければ、データを追加する。

メインスレッドではすべてのデータが読み出せたときにだけフラグをクリアする。データ領域がオーバーフローしない限りは、これでデータを取りこぼすことはなくなる。

非同期pipeによる方法

これを非同期pipeを使って実装することを考える。サブスレッド側では新しいデータができたら適当なヘッダをつけてpipeに書き込む(ヘッダはA/Dのデータに構造がある場合、どこが頭かを知らせるためのもので、2バイトのデータひとつだけならただpipeに突っ込むだけでも構わない)。

メインスレッド側では定期的にpipeを読み出す。読み出しがEAGAINで失敗したらそれはまだデータが更新されていない、ということを表している。読み出せることがわかったら、pipeからすべてのデータを読み出してpipeをカラにする。これだけでいい。

pipeではロック・アンロックは必要ないし、データの更新状態を表すフラグも必要ない。データがいくつあるかを示す値も必要ない。pipeはキューでもあるのでリングバッファがオーバオフローしない限りはデータを取りこぼすことはない。

pipeを使う方がずっとシンプルになるということがわかる。ロック・アンロックを使わないのでデッドロックの心配もなく精神衛生上非常に良い。

非同期pipeの不利な点

もちろんなんでも非同期pipeでやればいいというものでもない。普通はpipeのリングバッファはそれほど大きくないので、どちらかといえば頻繁ではあるけど、小さなデータに向いていて、動画データをpipeで送るというのはふさわしくない。

また、読み手が複数ある場合、工夫が必要になる。極端な例として、一つのデータをたくさんのスレッドで読み書きするような場合は、ロックを実装する方がずっと簡単になるのは明らかである。

非同期pipeにまつわるその他いろいろ

プロセス間通信としての非同期pipe

非同期pipeはスレッド間ではなくもちろんプロセス間でも使える。使う場面は少ないけれど、socketをブロックせずに使うような場面で、ローカルなマシン上でしか使わないことがはっきりしている場合、socketの代わりに使うことができる。socketも便利だけど、pipeの方がずっと簡単に使うことができる。しかし、汎用性という面ではsocketの方に軍配があがる。pipeが実装されてから、やはり10年ぐらい後にsocketは整備されたので、ずっと洗練されているということであろう。

古い非同期入出力の考え方

実は、昔から(pipeが実装される前から)ブロックしないO_NDELAYというフラグがあった。これはO_NONBLOCKと動作は実質的に同じである。ただし、O_NDELAYフラグは極端に遅いデバイス、例えばターミナル(キーボードなど)を待ってブロックしていたら何も仕事ができなくなるのを避けるためのものだった。互換性の問題もあり、マルチスレッド対応用のO_NONBLOCKというフラグが作られたようである。unixの実装によってはO_NDELAYでも動作は同じになる場合もあるらしい。

signalの利用とその危険性

また、読めるデータが来たときにsignalを送る、という動作もfcntlで設定することができる。signal handlerを定義して補足できるようにすれば、他の仕事に専念することもできる。

しかし、signal handlerを使ってコールバック関数を呼ぶような形にはしないほうがいい。signal handlerはレイテンシを低く抑えるために、ソフトウェア割り込みを使って実装されていることが多い。このためsignal handlerの内部では何でもできるわけではない。極端な場合、他のシステムコールの実行中にhandlerが呼び出されている場合もある。汎用のコールバックにするとコールバック関数の中で何をするかわからないので、危険である。実は僕は若い頃signal handlerで痛い目にあった。

従ってsignalを使ってもhandlerの中ではフラグを操作することぐらいしかできないので、ポーリングは必要になってしまう。どうせポーリングするなら、signalを使わなくてもいい、ということになる。

アトミックなpipeの条件

ところで、pipeが「ある条件を満たせば」アトミックとみなせる、と書いた。その条件は何かと言うとpipeのリングバッファのサイズが関係する。バッファサイズを超えるデータの読み書きはいっぺんにはできない。もちろんreadやwriteが返す完了バイト数を常にチェックして、完全に終わることを確認するようなコードでないといけないけど、複数回に分割されたときは当然アトミックではなくなる。ひとつのpipeに複数のスレッドから書き込んだとき、混ざってしまう可能性がある。

POSIX標準ではPIPE_BUFという定数が定義されている。OS Xではsys/syslimits.hに
#define	PIPE_BUF		  512	/* max bytes for atomic pipe writes */
とあって、Raspbianではlimits.hに
#define PIPE_BUF        4096	/* # bytes in atomic write to a pipe */
となっている。POSIXではこれ以下のサイズであればpipeはアトミックな動作を保証することになっている。この値は実際のリングバッファのサイズに比べるとずっと小さい(OS XもRaspbianも実際のバッファの大きさは65536バイトある)が、バッファの残りサイズがPIPE_BUFより小さいときにはO_NONBLOCKがクリアされているときはブロックし、O_NONBLOCKがセットされているときは失敗してすぐ返ってくる。

つまりpipeにO_NONBLOCKをセットしてデータのサイズをPIPE_BUF以下で使えばスレッド間通信のインフラとして使える。pipeはもともとオーバーヘッドは低い(実際の実装がどうなってるか知らないんだけど、リングバッファをランタイムからユーザプロセス空間に確保することもできるので、カーネルのリソースを消費しない実装もできるはず)ので効率がいいし、アトミックな動作が保証できるのでセマフォを専用に準備する必要もない。これを使えばずっと後になってPOSIX標準になったメッセージキュー(mqueue.hに定義されている)は使う必要はなくなる。実はメッセージキューはPOSIXなのにOS Xでは使えないので、ありがたい。

まとめ

ということでRaspbianでマルチスレッドを書いたときのスレッド間通信の小さなインフラを書いた。ポーリングが前提のコードなので汎用性はあまりないけど、ロックを使わなくてもすむので、シンプルで案外使いやすくなった。僕にとっては悪くない。まあ、ここでコードを公開するほどのまとまりもないのでお話だけで。使いたい人は多分自分で書けるだろう。

そう考えるとOS XのRunLoopは便利だと思ってしまう。あれも実は、中身はポーリングのループなんだけど、プログラマがそれを意識しなくてもいいし、安全なコールバックも書けるし、スレッド間通信も(実際にはスレッド間通信ではないのも含めて)いろいろ用意されているし、入力とみなすインターフェイスの追加削除ができて汎用性が高いし、しかもレイテンシは低くてCPU時間の消費も低い。しかし残念ながらメモリは大量に喰うし、メインスレッドは特別扱いしないといけないし、なんといってもOS X(とiOS)でしか動かない。

Raspberry PiにiOSをポート、なんてことは起きないかなあ。そうなるとめちゃ便利なんだけどなあ。もちろん絶対ないだろうけど。
nice!(0)  コメント(0)  トラックバック(0) 

nice! 0

コメント 0

コメントを書く

お名前:
URL:
コメント:
画像認証:
下の画像に表示されている文字を入力してください。

トラックバック 0

この広告は前回の更新から一定期間経過したブログに表示されています。更新すると自動で解除されます。