Actor System

Actor System是进入AKKA世界中的一个入口,也可以看做是Actor的系统工厂或管理者,掌控者Actor的生命周期,包括创建、停止Actor,当然也可以关闭整个ActorSystem。

actor system

图中表示的是一个Actor System,它显示了在这个Actor System中最重要实体之间的关系。

Actor Reference

Actor System创建出来的实则是Actor的引用,类型为ActorRef,也可以将其视为Actor的代理,主要目的是发送消息给它表示的Actor。Actor可以通过访问self字段来得到自身的引用;若要访问发送消息的Actor的引用,则访问sender字段。之所以引入ActorRef,一方面便于高效地传递Actor,同时还引入代理模式,使得本地Actor与远程Actor能够更好地统一,二者的区别仅仅体现在Actor Path。所以在AKKA中,Actor之间永远都不可能直接通信,而是他们的代理ActorRef建立联系。

Actor Path

可以认为Actor Path是通过字符串对Actor层级关系进行组合用以标识唯一Actor的一种方式。我们在创建Actor Path时,不用创建Actor;但如果没有创建对应的Actor,则不能创建Actor Reference。还可以创建一个Actor,再终止它,然后再以相同的Actor Path再创建一个新的Actor。新创建的Actor是Actor的新化身(Incarnation),但与旧的Actor并不是同一个。对于这个新化身而言,持有旧Actor的Actor Reference并不是有效的。消息发送给旧的Actor Reference,但不会被传递给新化身,即使它们具有相同的路径。

Actor Path包含协议、位置和actor的层级。如下是一些Actor Path的实例:

//purely local
"akka://my-sys/user/service-a/worker1"

// remote
"akka.tcp://[email protected]:5678/user/service-b"

//clustered (Future Extension)
"cluster://my-cluster/service-c"

有两种方式可以获得Actor Reference:创建Actor或查找Actor。

要创建Actor,可以调用ActorSystem.actorOf(),它创建的Actor在guardian actor之下;接着可以调用ActorContext.actorOf()在刚才创建的Actor内生成Actor树。这些方法会返回新创建的Actor的引用。每个Actor都可以直接访问Actor Context来或得它自身、Parent以及所有Children的引用。

要查找Actor Reference,则可调用ActorSystem.actorSelection()方法。要获得限定到特定Actor的生命周期中的ActorRef,可以使用sender引用来发送一条消息如内建的Identity消息给Actor。

在查找ActorRef时,可以使用绝对路径或相对路径。如果是相对路径,可以用两个点(..)表示parent actor。例如:

context.actorSelection("../brother") ! msg

使用绝对路径的例子:

context.actorSelection("/user/ServiceA") ! msg

还可以使用通配符查询逻辑的Actor层级,例如下面的例子就是发送消息给除当前Actor之外的所有同级Actor(因为..代表parent,所以这里就意味找当前Actor的parent的下级Actor):

context.actorSelection("../*") ! msg

通过调用ActorRef的path属性可以获得一个Actor的Path:

val actorRef = actorSystem.actorOf(Props[MyActor], "myActor")
println(actorRef.path)
区别:actorOf vs. actorSelection vs. actorFor

actorOf:创建一个新的Actor。创建的Actor为调用该方法时所属的Context下的直接子Actor;
actorSelection:当消息传递来时,只查找现有的Actor,而不会创建新的Actor;在创建了selection时,也不会验证目标Actors是否存在;
actorFor(已经被actorSelection所deprecated):只会查找现有的Actor,而不会创建新的Actor。

远程部署的相互影响

当一个Actor创建一个Child时,Actor的系统部署器会决定这个新的Actor究竟属于同一个JVM,还是另一个节点。如果是后一种情况,Actor的创建就会通过在不同JVM的网络连接而触发,这属于不同的Actor系统。远程系统会将新的Actor放在一个特定的路径下,且新Actor的Supervisor应该是一个远程的Actor引用。而且,context.parent(Supervisor的引用)与context.path.parent(actor path的父节点)表示的不是同一个Actor。如下图所示:

Remote Actor

