「プログラミングErlang」のRing Benchmark をAkkaでやってみた
「プログラミングErlang」のRing Benchmarkをやってみた
の続編です。今度は、Akkaでやってみました。
github.com/everpeace/ring-benchmark-in-akka
ErlangでプロセスといっていたのがAkkaではアクターになります。
ちょっとだけ違うのは、Erlangのプロセスは
Pid = spawn(somefunction)
ってやると、somefunctionが実行されると終わっちゃうけれど、Akkaのアクターは
class Hoge extends Actor{ def receive ={ case 'msg => println('msg) } }
とやっても終わらずにメッセージを待ち続けるところ。あるメッセージを受け取ったときに終了したいなら、こんな風に自分にPoisonPillを飲ませてやればOK.
class Hoge extends Actor{ def receive ={ case 'msg => self ! PoisonPill } }
では、普通のリングプロセスから見てみましょう
普通のリングプロセス
こんな感じです。
abstract class ProcessInRing extends Actor { import context._ def receive = wait_for_link def wait_for_link: Receive = { case ('link, next: ActorRef) => become(listen_tokens(next)) } def listen_tokens(next: ActorRef): Receive override def preStart() = println("[%s] starting..." format self) override def postStop() = println("[%s] exitting..." format self) } class Ordinal extends ProcessInRing { def listen_tokens(next: ActorRef): Receive = { case (i: Int, token) => println("[%s] token \"%s\" received, forward to %s" format(self, (i, token), next)) next forward(i, token) } }
ルートプロセスとも共通な振る舞いだけ抜き出してProcessInRingにしました。
- アクターが生成されたら、リング上のお隣さんのlinkをまつ
- リンクされたらトークンを待つ(これがルートと普通ので違う)
- ログ(生成と消滅のときにメッセージだす)
なので、Ordinalクラスのlisten_tokensは単純です。
あと、Erlangと変えたところは、ActorContextを活用して、リングの終了処理をAkkaに任せたところです。なので、listen_tokensの所に自分を終了させる部分は書かれていません。Ringプロセスとメインプロセスの節参照。
ルートプロセス
class Root(val M: Int) extends ProcessInRing { def listen_tokens(next: ActorRef): Receive = { case ('start, token) => println("[%s] token \"%s\" is injected." format(self, token)) println("[%s] token \"%s\" is forwarded to %s" format(self, (0, token),next)) next forward(0, token) case (i: Int, token) if i == (M - 1) => println("[%s] token \"%s\" received, the token reaches root %d time" format(self, (i, token), i + 1)) println("[%s] report the end of benchmark to main" format (self)) sender ! 'ended case (i: Int, token) => println("[%s] token \"%s\" received, the token reaches root %d time" format(self, (i, token), i + 1)) println("[%s] token \"%s\" is forwarded to %s" format(self, (i+1, token),next)) next forward(i + 1, token) } }
これも特に前と変わりませんね。メインから受け取ったときはリングへ流し込んで、リングの内側から受け取ったときは、もう一周流すか、メインに終了を報告するかです。
Ring プロセス
Erlangとはちょっと変えて、Ringプロセスというのを作ってみました。
AkkaではActorはcontextという名前でActorContextというのを持っています。そのコンテキスト中で生成したプロセスは自分が終了したときに、一緒に終了されます。それを今回は活用してみました。
主にRingの生成と消滅が責務です。消滅のほうはAkkaの機能(PoisonPillが来たら終了)なのでコードには反映されません。なので、Ringに送信されたメッセージはrootへ転送されるだけです。
Ringの生成手順自体はErlang版と大差ありません。
// Main --> Root --> Ordinal --> Ordinal --> ... --> Ordinal // +<----------------------------------------+ class Ring(val N: Int, val M: Int) extends Actor { require(N > 0 && M > 0) val root = init def receive = { case token => root forward token } private[this] def init: ActorRef = { val root = context.actorOf(Props(new Root(M)), name = "0") val first = create_ordinals(1, N - 1, root) root ! ('link, first) root } private[this] def create_ordinals(i: Int, num: Int, root: ActorRef): ActorRef = i match { case x if x == num => val ordinal = context.actorOf(Props[Ordinal], name = i.toString) ordinal !('link, root) ordinal case _ => val ordinal = context.actorOf(Props[Ordinal], name = i.toString) val next = create_ordinals(i + 1, num, root) ordinal !('link, next) ordinal } }
メインプロセス
クラス宣言とかmainとかは省略。
val system = ActorSystem("RingBenchmarkInAkka") val start = System.currentTimeMillis() // リングを作ります。 val ring = system.actorOf(Props(new Ring(n, m))) // askのときのタイムアウト implicit val timeout = Timeout(1000 seconds) // ベンチマークを開始して、終了を待つ (ring ? ('start, 'token)) onSuccess { case 'ended => println("[main] received the end of benchmark. killing the ring...") // graceful stop: // ringへPoisonPillを送って、ringの終了を待つ。 // ringのActorContextの中で作られたアクターたちも終了される。 val stopped: Future[Boolean] = gracefulStop(ring, timeout.duration)(system) Await.result(stopped, timeout.duration) // now system can be stopped safely. system.shutdown() val end = System.currentTimeMillis() println("[main] ring benchmark for %d processes and %d rounds = %d milliseconds" format(n, m, end - start)) }
とこんな感じです。
動かしてみる。
$ sbt > run 4 3 [info] Compiling 2 Scala sources to /Users/everpeace/Documents/github/ring-benchmark-in-akka/target/scala-2.9.2/classes... [info] Running org.everpeace.ringbench.RingBenchmarkInAkka 4 3 [Actor[akka://RingBenchmarkInAkka/user/$a/0]] starting... [Actor[akka://RingBenchmarkInAkka/user/$a/1]] starting... [Actor[akka://RingBenchmarkInAkka/user/$a/2]] starting... [Actor[akka://RingBenchmarkInAkka/user/$a/3]] starting... [Actor[akka://RingBenchmarkInAkka/user/$a/0]] token "'token" is injected. [Actor[akka://RingBenchmarkInAkka/user/$a/0]] token "(0, 'token)" is forwarded to Actor[akka://RingBenchmarkInAkka/user/$a/1] [Actor[akka://RingBenchmarkInAkka/user/$a/1]] token "(0, 'token)" received, forward to Actor[akka://RingBenchmarkInAkka/user/$a/2] [Actor[akka://RingBenchmarkInAkka/user/$a/2]] token "(0, 'token)" received, forward to Actor[akka://RingBenchmarkInAkka/user/$a/3] [Actor[akka://RingBenchmarkInAkka/user/$a/3]] token "(0, 'token)" received, forward to Actor[akka://RingBenchmarkInAkka/user/$a/0] [Actor[akka://RingBenchmarkInAkka/user/$a/0]] token "(0, 'token)" received, the token reaches root 1 time [Actor[akka://RingBenchmarkInAkka/user/$a/0]] token "(1, 'token)" is forwarded to Actor[akka://RingBenchmarkInAkka/user/$a/1] [Actor[akka://RingBenchmarkInAkka/user/$a/1]] token "(1, 'token)" received, forward to Actor[akka://RingBenchmarkInAkka/user/$a/2] [Actor[akka://RingBenchmarkInAkka/user/$a/2]] token "(1, 'token)" received, forward to Actor[akka://RingBenchmarkInAkka/user/$a/3] [Actor[akka://RingBenchmarkInAkka/user/$a/3]] token "(1, 'token)" received, forward to Actor[akka://RingBenchmarkInAkka/user/$a/0] [Actor[akka://RingBenchmarkInAkka/user/$a/0]] token "(1, 'token)" received, the token reaches root 2 time [Actor[akka://RingBenchmarkInAkka/user/$a/0]] token "(2, 'token)" is forwarded to Actor[akka://RingBenchmarkInAkka/user/$a/1] [Actor[akka://RingBenchmarkInAkka/user/$a/1]] token "(2, 'token)" received, forward to Actor[akka://RingBenchmarkInAkka/user/$a/2] [Actor[akka://RingBenchmarkInAkka/user/$a/2]] token "(2, 'token)" received, forward to Actor[akka://RingBenchmarkInAkka/user/$a/3] [Actor[akka://RingBenchmarkInAkka/user/$a/3]] token "(2, 'token)" received, forward to Actor[akka://RingBenchmarkInAkka/user/$a/0] [Actor[akka://RingBenchmarkInAkka/user/$a/0]] token "(2, 'token)" received, the token reaches root 3 time [Actor[akka://RingBenchmarkInAkka/user/$a/0]] report the end of benchmark to main [main] received the end of benchmark. killing the ring... [Actor[akka://RingBenchmarkInAkka/user/$a/0]] exitting... [Actor[akka://RingBenchmarkInAkka/user/$a/1]] exitting... [Actor[akka://RingBenchmarkInAkka/user/$a/2]] exitting... [Actor[akka://RingBenchmarkInAkka/user/$a/3]] exitting... [main] ring benchmark for 4 processes and 3 rounds = 46 milliseconds [success] Total time: 0 s, completed 2012/05/19 12:23:56
めでたしめでたし。