本篇文章帶大家了解Angular中的可觀察對(duì)象(Observable)、觀察者(observer)和RxJS操作符,希望對(duì)大家有所幫助!
Observable(可觀察對(duì)象)
Observable
(可觀察對(duì)象),是RxJS
庫(kù)里面的一個(gè)對(duì)象,可以用來(lái)處理異步事件,例如HTTP請(qǐng)求(實(shí)際上,在Angular中,所有的HTTP請(qǐng)求返回的都是Observable)?!鞠嚓P(guān)教程推薦:《angular教程》】
或許,你以前接觸過(guò)一個(gè)叫promise
的東西,它們本質(zhì)上面是相同的:都是生產(chǎn)者主動(dòng)向消費(fèi)者“push”產(chǎn)品,而消費(fèi)者是被動(dòng)接收的,但是他們兩者還是有很大區(qū)別的:Observable
可以發(fā)送任意多值,并且,在被訂閱之前,它是不會(huì)執(zhí)行的!這是promise
不具備的特點(diǎn)。
Observable
用于在發(fā)送方和接收方之間傳輸消息,你可以將這些消息看成是流- 在創(chuàng)建
Observable
對(duì)象時(shí),需要傳入一個(gè)函數(shù)作為構(gòu)造函數(shù)的參數(shù),這個(gè)函數(shù)叫訂閱者函數(shù),這個(gè)函數(shù)也就是生產(chǎn)者向消費(fèi)者推送消息的地方 - 在被消費(fèi)者
subscribe
(訂閱)之前,訂閱者函數(shù)不會(huì)被執(zhí)行,直到subscribe()
函數(shù)被調(diào)用,該函數(shù)返回一個(gè)subscription
對(duì)象,里面有一個(gè)unsubscribe()
函數(shù),消費(fèi)者可以隨時(shí)拒絕消息的接收! subscribe()
函數(shù)接收一個(gè)observer(觀察者)
對(duì)象作為入?yún)?/li>- 消息的發(fā)送可以是同步的,也可以是異步的
observer(觀察者)
有了可觀察對(duì)象(發(fā)送方)
,就需要一個(gè)觀察者(接收方)
來(lái)觀察可觀察對(duì)象,觀察者要實(shí)現(xiàn)observer
接口,它是一個(gè)對(duì)象,其中包含三個(gè)屬性,它們都是函數(shù),如下:
通知類(lèi)型 | 說(shuō)明 |
---|---|
next | 必要。以接收的值作為入?yún)?,在正常情況下執(zhí)行??赡軋?zhí)行零次或多次。 |
error | 可選。出錯(cuò)的情況下執(zhí)行。錯(cuò)誤會(huì)中斷這個(gè)可觀察對(duì)象實(shí)例的執(zhí)行過(guò)程。 |
complete | 可選。傳輸完成的情況下執(zhí)行。 |
訂閱
只有當(dāng)有人訂閱 Observable
的實(shí)例時(shí),它才會(huì)開(kāi)始發(fā)布值。 訂閱時(shí)要先調(diào)用可觀察對(duì)象的 subscribe()
方法,并把一個(gè)觀察者對(duì)象傳給它,用來(lái)接收通知。如下:
為了展示訂閱的原理,需要先創(chuàng)建新的可觀察對(duì)象。它有一個(gè)構(gòu)造函數(shù)可以用來(lái)創(chuàng)建新實(shí)例,但是為了更簡(jiǎn)明,也可以使用
Observable
上定義的一些靜態(tài)方法來(lái)創(chuàng)建一些常用的簡(jiǎn)單可觀察對(duì)象:
of(...items)
:返回一個(gè)Observable
實(shí)例,它用同步的方式把參數(shù)中提供的這些值一個(gè)一個(gè)
發(fā)送出來(lái)。from(iterable)
: 把它的參數(shù)轉(zhuǎn)換成一個(gè)Observable
實(shí)例。 該方法通常用于把一個(gè)數(shù)組轉(zhuǎn)換成一個(gè)(發(fā)送多個(gè)值的)可觀察對(duì)象。
import { of } from "rxjs"; // 1、通過(guò) of() 方法返回一個(gè)可觀察對(duì)象,并準(zhǔn)備將1,2,3三個(gè)數(shù)據(jù)發(fā)送出去 const observable = of(1, 2, 3); // 2、實(shí)現(xiàn) observer 接口,觀察者 const observer = { next: (num: number) => console.log(num), error: (err: Error) => console.error('Observer got an error: ' + err), complete: () => console.log('Observer got a complete notification'), } // 3、訂閱。調(diào)用可觀察對(duì)象的 subscribe() 方法訂閱,subscribe() 方法中傳入的對(duì)象就是一個(gè)觀察者 observable.subscribe(observer);
運(yùn)行結(jié)果如下:
上面訂閱的寫(xiě)法可以直接改為如下:參數(shù)不是對(duì)象
observable.subscribe( num => console.log(num), err => console.error('Observer got an error: ' + err), () => console.log('Observer got a complete notification') );
訂閱者函數(shù)
在上面的例子中使用的是of()
方法來(lái)創(chuàng)建可觀察對(duì)象,這節(jié)使用構(gòu)造函數(shù)創(chuàng)建可觀察對(duì)象。
Observable
構(gòu)造函數(shù)可以創(chuàng)建任何類(lèi)型的可觀察流。 當(dāng)執(zhí)行可觀察對(duì)象的subscribe()
方法時(shí),這個(gè)構(gòu)造函數(shù)就會(huì)把它接收到的參數(shù)作為訂閱函數(shù)
來(lái)運(yùn)行。 訂閱函數(shù)會(huì)接收一個(gè)Observer
對(duì)象,并把值發(fā)布給觀察者的next()
方法。
// 1、自定義訂閱者函數(shù) function sequenceSubscriber(observer: Observer<number>) { observer.next(1); // 發(fā)送數(shù)據(jù) observer.next(2); // 發(fā)送數(shù)據(jù) observer.next(3); // 發(fā)送數(shù)據(jù) observer.complete(); return {unsubscribe() {}}; } // 2、通過(guò)構(gòu)造函數(shù)創(chuàng)建一個(gè)新的可觀察對(duì)象,參數(shù)就是一個(gè)訂閱者函數(shù) const sequence = new Observable(sequenceSubscriber); // 3、訂閱 sequence.subscribe({ next(num) { console.log(num); }, // 接受數(shù)據(jù) complete() { console.log('Finished sequence'); } });
運(yùn)行結(jié)果如下:
上面一個(gè)例子演示了如何自定義訂閱函數(shù),那么既然可以自定義訂閱者函數(shù),我們就可以將異步代碼封裝進(jìn)可觀察對(duì)象的訂閱者函數(shù)中,待異步代碼執(zhí)行完再發(fā)送數(shù)據(jù)。如下:
import { Observable } from 'rxjs' // 異步函數(shù) function fn(num) { return new Promise((reslove, reject) => { setTimeout(() => { num++ reslove(num) }, 1000) }) } // 創(chuàng)建可觀察對(duì)象,并傳入訂閱者函數(shù) const observable = new Observable((x) => { let num = 1 fn(num).then( res => x.next(res) // 異步代碼執(zhí)行完成,發(fā)送數(shù)據(jù) ) }) // 訂閱,接收數(shù)據(jù),可以改為鏈?zhǔn)秸{(diào)用 observable.subscribe(data => console.log(data)) // 2
多播
https://angular.cn/guide/observables#multicasting
RxJS操作符
我們可以使用一系列的RxJS操作符
,在這些消息被接收方接收之前,對(duì)它們進(jìn)行一系列的處理、轉(zhuǎn)換,因?yàn)檫@些操作符都是純函數(shù)。
import { of } from 'rxjs'; import { map } from 'rxjs/operators'; // 1、創(chuàng)建可觀察對(duì)象,并發(fā)送數(shù)據(jù) const nums = of(1, 2, 3); // 2、創(chuàng)建函數(shù)以接受可觀察對(duì)象 const squareValues = map((val: number) => val * val); const squaredNums = squareValues(nums); squaredNums.subscribe(x => console.log(x));
上面的方式我看不懂且難以接受,一般常用下面這種,使用pipe
把多個(gè)操作符鏈接起來(lái)
import { map, Observable, filter } from 'rxjs' // 創(chuàng)建可觀察對(duì)象,并傳入訂閱者函數(shù) const observable = new Observable((x) => { x.next(1) x.next(2) x.next(3) x.next(4) }).pipe( map(value => value*100), // 操作符 filter(value => value == 200) // 操作符 ) .subscribe(data => console.log(data)) // 200
錯(cuò)誤處理
RxJS
還提供了catchError
操作符,它允許你在管道中處理已知錯(cuò)誤。
假設(shè)你有一個(gè)可觀察對(duì)象,它發(fā)起 API 請(qǐng)求,然后對(duì)服務(wù)器返回的響應(yīng)進(jìn)行映射。如果服務(wù)器返回了錯(cuò)誤或值不存在,就會(huì)生成一個(gè)錯(cuò)誤。如果你捕獲這個(gè)錯(cuò)誤并提供了一個(gè)默認(rèn)值,流就會(huì)繼續(xù)處理這些值,而不會(huì)報(bào)錯(cuò)。如下:
import { map, Observable, filter, catchError, of } from 'rxjs' const observable = new Observable((x) => { x.next(1) // 發(fā)送數(shù)據(jù) 1 和 2 x.next(2) }).pipe( map(value => { if (value === 1) { // 1、當(dāng)發(fā)送的數(shù)據(jù)為 1 時(shí),將其乘以 100 return value*100 } else { // 2、否則拋出錯(cuò)誤 throw new Error('拋出錯(cuò)誤'); } }), // 3、此處捕獲錯(cuò)誤并處理錯(cuò)誤,對(duì)外發(fā)送數(shù)據(jù) 0 catchError((err) => { console.log(err) return of(0) }) ) .subscribe( data => console.log(data), // 4、由于上面拋出的錯(cuò)誤被 catchError 操作符處理(重新發(fā)送數(shù)據(jù))了,所以這里能順利訂閱到數(shù)據(jù)而不報(bào)錯(cuò) err => console.log('接受不到數(shù)據(jù):', err) )
最后的運(yùn)行結(jié)果如下: