Actor的创建与终止

定义Actor

要定义自己的Actor,需要继承自AKKA框架提供的Actor trait。定义时,最基本的要求是实现receive方法。receive方法实则接受了一个偏函数(PartialFunction[Any, Unit])。在进行模式匹配时,Akka要求对所有进入的消息都要进行模式匹配,以处理传入的消息。这意味着模式匹配时应考虑默认情况。否则,Akka会发布akka.actor.UnhandledMessage(message, sender, recipeient)到ActorSystem的EventStream。

自定义Actor如下所示:

import akka.actor.Actor
import akka.actor.Props
import akka.event.Logging

class MyActor extends Actor {
    val log = Logging(context.system, this)
    def receive = {
        case "test" => log.info("received test")
        case _ => log.info("received unknown message")
    }
}

创建Actor

通过调用ActorSystem的actorOf()方法,可以创建一个Actor对象。创建时,需要传递Props对象,并指定Actor的名字。

import akka.actor.Props

val system = ActorSystem("mySystem")
val actor = system.actorOf(Props[MyActor], "myActor")

如果创建的Actor类需要传入构造函数参数,则使用Props对象的方式会有所变化:

class ActorWithArgs(arg: String) extends Actor

val system = ActorSystem("mySystem")
val actor = system.actorOf(Props(classOf[ActorWithArgs], "arg"), "actorWithArgs")

Akka推荐的一个实践是为自定义Actor建立伴生对象(companion object),提供创建Props对象的工厂方法。这有利于为Props的创建建立一个单一引用点。当Actor的创建发生变化时,可以只修改工厂方法:

object DemoActor {
    def props(name: String): Props = Props(classOf[DemoActor], name)
}

class DemoActor extends Actor {
    def receive = {
        case x => //some behaviour
        case _ => //unknown message
    }
}

val demoActor = context.actorOf(DemoActor.props("demoActor"))

还可以通过ActorContext来创建Actor对象。在Actor trait中,定义了类型为ActorContext的隐式变量context:

trait Actor {
implicit val context: ActorContext = {
    val contextStack = ActorCell.contextStack.get
    if ((contextStack.isEmpty) || (contextStack.head eq null))
      throw ActorInitializationException(
        s"You cannot create an instance of [${getClass.getName}] explicitly using the constructor (new). " +
          "You have to use one of the 'actorOf' factory methods to create a new actor. See the documentation.")
    val c = contextStack.head
    ActorCell.contextStack.set(null :: contextStack)
    c
  }
}

这样,我们就可以在自定义Actor中通过context创建属于它的子Actor:

class FirstActor extends Actor {
    val childActor = context.actorOf(Props[MyActor], name = "myChild")
}

注意,actorOf方法返回的是ActorRef的实例。ActorRef实例是可序列化的,可以支持序列化以及进程间网络间的传输。上面代码中name是可选的。实践中,Akka建议最好能为Actor命名,以便于记录日志识别是哪一个Actor。如果给定Name,则其值不能为空字符串,也不允许以$开头,但可以设置为URL编码字符,例如用%20来表示空格。如果在同一个父Actor下,name出现重复,会抛出akka.actor.InvalidActorNameException异常。

一旦创建了Actor,就会自动地以异步的方式启动。正因为如此,当我们在创建多个Actor时,actor对象的创建顺序并不以它们在程序中的顺序而定。例如,我创建了5个Actor,并通过重写preStart()方法,在启动Actor时打印信息:

class MyActor extends Actor {
  override def preStart(): Unit = {
    println(s"Start actor ${self}")
    super.preStart()
  }
}

val system = ActorSystem("actorSystem")

val actor1: ActorRef = createActor(system, "actor1")
val actor2: ActorRef = createActor(system, "actor2")
val actor3: ActorRef = createActor(system, "actor3")
val actor4: ActorRef = createActor(system, "actor4")
val actor5: ActorRef = createActor(system, "actor5")

def createActor(system: ActorSystem, actorName: String):ActorRef = {
    system.actorOf(Props[MyActor], actorName)
}

得到的信息可能是这样:

