Actor的创建与终止
定义Actor
要定义自己的Actor,需要继承自AKKA框架提供的Actor trait。定义时,最基本的要求是实现receive方法。receive方法实则接受了一个偏函数(PartialFunction[Any, Unit])。在进行模式匹配时,Akka要求对所有进入的消息都要进行模式匹配,以处理传入的消息。这意味着模式匹配时应考虑默认情况。否则,Akka会发布akka.actor.UnhandledMessage(message, sender, recipeient)到ActorSystem的EventStream。
自定义Actor如下所示:
import akka.actor.Actor
import akka.actor.Props
import akka.event.Logging
class MyActor extends Actor {
val log = Logging(context.system, this)
def receive = {
case "test" => log.info("received test")
case _ => log.info("received unknown message")
}
}
创建Actor
通过调用ActorSystem的actorOf()方法,可以创建一个Actor对象。创建时,需要传递Props对象,并指定Actor的名字。
import akka.actor.Props
val system = ActorSystem("mySystem")
val actor = system.actorOf(Props[MyActor], "myActor")
如果创建的Actor类需要传入构造函数参数,则使用Props对象的方式会有所变化:
class ActorWithArgs(arg: String) extends Actor
val system = ActorSystem("mySystem")
val actor = system.actorOf(Props(classOf[ActorWithArgs], "arg"), "actorWithArgs")
Akka推荐的一个实践是为自定义Actor建立伴生对象(companion object),提供创建Props对象的工厂方法。这有利于为Props的创建建立一个单一引用点。当Actor的创建发生变化时,可以只修改工厂方法:
object DemoActor {
def props(name: String): Props = Props(classOf[DemoActor], name)
}
class DemoActor extends Actor {
def receive = {
case x => //some behaviour
case _ => //unknown message
}
}
val demoActor = context.actorOf(DemoActor.props("demoActor"))
还可以通过ActorContext来创建Actor对象。在Actor trait中,定义了类型为ActorContext的隐式变量context:
trait Actor {
implicit val context: ActorContext = {
val contextStack = ActorCell.contextStack.get
if ((contextStack.isEmpty) || (contextStack.head eq null))
throw ActorInitializationException(
s"You cannot create an instance of [${getClass.getName}] explicitly using the constructor (new). " +
"You have to use one of the 'actorOf' factory methods to create a new actor. See the documentation.")
val c = contextStack.head
ActorCell.contextStack.set(null :: contextStack)
c
}
}
这样,我们就可以在自定义Actor中通过context创建属于它的子Actor:
class FirstActor extends Actor {
val childActor = context.actorOf(Props[MyActor], name = "myChild")
}
注意,actorOf方法返回的是ActorRef的实例。ActorRef实例是可序列化的,可以支持序列化以及进程间网络间的传输。上面代码中name是可选的。实践中,Akka建议最好能为Actor命名,以便于记录日志识别是哪一个Actor。如果给定Name,则其值不能为空字符串,也不允许以$开头,但可以设置为URL编码字符,例如用%20来表示空格。如果在同一个父Actor下,name出现重复,会抛出akka.actor.InvalidActorNameException异常。
一旦创建了Actor,就会自动地以异步的方式启动。正因为如此,当我们在创建多个Actor时,actor对象的创建顺序并不以它们在程序中的顺序而定。例如,我创建了5个Actor,并通过重写preStart()方法,在启动Actor时打印信息:
class MyActor extends Actor {
override def preStart(): Unit = {
println(s"Start actor ${self}")
super.preStart()
}
}
val system = ActorSystem("actorSystem")
val actor1: ActorRef = createActor(system, "actor1")
val actor2: ActorRef = createActor(system, "actor2")
val actor3: ActorRef = createActor(system, "actor3")
val actor4: ActorRef = createActor(system, "actor4")
val actor5: ActorRef = createActor(system, "actor5")
def createActor(system: ActorSystem, actorName: String):ActorRef = {
system.actorOf(Props[MyActor], actorName)
}
得到的信息可能是这样:
Start actor Actor[akka://actorSystem/user/actor5#-863244079]
Start actor Actor[akka://actorSystem/user/actor3#856116309]
Start actor Actor[akka://actorSystem/user/actor2#599256314]
Start actor Actor[akka://actorSystem/user/actor4#732617862]
Start actor Actor[akka://actorSystem/user/actor1#1046565537]
Akka还提供了创建Actor的DSL形式:
import akka.actor.ActorDSL._
import akka.actor.ActorSystem
class MyActor {
implicit val system = ActorSystem("demo")
def createActor = {
actor(new Act {
become {
case "hello" => sender ! "hi"
}
})
}
}
这里使用了actor方法,它扮演的是system.actorOf或context.actorOf的角色。因此,需要隐式定义system,它就是隐式的ActorRefFactory。这里的become是对Actor的实时替换。例如:
class HotSwapActor extends Actor {
import context._
def angry: Receive = {
case "foo" ⇒ sender ! "I am already angry?"
case "bar" ⇒ become(happy)
}
def happy: Receive = {
case "bar" ⇒ sender ! "I am already happy :-)"
case "foo" ⇒ become(angry)
}
def receive = {
case "foo" ⇒ become(angry)
case "bar" ⇒ become(happy)
}
}
become 方法还有很多其它的用处,一个特别好的例子是用它来实现一个有限状态机 (FSM)。以下是另外一个使用become and unbecome的非常酷的小例子:
case object Swap
class Swapper extends Actor {
import context._
val log = Logging(system, this)
def receive = {
case Swap ⇒
log.info("Hi")
become {
case Swap ⇒
log.info("Ho")
unbecome() // 重置最近的 'become' (完全为了好玩)
}
}
}
object SwapperApp extends App {
val system = ActorSystem("SwapperSystem")
val swap = system.actorOf(Props[Swapper], name = "swapper")
swap ! Swap // logs Hi
swap ! Swap // logs Ho
swap ! Swap // logs Hi
swap ! Swap // logs Ho
swap ! Swap // logs Hi
swap ! Swap // logs Ho
}
DSL还提供了becomeStacked以及unbecome方法,如下代码所示:
def createSwitchActor = {
actor(new Act {
become {
case "info" => sender ! "A"
case "switch" =>
becomeStacked {
case "info" => sender ! "B"
case "switch" => unbecome()
}
case "lobotomize" => unbecome()
}
})
通过编写的测试来判断,becomeStacked似乎是将"switch"消息放入到stack,然而等待接收下一条消息,如果消息为"info",则转发“B”,如果仍然为"switch",则设置为空的行为。而unbecome()则是重置,实则就是从栈顶中取出PartialFunction[Any, Unit]做为actor的行为。createActor与createSwitchActor对应的测试如下所示:
import org.scalatest.{FlatSpecLike, ShouldMatchers}
import akka.testkit.{ImplicitSender, DefaultTimeout, TestKit}
import com.typesafe.config.ConfigFactory
import akka.actor.ActorSystem
import scala.concurrent.duration._
class MyActorSpec extends TestKit(ActorSystem("MyActor",
ConfigFactory.parseString(MyActorSpec.config)))
with DefaultTimeout with ImplicitSender
with FlatSpecLike with ShouldMatchers {
val myActor = new MyActor
it should "send message" in {
val actorRef = myActor.createActor
within (100 millis) {
actorRef ! "hello"
expectMsg("hi")
}
}
it should "switch message" in {
val actorRef = myActor.createSwitchActor
within (100 millis) {
actorRef ! "info"
expectMsg("A")
actorRef ! "switch"
actorRef ! "info"
expectMsg("B")
actorRef ! "switch"
actorRef ! "switch"
expectNoMsg()
actorRef ! "lobotomize"
expectNoMsg()
}
}
}
object MyActorSpec {
val config = """
akka {
loglevel = "WARNING"
}"""
}
这里使用的测试是Akka提供的TestKit结合ScalaTest编写。对于createSwitchActor而言,可以看到在发送了switch和info之后,会走入sender ! “B"的分支。因此实际获得的消息为“B”。
终止Actor
AKKA提供了三种终止Actor的方式,分别为stop、PoisonPill以及Kill。
ActorRefFactory、ActorContext以及ActorSystem都提供了stop方法。例如在一个Actor中终止自己,则可以调用ActorContext的stop方法:
context.stop(self)
在实现语义上,PoisonPill与Kill都被定义为一种特殊消息,所有Actor都能接收到这两个类型的消息,并作出对应的处理。例如通过发送PoisonPill消息给自己:
self ! PoisonPill
虽然这三种方式都可以终止Actor,但细节存在细微区别,我们需谨慎对待。
当调用stop方法时,若Actor正在处理当前的消息,则stop方法会等待Actor处理完当前消息后才会终止。若Actor的mailbox中还有等待处理的消息,就会导致dead letter。PoisonPill消息就要温柔许多,它的身份就是一条消息,没有什么特权,只是具有强烈的毒害性。当发出该消息时,它会悄悄地进入Actor的mailbox,并不会干扰排列在它之前的消息,直到Actor开始处理它时,毒药发作,然后才一击致命。因此,若在该消息后仍有消息进入,那么对于一个已经被毒毙的Actor而言,只能是发送dead letter了。比较PoisonPill而言,Kill更加暴烈,如果说PoisonPill像是一位高明而冷静善于潜伏的死亡刺客,则Kill毋宁说一名惨烈桀骜,视死如归的敢死队员,抱着炸药就往Actor冲去,不管Actor是否还在处理消息,直接同归于尽,不留任何缓和余地。当然,这样暴烈的场景同样要受到一些限制。在实现机制上,发送Kill消息会导致Actor抛出ActorKilledException异常。因此,Kill Actor的行为还得取决于系统设置的Supervisor机制。默认情况下是停止该Actor,但如果mailbox被持久化了,一旦Actor被重新启动,除了当前导致失败的消息之外的消息都将重新拥有,Actor可以继续处理。注意Restart的机制,实际上重启后的Actor已经不是原来的Actor,而是它的一个克隆。
多数情况下,我们应优先选择PoisonPill方式。毕竟,当我们显式发送该消息时,即意味着明确需要终止Actor,但我们并不希望干扰Actor未完成的工作。至于在PoisonPill之后进入的消息,本来就可以视为是一种异外,放入dead letter也属题中应有之义。
当一个Actor被终止时,它的子Actor也会被终止,并在终止后调用postStop钩子方法。