「プログラミングErlang」のRing Benchmarkをやってみた

最近「プログラミングErlang」を読んでいます。

プログラミングErlang

プログラミングErlang

今後、並行、分散プログラミングが主流になるにつれて、Erlangという言語のもつパワーやその背後に横たわっている並行、分散プログラミングについての考え方がとても重要だと思うからです。最近Scala界隈で良く耳にするAkkaフレームワークもこのErlangから影響を受けているようです。

で、この本の中にこんな問題があったのでやってみました。

リングのベンチマークを書いてみよう。N個のプロセスからなるリングを作り、1つのメッセージがリングをM回るようにして、合計でN*Mのメッセージが送信されるようにする。さまざまなNとMの値について所要時間を計ってみよう。

プロセスとモジュールの設計

各プロセスとモジュールはこんな感じにしました。

2つのモジュールで構成されていて、それぞれのモジュールはこんな感じの責務を持ちます。

  • ringbenchモジュール: メインプロセスを包含したモジュールです。このベンチマークの動作を制御します
  • ringモジュール: リングです。リングの生成/停止、リングの各プロセスの処理など。リング自体はルートプロセスと通常のリングプロセスから構成されます。

下記のソースコードgithub.com/everpeace/ring-benchmarkにあります。

まずは通常のリングプロセスから見ていきましょう。

通常のリングプロセス

リング中に存在するルートプロセス以外のプロセスはこのプロセスです。このプロセスは受け取ったトークンを隣のプロセスに中継するのが主な責務です。killというアトム*1ベンチマークが終わった後に自分を終了させるために使っています。

% Name は名前、 Nextはお隣のプロセスID
listen_tokens(Name, Next) ->
  receive
    % killを受け取ったら終了。
    kill ->
      forward_kill(Name, Next);
    % それ以外を受け取ったら次に転送してまた待ち状態に。
    Token ->
      forward_token(Name, Token, Next),
      listen_tokens(Name, Next)
  end.

forward_token(Name, Token, Next) ->
      io:format("[~p(~p)] token \"~p\" received. forwading to ~p~n", [Name, self(), Token, Next]),
      Next ! Token.

forward_kill(Name, Next)->
      io:format("[~p(~p)] exitting...~n", [Name, self()]),
      Next ! kill.

プロセスの生成は次のような感じにします。プロセス生成時にお隣さんを引数にとってしまうと、再帰的な生成がやりづらいので、プロセスをスタートさせると、リング中のお隣さんを指定してもらうのを待つ状態(wait_for_link関数)に入るようにします。その状態で{link, Next}というメッセージを受け取ると、Next(プロセスID)をお隣さんとして、トークンを待つ状態に入る(listen_tokens関数(上参照))という具合です。

% 通常のリングプロセスの生成
node_start(Name) ->
  spawn(fun() ->
          io:format("[~p(~p)] starting...~n", [Name, self()]),
          wait_for_link(Name)
        end).
% 次要素の指定を待つ
wait_for_link(Name) ->
  receive
    {link, Next} ->
      listen_tokens(Name, Next)  %上参照
  end.

ルートプロセス

メインプロセスとリングのインターフェースとなるプロセスです。リング中のプロセスとしても振る舞うので、通常のリングプロセスより少々複雑になります。

メインプロセスからのメッセージはタプルで受け取ることにして、最初の要素にメインプロセスのプロセスIDが入っているときに、メインプロセスからのメッセージだと解釈することにしました。

メインからのトークンを受け取ったら、タプルの第1要素に0周目であることを付け加えて、リング中に放流します。通常のプロセスは中継するだけにして、自分がそのメッセージを受け取ることで、リングを一周したことを検出しています。

最後にM周したことを検出したらメインプロセスにendedというメッセージを送っています。

ルートプロセスの生成は通常のリングプロセスと大体同じ感じなので省略。

listen_tokens(Name, Main, Next, M) ->
  receive
    % メインからのメッセージ
    % メインからkillを受け取った時(リング停止)
    {Main, kill} ->
      Next ! kill,
      listen_tokens(Name, Main, Next, M);
    % メインからトークンを受け取った時(トークンM周開始)
    {Main, Token} ->
       io:format("[~p(~p)] token \"~p\" is injected from main. the token starts rounding in the ring.~n", [Name, self(), Token]),
       Next ! {0, Token},     %  0周目
       listen_tokens(Name, Main, Next, M);
    % リング中を流れるメッセージ
    %  M周完了
    {I, Token} when I =:= M-1 ->
      io:format("[~p(~p)] token \"~p\" reaches root ~p times.~n", [Name, self(), Token, M]),
      io:format("[~p(~p)] report the finish of benchmark to main(~p)~n", [Name, self(), Main]),
      Main ! ended,    %  メインにendedを送信
      listen_tokens(Name, Main, Next, M);
    % I 周完了
    {I, Token} ->
      io:format("[~p(~p)] token \"~p\" reaches root ~p times.~n", [Name, self(), Token, I+1]),
      Next ! {I+1, Token},   %  I+1周目
      listen_tokens(Name, Main, Next, M);
    % 終了
    kill ->
      io:format("[~p(~p)] exitting... \n", [Name, self()])
  end.
