加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

Scala – Akka死信与问问模式

发布时间:2020-12-16 21:34:00 所属栏目:安全 来源:网络整理
导读:如果这似乎令人困惑,我提前道歉,因为我在这里倾销了很多.基本上,我有一个小服务抓住一些Json,解析并提取它到案例类,然后写入数据库.该服务需要按照日程安排运行,而这个进度正由Akka调度程序处理得很好.我的数据库不喜欢当Slick试图在同一时间要求一个新的Aut
如果这似乎令人困惑,我提前道歉,因为我在这里倾销了很多.基本上,我有一个小服务抓住一些Json,解析并提取它到案例类,然后写入数据库.该服务需要按照日程安排运行,而这个进度正由Akka调度程序处理得很好.我的数据库不喜欢当Slick试图在同一时间要求一个新的AutoInc id,所以我建立了一个Await.result阻止发生.所有这一切都很好,但我的问题从这里开始:这些服务中有7个运行,所以我想使用类似的Await.result系统来阻止每个.每次我尝试将请求的结束时间作为响应发送回(在其他块的末尾),它将被发送到死信而不是分发者.基本上:为什么发信人!时间去死信,而不是经销商.这是一个很长的问题,一个简单的问题,但这是发展如何…

ClickActor.scala

import java.text.SimpleDateFormat
    import java.util.Date
    import Message._
    import akka.actor.{Actor,ActorLogging,Props}
    import akka.util.Timeout
    import com.typesafe.config.ConfigFactory
    import net.liftweb.json._
    import spray.client.pipelining._
    import spray.http.{BasicHttpCredentials,HttpRequest,HttpResponse,Uri}
    import akka.pattern.ask
    import scala.concurrent.{Await,Future}
    import scala.concurrent.duration._

case class ClickData(recipient : String,geolocation : Geolocation,tags : Array[String],url : String,timestamp : Double,campaigns : Array[String],`user-variables` : JObject,ip : String,`client-info` : ClientInfo,message : ClickedMessage,event : String)
  case class Geolocation(city : String,region : String,country : String)
  case class ClientInfo(`client-name`: String,`client-os`: String,`user-agent`: String,`device-type`: String,`client-type`: String)
  case class ClickedMessage(headers : ClickHeaders)
    case class ClickHeaders(`message-id` : String)

class ClickActor extends Actor with ActorLogging{

  implicit val formats = DefaultFormats
  implicit val timeout = new Timeout(3 minutes)
  import context.dispatcher

  val con = ConfigFactory.load("connection.conf")
  val countries = ConfigFactory.load("country.conf")
  val regions = ConfigFactory.load("region.conf")

  val df = new SimpleDateFormat("EEE,dd MMM yyyy HH:mm:ss -0000")
  var time = System.currentTimeMillis()
  var begin = new Date(time - (12 hours).toMillis)
  var end = new Date(time)

  val pipeline : HttpRequest => Future[HttpResponse] = (
    addCredentials(BasicHttpCredentials("api",con.getString("mailgun.key")))
      ~> sendReceive
    )

  def get(lastrun : Long): Future[String] = {

    if(lastrun != 0) {
      begin = new Date(lastrun)
      end = new Date(time)
    }

    val uri = Uri(con.getString("mailgun.uri")) withQuery("begin" -> df.format(begin),"end" -> df.format(end),"ascending" -> "yes","limit" -> "100","pretty" -> "yes","event" -> "clicked")
    val request = Get(uri)
    val futureResponse = pipeline(request)
    return futureResponse.map(_.entity.asString)
  }

  def receive = {
    case lastrun : Long => {
      val start = System.currentTimeMillis()
      val responseFuture = get(lastrun)
      responseFuture.onSuccess {
        case payload: String => val json = parse(payload)
          //println(pretty(render(json)))
          val elements = (json  "items").children
          if (elements.length == 0) {
            log.info("[ClickActor: " + this.hashCode() + "] did not find new events between " +
              begin.toString + " and " + end.toString)
            sender ! time
            context.stop(self)
          }
          else {
            for (item <- elements) {
              val data = item.extract[ClickData]
              var tags = ""
              if (data.tags.length != 0) {
                for (tag <- data.tags)
                  tags += (tag + ",")
              }
              var campaigns = ""
              if (data.campaigns.length != 0) {
                for (campaign <- data.campaigns)
                  campaigns += (campaign + ",")
              }
              val timestamp = (data.timestamp * 1000).toLong
              val msg = new ClickMessage(
                data.recipient,data.geolocation.city,regions.getString(data.geolocation.country + "." + data.geolocation.region),countries.getString(data.geolocation.country),tags,data.url,timestamp,campaigns,data.ip,data.`client-info`.`client-name`,data.`client-info`.`client-os`,data.`client-info`.`user-agent`,data.`client-info`.`device-type`,data.`client-info`.`client-type`,data.message.headers.`message-id`,data.event,compactRender(item))
              val csqla = context.actorOf(Props[ClickSQLActor])
              val future = csqla.ask(msg)
              val result = Await.result(future,timeout.duration).asInstanceOf[Int]
              if (result == 1) {
                log.error("[ClickSQLActor: " + csqla.hashCode() + "] shutting down due to lack of system environment variables")
                context.stop(csqla)
              }
              else if(result == 0) {
                log.info("[ClickSQLActor: " + csqla.hashCode() + "] successfully wrote to the DB")
              }
            }
            sender ! time
            log.info("[ClickActor: " + this.hashCode() + "] processed |" + elements.length + "| new events in " +
              (System.currentTimeMillis() - start) + " ms")
          }
      }
    }
  }
}

Distributor.scala

import akka.actor.{Props,ActorSystem}
import akka.event.Logging
import akka.util.Timeout
import akka.pattern.ask
import scala.concurrent.duration._
import scala.concurrent.Await

class Distributor {

  implicit val timeout = new Timeout(10 minutes)
  var lastClick : Long = 0

  def distribute(system : ActorSystem) = {
    val log = Logging(system,getClass)

    val clickFuture = (system.actorOf(Props[ClickActor]) ? lastClick)
    lastClick = Await.result(clickFuture,timeout.duration).asInstanceOf[Long]
    log.info(lastClick.toString)

    //repeat process with other events (open,unsub,etc)
  }
}

解决方法

原因是因为“发件人”(它是检索该值的方法)的值在离开接收块之后不再有效,但是上述示例中使用的未来将仍然在运行,并且到完成演员将离开接收块并爆炸;无效的发件人导致消息进入死信队列.

修复是要么不使用未来,要么在组合期货时,演员和发信人都会在触发未来之前捕获发信人的价值.

val s = sender

val responseFuture = get(lastrun)
    responseFuture.onSuccess {    
    ....
    s ! time
}

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读