Dispatcher

在AKKA中,actor之间都是通过传递消息来完成彼此之间的通信。当Actor的数量较多时,彼此之间的通信就需要协调,从而更好地改进整个系统的执行性能。

负责协调Actor之间通信的就是Dispatcher。它会运行在自己的线程上,分发来自各个actor mailbox的消息,将其分配到执行线程的堆上。

从本质上讲,AKKA的Dispatcher是基于Java Executor Framework来实现的,这一框架属于java.util.concurrent的一部分,主要用于异步任务的执行。Executor框架有两种不同的实现,分为ThreadPoolExecutor和ForkJoinPool。前者使用实现定义或配置好的线程池去执行任务;后者使用相同的线程池模型,但却使用了work-stealing模式,即池中的线程会去执行其他线程创建的任务。相比较而言,ForkJoinPool能够更加有效地利用池中的线程,使线程更少地处于空闲状态,因而更加高效。

AKKA提供了四种Dispatcher,我们可以根据项目的情况选择不同的Dispatcher。

  • Dispatcher:这是Akka的默认Dispatcher。对于这种Dispatcher,每个Actor都由自己的Mailbox支持;它可以被多个Actor所共享。Dispatcher可以由线程池或fork join pool支持。而且,它针对非阻塞代码进行了优化。

  • Pinned dispatcher:这种Dispatcher则为每个Actor提供了单独的专有的线程,这意味着该Dispatcher不能在Actor之间共享。因此,这种Dispatcher比较适合处理对外部资源操作(如IO操作)或耗时比较长的Actor。Pinned Dispatcher使用了线程池Executor,并针对阻塞操作进行了优化。例如在发起IO调用或访问数据库时,Actor会一直等待直到任务执行完毕。故而对于阻塞操作而言,Pinned Dispatcher要比默认的Dispatcher要好。但是在使用该Dispatcher时,需要合理考虑线程资源的问题,毕竟在多数情况下,Actor执行的任务完全可以共享线程资源。

  • Balancing dispatcher:它是基于事件的Dispatcher,它可以合理地协调Actor资源,若某个Actor上的任务较为繁忙,就可以将它的工作分发给闲置的Actor。但是,能够进行任务重新分配的前提是所有Actor都属于相同类型。对于这种Dispatcher,所有Actor只有唯一一个mailbox。它可以被相同类型的Actor所共享。该Dispatcher可以由Thread Pool或者Fork Join Pool支持。

  • Calling thread dispatcher:这个Dispatcher主要用于测试,它会将任务执行在当前线程上,而不会创建任何一个新的线程,也不提供确定的执行顺序。如果调用没有及时执行,则任务会入队到thead-local queue,等待前面调用结束再执行。对于这个Dispatcher,每个Actor都有自己的mailbox,它可以被多个Actor共享,为Calling Thread支持。

Dispatcher可以配置在application.conf文件中,例如配置为采用线程池Executor的默认Dispatcher:

writer-dispatcher {
  type = Dispatcher

  executor = "thread-pool-executor"

  thread-pool-executor {
    core-pool-size-min = 2
    core-pool-size-factor = 2.0
    core-pool-size-max = 10
  }
  throughput = 100
}

core-pool-size-min代表分配的最小线程数,core-pool-size-factor则为线程的乘积因子,通常应该根据CPU的核数进行设置。core-pool-size-max则为线程数的最大上限。

例如我定义了一个WriterActor,通过重写preStart()方法,打印当前线程名字。接下来,我根据设置的writer-dispatcher创建100个WriterActor的ActorRef,并放到一个List中,然后在其Parent Actor中通过遍历该List,发送消息,观察打印出来的线程名字:

trait Message
case class InsertCommand(recordCount:Int) extends Message
case class ControlCommand(message: String, startTime: Long) extends Message
case class StartCommand(actorCount: Int) extends Message
case class ExecutionResult(costTime: Long) extends Message

class WriterActor extends Actor {
  override def preStart(): Unit = {
    println(Thread.currentThread().getName)
  }

  def receive = ???
}

class ControlActor extends Actor {
  implicit val timeout = Timeout(5 minutes)

