「プログラミングErlang」のRing Benchmark をAkkaでやってみた

「プログラミングErlang」のRing Benchmarkをやってみた

の続編です。今度は、Akkaでやってみました。

github.com/everpeace/ring-benchmark-in-akka

ErlangでプロセスといっていたのがAkkaではアクターになります。

ちょっとだけ違うのは、Erlangのプロセスは

Pid = spawn(somefunction)

ってやると、somefunctionが実行されると終わっちゃうけれど、Akkaのアクターは

class Hoge extends Actor{
   def receive ={
      case 'msg => println('msg)
   }
}

とやっても終わらずにメッセージを待ち続けるところ。あるメッセージを受け取ったときに終了したいなら、こんな風に自分にPoisonPillを飲ませてやればOK.

class Hoge extends Actor{
  def receive ={
    case 'msg => self ! PoisonPill
  }
}

では、普通のリングプロセスから見てみましょう

普通のリングプロセス

こんな感じです。

abstract class ProcessInRing extends Actor {
  import context._

  def receive = wait_for_link

  def wait_for_link: Receive = {
    case ('link, next: ActorRef) => become(listen_tokens(next))
  }

  def listen_tokens(next: ActorRef): Receive

  override def preStart() = println("[%s] starting..." format self)
  override def postStop() = println("[%s] exitting..." format self)
}

class Ordinal extends ProcessInRing {
  def listen_tokens(next: ActorRef): Receive = {
    case (i: Int, token) =>
      println("[%s] token \"%s\" received, forward to %s" format(self, (i, token), next))
      next forward(i, token)
  }
}

ルートプロセスとも共通な振る舞いだけ抜き出してProcessInRingにしました。

  • アクターが生成されたら、リング上のお隣さんのlinkをまつ
  • リンクされたらトークンを待つ(これがルートと普通ので違う)
  • ログ(生成と消滅のときにメッセージだす)

なので、Ordinalクラスのlisten_tokensは単純です。

あと、Erlangと変えたところは、ActorContextを活用して、リングの終了処理をAkkaに任せたところです。なので、listen_tokensの所に自分を終了させる部分は書かれていません。Ringプロセスとメインプロセスの節参照。

ルートプロセス

class Root(val M: Int) extends ProcessInRing {
  def listen_tokens(next: ActorRef): Receive = {
    case ('start, token) =>
      println("[%s] token \"%s\" is injected." format(self, token))
      println("[%s] token \"%s\" is forwarded to %s" format(self, (0, token),next))
      next forward(0, token)
    case (i: Int, token) if i == (M - 1) =>
      println("[%s] token \"%s\" received, the token reaches root %d time" format(self, (i, token), i + 1))
      println("[%s] report the end of benchmark to main" format (self))
      sender ! 'ended
    case (i: Int, token) =>
      println("[%s] token \"%s\" received, the token reaches root %d time" format(self, (i, token), i + 1))
      println("[%s] token \"%s\" is forwarded to %s" format(self, (i+1, token),next))
      next forward(i + 1, token)
  }
}

これも特に前と変わりませんね。メインから受け取ったときはリングへ流し込んで、リングの内側から受け取ったときは、もう一周流すか、メインに終了を報告するかです。

Ring プロセス

Erlangとはちょっと変えて、Ringプロセスというのを作ってみました。

AkkaではActorはcontextという名前でActorContextというのを持っています。そのコンテキスト中で生成したプロセスは自分が終了したときに、一緒に終了されます。それを今回は活用してみました。

主にRingの生成と消滅が責務です。消滅のほうはAkkaの機能(PoisonPillが来たら終了)なのでコードには反映されません。なので、Ringに送信されたメッセージはrootへ転送されるだけです。

Ringの生成手順自体はErlang版と大差ありません。

// Main --> Root --> Ordinal --> Ordinal --> ... --> Ordinal
//           +<----------------------------------------+
class Ring(val N: Int, val M: Int) extends Actor {
  require(N > 0 && M > 0)
  val root = init

  def receive = {
    case token => root forward token
  }

