Scala – Akka死信与问问模式
发布时间:2020-12-16 21:34:00 所属栏目:安全 来源:网络整理
导读:如果这似乎令人困惑,我提前道歉,因为我在这里倾销了很多.基本上,我有一个小服务抓住一些Json,解析并提取它到案例类,然后写入数据库.该服务需要按照日程安排运行,而这个进度正由Akka调度程序处理得很好.我的数据库不喜欢当Slick试图在同一时间要求一个新的Aut
如果这似乎令人困惑,我提前道歉,因为我在这里倾销了很多.基本上,我有一个小服务抓住一些Json,解析并提取它到案例类,然后写入数据库.该服务需要按照日程安排运行,而这个进度正由Akka调度程序处理得很好.我的数据库不喜欢当Slick试图在同一时间要求一个新的AutoInc id,所以我建立了一个Await.result阻止发生.所有这一切都很好,但我的问题从这里开始:这些服务中有7个运行,所以我想使用类似的Await.result系统来阻止每个.每次我尝试将请求的结束时间作为响应发送回(在其他块的末尾),它将被发送到死信而不是分发者.基本上:为什么发信人!时间去死信,而不是经销商.这是一个很长的问题,一个简单的问题,但这是发展如何…
ClickActor.scala import java.text.SimpleDateFormat import java.util.Date import Message._ import akka.actor.{Actor,ActorLogging,Props} import akka.util.Timeout import com.typesafe.config.ConfigFactory import net.liftweb.json._ import spray.client.pipelining._ import spray.http.{BasicHttpCredentials,HttpRequest,HttpResponse,Uri} import akka.pattern.ask import scala.concurrent.{Await,Future} import scala.concurrent.duration._ case class ClickData(recipient : String,geolocation : Geolocation,tags : Array[String],url : String,timestamp : Double,campaigns : Array[String],`user-variables` : JObject,ip : String,`client-info` : ClientInfo,message : ClickedMessage,event : String) case class Geolocation(city : String,region : String,country : String) case class ClientInfo(`client-name`: String,`client-os`: String,`user-agent`: String,`device-type`: String,`client-type`: String) case class ClickedMessage(headers : ClickHeaders) case class ClickHeaders(`message-id` : String) class ClickActor extends Actor with ActorLogging{ implicit val formats = DefaultFormats implicit val timeout = new Timeout(3 minutes) import context.dispatcher val con = ConfigFactory.load("connection.conf") val countries = ConfigFactory.load("country.conf") val regions = ConfigFactory.load("region.conf") val df = new SimpleDateFormat("EEE,dd MMM yyyy HH:mm:ss -0000") var time = System.currentTimeMillis() var begin = new Date(time - (12 hours).toMillis) var end = new Date(time) val pipeline : HttpRequest => Future[HttpResponse] = ( addCredentials(BasicHttpCredentials("api",con.getString("mailgun.key"))) ~> sendReceive ) def get(lastrun : Long): Future[String] = { if(lastrun != 0) { begin = new Date(lastrun) end = new Date(time) } val uri = Uri(con.getString("mailgun.uri")) withQuery("begin" -> df.format(begin),"end" -> df.format(end),"ascending" -> "yes","limit" -> "100","pretty" -> "yes","event" -> "clicked") val request = Get(uri) val futureResponse = pipeline(request) return futureResponse.map(_.entity.asString) } def receive = { case lastrun : Long => { val start = System.currentTimeMillis() val responseFuture = get(lastrun) responseFuture.onSuccess { case payload: String => val json = parse(payload) //println(pretty(render(json))) val elements = (json "items").children if (elements.length == 0) { log.info("[ClickActor: " + this.hashCode() + "] did not find new events between " + begin.toString + " and " + end.toString) sender ! time context.stop(self) } else { for (item <- elements) { val data = item.extract[ClickData] var tags = "" if (data.tags.length != 0) { for (tag <- data.tags) tags += (tag + ",") } var campaigns = "" if (data.campaigns.length != 0) { for (campaign <- data.campaigns) campaigns += (campaign + ",") } val timestamp = (data.timestamp * 1000).toLong val msg = new ClickMessage( data.recipient,data.geolocation.city,regions.getString(data.geolocation.country + "." + data.geolocation.region),countries.getString(data.geolocation.country),tags,data.url,timestamp,campaigns,data.ip,data.`client-info`.`client-name`,data.`client-info`.`client-os`,data.`client-info`.`user-agent`,data.`client-info`.`device-type`,data.`client-info`.`client-type`,data.message.headers.`message-id`,data.event,compactRender(item)) val csqla = context.actorOf(Props[ClickSQLActor]) val future = csqla.ask(msg) val result = Await.result(future,timeout.duration).asInstanceOf[Int] if (result == 1) { log.error("[ClickSQLActor: " + csqla.hashCode() + "] shutting down due to lack of system environment variables") context.stop(csqla) } else if(result == 0) { log.info("[ClickSQLActor: " + csqla.hashCode() + "] successfully wrote to the DB") } } sender ! time log.info("[ClickActor: " + this.hashCode() + "] processed |" + elements.length + "| new events in " + (System.currentTimeMillis() - start) + " ms") } } } } } Distributor.scala import akka.actor.{Props,ActorSystem} import akka.event.Logging import akka.util.Timeout import akka.pattern.ask import scala.concurrent.duration._ import scala.concurrent.Await class Distributor { implicit val timeout = new Timeout(10 minutes) var lastClick : Long = 0 def distribute(system : ActorSystem) = { val log = Logging(system,getClass) val clickFuture = (system.actorOf(Props[ClickActor]) ? lastClick) lastClick = Await.result(clickFuture,timeout.duration).asInstanceOf[Long] log.info(lastClick.toString) //repeat process with other events (open,unsub,etc) } } 解决方法
原因是因为“发件人”(它是检索该值的方法)的值在离开接收块之后不再有效,但是上述示例中使用的未来将仍然在运行,并且到完成演员将离开接收块并爆炸;无效的发件人导致消息进入死信队列.
修复是要么不使用未来,要么在组合期货时,演员和发信人都会在触发未来之前捕获发信人的价值. val s = sender val responseFuture = get(lastrun) responseFuture.onSuccess { .... s ! time } (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |