Send and Receive模式
Send and Receive模式既包含actor也包含future。在AKKA中,它采用了ask和pipe模式。
import akka.pattern.{ ask, pipe }
case class Result(x: Int, s: String, d: Double)
case object Request
implicit val timeout = Timeout(5 seconds)
val f: Future[Result] =
for {
x <- ask(actorA, Request).mapTo[Int] // 直接调用
s <- actorB ask Request mapTo manifest[String] // 隐式转换调用
d <- actorC ? Request mapTo manifest[Double] // 通过符号名调用
} yield Result(x, s, d)
f pipeTo actorD // .. 或 ..
pipe(f) to actorD
上面的例子展示了将 ask 与 future上的pipeTo模式一起使用,因为这是一种非常常用的组合。 请注意上面所有的调用都是完全非阻塞和异步的:ask 产生Future,三个Future通过for语法组合成一个新的Future,然后用 pipeTo 在future上安装一个onComplete处理器来完成将收集到的 Result 发送到其它actor的动作。
ask模式与tell模式之间的一个区别是前者需要返回结果,因而当Actor通过ask模式发送消息给接收方时, 接收方必须通过sender ! reply发送Response为返回的Future填充数据。返回的结果类型为Future[Any],因此需要根据需要将其转型为正确的类型。可以使用asInstanceOf[],但更好地做法是调用mapTo[]方法,它既是非阻塞的,也能够保证更加安全地进行转型。注意mapTo返回的其实仍然为Future对象,如果返回结果为String,则调用mapTo[String]方法,,返回Future[String]。若类型转换失败,就会抛出ClassCastException异常。
当sender发送消息时,如果sender处于不可用状态,例如sender已经被停止,或者超时,则reply的消息就会被发送到DeadLetterActorRef中。若要查看DeadLetter消息,可以定义一个Actor作为dead letter的侦听者,然后在ActorSystem中订阅该侦听者。例如:
class DeadLetterListener extends Actor {
def receive = {
case DeadLetter(msg, from, to) =>
println(s"from $from to $to with message $msg.")
}
}
val listener = system.actorOf(Props[DeadLetterListener], "listener")
system.eventStream.subscribe(listener, classOf[DeadLetter])
我们可以通过重写Actor的postStop()方法,打印Actor终止时的消息,例如:
override def postStop(): Unit = {
println(s"$self is stopped.")
super.postStop
}
我现在写了如下的代码片段,当处理量达到一定规模后,就看到了大量的dead letter信息。这段代码要做的工作是通过Actor并行地将指定数量的数据插入到数据库中。每个WriterActor执行完插入操作后,会将自己的耗时时间通过sender发送给ControlActor。然后在通过Future的sequence得到结果的List,并对其求和,从而获得所有Actor执行插入操作的总时间;然后将消息pipe给AggregateActor,让其打印耗时结果。
class WriterActor extends Actor {
def receive = {
case cmd: InsertCommand =>
val startTime = System.currentTimeMillis()
DataWriter.execute(cmd.recordCount)
val elapsedTime = System.currentTimeMillis() - startTime
sender ! elapsedTime
}
override def preStart(): Unit = {
println(s"${System.currentTimeMillis()}: $self is started.")
super.preStart()
}
override def postStop(): Unit = {
println(s"${System.currentTimeMillis()}: $self is stopped.")
super.postStop
}
}
class ControlActor extends Actor {
implicit val timeout = Timeout(5 seconds)
val aggregateActor = context.actorOf(Props[AggregateActor], "aggregate")
def receive = {
case msg: StartCommand =>
val actors = createActors(msg.actorCount)
val results = actors.map(actor => actor ? InsertCommand(10000) mapTo manifest[Long])
val aggregate = Future.sequence(results).map(results => ExecutionResult(results.sum))
aggregate pipeTo aggregateActor
}
def createActors(count: Int): List[ActorRef] = {
val dispatcher = Props(classOf[WriterActor])
(1 to count).map(i => {
context.actorOf(dispatcher, s"writer_$i")
}).toList
}
}
class AggregateActor extends Actor {
def receive = {
case result: ExecutionResult =>
println(s"It take total time ${result.costTime} milli seconds")
println(s"It take total time ${result.costTime / 1000} seconds")
println(s"It take total time ${result.costTime / (1000 * 60)} minutes")
}
}
//main
object Master {
def main(args: Array[String]): Unit = {
val system = ActorSystem("DataInitializer")
val actor = system.actorOf(Props[ControlActor], "controller")
val listener = system.actorOf(Props[DeadLetterListener], "listener")
system.eventStream.subscribe(listener, classOf[DeadLetter])
actor ! StartCommand(100)
Thread.sleep(100000)
system.shutdown()
}
}
执行后,可能会输出这些的信息:
1430105019405: Actor[akka://DataInitializer/user/controller/writer_2#481424787] is started.
1430105029538: from Actor[akka://DataInitializer/user/controller/writer_2#481424787] to Actor[akka://DataInitializer/deadLetters] with message 10104.
1430105119424: Actor[akka://DataInitializer/user/controller/writer_2#481424787] is stopped.
显然,这里名为writer_2的Actor发送了一条dead letter。分析原因,是因为当我在ControlActor中创建WriterActor时,我给定的超时时间为5秒,同时采用了ask模式通过writerActor发送消息。而在WriterActor中,当插入操作执行完毕后,才会调用sender发送执行结果。由于设置的timeout已经超出(1430105029538 - 1430105019405 = 10122微秒 > 5秒),使得当前的sender不可用,从而产生了deadletter。由于sender发送消息不成功,导致Future没有结果,最后结果没有pipe到AggregateActor,无法打印最终的执行结果。
要解决这个问题,只有通过修改设置的timeout值,使其能够在规定的timeout时间内运行完毕。
AggregateActor获得的执行结果实际上是所有Actor执行任务的总耗时时间,但并不等于真正的总执行时间,因为这些Actor是并行执行任务的。要获得真实的执行时间,应该将aggregate future执行成功的时间作为结束时间。修改ControlActor为:
class ControlActor extends Actor {
implicit val timeout = Timeout(10 minutes)
val aggregateActor = context.actorOf(Props[AggregateActor], "aggregate")
def receive = {
case msg: StartCommand =>
val startTime = System.currentTimeMillis()
val actors = createActors(msg.actorCount)
val results = actors.map(actor => actor ? InsertCommand(10000) mapTo manifest[Long])
val aggregate = Future.sequence(results).map(results => ExecutionResult(results.sum))
aggregate onComplete {
case Success(_) =>
val endTime = System.currentTimeMillis()
val costTime = endTime - startTime
println(s"It take total time ${costTime} milli seconds")
println(s"It take total time ${costTime / 1000} seconds")
println(s"It take total time ${costTime / (1000 * 60)} minutes")
case Failure(_) => println("It failed.")
}
}
顾名思义,Future就是“未来”,这意味着它在接收到消息时,会承诺在未来某个时刻去执行。之所以如此,是因为在执行异步操作时,若操作存在多个,则可能操作之间会存在前后依赖关系。若留待“未来”去执行,就可以判断多个future之间的关系,如果没有依赖关系,就可以并行处理,否则就串行处理。例如如下代码:
val f = for {
a <- Future(10 / 2)
b <- Future(a + 1)
c <- Future(a - 1)
} yield b * c
此时的b、c由于依赖了a,因而并不能并行执行。
以ask模式发送消息时,由于需要获得返回值,因此它会创建一个内部actor来处理返回的Response。AKKA要求为这个内部actor指定一个超时期限(timeout),只要超过了这个期限,内部actor将被销毁以防止内存泄露。在前面那段代码中,我们定义了隐式的timeout值,设置超时期限为5秒。
Actor在处理相关事务时,可能会抛出异常。我们不能任异常自由地抛出,而是需要捕获该异常消息。正确地做法是在捕获异常时,发送一个Failure消息给发送方。
try {
val result = operation()
sender ! result
} catch {
case e: Exception =>
sender ! akka.actor.Status.Failure(e)
throw e
}
如果一个actor没有完成future,它会在超时时限到来时过期, 以 AskTimeoutException来结束。超时的时限是按下面的顺序和位置来获取的:
1.显式指定超时:
import akka.util.duration._
import akka.pattern.ask
val future = myActor.ask("hello")(5 seconds)
2.提供类型为 akka.util.Timeout的隐式参数,例如:
import akka.util.duration._
import akka.util.Timeout
import akka.pattern.ask
implicit val timeout = Timeout(5 seconds)
val future = myActor ? "hello"
Future的onComplete, onResult或 onTimeout 方法可以用来注册一个回调,以便在Future完成时得到通知。从而提供一种避免阻塞的方法。
注意 在使用future回调如onComplete, onSuccess和onFailure时,在actor内部要小心避免捕捉该actor的引用,例如,不要在回调中调用该actor的方法或访问其可变状态。这会破坏actor的封装,会引用同步bug和race condition, 因为回调会与此actor一同被并发调度。
接收超时
在接收消息时,如果在一段时间内没有收到第一条消息,可以使用超时机制。要检测这种超时,你必须设置receiveTimeout属性并声明一个处理ReceiveTimeout对象的匹配分支。
import akka.actor.ReceiveTimeout
import akka.util.duration._
class MyActor extends Actor {
context.setReceiveTimeout(30 milliseconds)
def receive = {
case "Hello" => //...
case ReceiveTimeout => throw new RuntimeException("received timeout")
}
}