  private[this] def init: ActorRef = {
    val root = context.actorOf(Props(new Root(M)), name = "0")
    val first = create_ordinals(1, N - 1, root)
    root ! ('link, first)
    root
  }

  private[this] def create_ordinals(i: Int, num: Int, root: ActorRef): ActorRef = i match {
    case x if x == num =>
      val ordinal = context.actorOf(Props[Ordinal], name = i.toString)
      ordinal !('link, root)
      ordinal
    case _ =>
      val ordinal = context.actorOf(Props[Ordinal], name = i.toString)
      val next = create_ordinals(i + 1, num, root)
      ordinal !('link, next)
      ordinal
  }
}

メインプロセス

クラス宣言とかmainとかは省略。

val system = ActorSystem("RingBenchmarkInAkka")

    val start = System.currentTimeMillis()

    // リングを作ります。
    val ring = system.actorOf(Props(new Ring(n, m)))

    // askのときのタイムアウト
    implicit val timeout = Timeout(1000 seconds)

    // ベンチマークを開始して、終了を待つ
    (ring ? ('start, 'token)) onSuccess {
      case 'ended =>
        println("[main] received the end of benchmark. killing the ring...")

        // graceful stop:
        // ringへPoisonPillを送って、ringの終了を待つ。
        // ringのActorContextの中で作られたアクターたちも終了される。
        val stopped: Future[Boolean] = gracefulStop(ring, timeout.duration)(system)
        Await.result(stopped, timeout.duration)

        // now system can be stopped safely.
        system.shutdown()
        val end = System.currentTimeMillis()
        println("[main] ring benchmark for %d processes and %d rounds = %d milliseconds" format(n, m, end - start))
    }

とこんな感じです。

動かしてみる。

    $ sbt
    > run 4 3
    [info] Compiling 2 Scala sources to /Users/everpeace/Documents/github/ring-benchmark-in-akka/target/scala-2.9.2/classes...
    [info] Running org.everpeace.ringbench.RingBenchmarkInAkka 4 3
    [Actor[akka://RingBenchmarkInAkka/user/$a/0]] starting...
    [Actor[akka://RingBenchmarkInAkka/user/$a/1]] starting...
    [Actor[akka://RingBenchmarkInAkka/user/$a/2]] starting...
    [Actor[akka://RingBenchmarkInAkka/user/$a/3]] starting...
    [Actor[akka://RingBenchmarkInAkka/user/$a/0]] token "'token" is injected.
    [Actor[akka://RingBenchmarkInAkka/user/$a/0]] token "(0, 'token)" is forwarded to Actor[akka://RingBenchmarkInAkka/user/$a/1]
    [Actor[akka://RingBenchmarkInAkka/user/$a/1]] token "(0, 'token)" received,  forward to Actor[akka://RingBenchmarkInAkka/user/$a/2]
    [Actor[akka://RingBenchmarkInAkka/user/$a/2]] token "(0, 'token)" received,  forward to Actor[akka://RingBenchmarkInAkka/user/$a/3]
    [Actor[akka://RingBenchmarkInAkka/user/$a/3]] token "(0, 'token)" received,  forward to Actor[akka://RingBenchmarkInAkka/user/$a/0]
    [Actor[akka://RingBenchmarkInAkka/user/$a/0]] token "(0, 'token)" received,  the token reaches root 1 time
    [Actor[akka://RingBenchmarkInAkka/user/$a/0]] token "(1, 'token)" is forwarded to Actor[akka://RingBenchmarkInAkka/user/$a/1]
    [Actor[akka://RingBenchmarkInAkka/user/$a/1]] token "(1, 'token)" received,  forward to Actor[akka://RingBenchmarkInAkka/user/$a/2]
    [Actor[akka://RingBenchmarkInAkka/user/$a/2]] token "(1, 'token)" received,  forward to Actor[akka://RingBenchmarkInAkka/user/$a/3]
    [Actor[akka://RingBenchmarkInAkka/user/$a/3]] token "(1, 'token)" received,  forward to Actor[akka://RingBenchmarkInAkka/user/$a/0]
    [Actor[akka://RingBenchmarkInAkka/user/$a/0]] token "(1, 'token)" received,  the token reaches root 2 time
    [Actor[akka://RingBenchmarkInAkka/user/$a/0]] token "(2, 'token)" is forwarded to Actor[akka://RingBenchmarkInAkka/user/$a/1]
    [Actor[akka://RingBenchmarkInAkka/user/$a/1]] token "(2, 'token)" received,  forward to Actor[akka://RingBenchmarkInAkka/user/$a/2]
    [Actor[akka://RingBenchmarkInAkka/user/$a/2]] token "(2, 'token)" received,  forward to Actor[akka://RingBenchmarkInAkka/user/$a/3]
    [Actor[akka://RingBenchmarkInAkka/user/$a/3]] token "(2, 'token)" received,  forward to Actor[akka://RingBenchmarkInAkka/user/$a/0]
    [Actor[akka://RingBenchmarkInAkka/user/$a/0]] token "(2, 'token)" received,  the token reaches root 3 time
    [Actor[akka://RingBenchmarkInAkka/user/$a/0]] report the end of benchmark to main
    [main] received the end of benchmark. killing the ring...
    [Actor[akka://RingBenchmarkInAkka/user/$a/0]] exitting...
    [Actor[akka://RingBenchmarkInAkka/user/$a/1]] exitting...
    [Actor[akka://RingBenchmarkInAkka/user/$a/2]] exitting...
    [Actor[akka://RingBenchmarkInAkka/user/$a/3]] exitting...
    [main] ring benchmark for 4 processes and 3 rounds = 46 milliseconds
    [success] Total time: 0 s,  completed 2012/05/19 12:23:56

めでたしめでたし。