注意图中展现的两个不同的Actor系统之间的Route关系。在左边的Actor系统中,Child Actor属于Remote ActorRef,它指向了右边远端Actor系统中的一个Actor节点,该Actor对于右边的Actor系统而言,属于Local ActorRef,但它的Parent Actor却是一个Remote ActorRef,它指向了左边对应的Local ActorRef。

Actor Path的Top-Level Scopes

Actor路径的根为“/”,而后续层级包括:“/user”, “/system”, “deadLetters”, “/temp”, “/remote”。

这里体现了Akka遵循“简单”原则的设计目标:层级中的任何事物都是Actor,且所有Actor的功能都采用同样的方式。

Actor的层级

Actor的整个体系就像是一家层级森严的企业组织,层次越高,管理权限与职责就更大。在AKKA中,parent actor就是child actor的supervisior,这意味着parent actor能够掌控child actor的整个生命周期。而这种分级的模式也能够更好地支持系统的容错。

如果要创建child actor,就不再调用ActorSystem的actorOf()方法。正如在前文介绍Acgor模型时提到的,ActorSystem会默认创建Root Guardian、Guardian和System Guardian这三个Actor,其中,Guardian Actor是用户自定义Actor的parent,当我们调用ActorSystem的actorOf()方法来创建Actor时,创建的actor就会成为Guardian Actor的child actor。

AKKA提供的方法是在Parent Actor内部,通过调用ActorContext的actorOf()方法来创建它自身的child actor。例如:

class ParentActor extend Actor {
    val child = context.actorOf(Props[ChildActor], "ChildActor")
}

当我们创建了父子层次的Actor后,我们就可以将执行任务分解到不同的Actor中。例如,以ParentActor作为任务的发起者,在其内部通过ChildActor发送消息,此时的ParentActor相当于ChildActor的sender。例如:

case class ExecutionMessage(msg: String)
case class StartMessage(msg: String)
case class FinishedMessage(msg: String)

class ChildActor extends Actor {
    def receive = {
        case ExecutionMessage(msg) =>
            println(msg) //fake real execution logic
            sender ! FinishedMessage("kill me")
    }
}
class ParentActor extends Actor {
    val child = context.actorOf(Props[ChildActor], "ChildActor")

    def receive = {
        case StartMessage(msg) =>
            println(msg)
            child ! ExecutionMessage("executing child actor")
        case FinishedMessage(msg) =>
            println(msg)
    }
}

object TestApp extends App {
    val system = ActorSystem("testParentChildActors")
    val parentActor = system.actorOf(Props[ParentActor], "ParentActor")

    parentActor ! StartMessage("start")
    system.shutdown
}

在TestApp中,通过ActorSystem创建ParentActor的ActorRef对象,然后通过它发送StartMessage消息。此时,ParentActor在receive方法中通过模式匹配到StartMessage分支,从而调用ChildActor的ActorRef对象去发送ExecutionMessage消息。

消息进入了ChildActor。仍然是在Actor的receive方法中,模式匹配了ExecutionMessage消息。它首先打印了ExecutionMessage传递过来的消息(可以认为这里是真正的执行逻辑),然后调用sender去发送FinishedMessage。注意,这里的sender实则就是parentActor。故而当FinishedMessage被sender发送后,ParentActor会接收到该消息,进入FinishedMessage分支,打印结束的消息。整个执行结果如下所示:

start
executing child actor
kill me

Supervision

除了作为任务的分派以及消息的通信之外,父Actor作为子Actor的supervisor,会对其下的所有子Actor进行监控。当某个子Actor执行失败抛出异常后,父Actor要负责处理该异常,例如对子Actor进行Restart/Resume/Escalate,当然也包括Stop。

  • Resume:子Actor会忽略引起异常的消息,继续处理mailbox里的后续消息;
  • Restart:终止引发异常的子Actor,然后初始化一个全新的Actor。但是,存储在原有Actor的mailbox中的消息会转移到这个新Actor中,并继续处理这些消息。对外而言,除了引起异常的消息没有处理成功,其他处理行为看起来就像没有发生错误一般;
  • Stop:终止引发异常的子Actor,后续发来的消息将不被处理,而是被转到deadLetters队列中;
  • Escalate:当前子Actor的supervisor不处理该失败,而是转交给更上一级的supervisor来处理。

