浅谈命令查询职责分离(CQRS)模式
在常用的三层架构中,通常都是通过数据访问层来修改或者查询数据,一般修改和查询使用的是相同的实体。在一些业务逻辑简单的系统中可能没有什么问题,但是随着系统逻辑变得复杂,用户增多,这种设计就会出现一些性能问题。虽然在DB上可以做一些读写分离的设计,但在业务上如果在读写方面混合在一起的话,仍然会出现一些问题。 本文介绍了命令查询职责分离模式(Command Query Responsibility Segregation,CQRS),该模式从业务上分离修改 (Command,增,删,改,会对系统状态进行修改)和查询(Query,查,不会对系统状态进行修改)的行为。从而使得逻辑更加清晰,便于对不同部分进行针对性的优化。文章首先简要介绍了传统的CRUD方式存在的问题,接着介绍了CQRS模式,最后以一个简单的在线日记系统演示了如何实现CQRS模式。要谈到读写操作,首先我们来看传统的CRUD的问题。 一 CRUD方式的问题 通常对DB执行的增,删,改,查(CRUD)都是针对的系统的实体对象。如通过数据访问层获取数据,然后通过数据传输对象DTO传给表现层。或者,用户需要更新数据,通过DTO对象将数据传给Model,然后通过数据访问层写回数据库,系统中的所有交互都是和数据查询和存储有关,可以认为是数据驱动(Data-Driven)的,如下图: 对于一些比较简单的系统,使用这种CRUD的设计方式能够满足要求。特别是通过一些代码生成工具及ORM等能够非常方便快速的实现功能。 但是传统的CRUD方法有一些问题: 使用同一个对象实体来进行数据库读写可能会太粗糙,大多数情况下,比如编辑的时候可能只需要更新个别字段,但是却需要将整个对象都穿进去,有些字段其实是不需要更新的。在查询的时候在表现层可能只需要个别字段,但是需要查询和返回整个实体对象。 要从业务上将读和写分离,就是接下来要介绍的命令查询职责分离模式。 二 什么是CQRS 命令(Command):不返回任何结果(void),但会改变对象的状态。 private int i = 0; private void IncreaseCommand(int value) CQRS是对CQS模式的进一步改进成的一种简单模式。 它由Greg Young在CQRS,Task Based UIs,Event Sourcing agh! 这篇文章中提出。“CQRS只是简单的将之前只需要创建一个对象拆分成了两个对象,这种分离是基于方法是执行命令还是执行查询这一原则来定的(这个和CQS的定义一致)”。 CQRS使用分离的接口将数据查询操作(Queries)和数据修改操作(Commands)分离开来,这也意味着在查询和更新过程中使用的数据模型也是不一样的。这样读和写逻辑就隔离开来了。 使用CQRS分离了读写职责之后,可以对数据进行读写分离操作来改进性能,可扩展性和安全。如下图: 主数据库处理CUD,从库处理R,从库的的结构可以和主库的结构完全一样,也可以不一样,从库主要用来进行只读的查询操作。在数量上从库的个数也可以根据查询的规模进行扩展,在业务逻辑上,也可以根据专题从主库中划分出不同的从库。从库也可以实现成ReportingDatabase,根据查询的业务需求,从主库中抽取一些必要的数据生成一系列查询报表来存储。 使用ReportingDatabase的一些优点通常可以使得查询变得更加简单高效: ReportingDatabase的结构和数据表会针对常用的查询请求进行设计。 三 什么时候可以考虑CQRS 分工明确,可以负责不同的部分 当在业务逻辑层有很多操作需要相同的实体或者对象进行操作的时候。CQRS使得我们可以对读和写定义不同的实体和方法,从而可以减少或者避免对某一方面的更改造成冲突 领域模型或者业务逻辑比较简单,这种情况下使用CQRS会把系统搞复杂。 五 CQRS的简单实现 上图很清晰的说明了CQRS在读写方面的分离,在读方面,通过QueryFacade到数据库里去读取数据,这个库有可能是ReportingDB。在写方面,比较复杂,操作通过Command发送到CommandBus上,然后特定的CommandHandler处理请求,产生对应的Event,将Eevnt持久化后,通过EventBus特定的EevntHandler对数据库进行修改等操作。 例子代码可以到codeproject上下载,整体结构如下: 由三个项目构成,Diary.CQRS包含了所有的Domain和消息对象。Configuration通过使用一个名为StructMap的IOC来初始化一些变量方便Web调用,Web是一个简单的MVC3项目,在Controller中有与CQRS交互的代码。 下面分别看Query和Command方面的实现: Query方向的实现 public ActionResult Index() public ActionResult Edit(Guid id) public class ReportDatabase : IReportDatabase public DiaryItemDto GetById(Guid id) { return items.Where(a => a.Id == id).FirstOrDefault(); } public void Add(DiaryItemDto item) { items.Add(item); } public void Delete(Guid id) { items.RemoveAll(i => i.Id == id); } public List<DiaryItemDto> GetItems() { return items; } } Query方面的代码很简单。在实际的应用中,这一块就是直接对DB进行查询,然后通过DTO对象返回,这个DB可能是应对特定场景的报表数据库,这样可以提升查询性能。 下面来看Command方向的实现: Command方向的实现 在MVC的Control中,可以看到Add的Controller中只调用了一句话: [HttpPost] return RedirectToAction("Index"); } public class CreateItemCommand:Command public CreateItemCommand(Guid aggregateId,string title,string description,int version,DateTime from,DateTime to) : base(aggregateId,version) { Title = title; Description = description; From = from; To = to; } } public class CommandBus:ICommandBus public CommandBus(ICommandHandlerFactory commandHandlerFactory) { _commandHandlerFactory = commandHandlerFactory; } public void Send<T>(T command) where T : Command { var handler = _commandHandlerFactory.GetHandler<T>(); if (handler != null) { handler.Execute(command); } else { throw new UnregisteredDomainCommandException("no handler registered"); } } } public class StructureMapCommandHandlerFactory : ICommandHandlerFactory var cmdHandler = handlers.Select(handler => (ICommandHandler<T>)ObjectFactory.GetInstance(handler)).FirstOrDefault(); return cmdHandler; } private IEnumerable<Type> GetHandlerTypes<T>() where T : Command { var handlers = typeof(ICommandHandler<>).Assembly.GetExportedTypes() .Where(x => x.GetInterfaces() .Any(a => a.IsGenericType && a.GetGenericTypeDefinition() == typeof(ICommandHandler<>) )) .Where(h=>h.GetInterfaces() .Any(ii=>ii.GetGenericArguments() .Any(aa=>aa==typeof(T)))).ToList(); return handlers; } } public class CreateItemCommandHandler : ICommandHandler public CreateItemCommandHandler(IRepository<DiaryItem> repository) { _repository = repository; } public void Execute(CreateItemCommand command) { if (command == null) { throw new ArgumentNullException("command"); } if (_repository == null) { throw new InvalidOperationException("Repository is not initialized."); } var aggregate = new DiaryItem(command.Id,command.Title,command.Description,command.From,command.To); aggregate.Version = -1; _repository.Save(aggregate,aggregate.Version); } } 现在CommandBus中,找到了处理特定Command的Handler。然后执行该类型的Execute方法。 可以看到在该类型中实例化了一个名为aggregate的DiaryItem对象。这个和我们之前查询所用到的DiaryItemDto有所不同,这个一个领域对象,里面包含了一系列事件。 public class DiaryItem : AggregateRoot, public DateTime From { get; set; } public DateTime To { get; set; } public string Description { get; set; } public DiaryItem() { } public DiaryItem(Guid id,DateTime to) { ApplyChange(new ItemCreatedEvent(id,title,description,from,to)); } public void ChangeTitle(string title) { ApplyChange(new ItemRenamedEvent(Id,title)); } public void Handle(ItemCreatedEvent e) { Title = e.Title; From = e.From; To = e.To; Id = e.AggregateId; Description = e.Description; Version = e.Version; } public void Handle(ItemRenamedEvent e) { Title = e.Title; } ... } public class ItemCreatedEvent:Event public ItemCreatedEvent(Guid aggregateId,DateTime to) { AggregateId = aggregateId; Title = title; From = from; To = to; Description = description; } } ApplyChange方法在AggregateRoot对象中,他是聚集根,这是DDD中的概念。通过这个根可以串起所有对象。 该类实现了IEventProvider接口,他保存了所有在_changes中的所有没有提交的变更,其中的ApplyChange的用来为特定的Event查找Eventhandler的方法: public abstract class AggregateRoot : IEventProvider public Guid Id { get; internal set; } public int Version { get; internal set; } public int EventVersion { get; protected set; } protected AggregateRoot() { _changes = new List<Event>(); } public IEnumerable<Event> GetUncommittedChanges() { return _changes; } public void MarkChangesAsCommitted() { _changes.Clear(); } public void LoadsFromHistory(IEnumerable<Event> history) { foreach (var e in history) ApplyChange(e,false); Version = history.Last().Version; EventVersion = Version; } protected void ApplyChange(Event @event) { ApplyChange(@event,true); } private void ApplyChange(Event @event,bool isNew) { dynamic d = this; d.Handle(Converter.ChangeTo(@event,@event.GetType())); if (isNew) { _changes.Add(@event); } } } 然后Command继续执行,然后调用了_repository.Save(aggregate,aggregate.Version);这个方法。先看这个Repository对象。 public class Repository : IRepository where T : AggregateRoot,new() public Repository(IEventStorage storage) { _storage = storage; } public void Save(AggregateRoot aggregate,int expectedVersion) { if (aggregate.GetUncommittedChanges().Any()) { lock (_lockStorage) { var item = new T(); if (expectedVersion != -1) { item = GetById(aggregate.Id); if (item.Version != expectedVersion) { throw new ConcurrencyException(string.Format("Aggregate {0} has been previously modified",item.Id)); } } _storage.Save(aggregate); } } } public T GetById(Guid id) { IEnumerable<Event> events; var memento = _storage.GetMemento<BaseMemento>(id); if (memento != null) { events = _storage.GetEvents(id).Where(e=>e.Version>=memento.Version); } else { events = _storage.GetEvents(id); } var obj = new T(); if(memento!=null) ((IOriginator)obj).SetMemento(memento); obj.LoadsFromHistory(events); return obj; } } IEventStorage用来存储所有的事件,其实现类型为InMemoryEventStorage。 public class InMemoryEventStorage:IEventStorage private readonly IEventBus _eventBus; public InMemoryEventStorage(IEventBus eventBus) { _events = new List<Event>(); _mementos = new List<BaseMemento>(); _eventBus = eventBus; } public IEnumerable<Event> GetEvents(Guid aggregateId) { var events = _events.Where(p => p.AggregateId == aggregateId).Select(p => p); if (events.Count() == 0) { throw new AggregateNotFoundException(string.Format("Aggregate with Id: {0} was not found",aggregateId)); } return events; } public void Save(AggregateRoot aggregate) { var uncommittedChanges = aggregate.GetUncommittedChanges(); var version = aggregate.Version; foreach (var @event in uncommittedChanges) { version++; if (version > 2) { if (version % 3 == 0) { var originator = (IOriginator)aggregate; var memento = originator.GetMemento(); memento.Version = version; SaveMemento(memento); } } @event.Version=version; _events.Add(@event); } foreach (var @event in uncommittedChanges) { var desEvent = Converter.ChangeTo(@event,@event.GetType()); _eventBus.Publish(desEvent); } } public T GetMemento<T>(Guid aggregateId) where T : BaseMemento { var memento = _mementos.Where(m => m.Id == aggregateId).Select(m=>m).LastOrDefault(); if (memento != null) return (T) memento; return null; } public void SaveMemento(BaseMemento memento) { _mementos.Add(memento); } } 然后在foreach循环中,对于所有的没有提交的变更,EventBus将该事件发布出去。 现在,所有的发生变更的事件已经记录下来了。事件已经被发布到EventBus上,然后对应的EventHandler再处理对应的事件,然后与DB交互。现在来看EventBus的Publish方法。 public class EventBus:IEventBus public EventBus(IEventHandlerFactory eventHandlerFactory) { _eventHandlerFactory = eventHandlerFactory; } public void Publish<T>(T @event) where T : Event { var handlers = _eventHandlerFactory.GetHandlers<T>(); foreach (var eventHandler in handlers) { eventHandler.Handle(@event); } } } public class StructureMapEventHandlerFactory : IEventHandlerFactory { public IEnumerable (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |
- Swift 周报 #73
- ruby-on-rails – Rails 3.1.0.rc5 Rake问题
- XML 和 ini作为配置文件优缺点
- 详解swiper在vue中的应用(以3.0为例)
- postgres启动服务器时missing or erroneous pg_hba.conf fi
- c – “隐式共享”是否可以从Qt类中删除?
- 使用刷机软件Flash_tools.exe,设备驱动安装后一闪而过
- postgresql – “”psql“”和“”postgres“”命令有什么区
- xml – XSD错误:不允许使用字符内容,因为内容类型为空
- c# – 无法从Activator.CreateInstance中捕获异常