加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 综合聚焦 > 服务器 > 安全 > 正文

scala – Akka Persistence Query事件流和CQRS

发布时间:2020-12-16 09:28:15 所属栏目:安全 来源:网络整理
导读:我正在尝试在ES-CQRS架构中实现读取端.假设我有一个像这样的持久性演员: object UserWrite { sealed trait UserEvent sealed trait State case object Uninitialized extends State case class User(username: String,password: String) extends State case
我正在尝试在ES-CQRS架构中实现读取端.假设我有一个像这样的持久性演员:

object UserWrite {

  sealed trait UserEvent
  sealed trait State
  case object Uninitialized extends State
  case class User(username: String,password: String) extends State
  case class AddUser(user: User)
  case class UserAdded(user: User) extends UserEvent
  case class UserEvents(userEvents: Source[(Long,UserEvent),NotUsed])
  case class UsersStream(fromSeqNo: Long)
  case object GetCurrentUser

  def props = Props(new UserWrite)
}

class UserWrite extends PersistentActor {

  import UserWrite._

  private var currentUser: State = Uninitialized

  override def persistenceId: String = "user-write"

  override def receiveRecover: Receive = {
    case UserAdded(user) => currentUser = user
  }

  override def receiveCommand: Receive = {
    case AddUser(user: User) => persist(UserAdded(user)) {
      case UserAdded(`user`) => currentUser = user
    }
    case UsersStream(fromSeqNo: Long) => publishUserEvents(fromSeqNo)
    case GetCurrentUser => sender() ! currentUser
  }

  def publishUserEvents(fromSeqNo: Long) = {
    val readJournal = PersistenceQuery(context.system).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
    val userEvents = readJournal
      .eventsByPersistenceId("user-write",fromSeqNo,Long.MaxValue)
      .map { case EventEnvelope(_,_,seqNo,event: UserEvent) => seqNo -> event }
    sender() ! UserEvents(userEvents)
  }
}

据我所知,每次事件持续存在时,我们都可以通过Akka Persistence Query发布它.现在,我不确定订阅这些事件的正确方法是什么,所以我可以将它保存在我的读取数据库中?其中一个想法是最初从我的读取方actor向UserWrite actor发送一条UsersStream消息,并在该read actor中发送“sink”事件.

编辑

根据@cmbaxter的建议,我以这种方式实现了read端:

object UserRead {

  case object GetUsers
  case class GetUserByUsername(username: String)
  case class LastProcessedEventOffset(seqNo: Long)
  case object StreamCompleted

  def props = Props(new UserRead)
}

class UserRead extends PersistentActor {
  import UserRead._

  var inMemoryUsers = Set.empty[User]
  var offset        = 0L

  override val persistenceId: String = "user-read"

  override def receiveRecover: Receive = {
    // Recovery from snapshot will always give us last sequence number
    case SnapshotOffer(_,LastProcessedEventOffset(seqNo)) => offset = seqNo
    case RecoveryCompleted                                 => recoveryCompleted()
  }

  // After recovery is being completed,events will be projected to UserRead actor
  def recoveryCompleted(): Unit = {
    implicit val materializer = ActorMaterializer()
    PersistenceQuery(context.system)
      .readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
      .eventsByPersistenceId("user-write",offset + 1,Long.MaxValue)
      .map {
        case EventEnvelope(_,event: UserEvent) => seqNo -> event
      }
      .runWith(Sink.actorRef(self,StreamCompleted))
  }

  override def receiveCommand: Receive = {
    case GetUsers                    => sender() ! inMemoryUsers
    case GetUserByUsername(username) => sender() ! inMemoryUsers.find(_.username == username)
    // Match projected event and update offset
    case (seqNo: Long,UserAdded(user)) =>
      saveSnapshot(LastProcessedEventOffset(seqNo))
      inMemoryUsers += user
  }
}

有一些问题,如:事件流似乎很慢.即UserRead actor可以在保存新添加的用户之前使用一组用户进行回答.

编辑2

我增加了cassandra查询日志的刷新间隔,更少解决了慢事件流的问题.看来Cassandra事件日志是默认情况下,每3秒轮询一次.在我的application.conf中我添加了:

cassandra-query-journal {
  refresh-interval = 20ms
}

编辑3

实际上,不要减少刷新间隔.这将增加内存使用量,但这并不危险,也不是一点.通常,CQRS的概念是写入和读取侧是异步的.因此,在您写入数据后,将永远无法立即进行读取.处理用户界面?我只是打开流并在读取端确认后通过服务器发送的事件推送数据.

解决方法

有一些方法可以做到这一点.例如,在我的应用程序中,我的查询端有一个actor,它一直在寻找变化的PersistenceQuery,但是你也可以拥有一个具有相同查询的线程.问题是要保持流打开,以便能够在发生持续事件时立即读取它

val readJournal =
PersistenceQuery(system).readJournalFor[CassandraReadJournal](
  CassandraReadJournal.Identifier)

// issue query to journal
val source: Source[EventEnvelope,NotUsed] =
  readJournal.eventsByPersistenceId(s"MyActorId",Long.MaxValue)

// materialize stream,consuming events
implicit val mat = ActorMaterializer()
source.map(_.event).runForeach{
  case userEvent: UserEvent => {
    doSomething(userEvent)
  }
}

而不是这个,你可以有一个提升PersistenceQuery并存储新事件的计时器,但我认为打开一个流是最好的方法

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读