scala – 确认交付的Akka持久性给出了不一致的结果
我一直在玩Akka Persistence,并编写了以下程序来测试我的理解.问题是每次运行此程序时我都会得到不同的结果.正确答案是49995000,但我并不总是这样.我已经清理了每次运行之间的日志目录,但它没有任何区别.有谁能看出出了什么问题?该程序简单地将从1到n的所有数字相加(其中n在下面的代码中为9999).
正确的答案是:(n *(n 1))/ 2.对于n = 9999,这是49995000. 编辑:似乎与JDK 8的工作更一致,而不是JDK 7.我应该只使用JDK 8吗? package io.github.ourkid.akka.aggregator.guaranteed import akka.actor.Actor import akka.actor.ActorPath import akka.actor.ActorSystem import akka.actor.Props import akka.actor.actorRef2Scala import akka.persistence.AtLeastOnceDelivery import akka.persistence.PersistentActor case class ExternalRequest(updateAmount : Int) case class CountCommand(deliveryId : Long,updateAmount : Int) case class Confirm(deliveryId : Long) sealed trait Evt case class CountEvent(updateAmount : Int) extends Evt case class ConfirmEvent(deliveryId : Long) extends Evt class TestGuaranteedDeliveryActor(counter : ActorPath) extends PersistentActor with AtLeastOnceDelivery { override def persistenceId = "persistent-actor-ref-1" override def receiveCommand : Receive = { case ExternalRequest(updateAmount) => persist(CountEvent(updateAmount))(updateState) case Confirm(deliveryId) => persist(ConfirmEvent(deliveryId)) (updateState) } override def receiveRecover : Receive = { case evt : Evt => updateState(evt) } def updateState(evt:Evt) = evt match { case CountEvent(updateAmount) => deliver(counter,id => CountCommand(id,updateAmount)) case ConfirmEvent(deliveryId) => confirmDelivery(deliveryId) } } class FactorialActor extends Actor { var count = 0 def receive = { case CountCommand(deliveryId : Long,updateAmount:Int) => { count = count + updateAmount sender() ! Confirm(deliveryId) } case "print" => println(count) } } object GuaranteedDeliveryTest extends App { val system = ActorSystem() val factorial = system.actorOf(Props[FactorialActor]) val delActor = system.actorOf(Props(classOf[TestGuaranteedDeliveryActor],factorial.path)) import system.dispatcher system.scheduler.schedule(0 seconds,2 seconds) { factorial ! "print" } for (i <- 1 to 9999) delActor ! ExternalRequest(i) } SBT文件 name := "akka_aggregator" organization := "io.github.ourkid" version := "0.0.1-SNAPSHOT" scalaVersion := "2.11.4" scalacOptions ++= Seq("-unchecked","-deprecation") resolvers ++= Seq( "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/" ) val Akka = "2.3.7" val Spray = "1.3.2" libraryDependencies ++= Seq( // Core Akka "com.typesafe.akka" %% "akka-actor" % Akka,"com.typesafe.akka" %% "akka-cluster" % Akka,"com.typesafe.akka" %% "akka-persistence-experimental" % Akka,"org.iq80.leveldb" % "leveldb" % "0.7","org.fusesource.leveldbjni" % "leveldbjni-all" % "1.8",// For future REST API "io.spray" %% "spray-httpx" % Spray,"io.spray" %% "spray-can" % Spray,"io.spray" %% "spray-routing" % Spray,"org.typelevel" %% "scodec-core" % "1.3.0",// CSV reader "net.sf.opencsv" % "opencsv" % "2.3",// Logging "com.typesafe.akka" %% "akka-slf4j" % Akka,"ch.qos.logback" % "logback-classic" % "1.0.13",// Testing "org.scalatest" %% "scalatest" % "2.2.1" % "test","com.typesafe.akka" %% "akka-testkit" % Akka % "test","io.spray" %% "spray-testkit" % Spray % "test","org.scalacheck" %% "scalacheck" % "1.11.6" % "test" ) fork := true mainClass in assembly := Some("io.github.ourkid.akka.aggregator.TestGuaranteedDeliveryActor") application.conf文件 ########################################## # Akka Persistence Reference Config File # ########################################## akka { # Loggers to register at boot time (akka.event.Logging$DefaultLogger logs # to STDOUT) loggers = ["akka.event.slf4j.Slf4jLogger"] # Log level used by the configured loggers (see "loggers") as soon # as they have been started; before that,see "stdout-loglevel" # Options: OFF,ERROR,WARNING,INFO,DEBUG loglevel = "DEBUG" # Log level for the very basic logger activated during ActorSystem startup. # This logger prints the log messages to stdout (System.out). # Options: OFF,DEBUG stdout-loglevel = "INFO" # Filter of log events that is used by the LoggingAdapter before # publishing log events to the eventStream. logging-filter = "akka.event.slf4j.Slf4jLoggingFilter" # Protobuf serialization for persistent messages actor { serializers { akka-persistence-snapshot = "akka.persistence.serialization.SnapshotSerializer" akka-persistence-message = "akka.persistence.serialization.MessageSerializer" } serialization-bindings { "akka.persistence.serialization.Snapshot" = akka-persistence-snapshot "akka.persistence.serialization.Message" = akka-persistence-message } } persistence { journal { # Maximum size of a persistent message batch written to the journal. max-message-batch-size = 200 # Maximum size of a deletion batch written to the journal. max-deletion-batch-size = 10000 # Path to the journal plugin to be used plugin = "akka.persistence.journal.leveldb" # In-memory journal plugin. inmem { # Class name of the plugin. class = "akka.persistence.journal.inmem.InmemJournal" # Dispatcher for the plugin actor. plugin-dispatcher = "akka.actor.default-dispatcher" } # LevelDB journal plugin. leveldb { # Class name of the plugin. class = "akka.persistence.journal.leveldb.LeveldbJournal" # Dispatcher for the plugin actor. plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher" # Dispatcher for message replay. replay-dispatcher = "akka.persistence.dispatchers.default-replay-dispatcher" # Storage location of LevelDB files. dir = "journal" # Use fsync on write fsync = on # Verify checksum on read. checksum = off # Native LevelDB (via JNI) or LevelDB Java port native = on # native = off } # Shared LevelDB journal plugin (for testing only). leveldb-shared { # Class name of the plugin. class = "akka.persistence.journal.leveldb.SharedLeveldbJournal" # Dispatcher for the plugin actor. plugin-dispatcher = "akka.actor.default-dispatcher" # timeout for async journal operations timeout = 10s store { # Dispatcher for shared store actor. store-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher" # Dispatcher for message replay. replay-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher" # Storage location of LevelDB files. dir = "journal" # Use fsync on write fsync = on # Verify checksum on read. checksum = off # Native LevelDB (via JNI) or LevelDB Java port native = on } } } snapshot-store { # Path to the snapshot store plugin to be used plugin = "akka.persistence.snapshot-store.local" # Local filesystem snapshot store plugin. local { # Class name of the plugin. class = "akka.persistence.snapshot.local.LocalSnapshotStore" # Dispatcher for the plugin actor. plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher" # Dispatcher for streaming snapshot IO. stream-dispatcher = "akka.persistence.dispatchers.default-stream-dispatcher" # Storage location of snapshot files. dir = "snapshots" } } view { # Automated incremental view update. auto-update = on # Interval between incremental updates auto-update-interval = 5s # Maximum number of messages to replay per incremental view update. Set to # -1 for no upper limit. auto-update-replay-max = -1 } at-least-once-delivery { # Interval between redelivery attempts redeliver-interval = 5s # Maximum number of unconfirmed messages that will be sent in one redelivery burst redelivery-burst-limit = 10000 # After this number of delivery attempts a `ReliableRedelivery.UnconfirmedWarning` # message will be sent to the actor. warn-after-number-of-unconfirmed-attempts = 5 # Maximum number of unconfirmed messages that an actor with AtLeastOnceDelivery is # allowed to hold in memory. max-unconfirmed-messages = 100000 } dispatchers { default-plugin-dispatcher { type = PinnedDispatcher executor = "thread-pool-executor" } default-replay-dispatcher { type = Dispatcher executor = "fork-join-executor" fork-join-executor { parallelism-min = 2 parallelism-max = 8 } } default-stream-dispatcher { type = Dispatcher executor = "fork-join-executor" fork-join-executor { parallelism-min = 2 parallelism-max = 8 } } } } } 正确输出: 18:02:36.684 [default-akka.actor.default-dispatcher-3] INFO akka.event.slf4j.Slf4jLogger - Slf4jLogger started 18:02:36.684 [default-akka.actor.default-dispatcher-3] DEBUG akka.event.EventStream - logger log1-Slf4jLogger started 18:02:36.684 [default-akka.actor.default-dispatcher-3] DEBUG akka.event.EventStream - Default Loggers started 0 18:02:36.951 [default-akka.actor.default-dispatcher-14] DEBUG a.s.Serialization(akka://default) - Using serializer[akka.persistence.serialization.MessageSerializer] for message [akka.persistence.PersistentImpl] 18:02:36.966 [default-akka.actor.default-dispatcher-3] DEBUG a.s.Serialization(akka://default) - Using serializer[akka.serialization.JavaSerializer] for message [io.github.ourkid.akka.aggregator.guaranteed.CountEvent] 3974790 24064453 18:02:42.313 [default-akka.actor.default-dispatcher-11] DEBUG a.s.Serialization(akka://default) - Using serializer[akka.serialization.JavaSerializer] for message [io.github.ourkid.akka.aggregator.guaranteed.ConfirmEvent] 49995000 49995000 49995000 49995000 运行不正确: 17:56:22.493 [default-akka.actor.default-dispatcher-4] INFO akka.event.slf4j.Slf4jLogger - Slf4jLogger started 17:56:22.508 [default-akka.actor.default-dispatcher-4] DEBUG akka.event.EventStream - logger log1-Slf4jLogger started 17:56:22.508 [default-akka.actor.default-dispatcher-4] DEBUG akka.event.EventStream - Default Loggers started 0 17:56:22.750 [default-akka.actor.default-dispatcher-2] DEBUG a.s.Serialization(akka://default) - Using serializer[akka.persistence.serialization.MessageSerializer] for message [akka.persistence.PersistentImpl] 17:56:22.765 [default-akka.actor.default-dispatcher-7] DEBUG a.s.Serialization(akka://default) - Using serializer[akka.serialization.JavaSerializer] for message [io.github.ourkid.akka.aggregator.guaranteed.CountEvent] 3727815 22167811 17:56:28.391 [default-akka.actor.default-dispatcher-3] DEBUG a.s.Serialization(akka://default) - Using serializer[akka.serialization.JavaSerializer] for message [io.github.ourkid.akka.aggregator.guaranteed.ConfirmEvent] 49995000 51084018 51084018 52316760 52316760 52316760 52316760 52316760 另一个错误的运行 17:59:12.122 [default-akka.actor.default-dispatcher-3] INFO akka.event.slf4j.Slf4jLogger - Slf4jLogger started 17:59:12.137 [default-akka.actor.default-dispatcher-3] DEBUG akka.event.EventStream - logger log1-Slf4jLogger started 17:59:12.137 [default-akka.actor.default-dispatcher-3] DEBUG akka.event.EventStream - Default Loggers started 0 17:59:12.387 [default-akka.actor.default-dispatcher-7] DEBUG a.s.Serialization(akka://default) - Using serializer[akka.persistence.serialization.MessageSerializer] for message [akka.persistence.PersistentImpl] 17:59:12.402 [default-akka.actor.default-dispatcher-13] DEBUG a.s.Serialization(akka://default) - Using serializer[akka.serialization.JavaSerializer] for message [io.github.ourkid.akka.aggregator.guaranteed.CountEvent] 2982903 17710176 49347145 17:59:18.204 [default-akka.actor.default-dispatcher-13] DEBUG a.s.Serialization(akka://default) - Using serializer[akka.serialization.JavaSerializer] for message [io.github.ourkid.akka.aggregator.guaranteed.ConfirmEvent] 51704199 51704199 55107844 55107844 55107844 55107844 解决方法
您正在使用AtLeastOnceDelivery语义.正如它说的
here:
所以有些数字可能不止一次收到.您可以忽略FactorialActor中的重复数字或不使用此语义. (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |