【Akka扩展】Akka Extensions
如果你想为 Akka 添加功能,那么有一个非常优雅且功能强大的机制。它被称为 Akka 扩展(Akka Extensions),由2个基本组件组成: 每个 由于扩展是钩入( hook into) Akka 本身的一种方式,扩展的实现者需要确保扩展的线程安全。 创建一个扩展因此,让我们创建一个示例扩展,使我们计算发生事件的次数。 首先,我们定义我们的 import akka.actor.Extension class CountExtensionImpl extends Extension { //由于这个扩展是一个共享实例 // 每个 ActorSystem 我们需要线程安全 private val counter = new AtomicLong(0) //这是这个 Extension 提供的操作 def increment() = counter.incrementAndGet() } 然后,我们需要为我们的扩展创建一个 import akka.actor.ActorSystem import akka.actor.ExtensionId import akka.actor.ExtensionIdProvider import akka.actor.ExtendedActorSystem object CountExtension extends ExtensionId[CountExtensionImpl] with ExtensionIdProvider { //ExtensionIdProvider需要查找方法, // 所以我们在这里返回,这允许我们 // 配置我们的扩展程序以便 // 在ActorSystem启动时加载 override def lookup = CountExtension //这个方法将被Akka调用 // 来实例化我们的扩展 override def createExtension(system: ExtendedActorSystem) = new CountExtensionImpl /** * Java API:检索给定系统的计数扩展。 */ override def get(system: ActorSystem): CountExtensionImpl = super.get(system) } 真顽皮!现在我们需要做的就是实际使用它: CountExtension(system).increment 或者从 Akka Actor 内部使用: class MyActor extends Actor { def receive = { case someMessage ? CountExtension(context.system).increment() } } 你还可以隐藏 traits 后面的 extension: trait Counting { self: Actor ? def increment() = CountExtension(context.system).increment() } class MyCounterActor extends Actor with Counting { def receive = { case someMessage ? increment() } } 这一切就是这么简单! 从配置加载为了能够从 Akka 配置中加载扩展,您必须在您提供给您的 akka { extensions = ["docs.extension.CountExtension"] } 适应性没有限制,一切都是可能的(The sky is the limit)!顺便说一句,你知道 Akka 的 应用程序特定配置该配置可用于应用程序特定的设置。 一个好的做法是将这些设置放 Extention 中。 示例配置: myapp { db { uri = "mongodb://example1.com:27017,example2.com:27017" } circuit-breaker { timeout = 30 seconds } } 然后 import akka.actor.ActorSystem import akka.actor.Extension import akka.actor.ExtensionId import akka.actor.ExtensionIdProvider import akka.actor.ExtendedActorSystem import scala.concurrent.duration.Duration import com.typesafe.config.Config import java.util.concurrent.TimeUnit class SettingsImpl(config: Config) extends Extension { val DbUri: String = config.getString("myapp.db.uri") val CircuitBreakerTimeout: Duration = Duration( config.getMilliseconds("myapp.circuit-breaker.timeout"),TimeUnit.MILLISECONDS) } object Settings extends ExtensionId[SettingsImpl] with ExtensionIdProvider { override def lookup = Settings override def createExtension(system: ExtendedActorSystem) = new SettingsImpl(system.settings.config) /** * Java API: retrieve the Settings extension for the given system. */ override def get(system: ActorSystem): SettingsImpl = super.get(system) } 使用它: class MyActor extends Actor { val settings = Settings(context.system) val connection = connect(settings.DbUri,settings.CircuitBreakerTimeout) // ...... } 库扩展第三部分库可以在 actor 系统启动时注册它的自动加载扩展,方法是将其附加到 akka.library-extensions += "docs.extension.ExampleExtension" 由于没有办法有选择地删除这样的扩展名,所以应该小心使用它,并且只有在用户不希望禁用该扩展名的情况下或者对禁用这些子特性具有特定支持时才应使用它。有一个重要的例子:比如在测试中。
(编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |