scala – 实现弱引用的Eventbus演员?
发布时间:2020-12-16 18:46:41 所属栏目:安全 来源:网络整理
导读:说未引用的演员仍然订阅了事件流是否正确?至少,这是我从Akka实验中得到的…… 我正在尝试在EventBus场景中为actor实现弱引用.在这些情况下,事件监听器/演员通常会来来去去.与应该一直存在的独立演员不同.显然取消注册当然可行.但我并不总是能够认识到这一点
说未引用的演员仍然订阅了事件流是否正确?至少,这是我从Akka实验中得到的……
我正在尝试在EventBus场景中为actor实现弱引用.在这些情况下,事件监听器/演员通常会来来去去.与应该一直存在的独立演员不同.显然取消注册当然可行.但我并不总是能够认识到这一点的正确时机. Akka是否提供此类用例? val as = ActorSystem.create("weak") var actor = as.actorOf(Props[ExceptionHandler]) as.eventStream.subscribe(actor,classOf[Exception]) // an event is published & received as.eventStream.publish(new KnownProblem) //session expires or whatever that makes the actor redundant actor = null (1 to 30).foreach(_ => System.gc) // an event is published & STILL received as.eventStream.publish(new KnownProblem) 解决方法
好吧,我实际上无法实现它,但演员正停止在GC上.使用Scala 2.9.2(REPL)Akka 2.0.3.
带有WeakReference的EventBus [ActorRef]没有帮助 – 因为在Akka中你也有一个带有ChildrenContainer(self.children)的地牢,也可能有Monitor对生命周期事件的订阅.我没有尝试的事情 – 用调度员创建只知道我们新的闪亮的WeakEventBus的演员 – 所以也许我错过了这一点? 这里是REPL的代码(使用适当的导入启动它,并且:分两步粘贴它): // Start REPL with something like: // scala -Yrepl-sync -classpath "/opt/akka-2.0.3/lib/akka/akka-actor-2.0.3.jar: // /opt/akka-2.0.3/lib/akka/akka-remote-2.0.3.jar: // /opt/akka-2.0.3/lib/akka/config-0.3.1.jar: // /opt/akka-2.0.3/lib/akka/protobuf-java-2.4.1.jar: // /opt/akka-2.0.3/lib/akka/netty-3.5.3.Final.jar" // :paste 1/2 import akka.actor._ import akka.pattern._ import akka.event._ import akka.util._ import com.typesafe.config.ConfigFactory import akka.util.Timeout import akka.dispatch.Await import scala.ref.WeakReference import java.util.Comparator import java.util.concurrent.atomic._ import java.util.UUID case class Message(val id:String,val timestamp: Long) case class PostMessage( override val id:String=UUID.randomUUID().toString(),override val timestamp: Long=new java.util.Date().getTime(),text:String) extends Message(id,timestamp) case class MessageEvent(val channel:String,val message:Message) case class StartServer(nodeName: String) case class ServerStarted(nodeName: String,actor: ActorRef) case class IsAlive(nodeName: String) case class IsAliveWeak(nodeName: String) case class AmAlive(nodeName: String,actor: ActorRef) case class GcCheck() case class GcCheckScheduled(isScheduled: Boolean,gcFlag: WeakReference[AnyRef]) trait WeakLookupClassification { this: WeakEventBus ? protected final val subscribers = new Index[Classifier,WeakReference[Subscriber]](mapSize(),new Comparator[WeakReference[Subscriber]] { def compare(a: WeakReference[Subscriber],b: WeakReference[Subscriber]): Int = { if (a.get == None || b.get == None) -1 else compareSubscribers(a.get.get,b.get.get) } }) protected def mapSize(): Int protected def compareSubscribers(a: Subscriber,b: Subscriber): Int protected def classify(event: Event): Classifier protected def publish(event: Event,subscriber: Subscriber): Unit def subscribe(subscriber: Subscriber,to: Classifier): Boolean = subscribers.put(to,new WeakReference(subscriber)) def unsubscribe(subscriber: Subscriber,from: Classifier): Boolean = subscribers.remove(from,new WeakReference(subscriber)) def unsubscribe(subscriber: Subscriber): Unit = subscribers.removeValue(new WeakReference(subscriber)) def publish(event: Event): Unit = { val i = subscribers.valueIterator(classify(event)) while (i.hasNext) publish(event,i.next().get.get) } } class WeakEventBus extends EventBus with WeakLookupClassification { type Event = MessageEvent type Classifier=String type Subscriber = ActorRef protected def compareSubscribers(a: ActorRef,b: ActorRef) = a compareTo b protected def mapSize(): Int = 10 protected def classify(event: Event): Classifier = event.channel protected def publish(event: Event,subscriber: Subscriber): Unit = subscriber ! event } lazy val weakEventBus = new WeakEventBus implicit val timeout = akka.util.Timeout(1000) lazy val actorSystem = ActorSystem("serversys",ConfigFactory.parseString(""" akka { loglevel = "DEBUG" actor { provider = "akka.remote.RemoteActorRefProvider" debug { receive = on autoreceive = on lifecycle = on event-stream = on } } remote { transport = "akka.remote.netty.NettyRemoteTransport" log-sent-messages = on log-received-messages = on } } serverconf { include "common" akka { actor { deployment { /root { remote = "akka://serversys@127.0.0.1:2552" } } } remote { netty { hostname = "127.0.0.1" port = 2552 } } } } """).getConfig("serverconf")) class Server extends Actor { private[this] val scheduled = new AtomicBoolean(false) private[this] val gcFlagRef = new AtomicReference[WeakReference[AnyRef]]() val gcCheckPeriod = Duration(5000,"millis") override def preRestart(reason: Throwable,message: Option[Any]) { self ! GcCheckScheduled(scheduled.get,gcFlagRef.get) super.preRestart(reason,message) } def schedule(period: Duration,who: ActorRef) = actorSystem.scheduler.scheduleOnce(period)(who ! GcCheck) def receive = { case StartServer(nodeName) => sender ! ServerStarted(nodeName,self) if (scheduled.compareAndSet(false,true)) schedule(gcCheckPeriod,self) val gcFlagObj = new AnyRef() gcFlagRef.set(new WeakReference(gcFlagObj)) weakEventBus.subscribe(self,nodeName) actorSystem.eventStream.unsubscribe(self) case GcCheck => val gcFlag = gcFlagRef.get if (gcFlag == null) { sys.error("gcFlag") } gcFlag.get match { case Some(gcFlagObj) => scheduled.set(true) schedule(gcCheckPeriod,self) case None => println("Actor stopped because of GC: " + self) context.stop(self) } case GcCheckScheduled(isScheduled,gcFlag) => if (isScheduled && scheduled.compareAndSet(false,isScheduled)) { gcFlagRef.compareAndSet(null,gcFlag) schedule(gcCheckPeriod,self) } case IsAlive(nodeName) => println("Im alive (default EventBus): " + nodeName) sender ! AmAlive(nodeName,self) case e: MessageEvent => println("Im alive (weak EventBus): " + e) } } // :paste 2/2 class Root extends Actor { def receive = { case start @ StartServer(nodeName) => val server = context.actorOf(Props[Server],nodeName) context.watch(server) Await.result(server ? start,timeout.duration) .asInstanceOf[ServerStarted] match { case started @ ServerStarted(nodeName,_) => sender ! started case _ => throw new RuntimeException( "[S][FAIL] Could not start server: " + start) } case isAlive @ IsAlive(nodeName) => Await.result(context.actorFor(nodeName) ? isAlive,timeout.duration).asInstanceOf[AmAlive] match { case AmAlive(nodeName,_) => println("[S][SUCC] Server is alive : " + nodeName) case _ => throw new RuntimeException("[S][FAIL] Wrong answer: " + nodeName) } case isAliveWeak @ IsAliveWeak(nodeName) => actorSystem.eventStream.publish(MessageEvent(nodeName,PostMessage(text="isAlive-default"))) weakEventBus.publish(MessageEvent(nodeName,PostMessage(text="isAlive-weak"))) } } lazy val rootActor = actorSystem.actorOf(Props[Root],"root") object Root { def start(nodeName: String) = { val msg = StartServer(nodeName) var startedActor: Option[ActorRef] = None Await.result(rootActor ? msg,timeout.duration) .asInstanceOf[ServerStarted] match { case succ @ ServerStarted(nodeName,actor) => println("[S][SUCC] Server started: " + succ) startedActor = Some(actor) case _ => throw new RuntimeException("[S][FAIL] Could not start server: " + msg) } startedActor } def isAlive(nodeName: String) = rootActor ! IsAlive(nodeName) def isAliveWeak(nodeName: String) = rootActor ! IsAliveWeak(nodeName) } //////////////// // actual test Root.start("weak") Thread.sleep(7000L) System.gc() Root.isAlive("weak") (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |