扩展Actor

扩展Actor既体现了面向对象设计中常见的扩展机制,又使用了函数式语言的偏函数特性。

首先,Actor是一个trait,我们可以通过继承它对其进行扩展。Actor除了一些与生命周期有关的钩子方法之外,主要的方法为receive,其类型为PartialFunction[Any, Unit]:

trait Actor {
  def receive: Actor.Receive
}

object Actor {
  type Receive = PartialFunction[Any, Unit]
}

我们可以定义一个通用的Actor基类,它继承了Actor,并实现了receive()方法,并且将该方法作为模板方法[1]。同时,利用偏函数可以组合的特点,定义多个返回类型同样为Actor.Receive的方法,相当于模板方法模式中的基本方法。如下,是AKKA官方文档中给出的例子:

abstract class GenericActor extends Actor {
  // to be defined in subclassing actor
  def specificMessageHandler: Receive

  // generic message handler
  def genericMessageHandler: Receive = {
    case event ⇒ printf("generic: %s\n", event)
  }

  def receive = specificMessageHandler orElse genericMessageHandler
}

class SpecificActor extends GenericActor {
  def specificMessageHandler = {
    case event: MyMsg ⇒ printf("specific: %s\n", event.subject)
  }
}

case class MyMsg(subject: String)

我们还可以结合Scala中的trait结合装饰器模式和偏函数做出对Actor更为精巧的扩展。例如在开源项目spark-jobserver中,定义了一个相当于Actor扩展基类的trait:ActorStack:

trait ActorStack extends Actor {
  def wrappedReceive: Receive

  def receive: Receive = {
    case x => if (wrappedReceive.isDefinedAt(x)) wrappedReceive(x) else unhandled(x)
  }
}

与前面的例子相似,将receive方法作为了模板方法,而将wrappedReceive方法作为基本方法留给继承的子类去实现。代码中的isDefinedAt(x)是由偏函数定义的,可以用以确定wrappedReceive是否能接受给定的x参数。unhandled()方法定义在Actor中,当Actor的当前行为无法处理传递过来的消息时,就会调用unhandled方法。如果是Actor被终止了,就会获得Terminated消息,unhandled就会将该消息发送到dead letter中,方法是抛出DeathPactException异常;否则就通过EventStream发布UnhandledMessage消息:

  def unhandled(message: Any): Unit = {
    message match {
      case Terminated(dead) ⇒ throw new DeathPactException(dead)
      case _                ⇒ context.system.eventStream.publish(UnhandledMessage(message, sender(), self))
    }
  }

spark-jobserver为Actor提供了度量和日志的功能,这两个功能是可以自由组合的,因此被分别定义在两个trait中,相当于是装饰器模式中的装饰器[2]:

trait ActorMetrics extends ActorStack {
  val metricReceiveTimer = Metrics.newTimer(getClass, "message-handler",
                                            TimeUnit.MILLISECONDS, TimeUnit.SECONDS)

  override def receive: Receive = {
    case x =>
      val context = metricReceiveTimer.time()
      try {
        super.receive(x)
      } finally {
        context.stop()
      }
  }
}

trait Slf4jLogging extends ActorStack {
  val logger = LoggerFactory.getLogger(getClass)
  private[this] val myPath = self.path.toString

  withAkkaSourceLogging {
    logger.info("Starting actor " + getClass.getName)
  }

  override def receive: Receive = {
    case x =>
      withAkkaSourceLogging {
        super.receive(x)
      }
  }

  private def withAkkaSourceLogging(fn: => Unit) {
    try {
      org.slf4j.MDC.put("akkaSource", myPath)
      fn
    } finally {
      org.slf4j.MDC.remove("akkaSource")
    }
  }
}

虽然这两个trait都重写了ActorStack的receive()方法,但在重写的实现中,都通过super去调用了ActorStack的receive(),所以才能够完成对原有receive()方法的装饰。

现在就可以根据具体场景,选择性地去继承ActorMetrics和Slf4jLogging,以获得装饰好的度量与日志功能。

当然,我们还可以通过运用scala的自类型去限制继承的子类必须为Actor类型,然后重写isDefinedAt()和apply()方法,并定义类型为Actor.Receive类型的基本方法,实现扩展。Spark中就展现了这样一种扩展方式:

private[spark] trait ActorLogReceive {
  self: Actor =>

  override def receive: Actor.Receive = new Actor.Receive {

    private val _receiveWithLogging = receiveWithLogging

    override def isDefinedAt(o: Any): Boolean = {
      val handled = _receiveWithLogging.isDefinedAt(o)
      if (!handled) {
        log.debug(s"Received unexpected actor system event: $o")
      }
      handled
    }

    override def apply(o: Any): Unit = {
      if (log.isDebugEnabled) {
        log.debug(s"[actor] received message $o from ${self.sender}")
      }
      val start = System.nanoTime
      _receiveWithLogging.apply(o)
      val timeTaken = (System.nanoTime - start).toDouble / 1000000
      if (log.isDebugEnabled) {
        log.debug(s"[actor] handled message ($timeTaken ms) $o from ${self.sender}")
      }
    }
  }

  def receiveWithLogging: Actor.Receive

  protected def log: Logger
}

ActorLogReceive提供了日志功能,而将receive处理的主体功能转交给receiveWithLogging来完成,从而有效地分离了主要逻辑与次要逻辑(相当于是一个横切关注点),既做到了有效扩展,又能避免造成重复代码,简化实现。例如在Spark的Master Actor中,就只需要重写receiveWithLogging方法即可,它自然就拥有了日志功能:

private[master] class Master(
    host: String,
    port: Int,
    webUiPort: Int,
    val securityMgr: SecurityManager,
    val conf: SparkConf)
  extends Actor with ActorLogReceive with Logging with LeaderElectable {
  override def receiveWithLogging: PartialFunction[Any, Unit] = {
    case ElectedLeader => {
      val (storedApps, storedDrivers, storedWorkers) = persistenceEngine.readPersistedData()
      state = if (storedApps.isEmpty && storedDrivers.isEmpty && storedWorkers.isEmpty) {
        RecoveryState.ALIVE
      } else {
        RecoveryState.RECOVERING
      }
      logInfo("I have been elected leader! New state: " + state)
      if (state == RecoveryState.RECOVERING) {
        beginRecovery(storedApps, storedDrivers, storedWorkers)
        recoveryCompletionTask = context.system.scheduler.scheduleOnce(WORKER_TIMEOUT millis, self,
          CompleteRecovery)
      }
    }

    //其他case
  }
}

参考文献:

[1]Template Method模式

[2]Decorator模式