Start actor Actor[akka://actorSystem/user/actor5#-863244079]
Start actor Actor[akka://actorSystem/user/actor3#856116309]
Start actor Actor[akka://actorSystem/user/actor2#599256314]
Start actor Actor[akka://actorSystem/user/actor4#732617862]
Start actor Actor[akka://actorSystem/user/actor1#1046565537]

Akka还提供了创建Actor的DSL形式:

import akka.actor.ActorDSL._
import akka.actor.ActorSystem

class MyActor {
  implicit val system = ActorSystem("demo")

  def createActor = {
    actor(new Act {
      become {
        case "hello" => sender ! "hi"
      }
    })
  }
}

这里使用了actor方法,它扮演的是system.actorOf或context.actorOf的角色。因此,需要隐式定义system,它就是隐式的ActorRefFactory。这里的become是对Actor的实时替换。例如:

class HotSwapActor extends Actor {
  import context._
  def angry: Receive = {
    case "foo" ⇒ sender ! "I am already angry?"
    case "bar" ⇒ become(happy)
  }

  def happy: Receive = {
    case "bar" ⇒ sender ! "I am already happy :-)"
    case "foo" ⇒ become(angry)
  }

  def receive = {
    case "foo" ⇒ become(angry)
    case "bar" ⇒ become(happy)
  }
}

become 方法还有很多其它的用处,一个特别好的例子是用它来实现一个有限状态机 (FSM)。以下是另外一个使用become and unbecome的非常酷的小例子:

case object Swap
class Swapper extends Actor {
  import context._
  val log = Logging(system, this)

  def receive = {
    case Swap ⇒
      log.info("Hi")
      become {
        case Swap ⇒
          log.info("Ho")
          unbecome() // 重置最近的 'become' (完全为了好玩)
      }
  }
}

object SwapperApp extends App {
  val system = ActorSystem("SwapperSystem")
  val swap = system.actorOf(Props[Swapper], name = "swapper")
  swap ! Swap // logs Hi
  swap ! Swap // logs Ho
  swap ! Swap // logs Hi
  swap ! Swap // logs Ho
  swap ! Swap // logs Hi
  swap ! Swap // logs Ho
}

DSL还提供了becomeStacked以及unbecome方法,如下代码所示:

  def createSwitchActor = {
    actor(new Act {
      become {
        case "info" => sender ! "A"
        case "switch" =>
          becomeStacked {
            case "info" => sender ! "B"
            case "switch" => unbecome()
          }
        case "lobotomize" => unbecome()
      }
    })

通过编写的测试来判断,becomeStacked似乎是将"switch"消息放入到stack,然而等待接收下一条消息,如果消息为"info",则转发“B”,如果仍然为"switch",则设置为空的行为。而unbecome()则是重置,实则就是从栈顶中取出PartialFunction[Any, Unit]做为actor的行为。createActor与createSwitchActor对应的测试如下所示:

import org.scalatest.{FlatSpecLike, ShouldMatchers}
import akka.testkit.{ImplicitSender, DefaultTimeout, TestKit}
import com.typesafe.config.ConfigFactory
import akka.actor.ActorSystem
import scala.concurrent.duration._

class MyActorSpec extends TestKit(ActorSystem("MyActor",
ConfigFactory.parseString(MyActorSpec.config)))
with DefaultTimeout with ImplicitSender
with FlatSpecLike with ShouldMatchers {
  val myActor = new MyActor

  it should "send message" in {
    val actorRef = myActor.createActor

    within (100 millis) {
      actorRef ! "hello"
      expectMsg("hi")
    }
  }

  it should "switch message" in {
    val actorRef = myActor.createSwitchActor
    within (100 millis) {
      actorRef ! "info"
      expectMsg("A")

      actorRef ! "switch"
      actorRef ! "info"
      expectMsg("B")

      actorRef ! "switch"
      actorRef ! "switch"
      expectNoMsg()

      actorRef ! "lobotomize"
      expectNoMsg()
    }
  }
}

object MyActorSpec {
  val config = """
  akka {
    loglevel = "WARNING"
  }"""
}

这里使用的测试是Akka提供的TestKit结合ScalaTest编写。对于createSwitchActor而言,可以看到在发送了switch和info之后,会走入sender ! “B"的分支。因此实际获得的消息为“B”。

终止Actor

AKKA提供了三种终止Actor的方式,分别为stop、PoisonPill以及Kill。

ActorRefFactory、ActorContext以及ActorSystem都提供了stop方法。例如在一个Actor中终止自己,则可以调用ActorContext的stop方法:

context.stop(self)

在实现语义上,PoisonPill与Kill都被定义为一种特殊消息,所有Actor都能接收到这两个类型的消息,并作出对应的处理。例如通过发送PoisonPill消息给自己:

self ! PoisonPill

虽然这三种方式都可以终止Actor,但细节存在细微区别,我们需谨慎对待。

当调用stop方法时,若Actor正在处理当前的消息,则stop方法会等待Actor处理完当前消息后才会终止。若Actor的mailbox中还有等待处理的消息,就会导致dead letter。PoisonPill消息就要温柔许多,它的身份就是一条消息,没有什么特权,只是具有强烈的毒害性。当发出该消息时,它会悄悄地进入Actor的mailbox,并不会干扰排列在它之前的消息,直到Actor开始处理它时,毒药发作,然后才一击致命。因此,若在该消息后仍有消息进入,那么对于一个已经被毒毙的Actor而言,只能是发送dead letter了。比较PoisonPill而言,Kill更加暴烈,如果说PoisonPill像是一位高明而冷静善于潜伏的死亡刺客,则Kill毋宁说一名惨烈桀骜,视死如归的敢死队员,抱着炸药就往Actor冲去,不管Actor是否还在处理消息,直接同归于尽,不留任何缓和余地。当然,这样暴烈的场景同样要受到一些限制。在实现机制上,发送Kill消息会导致Actor抛出ActorKilledException异常。因此,Kill Actor的行为还得取决于系统设置的Supervisor机制。默认情况下是停止该Actor,但如果mailbox被持久化了,一旦Actor被重新启动,除了当前导致失败的消息之外的消息都将重新拥有,Actor可以继续处理。注意Restart的机制,实际上重启后的Actor已经不是原来的Actor,而是它的一个克隆。

多数情况下,我们应优先选择PoisonPill方式。毕竟,当我们显式发送该消息时,即意味着明确需要终止Actor,但我们并不希望干扰Actor未完成的工作。至于在PoisonPill之后进入的消息,本来就可以视为是一种异外,放入dead letter也属题中应有之义。

当一个Actor被终止时,它的子Actor也会被终止,并在终止后调用postStop钩子方法。