在AKKA中,仅将监控Actor的权力赋予该Actor的创建者。这种监控方式在AKKA中被称之为Parental Supervisior。

AKKA为Actor的Supervision提供了SupervisorStrategy,分为AllForOneStrategy和OneForOneStrategy。默认情况下,Actor的SupervisorStrategy被设置为OneForOneStrategy。

trait Actor {
  def supervisorStrategy: SupervisorStrategy = SupervisorStrategy.defaultStrategy
}

object SupervisorStrategy extends SupervisorStrategyLowPriorityImplicits {
  final val defaultDecider: Decider = {
    case _: ActorInitializationExceptionStop
    case _: ActorKilledExceptionStop
    case _: DeathPactExceptionStop
    case _: ExceptionRestart
  }

  final val defaultStrategy: SupervisorStrategy = {
    OneForOneStrategy()(defaultDecider)
  }
}

默认的策略传入了同样定义在SupervisorStrategy中的defaultDecider。默认的decider有不同的分支策略,除了规定的ActorInitializationException、ActorKilledException、DeathPactException三种异常,需要停止Actor,其余情况都是进行重启(Restart)。

在自定义Actor时,可以通过重写supervisorStrategy来改变策略。

SupervisorStrategy在AKKA中被定义为一个抽象类,所以我们也可以根据自己的情况实现符合项目需要的SupervisorStrategy。当然,也可以重写AllForOneStrategy或OneForOneStrategy的方法。例如,在akka.io中定义了SelectorBasedManager类,它重写了OneForOneStrategy的logFailure()实现,并传入了不同的decider。

private[io] abstract class SelectorBasedManager(selectorSettings: SelectionHandlerSettings, nrOfSelectors: Int) extends Actor {

    override def supervisorStrategy = connectionSupervisorStrategy

  private[io] final val connectionSupervisorStrategy: SupervisorStrategy =
    new OneForOneStrategy()(SupervisorStrategy.stoppingStrategy.decider) {
      override def logFailure(context: ActorContext, child: ActorRef, cause: Throwable,
                              decision: SupervisorStrategy.Directive): Unit =
        if (cause.isInstanceOf[DeathPactException]) {
          try context.system.eventStream.publish {
            Logging.Debug(child.path.toString, getClass, "Closed after handler termination")
          } catch { case NonFatal(_) ⇒ }
        } else super.logFailure(context, child, cause, decision)
    }
}

object SupervisorStrategy extends SupervisorStrategyLowPriorityImplicits {
  final val stoppingStrategy: SupervisorStrategy = {
    def stoppingDecider: Decider = {
      case _: ExceptionStop
    }
    OneForOneStrategy()(stoppingDecider)
  }
}

上述代码重写了OneForOneStrategy的logFailure()方法,并将定义在SupervisorStrategy中的stoppingDecider作为Strategy的构造函数参数传入,取代了默认的decider。这意味着当Actor抛出异常时,就要停止该Actor,而非重启。

OneForOneStrategy与AllForOneStrategy实现了两种迥然不同的策略。 前者仅仅将supervision strategy运用到失败的Child Actor,而后者则是将strategy运用到包括失败的Child Actor在内的所有兄弟节点。多数情况下,我们会优先选择OneForOneStrategy,除非父Actor与所有子Actor之间存在非常紧密的内在依赖,我们才会选择AllForOneStrategy。例如,每个Actor是处理数据的其中一个步骤,如排序,只要一个步骤失败,就可能导致它与其他Actor的状态不一致。因此,一旦某个步骤失败,就应该重启所有的Actor,以确保状态的一致性。

注意,即使actor hierarchy使用这种策略,其中一个Actor的失败会导致其Supervisor会发送一条命令对其执行停止和重启,但并不意味着它的所有兄弟actor会执行重启。这些兄弟Actor需要由supervisor显式地执行重启。

Monitoring