scala – Akka Persistence Query事件流和CQRS
我正在尝试在ES-CQRS架构中实现读取端.假设我有一个像这样的持久性演员:
object UserWrite { sealed trait UserEvent sealed trait State case object Uninitialized extends State case class User(username: String,password: String) extends State case class AddUser(user: User) case class UserAdded(user: User) extends UserEvent case class UserEvents(userEvents: Source[(Long,UserEvent),NotUsed]) case class UsersStream(fromSeqNo: Long) case object GetCurrentUser def props = Props(new UserWrite) } class UserWrite extends PersistentActor { import UserWrite._ private var currentUser: State = Uninitialized override def persistenceId: String = "user-write" override def receiveRecover: Receive = { case UserAdded(user) => currentUser = user } override def receiveCommand: Receive = { case AddUser(user: User) => persist(UserAdded(user)) { case UserAdded(`user`) => currentUser = user } case UsersStream(fromSeqNo: Long) => publishUserEvents(fromSeqNo) case GetCurrentUser => sender() ! currentUser } def publishUserEvents(fromSeqNo: Long) = { val readJournal = PersistenceQuery(context.system).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier) val userEvents = readJournal .eventsByPersistenceId("user-write",fromSeqNo,Long.MaxValue) .map { case EventEnvelope(_,_,seqNo,event: UserEvent) => seqNo -> event } sender() ! UserEvents(userEvents) } } 据我所知,每次事件持续存在时,我们都可以通过Akka Persistence Query发布它.现在,我不确定订阅这些事件的正确方法是什么,所以我可以将它保存在我的读取数据库中?其中一个想法是最初从我的读取方actor向UserWrite actor发送一条UsersStream消息,并在该read actor中发送“sink”事件. 编辑 根据@cmbaxter的建议,我以这种方式实现了read端: object UserRead { case object GetUsers case class GetUserByUsername(username: String) case class LastProcessedEventOffset(seqNo: Long) case object StreamCompleted def props = Props(new UserRead) } class UserRead extends PersistentActor { import UserRead._ var inMemoryUsers = Set.empty[User] var offset = 0L override val persistenceId: String = "user-read" override def receiveRecover: Receive = { // Recovery from snapshot will always give us last sequence number case SnapshotOffer(_,LastProcessedEventOffset(seqNo)) => offset = seqNo case RecoveryCompleted => recoveryCompleted() } // After recovery is being completed,events will be projected to UserRead actor def recoveryCompleted(): Unit = { implicit val materializer = ActorMaterializer() PersistenceQuery(context.system) .readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier) .eventsByPersistenceId("user-write",offset + 1,Long.MaxValue) .map { case EventEnvelope(_,event: UserEvent) => seqNo -> event } .runWith(Sink.actorRef(self,StreamCompleted)) } override def receiveCommand: Receive = { case GetUsers => sender() ! inMemoryUsers case GetUserByUsername(username) => sender() ! inMemoryUsers.find(_.username == username) // Match projected event and update offset case (seqNo: Long,UserAdded(user)) => saveSnapshot(LastProcessedEventOffset(seqNo)) inMemoryUsers += user } } 有一些问题,如:事件流似乎很慢.即UserRead actor可以在保存新添加的用户之前使用一组用户进行回答. 编辑2 我增加了cassandra查询日志的刷新间隔,更少解决了慢事件流的问题.看来Cassandra事件日志是默认情况下,每3秒轮询一次.在我的application.conf中我添加了: cassandra-query-journal { refresh-interval = 20ms } 编辑3 实际上,不要减少刷新间隔.这将增加内存使用量,但这并不危险,也不是一点.通常,CQRS的概念是写入和读取侧是异步的.因此,在您写入数据后,将永远无法立即进行读取.处理用户界面?我只是打开流并在读取端确认后通过服务器发送的事件推送数据. 解决方法
有一些方法可以做到这一点.例如,在我的应用程序中,我的查询端有一个actor,它一直在寻找变化的PersistenceQuery,但是你也可以拥有一个具有相同查询的线程.问题是要保持流打开,以便能够在发生持续事件时立即读取它
val readJournal = PersistenceQuery(system).readJournalFor[CassandraReadJournal]( CassandraReadJournal.Identifier) // issue query to journal val source: Source[EventEnvelope,NotUsed] = readJournal.eventsByPersistenceId(s"MyActorId",Long.MaxValue) // materialize stream,consuming events implicit val mat = ActorMaterializer() source.map(_.event).runForeach{ case userEvent: UserEvent => { doSomething(userEvent) } } 而不是这个,你可以有一个提升PersistenceQuery并存储新事件的计时器,但我认为打开一个流是最好的方法 (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |