Observable详解
在介绍 Observable 之前,我们要先了解两个设计模式:
这两个模式是 Observable 的基础,下面我们先来介绍一下 Observer Pattern。 Observer Pattern观察者模式定义
观察者模式又叫发布订阅模式(Publish/Subscribe),它定义了一种一对多的关系,让多个观察者对象同时监听某一个主题对象,这个主题对象的状态发生变化时就会通知所有的观察者对象,使得它们能够自动更新自己。 我们可以使用日常生活中,期刊订阅的例子来形象地解释一下上面的概念。期刊订阅包含两个主要的角色:期刊出版方和订阅者,他们之间的关系如下:
在观察者模式中也有两个主要角色:Subject (主题) 和 Observer (观察者) 。它们分别对应例子中的期刊出版方和订阅者。接下来我们来看张图,从而加深对上面概念的理解。
观察者模式优缺点观察者模式的优点:
观察者模式的缺点:
观察者模式的应用在前端领域,观察者模式被广泛地使用。最常见的例子就是为 DOM 对象添加事件监听,具体示例如下: <button id="btn">确认</button> function clickHandler(event) { console.log('用户已点击确认按钮!'); } document.getElementById("btn").addEventListener('click',clickHandler); 上面代码中,我们通过 addEventListener API 监听 button 对象上的点击事件,当用户点击按钮时,会自动执行我们的 观察者模式实战Subject 类定义: class Subject { constructor() { this.observerCollection = []; } registerObserver(observer) { this.observerCollection.push(observer); } unregisterObserver(observer) { let index = this.observerCollection.indexOf(observer); if(index >= 0) this.observerCollection.splice(index,1); } notifyObservers() { this.observerCollection.forEach((observer)=>observer.notify()); } } Observer 类定义: class Observer { constructor(name) { this.name = name; } notify() { console.log(`${this.name} has been notified.`); } } 使用示例: let subject = new Subject(); // 创建主题对象 let observer1 = new Observer('semlinker'); // 创建观察者A - 'semlinker' let observer2 = new Observer('lolo'); // 创建观察者B - 'lolo' subject.registerObserver(observer1); // 注册观察者A subject.registerObserver(observer2); // 注册观察者B subject.notifyObservers(); // 通知观察者 subject.unregisterObserver(observer1); // 移除观察者A subject.notifyObservers(); // 验证是否成功移除 以上代码成功运行后控制台的输出结果: semlinker has been notified. # 输出一次 2(unknown) lolo has been notified. # 输出两次 需要注意的是,在观察者模式中,通常情况下调用注册观察者后,会返回一个函数,用于移除监听,有兴趣的读者,可以自己尝试一下。(备注:在 Angular 1.x 中调用 $scope.$on() 方法后,就会返回一个函数,用于移除监听) Iterator Pattern迭代器模式定义迭代器(Iterator)模式,又叫做游标(Cursor)模式。它提供一种方法顺序访问一个聚合对象中的各个元素,而又不需要暴露该对象的内部表示。迭代器模式可以把迭代的过程从业务逻辑中分离出来,在使用迭代器模式之后,即使不关心对象的内部构造,也可以按顺序访问其中的每个元素。 迭代器模式的优缺点迭代器模式的优点:
迭代器模式的缺点:
ECMAScript 迭代器
一个迭代器对象 ,知道如何每次访问集合中的一项, 并记录它的当前在序列中所在的位置。在 JavaScript 中迭代器是一个对象,它提供了一个 next() 方法,返回序列中的下一项。这个方法返回包含
详细信息可以参考 - 可迭代协议和迭代器协议 ES 5 迭代器接下来我们来创建一个 makeIterator 函数,该函数的参数类型是数组,当调用该函数后,返回一个包含 next() 方法的 Iterator 对象, 其中 next() 方法是用来获取容器对象中下一个元素。具体示例如下: function makeIterator(array){ var nextIndex = 0; return { next: function(){ return nextIndex < array.length ? {value: array[nextIndex++],done: false} : {done: true}; } } } 一旦初始化,next() 方法可以用来依次访问可迭代对象中的元素: var it = makeIterator(['yo','ya']); console.log(it.next().value); // 'yo' console.log(it.next().value); // 'ya' console.log(it.next().done); // true ES 6 迭代器在 ES 6 中我们可以通过 let arr = ['a','b','c']; let iter = arr[Symbol.iterator](); 调用 > iter.next() { value: 'a',done: false } > iter.next() { value: 'b',done: false } > iter.next() { value: 'c',done: false } > iter.next() { value: undefined,done: true } ES 6 中可迭代的对象:
ObservableRxJS 是基于观察者模式和迭代器模式以函数式编程思维来实现的。RxJS 中含有两个基本概念:Observables 与 Observer。Observables 作为被观察者,是一个值或事件的流集合;而 Observer 则作为观察者,根据 Observables 进行处理。 Observables 与 Observer 之间的订阅发布关系(观察者模式) 如下:
Proposal Observable
自定义 Observable如果你想真正了解 Observable,最好的方式就是自己写一个。其实 Observable 就是一个函数,它接受一个 它的基本特征:
它的作用: 作为生产者与观察者之间的桥梁,并返回一种方法来解除生产者与观察者之间的联系,其中观察者用于处理时间序列上数据流。接下来我们来看一下 Observable 的基础实现: DataSource - 数据源 class DataSource { constructor() { let i = 0; this._id = setInterval(() => this.emit(i++),200); // 创建定时器 } emit(n) { const limit = 10; // 设置数据上限值 if (this.ondata) { this.ondata(n); } if (n === limit) { if (this.oncomplete) { this.oncomplete(); } this.destroy(); } } destroy() { // 清除定时器 clearInterval(this._id); } } myObservable function myObservable(observer) { let datasource = new DataSource(); // 创建数据源 datasource.ondata = (e) => observer.next(e); // 处理数据流 datasource.onerror = (err) => observer.error(err); // 处理异常 datasource.oncomplete = () => observer.complete(); // 处理数据流终止 return () => { // 返回一个函数用于,销毁数据源 datasource.destroy(); }; } 使用示例: const unsub = myObservable({ next(x) { console.log(x); },error(err) { console.error(err); },complete() { console.log('done')} }); /** * 移除注释,可以测试取消订阅 */ // setTimeout(unsub,500); 具体运行结果,可以查看线上示例。 SafeObserver - 更好的 Observer上面的示例中,我们使用一个包含了 next、error、complete 方法的普通 JavaScript 对象来定义观察者。一个普通的 JavaScript 对象只是一个开始,在 RxJS 5 里面,为开发者提供了一些保障机制,来保证一个更安全的观察者。以下是一些比较重要的原则:
为了完成上述目标,我们得把传入的匿名 class SafeObserver { constructor(destination) { this.destination = destination; } next(value) { // 尚未取消订阅,且包含next方法 if (!this.isUnsubscribed && this.destination.next) { try { this.destination.next(value); } catch (err) { // 出现异常时,取消订阅释放资源,再抛出异常 this.unsubscribe(); throw err; } } } error(err) { // 尚未取消订阅,且包含error方法 if (!this.isUnsubscribed && this.destination.error) { try { this.destination.error(err); } catch (e2) { // 出现异常时,取消订阅释放资源,再抛出异常 this.unsubscribe(); throw e2; } this.unsubscribe(); } } complete() { // 尚未取消订阅,且包含complete方法 if (!this.isUnsubscribed && this.destination.complete) { try { this.destination.complete(); } catch (err) { // 出现异常时,取消订阅释放资源,再抛出异常 this.unsubscribe(); throw err; } this.unsubscribe(); } } unsubscribe() { // 用于取消订阅 this.isUnsubscribed = true; if (this.unsub) { this.unsub(); } } } myObservable - 使用 SafeObserver function myObservable(observer) { const safeObserver = new SafeObserver(observer); // 创建SafeObserver对象 const datasource = new DataSource(); // 创建数据源 datasource.ondata = (e) => safeObserver.next(e); datasource.onerror = (err) => safeObserver.error(err); datasource.oncomplete = () => safeObserver.complete(); safeObserver.unsub = () => { // 为SafeObserver对象添加unsub方法 datasource.destroy(); }; // 绑定this上下文,并返回unsubscribe方法 return safeObserver.unsubscribe.bind(safeObserver); } 使用示例: const unsub = myObservable({ next(x) { console.log(x); },complete() { console.log('done')} }); 具体运行结果,可以查看线上示例。 Operators - 也是函数Operator 是一个函数,它接收一个 Observable 对象,然后返回一个新的 Observable 对象。当我们订阅新返回的 Observable 对象时,它内部会自动订阅前一个 Observable 对象。接下来我们来实现常用的 map 操作符: Observable 实现: class Observable { constructor(_subscribe) { this._subscribe = _subscribe; } subscribe(observer) { const safeObserver = new SafeObserver(observer); safeObserver.unsub = this._subscribe(safeObserver); return safeObserver.unsubscribe.bind(safeObserver); } } map 操作符实现: function map(source,project) { return new Observable((observer) => { const mapObserver = { next: (x) => observer.next(project(x)),error: (err) => observer.error(err),complete: () => observer.complete() }; return source.subscribe(mapObserver); }); } 具体运行结果,可以查看线上示例。 改进 Observable - 支持 Operator 链式调用如果把 map(map(myObservable,(x) => x + 1),(x) => x + 2); 对于上面的代码,想象一下有 5、6 个嵌套着的 你也可以试下 Texas Toland 提议的简单版管道实现,合并压缩一个数组的 pipe(myObservable,map(x => x + 1),map(x => x + 2)); 理想情况下,我们想将代码用更自然的方式链起来: myObservable.map(x => x + 1).map(x => x + 2); 幸运的是,我们已经有了这样一个 Observable.prototype.map = function (project) { return new Observable((observer) => { const mapObserver = { next: (x) => observer.next(project(x)),complete: () => observer.complete() }; return this.subscribe(mapObserver); }); }; 现在我们终于有了一个还不错的实现。这样实现还有其他好处,例如:可以写子类继承 接下来我们来总结一下该部分的内容:Observable 就是函数,它接受 Observer 作为参数,又返回一个函数。如果你也写了一个函数,接收一个 Observer 作为参数,又返回一个函数,那么,它是异步的、还是同步的 ?其实都不是,它就只是一个函数。任何函数的行为都依赖于它的具体实现,所以当你处理一个 Observable 时,就把它当成一个普通函数,里面没有什么黑魔法。当你要构建 Operator 链时,你需要做的其实就是生成一个函数将一堆 Observers 链接在一起,然后让真正的数据依次穿过它们。 Rx.Observable.createvar observable = Rx.Observable .create(function(observer) { observer.next('Semlinker'); // RxJS 4.x 以前的版本用 onNext observer.next('Lolo'); }); // 订阅这个 Observable observable.subscribe(function(value) { console.log(value); }); 以上代码运行后,控制台会依次输出 'Semlinker' 和 'Lolo' 两个字符串。 需要注意的是,很多人认为 RxJS 中的所有操作都是异步的,但其实这个观念是错的。RxJS 的核心特性是它的异步处理能力,但它也是可以用来处理同步的行为。具体示例如下: var observable = Rx.Observable .create(function(observer) { observer.next('Semlinker'); // RxJS 4.x 以前的版本用 onNext observer.next('Lolo'); }); console.log('start'); observable.subscribe(function(value) { console.log(value); }); console.log('end'); 以上代码运行后,控制台的输出结果: start Semlinker Lolo end 当然我们也可以用它处理异步行为: var observable = Rx.Observable .create(function(observer) { observer.next('Semlinker'); // RxJS 4.x 以前的版本用 onNext observer.next('Lolo'); setTimeout(() => { observer.next('RxJS Observable'); },300); }) console.log('start'); observable.subscribe(function(value) { console.log(value); }); console.log('end'); 以上代码运行后,控制台的输出结果: start Semlinker Lolo end RxJS Observable 从以上例子中,我们可以得出一个结论 - Observable 可以应用于同步和异步的场合。 Observable - Creation OperatorRxJS 中提供了很多操作符,用于创建 Observable 对象,常用的操作符如下:
上面的例子中,我们已经使用过了 create 操作符,接下来我们来看一下其它的操作符: ofvar source = Rx.Observable.of('Semlinker','Lolo'); source.subscribe({ next: function(value) { console.log(value); },complete: function() { console.log('complete!'); },error: function(error) { console.log(error); } }); 以上代码运行后,控制台的输出结果: Semlinker Lolo complete! fromvar arr = [1,2,3]; var source = Rx.Observable.from(arr); // 也支持字符串,如 "Angular 2 修仙之路" source.subscribe({ next: function(value) { console.log(value); },error: function(error) { console.log(error); } }); 以上代码运行后,控制台的输出结果: 1 2 3 complete! fromEventRx.Observable.fromEvent(document.querySelector('button'),'click'); fromPromisevar source = Rx.Observable .fromPromise(new Promise((resolve,reject) => { setTimeout(() => { resolve('Hello RxJS!'); },3000) })); source.subscribe({ next: function(value) { console.log(value); },error: function(error) { console.log(error); } }); 以上代码运行后,控制台的输出结果: Hello RxJS! complete! emptyvar source = Rx.Observable.empty(); source.subscribe({ next: function(value) { console.log(value); },error: function(error) { console.log(error); } }); 以上代码运行后,控制台的输出结果: complete! empty 操作符返回一个空的 Observable 对象,如果我们订阅该对象,它会立即返回 complete 信息。 nevervar source = Rx.Observable.never(); source.subscribe({ next: function(value) { console.log(value); },error: function(error) { console.log(error); } }); never 操作符会返回一个无穷的 Observable,当我们订阅它后,什么事情都不会发生,它是一个一直存在却什么都不做的 Observable 对象。 throwvar source = Rx.Observable.throw('Oop!'); source.subscribe({ next: function(value) { console.log(value); },error: function(error) { console.log('Throw Error: ' + error); } }); 以上代码运行后,控制台的输出结果: Throw Error: Oop! throw 操作如,只做一件事就是抛出异常。 intervalvar source = Rx.Observable.interval(1000); source.subscribe({ next: function(value) { console.log(value); },error: function(error) { console.log('Throw Error: ' + error); } }); 以上代码运行后,控制台的输出结果: 0 1 2 ... interval 操作符支持一个数值类型的参数,用于表示定时的间隔。上面代码表示每隔 1s,会输出一个递增的值,初始值从 0 开始。 timervar source = Rx.Observable.timer(1000,5000); source.subscribe({ next: function(value) { console.log(value); },error: function(error) { console.log('Throw Error: ' + error); } }); 以上代码运行后,控制台的输出结果: 0 # 1s后 1 # 5s后 2 # 5s后 ... timer 操作符支持两个参数,第一个参数用于设定发送第一个值需等待的时间,第二个参数表示第一次发送后,发送其它值的间隔时间。此外,timer 操作符也可以只传递一个参数,具体如下: var source = Rx.Observable.timer(1000); source.subscribe({ next: function(value) { console.log(value); },error: function(error) { console.log('Throw Error: ' + error); } }); 以上代码运行后,控制台的输出结果: 0 complete! Subscription有些时候对于一些 Observable 对象 (如通过 interval、timer 操作符创建的对象),当我们不需要的时候,要释放相关的资源,以避免资源浪费。针对这种情况,我们可以调用 var source = Rx.Observable.timer(1000,1000); // 取得subscription对象 var subscription = source.subscribe({ next: function(value) { console.log(value); },error: function(error) { console.log('Throw Error: ' + error); } }); setTimeout(() => { subscription.unsubscribe(); },5000); RxJS - ObserverObserver (观察者) 是一个包含三个方法的对象,每当 Observable 触发事件时,便会自动调用观察者的对应方法。 Observer 接口定义: interface Observer<T> { closed?: boolean; // 标识是否已经取消对Observable对象的订阅 next: (value: T) => void; error: (err: any) => void; complete: () => void; } Observer 中的三个方法的作用:
接下来我们来看个具体示例: var observable = Rx.Observable .create(function(observer) { observer.next('Semlinker'); observer.next('Lolo'); observer.complete(); observer.next('not work'); }); // 创建一个观察者 var observer = { next: function(value) { console.log(value); },error: function(error) { console.log(error); },complete: function() { console.log('complete'); } } // 订阅已创建的observable对象 observable.subscribe(observer); 以上代码运行后,控制台的输出结果: Semlinker Lolo complete 上面的例子中,我们可以看出,complete 方法执行后,next 就会失效,所以不会输出 另外观察者可以不用同时包含 next、complete、error 三种方法,它可以只包含一个 next 方法,具体如下: var observer = { next: function(value) { console.log(value); } }; 有时候 Observable 可能是一个无限的序列,例如 click 事件,对于这种场景,complete 方法就永远不会被调用。 我们也可以在调用 Observable 对象的 observable.subscribe( value => { console.log(value); },error => { console.log('Error: ',error); },() => { console.log('complete'); } ); Pull vs PushPull 和 Push 是数据生产者和数据的消费者两种不同的交流方式。 什么是Pull?在 "拉" 体系中,数据的消费者决定何时从数据生产者那里获取数据,而生产者自身并不会意识到什么时候数据将会被发送给消费者。 每一个 JavaScript 函数都是一个 "拉" 体系,函数是数据的生产者,调用函数的代码通过 ''拉出" 一个单一的返回值来消费该数据。 const add = (a,b) => a + b; let sum = add(3,4); ES6介绍了 iterator迭代器 和 Generator生成器 — 另一种 "拉" 体系,调用 什么是Push?在 "推" 体系中,数据的生产者决定何时发送数据给消费者,消费者不会在接收数据之前意识到它将要接收这个数据。 Promise(承诺) 是当今 JS 中最常见的 "推" 体系,一个Promise (数据的生产者)发送一个 resolved value (成功状态的值)来执行一个回调(数据消费者),但是不同于函数的地方的是:Promise 决定着何时数据才被推送至这个回调函数。 RxJS 引入了 Observables (可观察对象),一个全新的 "推" 体系。一个可观察对象是一个产生多值的生产者,当产生新数据的时候,会主动 "推送给" Observer (观察者)。
接下来我们来看张图,从而加深对上面概念的理解:
Observable vs PromiseObservable(可观察对象)是基于推送(Push)运行时执行(lazy)的多值集合。
延迟计算 & 渐进式取值延迟计算所有的 Observable 对象一定会等到订阅后,才开始执行,如果没有订阅就不会执行。 var source = Rx.Observable.from([1,3,4,5]); var example = source.map(x => x + 1); 上面的示例中,因为 example 对象还未被订阅,所以不会进行运算。这跟数组不一样,具体如下: var source = [1,5]; var example = source.map(x => x + 1); 以上代码运行后,example 中就包含已运算后的值。 渐进式取值数组中的操作符如:filter、map 每次都会完整执行并返回一个新的数组,才会继续下一步运算。具体示例如下: var source = [1,5]; var example = source .filter(x => x % 2 === 0) // [2,4] .map(x => x + 1) // [3,5] 关于数组中的 map、filter 的详细信息,可以参考 - RxJS Functional Programming 为了更好地理解数组操作符的运算过程,我们可以参考下图:
虽然 Observable 运算符每次都会返回一个新的 Observable 对象,但每个元素都是渐进式获取的,且每个元素都会经过操作符链的运算后才输出,而不会像数组那样,每个阶段都得完整运算。具体示例如下: var source = Rx.Observable.from([1,5]); var example = source .filter(x => x % 2 === 0) .map(x => x + 1) example.subscribe(console.log); 以上代码的执行过程如下:
为了更好地理解 Observable 操作符的运算过程,我们可以参考下图:
学习资源
参考资源
(编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |