加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

Scala:Akka演员没有在Play Framework 2.2.0中死亡

发布时间:2020-12-16 18:35:40 所属栏目:安全 来源:网络整理
导读:我有一个例子,我正在使用Play Framework 2.2.0- scala构建,它使用WebSockets将数据流式传输到客户端.我遇到的问题是,无论出于何种原因,其中一个父级Actor的孩子没有正确关闭.所有日志都表明它正在停止并且已经关闭,但我发现通过向其发布数据实际上并没有停止
我有一个例子,我正在使用Play Framework 2.2.0- scala构建,它使用WebSockets将数据流式传输到客户端.我遇到的问题是,无论出于何种原因,其中一个父级Actor的孩子没有正确关闭.所有日志都表明它正在停止并且已经关闭,但我发现通过向其发布数据实际上并没有停止.这是一些代码,首先是我的控制器操作:

def scores(teamIds: String) = WebSocket.async[JsValue] { request =>
    val teamIdsArr:Array[String] = teamIds.split(",").distinct.map { el =>
        s"nfl-streaming-scores-${el}"
    }

    val scoresStream = Akka.system.actorOf(Props(new ScoresStream(teamIdsArr)))
    ScoresStream.join(scoresStream)
  }

因此,每次客户端连接时,它们都会加入ScoresStream,后者返回WebSocket.async所需的相应Iteratee,Enumerator.实际的ScoresStream对象如下所示:

object ScoresStream {

  implicit val timeout = Timeout(5 seconds)

  def join(scoresStream:ActorRef):scala.concurrent.Future[(Iteratee[JsValue,_],Enumerator[JsValue])] = {

    (scoresStream ? BeginStreaming).map {

      case Connected(enumerator) => 
        val iteratee = Iteratee.foreach[JsValue] { _ => 
          Logger.info("Ignore iteratee input.")
        }.map { _ => 
          Logger.info("Client quitting - killing Actor.")
          scoresStream ! UnsubscribeAll
          scoresStream ! PoisonPill
        }
        (iteratee,enumerator)
}

这里的想法是在客户端断开连接时终止主Actor,ScoresStream.我通过使用scoresStream来做到这一点! PoisonPill.

ScoresStream依次创建Pub和Sub实例,这些实例是连接到Redis的包装器,用于发布/编写消息,这里是Actor代码:

class ScoresStream(teamIds: Array[String]) extends Actor with CreatePubSub with akka.actor.ActorLogging {

  val (scoresEnumerator,scoresChannel) = Concurrent.broadcast[JsValue]

  case class Message(kind: String,user: String,message: String)
  implicit val messageReads = Json.reads[Message]
  implicit val messageWrites = Json.writes[Message]

  val sub = context.child("sub") match {
    case None    => createSub(scoresChannel)
    case Some(c) => c
  }

  val pub = context.child("pub") match {
     case None     => createPub(teamIds)
     case Some(c)  => c
  }

  def receive = {
    case BeginStreaming => {
      log.info("hitting join...")
      sub ! RegisterCallback
      sub ! SubscribeChannel(teamIds)
      sender ! Connected(scoresEnumerator)
    }

    case UnsubscribeAll => {
      sub ! UnsubscribeChannel(teamIds)
    }
  }

}

trait CreatePubSub { self:Actor =>
  def createSub(pChannel: Concurrent.Channel[JsValue]) = context.actorOf(Props(new Sub(pChannel)),"sub")
  def createPub(teamIds: Array[String]) = context.actorOf(Props(new Pub(teamIds)),"pub")
}

最后,这是实际的Sub Actor代码:( Pub在这里似乎没有关系,因为它关闭正常):

class Sub(pChannel: Concurrent.Channel[JsValue]) extends Actor with CreatePublisherSubscriber with ActorLogging {
  val s = context.child("subscriber") match {
    case None    => createSubscriber
    case Some(c) => c
  }

  def callback(pubsub: PubSubMessage) = pubsub match {
    case E(exception) => println("Fatal error caused consumer dead. Please init new consumer reconnecting to master or connect to backup")
    case S(channel,no) => println("subscribed to " + channel + " and count = " + no)
    case U(channel,no) => println("unsubscribed from " + channel + " and count = " + no)
    case M(channel,msg) => 
      msg match {
        // exit will unsubscribe from all channels and stop subscription service
        case "exit" => 
          println("unsubscribe all ..")
          pChannel.end
          r.unsubscribe

        // message "+x" will subscribe to channel x
        case x if x startsWith "+" => 
          val s: Seq[Char] = x
          s match {
            case Seq('+',rest @ _*) => r.subscribe(rest.toString){ m => }
          }

        // message "-x" will unsubscribe from channel x
        case x if x startsWith "-" => 
          val s: Seq[Char] = x
          s match {
            case Seq('-',rest @ _*) => r.unsubscribe(rest.toString)
                                        pChannel.end
          }

        case x => 
         try {
            log.info("Just got a message: " + x)
            pChannel.push(Json.parse(x))
          } 
          catch {
            case ex: com.fasterxml.jackson.core.JsonParseException => {
              log.info("Malformed JSON sent.")
            }
          }
      }
  }

  def receive = {
    case RegisterCallback => {
      log.info("Creating a subscriber and registering callback")  
      s ! Register(callback)
    }
    case SubscribeChannel(teamIds) => {
      teamIds.foreach { x => log.info("subscribing to channel " + x + " ") }
      //sub ! Subscribe(Array("scores-5","scores-6"))
      s ! Subscribe(teamIds)
    }
    case UnsubscribeChannel(teamIds) => {
      teamIds.foreach { x => log.info("unsubscribing from channel " + x + " ") }
      s ! Unsubscribe(teamIds)
    }
    case true => println("Subscriber successfully received message.")
    case false => println("Something went wrong.")
  }
}

trait CreatePublisherSubscriber { self:Actor =>
  def r = new RedisClient("localhost",6379)
  def createSubscriber = context.actorOf(Props(new Subscriber(r)),"subscriber")
  def createPublisher = context.actorOf(Props(new Publisher(r)),"publisher")
}

现在,当客户端连接时,启动消息看起来很健康:

[DEBUG] [10/20/2013 00:35:53.618] [application-akka.actor.default-dispatcher-12] [akka://application/user] now supervising Actor[akka://application/user/$c#-54456921]
[DEBUG] [10/20/2013 00:35:53.619] [application-akka.actor.default-dispatcher-12] [akka://application/user/$c] started (com.example.stream.models.ScoresStream@131a9310)
[DEBUG] [10/20/2013 00:35:53.620] [application-akka.actor.default-dispatcher-12] [akka://application/user/$c] now supervising Actor[akka://application/user/$c/sub#1376180991]
[DEBUG] [10/20/2013 00:35:53.621] [application-akka.actor.default-dispatcher-17] [akka://application/user/$c/pub/publisher] started (com.redis.Publisher@3b34c0a6)
[DEBUG] [10/20/2013 00:35:53.622] [application-akka.actor.default-dispatcher-17] [akka://application/user/$c/sub/subscriber] started (com.redis.Subscriber@453f0a8)
Subscriber successfully received message.
Subscriber successfully received message.
[DEBUG] [10/20/2013 00:35:53.699] [application-akka.actor.default-dispatcher-19] [akka://application/user/$c/sub] started (com.example.stream.models.Sub@6165ab39)
[DEBUG] [10/20/2013 00:35:53.699] [application-akka.actor.default-dispatcher-19] [akka://application/user/$c/sub] now supervising Actor[akka://application/user/$c/sub/subscriber#-1562348862]
subscribed to nfl-streaming-scores-5 and count = 1
[DEBUG] [10/20/2013 00:35:53.699] [application-akka.actor.default-dispatcher-12] [akka://application/user/$c] now supervising Actor[akka://application/user/$c/pub#-707418539]
[INFO] [10/20/2013 00:35:53.700] [application-akka.actor.default-dispatcher-12] [akka://application/user/$c] hitting join...
[INFO] [10/20/2013 00:35:53.700] [application-akka.actor.default-dispatcher-23] [akka://application/user/$c/sub] Creating a subscriber and registering callback
[INFO] [10/20/2013 00:35:53.700] [application-akka.actor.default-dispatcher-23] [akka://application/user/$c/sub] subscribing to channel nfl-streaming-scores-5 
[DEBUG] [10/20/2013 00:35:53.700] [application-akka.actor.default-dispatcher-18] [akka://application/user/$c/pub] started (com.example.stream.models.Pub@48007a17)
[DEBUG] [10/20/2013 00:35:53.703] [application-akka.actor.default-dispatcher-18] [akka://application/user/$c/pub] now supervising Actor[akka://application/user/$c/pub/publisher#1509054514]

断开连接看起来很健康:

[info] application - Client quitting - killing Actor.
unsubscribed from nfl-streaming-scores-5 and count = 0
[DEBUG] [10/20/2013 00:37:51.696] [application-akka.actor.default-dispatcher-17] [akka://application/user/$c] received AutoReceiveMessage Envelope(PoisonPill,Actor[akka://application/deadLetters])
[INFO] [10/20/2013 00:37:51.696] [application-akka.actor.default-dispatcher-25] [akka://application/user/$c/sub] unsubscribing from channel nfl-streaming-scores-5 
[DEBUG] [10/20/2013 00:37:51.696] [application-akka.actor.default-dispatcher-17] [akka://application/user/$c] stopping
[DEBUG] [10/20/2013 00:37:51.697] [application-akka.actor.default-dispatcher-25] [akka://application/user/$c/sub] stopping
[DEBUG] [10/20/2013 00:37:51.697] [application-akka.actor.default-dispatcher-25] [akka://application/user/$c/pub/publisher] stopped
[DEBUG] [10/20/2013 00:37:51.697] [application-akka.actor.default-dispatcher-17] [akka://application/user/$c/sub/subscriber] stopped
[DEBUG] [10/20/2013 00:37:51.697] [application-akka.actor.default-dispatcher-17] [akka://application/user/$c/sub] stopped
[INFO] [10/20/2013 00:37:51.697] [application-akka.actor.default-dispatcher-17] [akka://application/user/$c/sub] Message [java.lang.Boolean] from Actor[akka://application/user/$c/sub/subscriber#-1562348862] to Actor[akka://application/user/$c/sub#1376180991] was not delivered. [2] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[DEBUG] [10/20/2013 00:37:51.699] [application-akka.actor.default-dispatcher-26] [akka://application/user/$c/pub] stopping
[DEBUG] [10/20/2013 00:37:51.699] [application-akka.actor.default-dispatcher-26] [akka://application/user/$c/pub] stopped
[DEBUG] [10/20/2013 00:37:51.699] [application-akka.actor.default-dispatcher-17] [akka://application/user/$c] stopped

这就是问题,在客户端断开连接后,我将发送一条消息,表示当前关闭的Actor已订阅:

redis-cli publish "nfl-streaming-scores-5" "{"test":"message"}"

在这里它出现了,当它不应该出现时,这个演员应该在技术上死了.之前到处的其他演员也会收到消息,标有$a / $b的演员.我可以确认没有其他客户端连接.

[INFO] [10/20/2013 00:38:33.097] [Thread-7] [akka://application/user/$c/sub] Just got a message: {"test":"message"}

还有一个奇怪的指标是地址名称永远不会被重复使用.断开/连接时,我一直看到类似以下名称的趋势:

akka://application/user/$c
akka://application/user/$d
akka://application/user/$e

永远不会看到旧的引用被重用.

我在这里的假设是与Redis的连接并未完全关闭.它没有解释为什么日志说Actor已经停止但仍然存在,但我确实看到在运行netstat之后建立了与redis的连接,即使在所有Actors都可能已经死亡之后.当我完全停止运行应用程序时,这些连接就会清除.这就好像取消订阅是静默失败并且保持Actor活着以及连接,这实际上是由于多种原因造成的,因为最终系统将耗尽文件描述符和/或存在内存泄漏.这里有什么明显的东西我做错了吗?

解决方法

仅仅因为你停止了演员并不意味着该演员拥有的任何资源都会被自动清理.如果有一个RedisClient绑定到该actor实例,并且需要停止此连接以便正确清理,那么您应该在postStop方法中执行类似的操作.我也同意@Schleichardt的意思,你应该将def r = new RedisClient更改为val或lazy val(取决于初始化顺序和需求).这样您知道每个订阅者实例只有一个RedisClient可以清理.我不知道您正在使用的RedisClient的API,但是假设它有一个关闭方法,它将终止其连接并清理其资源.然后你可以简单地将postStop添加到订阅者actor,如下所示:

override def postStop {
  r.shutdown
}

假设您将def更改为val,这可能就是您要寻找的.

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读