加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – 实现弱引用的Eventbus演员?

发布时间:2020-12-16 18:46:41 所属栏目:安全 来源:网络整理
导读:说未引用的演员仍然订阅了事件流是否正确?至少,这是我从Akka实验中得到的…… 我正在尝试在EventBus场景中为actor实现弱引用.在这些情况下,事件监听器/演员通常会来来去去.与应该一直存在的独立演员不同.显然取消注册当然可行.但我并不总是能够认识到这一点
说未引用的演员仍然订阅了事件流是否正确?至少,这是我从Akka实验中得到的……

我正在尝试在EventBus场景中为actor实现弱引用.在这些情况下,事件监听器/演员通常会来来去去.与应该一直存在的独立演员不同.显然取消注册当然可行.但我并不总是能够认识到这一点的正确时机.

Akka是否提供此类用例?

val as = ActorSystem.create("weak")
var actor = as.actorOf(Props[ExceptionHandler])
as.eventStream.subscribe(actor,classOf[Exception])

// an event is published & received
as.eventStream.publish(new KnownProblem)

//session expires or whatever that makes the actor redundant
actor = null
(1 to 30).foreach(_ => System.gc)

// an event is published & STILL received
as.eventStream.publish(new KnownProblem)

解决方法

好吧,我实际上无法实现它,但演员正停止在GC上.使用Scala 2.9.2(REPL)Akka 2.0.3.

带有WeakReference的EventBus [ActorRef]没有帮助 – 因为在Akka中你也有一个带有ChildrenContainer(self.children)的地牢,也可能有Monitor对生命周期事件的订阅.我没有尝试的事情 – 用调度员创建只知道我们新的闪亮的WeakEventBus的演员 – 所以也许我错过了这一点?

这里是REPL的代码(使用适当的导入启动它,并且:分两步粘贴它):

// Start REPL with something like:
// scala -Yrepl-sync -classpath "/opt/akka-2.0.3/lib/akka/akka-actor-2.0.3.jar:
// /opt/akka-2.0.3/lib/akka/akka-remote-2.0.3.jar:
// /opt/akka-2.0.3/lib/akka/config-0.3.1.jar:
// /opt/akka-2.0.3/lib/akka/protobuf-java-2.4.1.jar:
// /opt/akka-2.0.3/lib/akka/netty-3.5.3.Final.jar"

// :paste 1/2
import akka.actor._
import akka.pattern._
import akka.event._
import akka.util._
import com.typesafe.config.ConfigFactory
import akka.util.Timeout
import akka.dispatch.Await
import scala.ref.WeakReference
import java.util.Comparator
import java.util.concurrent.atomic._
import java.util.UUID

case class Message(val id:String,val timestamp: Long)
case class PostMessage(
  override val id:String=UUID.randomUUID().toString(),override val timestamp: Long=new java.util.Date().getTime(),text:String) extends Message(id,timestamp)  
case class MessageEvent(val channel:String,val message:Message)

case class StartServer(nodeName: String)
case class ServerStarted(nodeName: String,actor: ActorRef) 
case class IsAlive(nodeName: String)
case class IsAliveWeak(nodeName: String)
case class AmAlive(nodeName: String,actor: ActorRef)
case class GcCheck()
case class GcCheckScheduled(isScheduled: Boolean,gcFlag: WeakReference[AnyRef])

trait WeakLookupClassification { this: WeakEventBus ?
protected final val subscribers = new Index[Classifier,WeakReference[Subscriber]](mapSize(),new Comparator[WeakReference[Subscriber]] {
          def compare(a: WeakReference[Subscriber],b: WeakReference[Subscriber]): Int = { 
              if (a.get == None || b.get == None) -1
              else compareSubscribers(a.get.get,b.get.get)
        }
      })  
protected def mapSize(): Int
protected def compareSubscribers(a: Subscriber,b: Subscriber): Int
protected def classify(event: Event): Classifier
protected def publish(event: Event,subscriber: Subscriber): Unit
def subscribe(subscriber: Subscriber,to: Classifier): Boolean = 
  subscribers.put(to,new WeakReference(subscriber))
def unsubscribe(subscriber: Subscriber,from: Classifier): Boolean = 
  subscribers.remove(from,new WeakReference(subscriber))
def unsubscribe(subscriber: Subscriber): Unit = 
  subscribers.removeValue(new WeakReference(subscriber))
def publish(event: Event): Unit = {
      val i = subscribers.valueIterator(classify(event))
      while (i.hasNext) publish(event,i.next().get.get)
}
  }

class WeakEventBus extends EventBus with WeakLookupClassification {
  type Event = MessageEvent
  type Classifier=String   
  type Subscriber = ActorRef

  protected def compareSubscribers(a: ActorRef,b: ActorRef) = a compareTo b

  protected def mapSize(): Int = 10
  protected def classify(event: Event): Classifier = event.channel
  protected def publish(event: Event,subscriber: Subscriber): Unit =
      subscriber ! event
}

lazy val weakEventBus = new WeakEventBus

implicit val timeout = akka.util.Timeout(1000)
lazy val actorSystem = ActorSystem("serversys",ConfigFactory.parseString(""" 
akka {
  loglevel = "DEBUG"
  actor {
        provider = "akka.remote.RemoteActorRefProvider"
        debug {
          receive = on 
          autoreceive = on        
          lifecycle = on
          event-stream = on
        }
  }
  remote {
        transport = "akka.remote.netty.NettyRemoteTransport"
        log-sent-messages = on 
        log-received-messages = on    
  }
}
serverconf {
  include "common"
  akka {
        actor {
          deployment {
        /root {
          remote = "akka://serversys@127.0.0.1:2552"
        }    
          }
        }
        remote {
          netty {
        hostname = "127.0.0.1"
        port = 2552
          }
        }
  }
}
""").getConfig("serverconf"))

class Server extends Actor {
  private[this] val scheduled = new AtomicBoolean(false)
  private[this] val gcFlagRef = new AtomicReference[WeakReference[AnyRef]]()

  val gcCheckPeriod = Duration(5000,"millis")

  override def preRestart(reason: Throwable,message: Option[Any]) {
        self ! GcCheckScheduled(scheduled.get,gcFlagRef.get)
        super.preRestart(reason,message)
  }

  def schedule(period: Duration,who: ActorRef) = 
        actorSystem.scheduler.scheduleOnce(period)(who ! GcCheck)  

  def receive = {    
        case StartServer(nodeName) => 
          sender ! ServerStarted(nodeName,self)
          if (scheduled.compareAndSet(false,true)) 
        schedule(gcCheckPeriod,self)
          val gcFlagObj = new AnyRef()              
          gcFlagRef.set(new WeakReference(gcFlagObj))
          weakEventBus.subscribe(self,nodeName)
          actorSystem.eventStream.unsubscribe(self)      
        case GcCheck =>
          val gcFlag = gcFlagRef.get
          if (gcFlag == null) {
        sys.error("gcFlag")
          }
         gcFlag.get match {
        case Some(gcFlagObj) =>
          scheduled.set(true)
          schedule(gcCheckPeriod,self)  
        case None =>
          println("Actor stopped because of GC: " + self)
          context.stop(self)        
        }
        case GcCheckScheduled(isScheduled,gcFlag) =>
          if (isScheduled && scheduled.compareAndSet(false,isScheduled)) {
        gcFlagRef.compareAndSet(null,gcFlag)
        schedule(gcCheckPeriod,self)            
          }
        case IsAlive(nodeName) =>
          println("Im alive (default EventBus): " + nodeName)
          sender ! AmAlive(nodeName,self)
        case e: MessageEvent => 
          println("Im alive (weak EventBus): " + e)       
    }   
} 

// :paste 2/2
class Root extends Actor { 
  def receive = {
      case start @ StartServer(nodeName) =>
        val server = context.actorOf(Props[Server],nodeName)
        context.watch(server)
        Await.result(server ? start,timeout.duration)
      .asInstanceOf[ServerStarted] match {
        case started @ ServerStarted(nodeName,_) => 
          sender ! started
        case _ => 
          throw new RuntimeException(
            "[S][FAIL] Could not start server: " + start)
        }
      case isAlive @ IsAlive(nodeName) =>
        Await.result(context.actorFor(nodeName) ? isAlive,timeout.duration).asInstanceOf[AmAlive] match {
        case AmAlive(nodeName,_) => 
          println("[S][SUCC] Server is alive : " + nodeName)
        case _ => 
      throw new RuntimeException("[S][FAIL] Wrong answer: " + nodeName)    
          }
      case isAliveWeak @ IsAliveWeak(nodeName) =>                
        actorSystem.eventStream.publish(MessageEvent(nodeName,PostMessage(text="isAlive-default")))  
        weakEventBus.publish(MessageEvent(nodeName,PostMessage(text="isAlive-weak")))  
}
  }

lazy val rootActor = actorSystem.actorOf(Props[Root],"root")

object Root {
  def start(nodeName: String) = {
        val msg = StartServer(nodeName)
        var startedActor: Option[ActorRef] = None
        Await.result(rootActor ? msg,timeout.duration)
      .asInstanceOf[ServerStarted] match {
            case succ @ ServerStarted(nodeName,actor) => 
          println("[S][SUCC] Server started: " + succ)
          startedActor = Some(actor)
            case _ => 
      throw new RuntimeException("[S][FAIL] Could not start server: " + msg)
          }
        startedActor  
  }
  def isAlive(nodeName: String) = rootActor ! IsAlive(nodeName)
  def isAliveWeak(nodeName: String) = rootActor ! IsAliveWeak(nodeName)
}

////////////////
// actual test 
Root.start("weak")
Thread.sleep(7000L)
System.gc()
Root.isAlive("weak")

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读