limitusus’s diary

主に技術のことを書きます

Parallel::Fork::BossWorkerAsyncがいい感じ

先日 id:hirose31

  • 並列処理はParallel::ForkManagerとかParallel::Preforkが定番だけど、もうちょっと効率よくやりたいこともある
  • P::ForkManagerはタスクごとにforkするので負荷の分散は綺麗にできるが、タスクの数だけforkが発生して効率がよくない
  • P::Preforkだとfork回数は並列度分だけだが、タスク開始前にタスクの分配を完了させないといけない
  • Cでmultithreadならmaster-workerモデルでmasterにqueue持ってmutexでロックするような練習問題とかある
  • けどPerlでmultithreadやりたくないし、実現するとすればsocket経由でタスクを渡していく感じにしないといけないか
  • 誰かCPANに上げてるんじゃないかなー
  • なければ作るかー

的な話をしていました。

で、metacpanを漁っていたところ、それっぽいのを見付けました。Parallel::Fork::BossWorkerAsync

計算モデル

このモジュールは事前に1個のbossとk個のworkerプロセスをforkし、masterが都度タスクを生成して登録していきます。
workerはただただwork_handlerで与えられたsubを実行していき、計算結果をreturnします。
workerの計算結果が帰ってくるとmasterではresult_handlerで与えられたsubが呼び出されます。(たとえば結果を集めるオブジェクトに登録する)
最後にshutdownを呼び出すとbossと全workerが終了します。

プロセスツリーとしてはこうなります。

  └─perl,18371 bwa.pl # アプリケーション
      └─perl,18372 bwa.pl # boss
          ├─perl,18373 bwa.pl # worker
          ├─perl,18374 bwa.pl # worker
          ├─perl,18375 bwa.pl # worker
          ├─perl,18376 bwa.pl # worker
          └─perl,18377 bwa.pl # worker

bossプロセスは各workerとアプリケーションとタスク/結果のやりとりをするためにひたすらselect(2)を呼んでいるプロセスです。
workerプロセスはbossからタスクを受け取って処理したらbossに結果を返します。
アプリケーションプロセスはbossとだけ直接通信し、タスクを投げて結果を受け取ります。

利用例

大したものではないですが、podに書かれていないようなモデルでの利用例。

workerは単に1秒sleepし、自分のjob idを返します。
masterは返されたjob idを配列に追加していって最後にdumpします。
それだけ。

この例ではk=5としていて、最初に5プロセスが生まれた後はプロセスの交代は起こっていません。
タスク供給はちょっと不定期な感じにするために、17個ずつ供給し、終了次第次の17個を供給してます。
53 / 17 = 3あまり2
17個のタスクを5並列でやると4秒かかるので、
4 * 3 + 1 = 13秒で実行が完了します。

#!/usr/bin/env perl

use strict;
use warnings;

use Parallel::Fork::BossWorkerAsync;
use Data::Dumper qw(Dumper);
use Time::HiRes qw(gettimeofday);

my @Finished_Jobs = ();
my $PARALLELISM = 5;
# sleep time
my $NUM_WORKS = 53;
my $WORKS_AT = 17;

# この時点でworkerがforkされた
my $bw = Parallel::Fork::BossWorkerAsync->new(
    work_handler => \&work,
    result_handler => \&rhandler,
    worker_count => $PARALLELISM,
);

my $thrown_work = 0;
while($thrown_work < $NUM_WORKS) {
    my @tasks;
    for my $i ($thrown_work .. $thrown_work + $WORKS_AT - 1) {
        push @tasks, { id => $i, sleep => 1 };
        $thrown_work++;
        if ($thrown_work >= $NUM_WORKS) {
            last;
        }
    }
    # タスク集合はここでディスパッチされる
    $bw->add_work(@tasks);
    # 実行完了待ちタスク数が得られる。この関数はblockしない
    while($bw->pending) {
        # "return"されたオブジェクトをそのまま受け取る。以下のように単に呼び出すとblocking
        my $ref = $bw->get_result;
        # P::F::BossWorkerAsyncのレイヤでエラーがハンドルされた場合は ERROR というkeyにメッセージが入ってくる(タイムアウトを指定した場合など)
        if ($ref->{ERROR}) {
            print STDERR "ERR: ". $ref->{ERROR};
        } else {
            print $ref->{job} . "\n";
        }
    }
}

# workerプロセスを終了させる
$bw->shut_down;

print Dumper \@Finished_Jobs;

# worker側で各タスクに対してcallbackされる
sub work {
    my ($job) = @_;
    my $id = $job->{id};
    my $t0 = gettimeofday;
    print "$t0 start $id\n";
    sleep $job->{sleep};
    my $te = gettimeofday;
    print "$te end $id\n";
    return { job => $id };
}

# master側でタスク終了時にcallbackされる
sub rhandler {
    my ($result) = @_;
    my $job = $result->{job};
    push @Finished_Jobs, $job;
    return $result;
}

内部的にData::Dumperを使ったシリアライズをしているらしく、データの区切りに決め打ちのバイト列を指定する必要があります。そこはStorable::nfreeze/thawした方がいいんじゃないかな?と思ったりはします。
P::ForkManagerP::Preforkに比べると記述は増えますが、用途によっては便利なのでは?