Send and Receive模式

Send and Receive模式既包含actor也包含future。在AKKA中,它采用了ask和pipe模式。

import akka.pattern.{ ask, pipe }

case class Result(x: Int, s: String, d: Double)
case object Request

implicit val timeout = Timeout(5 seconds)

val f: Future[Result] =
  for {
    x <- ask(actorA, Request).mapTo[Int] // 直接调用
    s <- actorB ask Request mapTo manifest[String] // 隐式转换调用
    d <- actorC ? Request mapTo manifest[Double] // 通过符号名调用
  } yield Result(x, s, d)

f pipeTo actorD // .. 或 ..
pipe(f) to actorD

上面的例子展示了将 ask 与 future上的pipeTo模式一起使用,因为这是一种非常常用的组合。 请注意上面所有的调用都是完全非阻塞和异步的:ask 产生Future,三个Future通过for语法组合成一个新的Future,然后用 pipeTo 在future上安装一个onComplete处理器来完成将收集到的 Result 发送到其它actor的动作。

ask模式与tell模式之间的一个区别是前者需要返回结果,因而当Actor通过ask模式发送消息给接收方时, 接收方必须通过sender ! reply发送Response为返回的Future填充数据。返回的结果类型为Future[Any],因此需要根据需要将其转型为正确的类型。可以使用asInstanceOf[],但更好地做法是调用mapTo[]方法,它既是非阻塞的,也能够保证更加安全地进行转型。注意mapTo返回的其实仍然为Future对象,如果返回结果为String,则调用mapTo[String]方法,,返回Future[String]。若类型转换失败,就会抛出ClassCastException异常。

当sender发送消息时,如果sender处于不可用状态,例如sender已经被停止,或者超时,则reply的消息就会被发送到DeadLetterActorRef中。若要查看DeadLetter消息,可以定义一个Actor作为dead letter的侦听者,然后在ActorSystem中订阅该侦听者。例如:

class DeadLetterListener extends Actor {
  def receive = {
    case DeadLetter(msg, from, to) =>
      println(s"from $from to $to with message $msg.")
  }
}

val listener = system.actorOf(Props[DeadLetterListener], "listener")

system.eventStream.subscribe(listener, classOf[DeadLetter])

我们可以通过重写Actor的postStop()方法,打印Actor终止时的消息,例如:

  override def postStop(): Unit = {
    println(s"$self is stopped.")
    super.postStop
  }

我现在写了如下的代码片段,当处理量达到一定规模后,就看到了大量的dead letter信息。这段代码要做的工作是通过Actor并行地将指定数量的数据插入到数据库中。每个WriterActor执行完插入操作后,会将自己的耗时时间通过sender发送给ControlActor。然后在通过Future的sequence得到结果的List,并对其求和,从而获得所有Actor执行插入操作的总时间;然后将消息pipe给AggregateActor,让其打印耗时结果。

class WriterActor extends Actor {
  def receive = {
    case cmd: InsertCommand =>
      val startTime = System.currentTimeMillis()
      DataWriter.execute(cmd.recordCount)
      val elapsedTime = System.currentTimeMillis() - startTime
      sender ! elapsedTime
  }

  override def preStart(): Unit = {
    println(s"${System.currentTimeMillis()}: $self is started.")
    super.preStart()
  }

  override def postStop(): Unit = {
    println(s"${System.currentTimeMillis()}: $self is stopped.")
    super.postStop
  }
}

class ControlActor extends Actor {
  implicit val timeout = Timeout(5 seconds)
  val aggregateActor = context.actorOf(Props[AggregateActor], "aggregate")

  def receive = {
    case msg: StartCommand =>
      val actors = createActors(msg.actorCount)
      val results = actors.map(actor => actor ? InsertCommand(10000) mapTo manifest[Long])
      val aggregate = Future.sequence(results).map(results => ExecutionResult(results.sum))
      aggregate pipeTo aggregateActor
  }

  def createActors(count: Int): List[ActorRef] = {
    val dispatcher = Props(classOf[WriterActor])
    (1 to count).map(i => {
      context.actorOf(dispatcher, s"writer_$i")
    }).toList
  }
}

class AggregateActor extends Actor {
  def receive = {
    case result: ExecutionResult =>
      println(s"It take total time ${result.costTime} milli seconds")
      println(s"It take total time ${result.costTime / 1000} seconds")
      println(s"It take total time ${result.costTime / (1000 * 60)} minutes")
  }
}

//main
object Master {
  def main(args: Array[String]): Unit = {
    val system = ActorSystem("DataInitializer")
    val actor = system.actorOf(Props[ControlActor], "controller")
    val listener = system.actorOf(Props[DeadLetterListener], "listener")
    system.eventStream.subscribe(listener, classOf[DeadLetter])

    actor ! StartCommand(100)

    Thread.sleep(100000)
    system.shutdown()
  }
}

执行后,可能会输出这些的信息:

1430105019405: Actor[akka://DataInitializer/user/controller/writer_2#481424787] is started.
1430105029538: from Actor[akka://DataInitializer/user/controller/writer_2#481424787] to Actor[akka://DataInitializer/deadLetters] with message 10104.
1430105119424: Actor[akka://DataInitializer/user/controller/writer_2#481424787] is stopped.

显然,这里名为writer_2的Actor发送了一条dead letter。分析原因,是因为当我在ControlActor中创建WriterActor时,我给定的超时时间为5秒,同时采用了ask模式通过writerActor发送消息。而在WriterActor中,当插入操作执行完毕后,才会调用sender发送执行结果。由于设置的timeout已经超出(1430105029538 - 1430105019405 = 10122微秒 > 5秒),使得当前的sender不可用,从而产生了deadletter。由于sender发送消息不成功,导致Future没有结果,最后结果没有pipe到AggregateActor,无法打印最终的执行结果。

要解决这个问题,只有通过修改设置的timeout值,使其能够在规定的timeout时间内运行完毕。

AggregateActor获得的执行结果实际上是所有Actor执行任务的总耗时时间,但并不等于真正的总执行时间,因为这些Actor是并行执行任务的。要获得真实的执行时间,应该将aggregate future执行成功的时间作为结束时间。修改ControlActor为:

class ControlActor extends Actor {
  implicit val timeout = Timeout(10 minutes)
  val aggregateActor = context.actorOf(Props[AggregateActor], "aggregate")

  def receive = {
    case msg: StartCommand =>
      val startTime = System.currentTimeMillis()
      val actors = createActors(msg.actorCount)
      val results = actors.map(actor => actor ? InsertCommand(10000) mapTo manifest[Long])
      val aggregate = Future.sequence(results).map(results => ExecutionResult(results.sum))

      aggregate onComplete {
        case Success(_) =>
          val endTime = System.currentTimeMillis()
          val costTime = endTime - startTime
          println(s"It take total time ${costTime} milli seconds")
          println(s"It take total time ${costTime / 1000} seconds")
          println(s"It take total time ${costTime / (1000 * 60)} minutes")
        case Failure(_) => println("It failed.")
      }
  }

顾名思义,Future就是“未来”,这意味着它在接收到消息时,会承诺在未来某个时刻去执行。之所以如此,是因为在执行异步操作时,若操作存在多个,则可能操作之间会存在前后依赖关系。若留待“未来”去执行,就可以判断多个future之间的关系,如果没有依赖关系,就可以并行处理,否则就串行处理。例如如下代码:

val f = for {
    a <- Future(10 / 2)
    b <- Future(a + 1)
    c <- Future(a - 1)
} yield b * c

此时的b、c由于依赖了a,因而并不能并行执行。

以ask模式发送消息时,由于需要获得返回值,因此它会创建一个内部actor来处理返回的Response。AKKA要求为这个内部actor指定一个超时期限(timeout),只要超过了这个期限,内部actor将被销毁以防止内存泄露。在前面那段代码中,我们定义了隐式的timeout值,设置超时期限为5秒。

Actor在处理相关事务时,可能会抛出异常。我们不能任异常自由地抛出,而是需要捕获该异常消息。正确地做法是在捕获异常时,发送一个Failure消息给发送方。

try {
  val result = operation()
  sender ! result
} catch {
  case e: Exception =>
    sender ! akka.actor.Status.Failure(e)
    throw e
}

如果一个actor没有完成future,它会在超时时限到来时过期, 以 AskTimeoutException来结束。超时的时限是按下面的顺序和位置来获取的:

1.显式指定超时:

import akka.util.duration._
import akka.pattern.ask
val future = myActor.ask("hello")(5 seconds)

2.提供类型为 akka.util.Timeout的隐式参数,例如:

import akka.util.duration._
import akka.util.Timeout
import akka.pattern.ask
implicit val timeout = Timeout(5 seconds)
val future = myActor ? "hello"

Future的onComplete, onResult或 onTimeout 方法可以用来注册一个回调,以便在Future完成时得到通知。从而提供一种避免阻塞的方法。

注意 在使用future回调如onComplete, onSuccess和onFailure时,在actor内部要小心避免捕捉该actor的引用,例如,不要在回调中调用该actor的方法或访问其可变状态。这会破坏actor的封装,会引用同步bug和race condition, 因为回调会与此actor一同被并发调度。

接收超时

在接收消息时,如果在一段时间内没有收到第一条消息,可以使用超时机制。要检测这种超时,你必须设置receiveTimeout属性并声明一个处理ReceiveTimeout对象的匹配分支。

import akka.actor.ReceiveTimeout
import akka.util.duration._
class MyActor extends Actor {
  context.setReceiveTimeout(30 milliseconds)
  def receive = {
    case "Hello"        => //...
    case ReceiveTimeout => throw new RuntimeException("received timeout")
  }
}