  def receive = {
    case msg: StartCommand =>
      val startTime = System.currentTimeMillis()
      val actors = createActors(msg.actorCount)
      val results = actors.map(actor => actor ? InsertCommand(100) mapTo manifest[Long])
      val aggregate = Future.sequence(results).map(results => ExecutionResult(results.sum))

      aggregate onComplete {
        case Success(_) =>
          val endTime = System.currentTimeMillis()
          val costTime = endTime - startTime
          println(s"It take total time ${costTime} milli seconds")
        case Failure(_) => println("It failed.")
      }
  }

  def createActors(count: Int): List[ActorRef] = {
    val props = Props(classOf[WriterActor]).withDispatcher("writer-dispatcher")
    (1 to count).map(i => {
      context.actorOf(props, s"writer_$i")
    }).toList
  }
}

object Master {
  def main(args: Array[String]): Unit = {
    val system = ActorSystem("DataInitializer")
    val actor = system.actorOf(Props[ControlActor], "controller")

    actor ! StartCommand(100)
  }
}

ControlActor发送的StartCommand消息中的参数即为要创建的WriterActor数量。执行的结果部分显示为:

DataInitializer-writer-dispatcher-15
DataInitializer-writer-dispatcher-14
DataInitializer-writer-dispatcher-13
DataInitializer-writer-dispatcher-8
DataInitializer-writer-dispatcher-8
DataInitializer-writer-dispatcher-10
DataInitializer-writer-dispatcher-6
DataInitializer-writer-dispatcher-7
DataInitializer-writer-dispatcher-11
DataInitializer-writer-dispatcher-11
DataInitializer-writer-dispatcher-14
DataInitializer-writer-dispatcher-8
DataInitializer-writer-dispatcher-12
DataInitializer-writer-dispatcher-13
DataInitializer-writer-dispatcher-15
DataInitializer-writer-dispatcher-10
DataInitializer-writer-dispatcher-6
DataInitializer-writer-dispatcher-7
DataInitializer-writer-dispatcher-11
DataInitializer-writer-dispatcher-9
DataInitializer-writer-dispatcher-14
DataInitializer-writer-dispatcher-8
DataInitializer-writer-dispatcher-12
DataInitializer-writer-dispatcher-12
It take total time 2952 milli seconds

通过判断打印出来的线程名,编号最小为6,最大为15,恰好只有10个线程。如果我们改变application.conf中Dispatcher的core-pool-size-max值,会看到线程数会随之发生变化。如果我们将Dispatcher的配置修改为PinnedDispatcher,再观察打印出来的消息,在创建了100个Actor的前提下,系统会创建100个线程,这是因为PinnedDispatcher会为每个Actor分配一个专有的线程。

ForkJoinPool Executor的配置属性与ThreadPool Executor类似,但属性的名字完全不同:

  fork-join-executor {
    parallelism-min = 2
    parallelism-factor = 2.0
    parallelism-max = 10
  }

此外,在上面的配置中,type可以分别配置为Dispatcher、PinnedDispatcher和BalancingDispatcher。而throughput属性则代表线程在进入下一个Actor之前每次能够处理的消息数。

若要运用配置好的Dispatcher,可以在创建Actor时调用,例如:

val writerActor = context.actorOf(Props[WriterActor].withDispatcher("writer-dispatcher"), "WriterActor")

注意:在AKKA 2.3.x版本中,类型为BalancingDispatcher的Dispatcher已经被Disprecated,如果在配置文件中,仍然将Dispatcher的type配置为BalancingDispatcher,系统会提示:

BalancingDispatcher is deprecated, use a BalancingPool instead. During a migration period you can still use BalancingDispatcher by specifying the full class name: akka.dispatch.BalancingDispatcherConfigurator

可以根据提示信息的建议,将type设置为akka.dispatch.BalancingDispatcherConfigurator。或者使用BalancingPool Router,而非Dispatcher。

如何选择Dispatcher

选择Dispatcher时,我们要根据项目的具体情况来。例如我们要考虑Actor执行的任务是阻塞还是非阻塞,耗时长还是短,是否消耗了足够大的内存资源,Actor之间是否独立。