扩展Actor
扩展Actor既体现了面向对象设计中常见的扩展机制,又使用了函数式语言的偏函数特性。
首先,Actor是一个trait,我们可以通过继承它对其进行扩展。Actor除了一些与生命周期有关的钩子方法之外,主要的方法为receive,其类型为PartialFunction[Any, Unit]:
trait Actor {
def receive: Actor.Receive
}
object Actor {
type Receive = PartialFunction[Any, Unit]
}
我们可以定义一个通用的Actor基类,它继承了Actor,并实现了receive()方法,并且将该方法作为模板方法[1]。同时,利用偏函数可以组合的特点,定义多个返回类型同样为Actor.Receive的方法,相当于模板方法模式中的基本方法。如下,是AKKA官方文档中给出的例子:
abstract class GenericActor extends Actor {
// to be defined in subclassing actor
def specificMessageHandler: Receive
// generic message handler
def genericMessageHandler: Receive = {
case event ⇒ printf("generic: %s\n", event)
}
def receive = specificMessageHandler orElse genericMessageHandler
}
class SpecificActor extends GenericActor {
def specificMessageHandler = {
case event: MyMsg ⇒ printf("specific: %s\n", event.subject)
}
}
case class MyMsg(subject: String)
我们还可以结合Scala中的trait结合装饰器模式和偏函数做出对Actor更为精巧的扩展。例如在开源项目spark-jobserver中,定义了一个相当于Actor扩展基类的trait:ActorStack:
trait ActorStack extends Actor {
def wrappedReceive: Receive
def receive: Receive = {
case x => if (wrappedReceive.isDefinedAt(x)) wrappedReceive(x) else unhandled(x)
}
}
与前面的例子相似,将receive方法作为了模板方法,而将wrappedReceive方法作为基本方法留给继承的子类去实现。代码中的isDefinedAt(x)是由偏函数定义的,可以用以确定wrappedReceive是否能接受给定的x参数。unhandled()方法定义在Actor中,当Actor的当前行为无法处理传递过来的消息时,就会调用unhandled方法。如果是Actor被终止了,就会获得Terminated消息,unhandled就会将该消息发送到dead letter中,方法是抛出DeathPactException异常;否则就通过EventStream发布UnhandledMessage消息:
def unhandled(message: Any): Unit = {
message match {
case Terminated(dead) ⇒ throw new DeathPactException(dead)
case _ ⇒ context.system.eventStream.publish(UnhandledMessage(message, sender(), self))
}
}
spark-jobserver为Actor提供了度量和日志的功能,这两个功能是可以自由组合的,因此被分别定义在两个trait中,相当于是装饰器模式中的装饰器[2]:
trait ActorMetrics extends ActorStack {
val metricReceiveTimer = Metrics.newTimer(getClass, "message-handler",
TimeUnit.MILLISECONDS, TimeUnit.SECONDS)
override def receive: Receive = {
case x =>
val context = metricReceiveTimer.time()
try {
super.receive(x)
} finally {
context.stop()
}
}
}
trait Slf4jLogging extends ActorStack {
val logger = LoggerFactory.getLogger(getClass)
private[this] val myPath = self.path.toString
withAkkaSourceLogging {
logger.info("Starting actor " + getClass.getName)
}
override def receive: Receive = {
case x =>
withAkkaSourceLogging {
super.receive(x)
}
}
private def withAkkaSourceLogging(fn: => Unit) {
try {
org.slf4j.MDC.put("akkaSource", myPath)
fn
} finally {
org.slf4j.MDC.remove("akkaSource")
}
}
}
虽然这两个trait都重写了ActorStack的receive()方法,但在重写的实现中,都通过super去调用了ActorStack的receive(),所以才能够完成对原有receive()方法的装饰。
现在就可以根据具体场景,选择性地去继承ActorMetrics和Slf4jLogging,以获得装饰好的度量与日志功能。
当然,我们还可以通过运用scala的自类型去限制继承的子类必须为Actor类型,然后重写isDefinedAt()和apply()方法,并定义类型为Actor.Receive类型的基本方法,实现扩展。Spark中就展现了这样一种扩展方式:
private[spark] trait ActorLogReceive {
self: Actor =>
override def receive: Actor.Receive = new Actor.Receive {
private val _receiveWithLogging = receiveWithLogging
override def isDefinedAt(o: Any): Boolean = {
val handled = _receiveWithLogging.isDefinedAt(o)
if (!handled) {
log.debug(s"Received unexpected actor system event: $o")
}
handled
}
override def apply(o: Any): Unit = {
if (log.isDebugEnabled) {
log.debug(s"[actor] received message $o from ${self.sender}")
}
val start = System.nanoTime
_receiveWithLogging.apply(o)
val timeTaken = (System.nanoTime - start).toDouble / 1000000
if (log.isDebugEnabled) {
log.debug(s"[actor] handled message ($timeTaken ms) $o from ${self.sender}")
}
}
}
def receiveWithLogging: Actor.Receive
protected def log: Logger
}
ActorLogReceive提供了日志功能,而将receive处理的主体功能转交给receiveWithLogging来完成,从而有效地分离了主要逻辑与次要逻辑(相当于是一个横切关注点),既做到了有效扩展,又能避免造成重复代码,简化实现。例如在Spark的Master Actor中,就只需要重写receiveWithLogging方法即可,它自然就拥有了日志功能:
private[master] class Master(
host: String,
port: Int,
webUiPort: Int,
val securityMgr: SecurityManager,
val conf: SparkConf)
extends Actor with ActorLogReceive with Logging with LeaderElectable {
override def receiveWithLogging: PartialFunction[Any, Unit] = {
case ElectedLeader => {
val (storedApps, storedDrivers, storedWorkers) = persistenceEngine.readPersistedData()
state = if (storedApps.isEmpty && storedDrivers.isEmpty && storedWorkers.isEmpty) {
RecoveryState.ALIVE
} else {
RecoveryState.RECOVERING
}
logInfo("I have been elected leader! New state: " + state)
if (state == RecoveryState.RECOVERING) {
beginRecovery(storedApps, storedDrivers, storedWorkers)
recoveryCompletionTask = context.system.scheduler.scheduleOnce(WORKER_TIMEOUT millis, self,
CompleteRecovery)
}
}
//其他case
}
}
参考文献:
[2]Decorator模式