リングの生成

ルートプロセスと通常のリングプロセスを生成して、サイズNのリングを生成します。最初にルートプロセスを一個生成し、その後にN-1個のリングプロセスを生成しています。

init(Main, N, M) ->
  io:format("[main(~p)] constructing ~p nodes ring...~n", [self(), N]),
  Root = root_start(0, Main, M),         % ルートを生成
  First = create_nodes(1, N-1, Root), % ルート以外のリングプロセスを再帰的に生成(最初のリングプロセスIDが返る)
  Root ! {link, First},                           % ルートの次要素は最初のプロセス
  Root.                                              % ルートのプロセスIDを返す

%リングサイズが1の時
create_nodes(_, 0, Root)-> Root;  
% 最後のノードはルートが次要素
create_nodes(Num, Num, Root)->
  Node = node_start(Num),
  Node ! {link, Root},
  Node;
% 途中のノードは次のプロセスを次要素として指定
create_nodes(I, Num, Root)->
  Node = node_start(I),
  Next = create_nodes(I+1, Num, Root),
  Node ! {link, Next},
  Node.
メインプロセス

上記のリングと対話をしながらベンチマークを行うのがメインプロセスです。下記を行います。

  1. リングの生成
  2. リングへのトークンの注入
  3. リングからトークンM周完了通知受信
  4. 上記の処理時間の計測

mainプロセスのコードはこんな感じです。

start(N, M) ->
  statistics(runtime),
  statistics(wall_clock),
  Main = self(),
  Root = ring:init(Main, N, M),  % リング生成、ルートプロセスのプロセスIdが返ってくる。
  io:format("[main(~p)] injecting the token \"~p\".~n", [self(), M]),
  Root ! {Main, token},              % リングのルートプロセスにトークンを送る(ベンチマーク開始)
  receive                                  % リングからの終了通知待ち
    ended -> void
  end,
  Root ! {Main, kill},                  % リングを停止
  {_, Time1} = statistics(runtime),
  {_, Time2} = statistics(wall_clock),
  U1 = Time1,
  U2 = Time2,
  io:format("[main(~p)] ring benchmark for ~p processes and ~p tokens = ~p (~p) milliseconds\n", [self(), N, M, U1, U2]).

動作させてみる。

erlangシェルで動作させてみましょう。N=3,M=2です。

-(~/Documents/github/ring-benchmark)-
(git:master) $ erl
Erlang R15B01 (erts-5.9.1) [source] [64-bit] [smp:4:4] [async-threads:0] [hipe] [kernel-poll:false]

Eshell V5.9.1  (abort with ^G)
1> ringbench:start(3,2).
[main(<0.31.0>)] constructing 3 nodes ring...
[main(<0.31.0>)] injecting the token "2".
[0(<0.33.0>)] starting...
[1(<0.34.0>)] starting...
[2(<0.35.0>)] starting...
[0(<0.33.0>)] token "token" is injected from main. the token starts rounding in the ring.
[1(<0.34.0>)] token "{0,token}" received. forwading to <0.35.0>
[2(<0.35.0>)] token "{0,token}" received. forwading to <0.33.0>
[0(<0.33.0>)] token "token" reaches root 1 times.
[1(<0.34.0>)] token "{1,token}" received. forwading to <0.35.0>
[2(<0.35.0>)] token "{1,token}" received. forwading to <0.33.0>
[0(<0.33.0>)] token "token" reaches root 2 times.
[0(<0.33.0>)] report the finish of benchmark to main(<0.31.0>)
[main(<0.31.0>)] ring benchmark for 3 processes and 2 tokens = 10 (3) milliseconds
[1(<0.34.0>)] exitting...
[2(<0.35.0>)] exitting...
[0(<0.33.0>)] exitting... 
ok

うまく行ってますね。この記事で使ったコードはgithub.com/everpeace/ring-benchmarkにあります。Makefileが作ってあるので、make runだけでもしよかったら試してみてください。

*1:シンボルみたいなやつ