Parallel::Fork::BossWorkerAsyncがいい感じ
先日 id:hirose31 と
- 並列処理はParallel::ForkManagerとかParallel::Preforkが定番だけど、もうちょっと効率よくやりたいこともある
- P::ForkManagerはタスクごとにforkするので負荷の分散は綺麗にできるが、タスクの数だけforkが発生して効率がよくない
- P::Preforkだとfork回数は並列度分だけだが、タスク開始前にタスクの分配を完了させないといけない
- Cでmultithreadならmaster-workerモデルでmasterにqueue持ってmutexでロックするような練習問題とかある
- けどPerlでmultithreadやりたくないし、実現するとすればsocket経由でタスクを渡していく感じにしないといけないか
- 誰かCPANに上げてるんじゃないかなー
- なければ作るかー
的な話をしていました。
で、metacpanを漁っていたところ、それっぽいのを見付けました。Parallel::Fork::BossWorkerAsync。
計算モデル
このモジュールは事前に1個のbossとk個のworkerプロセスをforkし、masterが都度タスクを生成して登録していきます。
workerはただただwork_handlerで与えられたsubを実行していき、計算結果をreturnします。
workerの計算結果が帰ってくるとmasterではresult_handlerで与えられたsubが呼び出されます。(たとえば結果を集めるオブジェクトに登録する)
最後にshutdownを呼び出すとbossと全workerが終了します。
プロセスツリーとしてはこうなります。
└─perl,18371 bwa.pl # アプリケーション └─perl,18372 bwa.pl # boss ├─perl,18373 bwa.pl # worker ├─perl,18374 bwa.pl # worker ├─perl,18375 bwa.pl # worker ├─perl,18376 bwa.pl # worker └─perl,18377 bwa.pl # worker
bossプロセスは各workerとアプリケーションとタスク/結果のやりとりをするためにひたすらselect(2)を呼んでいるプロセスです。
workerプロセスはbossからタスクを受け取って処理したらbossに結果を返します。
アプリケーションプロセスはbossとだけ直接通信し、タスクを投げて結果を受け取ります。
利用例
大したものではないですが、podに書かれていないようなモデルでの利用例。
workerは単に1秒sleepし、自分のjob idを返します。
masterは返されたjob idを配列に追加していって最後にdumpします。
それだけ。
この例ではk=5としていて、最初に5プロセスが生まれた後はプロセスの交代は起こっていません。
タスク供給はちょっと不定期な感じにするために、17個ずつ供給し、終了次第次の17個を供給してます。
53 / 17 = 3あまり2
17個のタスクを5並列でやると4秒かかるので、
4 * 3 + 1 = 13秒で実行が完了します。
#!/usr/bin/env perl use strict; use warnings; use Parallel::Fork::BossWorkerAsync; use Data::Dumper qw(Dumper); use Time::HiRes qw(gettimeofday); my @Finished_Jobs = (); my $PARALLELISM = 5; # sleep time my $NUM_WORKS = 53; my $WORKS_AT = 17; # この時点でworkerがforkされた my $bw = Parallel::Fork::BossWorkerAsync->new( work_handler => \&work, result_handler => \&rhandler, worker_count => $PARALLELISM, ); my $thrown_work = 0; while($thrown_work < $NUM_WORKS) { my @tasks; for my $i ($thrown_work .. $thrown_work + $WORKS_AT - 1) { push @tasks, { id => $i, sleep => 1 }; $thrown_work++; if ($thrown_work >= $NUM_WORKS) { last; } } # タスク集合はここでディスパッチされる $bw->add_work(@tasks); # 実行完了待ちタスク数が得られる。この関数はblockしない while($bw->pending) { # "return"されたオブジェクトをそのまま受け取る。以下のように単に呼び出すとblocking my $ref = $bw->get_result; # P::F::BossWorkerAsyncのレイヤでエラーがハンドルされた場合は ERROR というkeyにメッセージが入ってくる(タイムアウトを指定した場合など) if ($ref->{ERROR}) { print STDERR "ERR: ". $ref->{ERROR}; } else { print $ref->{job} . "\n"; } } } # workerプロセスを終了させる $bw->shut_down; print Dumper \@Finished_Jobs; # worker側で各タスクに対してcallbackされる sub work { my ($job) = @_; my $id = $job->{id}; my $t0 = gettimeofday; print "$t0 start $id\n"; sleep $job->{sleep}; my $te = gettimeofday; print "$te end $id\n"; return { job => $id }; } # master側でタスク終了時にcallbackされる sub rhandler { my ($result) = @_; my $job = $result->{job}; push @Finished_Jobs, $job; return $result; }
内部的にData::Dumperを使ったシリアライズをしているらしく、データの区切りに決め打ちのバイト列を指定する必要があります。そこはStorable::nfreeze/thawした方がいいんじゃないかな?と思ったりはします。
P::ForkManagerやP::Preforkに比べると記述は増えますが、用途によっては便利なのでは?