项目简介:
该项目始于2012年,孕育时间较长。2013年出现Reactor1.x版本。该版本成功部署到不同的组织,不仅有开源组织如MeltDown、还有商业机构如Pivotal RTI。2014年我们实现了新的"Reactive流标准",并在2015年的4月开始了版本2.0的大规模重构目标。Reactive流标准拉近了分发机制的鸿沟:控制多少线程传递多少数据。
同时我们也决定重新调整我们的一些事件驱动和任务协调API的来应对日益流行、记录的reactive扩展。
Reactor由Pivotal赞助支持,有两个核心提交者。因为Pivotal同时也是spring框架的东家,我们的很多同事也是不同spring项目的核心贡献者,所以我们也提供从Reactor到spring的集成同时也支持spring框架的一些重要功能如spring消息模块的STOMP代理。也就是说,我们不会强迫仅仅想使用Reactor的人去适应spring。我们保留了一个大容量Reactive的内嵌工具。事实上,Reactor的目标之一是在你解决异步和功能性问题时保持公正的态度。
Reactor遵循Apache 2.0 licensed,可以通过GitHub获取。
使用要求:
Reactor需要jdk7及以上版本。
但完整的功能组合表达式需要java8的lambdas支持。
作为后备,支持spring clojure和groovy的扩展。
Reactor需要jvm支持Unsafe方式获取(如:android不支持)时才能表现最全的功能。
当Unsafe获取不支持是所有基于RingBuffer的特定将不能工作。
Reactor打包成传统的jar形式存在于maven中央库中,可以使用你喜欢的工具来拉取这个依赖包。
架构总览:
Reactor基本代码划分为几个子模块,这样你可以单独使用某一模块而抛弃不需要的模块。
下述是一些使用Reactor模块和其它混合的Reactive技术示例,完成异步目标:
-
Spring XD + Reactor-Net (Core/Stream) : 使用Reactor 作为Sink/Source IO 驱动.
-
Grails | Spring + Reactor-Stream (Core) : 使用Stream和 Promise作为后台处理程序。
-
Spring Data + Reactor-Bus (Core) : 生产数据库事件(Save/Delete/…?).
-
Spring集成Java DSL + Reactor Stream (Core) : Microbatch MessageChannel from Spring Integration.
-
RxJavaReactiveStreams + RxJava + Reactor-Core : Combine rich composition with efficient asynchronous IO Processor
-
RxJavaReactiveStreams + RxJava + Reactor-Net (Core/Stream) : Compose input data with RxJava and gate with Async IO drivers.
Reactive stream
Reactive stream是一个新的标准,不同的厂商和技术组织包括Netflix,oracle,Pivotal,TypeSafe支持这个标准。该标准有望被java9或者以后的版本的标准收录进去。
该标准的目标是提供一种同步或者异步的具有控制流机制的数据序列。这个标准是轻量级的,第一个目标是JVM。它提供了4个java接口,一个Tck和一系列的示例。根据需要,4个java接口的实现非常直接,这个项目的内涵是Tck对操作的校验。
图三 Reactive stream协约
Reactive Streams接口
org.reactivestreams.Pubslisher: 数据源(从0到N个数据,n是任意的).它提供两个可选的中断事件:error和completion。
org.reactivestreams.Subscriber: 数据序列的消费者(从0到N个数据,n是任意的).它在初始化时接收一个Subscription,subscription获取Subscriber将要处理多少数据。 与其它数据时序信号交互的其它回调有: next (新消息)和可选的completion/error.
org.reactivestreams.Subscription:在初始化时传给Subscriber的一个小的追踪器。.它控制着我们准备消费掉多少数据和什么时候停止消费(取消).
org.reactivestreams.Processor: 既是Subscriber和Publisher的组件标识!
图四:Reactive Stream发布协约
通过传递Subscriber,一个请求数据从subscriber到publisher有两种方式:
无限制的: 在订阅时, 仅调用Subscription的request(Long.MAX_VALUE)方法.
有限制的: 在订阅时, 保留subscription的引用,并且当subscriber准备处理数据时调用request(long)方法。
通常,在订阅时Subscribers将请求一组初始数据或者甚至1个数据。
然后,onNext认为执行成功(例如后面的Commit,Flush等等?), 请求更多的数据。
建议使用线性组的请求。为避免请求重叠,例如每次下次请求时请求10个或者更多的数据。
表1 目前为止,Reactor直接使用的Reactive stream接口及实现
Reactive Streams |
Reactor模块 |
实现 |
说明 |
Processor |
reactor-core,reactor-stream |
reactor.core.processor.*,reactor.rx.* |
在core模块,提供了RingBuffer处理器,在stream模块,提供了一整组操作和Broadcaster。 |
Publisher |
在core模块,处理器继承了Publisher.在bus模块,发布一个不限制的路由事件,在stream模块,stream扩展直接继承Publisher. 在net模块,Chanel继承了Publisher来消费请求数据,同时也提供了具有flush和close的回调的providers. |
Subscriber |
在core模块,处理器继承了Subscriber. 在bus模块,提供了无限制的Publisher/Subscriber能力.在stream模块,Subscribers计算特定的回调行为.在Net模块,subscriber的IO层实现处理写、关闭和flush. |
Subscription |
reactor-stream,Arial; word-break:break-all; line-height:1.5; border:1px solid silver; border-collapse:collapse; padding:3px"> reactor.rx.subscription.*,Arial; word-break:break-all; line-height:1.5; border:1px solid silver; border-collapse:collapse; padding:3px"> 在stream模块,提供了一个优化过的PushSubscriptions和 buffering-ready ReactiveSubscription. 在Net模块,使用自定义Subscription实现背压的方式实现异步IO读。 |
从reactor 2启动时我们就一直遵循这个标准,并且随着标准的改变而改变直到1.0.0正式版准备发布。现在可以通过maven中央库及其流行的镜像可以找到该标准,你将发现它作为过渡,依赖于reactor-core模块。
Reactive扩展
Reactive扩展或者通常称作Rx,是一种定义完备的功能api,这些api扩展了观察者模式到一个史诗的程度。
Rx模式支持实现了使用少数设计的关键字来处理Reactive 数据序列:
使用回调链来抽象实时及延迟:当可以获得到数据时调用。
抽象了一直使用的线程模式:同步或者异步仅仅是我们处理的Observable/Stream。
控制错误传递及停止:错误和完成信号及数据的有效负载信号传递到链中。
在多个预先定义的api中解决了多个扩展-聚合及其它组合问题。
Reactive扩展的标准Jvm实现是RxJava。它提供了一个功能丰富的Api。
Reactor 2 提供了一个特定模块实现了Reactive扩展的一部分功能。建议需要使用Reactive stream全部功能的用户使用RxJava。最后,当组合完整的RxJava系统时,用户可以从Reactor提供的强大的异步和IO的中获益。
表2:Rx和Reactor stream的不同点:
rx |
reactor-stream |
说明 |
Observable |
reactor.rx.Stream |
Reactive Stream Publisher的实现 |
Operator |
reactor.rx.action.Action |
Reactive Stream Processor的实现 |
Observable with 1 data at most |
reactor.rx.Promise |
返回唯一结果的类型,Reactive Stream Processor实现并提供了可选的异步分发功能。 |
Factory API (just,from,merge…?.) |
reactor.rx.Streams |
和core模块的 data-focused 子类一样,返回 Stream |
Functional API (map,filter,take…?.) |
和core模块的data-focused 子类一样,返回Stream |
Schedulers |
reactor.core.Dispatcher,org.reactivestreams.Processor |
Reactor Stream计算无限制的共享Dispatcher或者有限的Processor的操作。 |
Observable.observeOn() |
Stream.dispatchOn() |
只是dispatcher参数的一个适配命名。 |
(编辑:李大同)
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!