RxJS
《深入淺出RxJS》讀書筆記
遺留問題
- Observable的HOT與COLD對應的實際場景,以及在編碼中的體現
chapter1
html部分
測試你對時間的感覺按住我一秒鐘然后松手你的時間:毫秒
-
jquery
實現var time = new Date().getTime(); $("#hold-me").mousedown(function(event) {time = new Date().getTime();}).mouseup(function(event) {if (time) {var elapse = new Date().getTime() - time;$("#hold-time").text(elapse);// 重置time 避免直接觸發mouseup事件,例如在A處點擊然后在B處uptime = null;}});
-
RxJS
實現const holdMeButton = document.getElementById("hold-me");const mouseDown$ = Rx.Observable.fromEvent(holdMeButton,"mousedown");const mouseUp$ = Rx.Observable.fromEvent(holdMeButton,"mouseup");// 獲取間隔時間const holdTime$ = mouseUp$.timestamp().withLatestFrom(mouseDown$.timestamp(),(mouseUpEvent,mouseDownEvent)=>{return mouseUpEvent.timestamp - mouseDownEvent.timestamp});holdTime$.subscribe(ms=>{document.getElementById("hold-time").innerText = ms;})holdTime$.flatMap(ms=>{return Rx.Observable.ajax('https://timing-sense-score-board.herokuapp.com/score/'+ms)}).map(e=>e.response).subscribe(res=>{document.getElementById("rank").innerText = `你超過了${res.rank}% 的用戶`})
chapter2
Koa2的使用
主要用來加載靜態資源,所以使用到了koa
,koa-static
const path = require("path");
const koa = require("koa");
const serve = require("koa-static");
const app = new koa();app.use(async function (ctx,next) {console.log("收到請求...")await next()console.log(`"${ctx.path}"請求 已處理...`)
})app.use(serve(path.resolve(__dirname, "../src"))).listen(3001,function(err){if(err) throw err;console.log("程序啟動成功")
});
Observable
和 Observer
Observable 可被觀察的對象,Observer觀察者,Observer通過subscribe來觀察Observable對象
RxJS的數據流就是Observable對象:
- 觀察者模式
- 迭代器模式
舉個栗子
// 使用 deep-link方式引入函數
const Observable = require("rxjs").Observable;/** 定義Observable對象的行為,會產生數據,調用訂閱者的next方法* 1. 此處的Observer與訂閱者行為 theObserver并不是同一個對象,而是對theObserver的包裝* 2. 如果observer.error被調用,之后的complete或者next就不會被調用啦,同理,complete被調用之后,也不會* 再調用next或者error* 3. 如果error或者complete一直未調用,則observer就一直在內存中等待被調用
*/
const onSubscribe = observer =>{observer.next(1);observer.error(2);observer.complete(3);
}
// 產生一個Observable對象
const source$ = new Observable(onSubscribe);
// 定義觀察者的行為 消費Observable對象產生的數據
const theObserver = {next:item => console.log(item),error:item => console.error(item),complete:item => console.log("已完成"),
}
// 建立Observable與Observer的關系
source$.subscribe(theObserver)
退訂subscribe
在訂閱一段事件之后observer不再響應吐出的信息了,這時可以退訂,但是Observeable還會一直產生數據
const Observable = require("rxjs").Observable;const onSubscribe = observer =>{let n = 1;const handle = setInterval(()=>{console.log(`in onSubscribe ${n}`)// if(n>3){// observer.complete()// }observer.next(n++);},1000)return {unsubscribe(){// clearInterval(handle)}}
}const source$ = new Observable(onSubscribe);const theObserver = {next:item => console.log(item)
}let subscription = source$.subscribe(theObserver)setTimeout(()=>{// 此處的unsubscribe也是封裝過的subscription.unsubscribe()
},3500)
在node
中執行,會一直打印 in onSubscribe *
,但是source$不會再響應
Chapter3 操作符基礎
const Observable = require("rxjs/Observable").Observable;
const of = require("rxjs/observable/of").of;
const map = require("rxjs/operator/map").map;
// 新建一個操作符
// 此處this是外部變量,導致此operator不再是純函數
Observable.prototype.double = function(){// return this::map(x=>x*2)return map.call(this,x=>x*2)
}const source$ = of(1,3,4);
const result$ = source$.double();result$.subscribe(value=>console.log(value))
lettable/pipeable操作符
解決需要使用call或者bind改變this的操作,這樣是依賴外部環境的,不屬于純函數,也會喪失TS的類型檢查優勢
-
lettable
將Observable
對象傳遞給下文,避免使用this
const Observable = require("rxjs/Observable").Observable; require("rxjs/add/observable/of").of; require("rxjs/add/operator/map").map; require("rxjs/add/operator/let").let;const source$ = Observable.of(1,2,3); const double$ = obs$ => obs$.map(v=>v*2); // 接受上文,傳遞到下文 const result$ = source$.let(double$);result$.subscribe(console.log)
不引入`map`補丁,開發**lettable**寫法的操作符
// ES5實現
function map(project){
return function(obj$){// 通過上面的Observable生成一個新Observablereturn new Observable(observer=>{return obj$.subscribe({next:value=>observer.next(project(value)),error:err=>observer.error(err),complete:()=>observer.complete()})})
}
}
// 添加操作符
var result$ = source$.let(map(x => x * 3));
// ES6實現
const map6 = fn => obj$ =>
new Observable(observer =>obj$.subscribe({next: value => observer.next(fn(value)),error: err => observer.error(err),complete: () => observer.complete()}));
// 添加操作符
var result$ = source$.let(map6(x => x * 4));
`pipeable`是`lettable`的別稱,方便對于`lattable`的理解,V6以上才支持## Chapter4 創建數據流> 大多數的操作符是靜態操作符### 基礎操作符1. `create`簡單的返回一個Observable對象
Observable.create = function(subscribe){return new Observable(subscribe)
}
```
-
of
列舉數據import {Observable} from "rxjs/Observable"; import "rxjs/add/observable/of" // 依次吐出數據,一次性emit const source$ = Observable.of(1,2,3); // 訂閱 // 第一個參數是next,第二個參數是error回調,第三個參數是complete回調 source$.subscribe(console.log,null,()=>{console.log("Complete")})
-
range
產生指定范圍的數據const sourc$ = Observable.range(/*初始值*/1,/*個數*/100); // 每次只能步進 1
-
generate
循環創建相當于for循環
const source$ = Observable.generate(// 初始值2,// 判斷條件value=> value < 10,// 步進value=> value+0.5,// 函數體,產生的結果value=> value*value )
使用
generate
代替rangeconst range = function(min,count){const max = min + count;return Observable.generate(min,v=>vv+1,v=>v*v) }
-
repeat
重復數據的數據流實例操作符,通過
import 'rxjs/add/operator/repeat'
引入- V4版本中repeat是靜態屬性,這樣在使用
Observable.repeat(1,2)
重復1兩次,這樣數據就夠靈活 - V5版本中改為實例屬性之后,Observable.of(1,2,4).repeat(2),將產生的1,2,3重復兩次,功能更加強大
const Observable = require("rxjs").Observable; require("rxjs/add/operator/repeat");const source$ = Observable.create(observer => {setTimeout(() => {observer.next(1);}, 1000);setTimeout(() => {observer.next(2);}, 2000);setTimeout(() => {observer.next(3);}, 3000);setTimeout(() => {observer.complete(1);}, 4000);return {unsubscribe(){console.log("on Unsubscribe")}} });const repeat$ = source$.repeat(2)repeat$.subscribe(console.log,null,()=>{console.log("Complete")})// 1 // 2 // 3 // on Unsubscribe // 1 // 2 // 3 // Complete // on Unsubscribe
-
如果沒有
observer.complete()
repeat不會被調用repeat以complete為契機會再次執行數據源,如果上游一直沒有complete下游就不會執行
- 因為
repeat
的存在,第一次數據源執行完(以complete為契機)后并不會執行observer的complete回調
- V4版本中repeat是靜態屬性,這樣在使用
-
empty
,throw
,never
創建異步數據的Observable對象
-
interval
和timer
interval
類似于setIntervalrequire('rxjs/add/observable/interval') // 每隔1000ms產生一個數據,初始值為0,步進為1 Observable.interval(1000)'
timer
是setTimeout的超集// 1000ms后開始產生數據,之后每隔1000ms產生一個數據,功能相當于interval Observable.timer(1000,1000) // 指定日期 Observable.time(new Date(new Date().getTime() + 12000))
-
from
把一切轉化為Observable
- 將所有的Iterable的對象都轉化為Observable對象
- 可以將Promise對象轉化為Observable對象,功能與fromPromise相同
-
fromPromise
異步處理的對接const Observable = require("rxjs").Observable; require("rxjs/add/observable/fromPromise");const promise = Promise.resolve(123); Observable.fromPromise(promise).subscribe(console.log, null, () =>console.log("Complete") ); //123 //Complete const promise1 = Promise.reject("error"); Observable.from(console.log,err => console.log("catch", err),() => console.log("Complete!") ); // 未捕獲的Promise錯誤 // (node:765) UnhandledPromiseRejectionWarning: error // (node:765) UnhandledPromiseRejectionWarning: Unhandled promise rejection. This error originated either by throwing // inside of an async function without a catch block, or by rejecting a promise which was not handled with .catch(). ( // rejection id: 1) // (node:765) [DEP0018] DeprecationWarning: Unhandled promise rejections are deprecated. In the future, promise rejections that are not handled will terminate the Node.js process with a non-zero exit code.
-
fromEvent
連接DOM與RxJS的橋梁const event$ = Observable.fromEvent(document.getElementById("btn"),"click"); event$.subscribe(event=>{// Do something })
在NodeJs中可以與
EventEmitter
交互const Observable = require("rxjs").Observable; const EventEmitter = require("events"); require("rxjs/add/observable/fromEvent")const emitter = new EventEmitter();const source$ = Observable.fromEvent(emitter,"msg"); source$.subscribe(console.log,null,()=>console.log("Complete"))emitter.emit("msg",1) // 1 emitter.emit("msg","haha") // haha emitter.emit("a-msg","haha") // emitter.emit("msg",'nihao') // nihao
fromEvent
是Hot Observable,也就是數據的產生和訂閱無關,對于fromEvent
來說,數據源是外部產生的,不受RxJS
控制,這是Hot Observable
對象的特點 -
fromEventPattern
針對不規范的事件源規范的事件源:DOM事件,EventEmitter事件
-
ajax
見最上面的例子 -
repeatWhen
例如 在上游事件結束之后的一段時間再重新訂閱
const Observable = require("rxjs").Observable; require("rxjs/add/operator/repeatWhen")const notifier = ()=>{return Observable.interval(1000); }const source$ = Observable.of(1,2,3); // const source$ = Observable.create(observer=>{ // observer.next(111); // return { // unsubscribe(){ // console.log("on Unsubscribe") // } // } // }); const repeat$ = source$.repeatWhen(notifier);repeat$.subscribe(console.log,null,()=>console.log("Complete")) // 每隔一秒產生一次 // 1 // 2 // 3 // 1 // 2 // 3 // 1 // 2 // 3 // 1 // 2 // 3 // 1 // 2 // 3 // ^C
-
defer
延遲創建Observable針對Observable占用內存比較大的情況,懶加載
const Observable = require("rxjs").Observable; require("rxjs/add/observable/defer"); require("rxjs/add/observable/of");const observableFactory = ()=>Observable.of(1,2,3); const source$ = Observable.defer(observableFactory)
合并數據流
功能需求 操作符 把多個數據流以首尾相連的方式合并 concat
,concatAll
把多個數據流以先到先得的方式合并 merge
,mergeAll
把多個數據流中的數據以一一對應的方式合并 zip
和zipAll
持續合并多個數據流中最新產生的數據 combineLatest
,combineAll
,withLatestFrom
從多個數據流中選取第一個產生內容的數據流 race
在數據流前面添加一個指定數據 startWith
只獲取多個數據流最后產生的數據 forkJoin
從高階數據流中切換數據源 switch
,exhaust
-
concat
- 實例方法
- 靜態方法,如果兩個數據沒有先后關系,推薦使用此方法
-
實例方法
const Observable = require("rxjs").Observable; require("rxjs/add/operator/of") require("rxjs/add/operator/concat")const source$1 = Observable.of(1,2,3); const source$2 = Observable.of(4,5,6);source$1.concat(source$2).subscribe(console.log,null,()=>console.log("Complete"))
-
靜態方法
const Observable = require("rxjs").Observable; require("rxjs/add/operator/of") require("rxjs/add/observable/concat")const source$1 = Observable.of(1,2,3); const source$2 = Observable.of(4,5,6);Observable.concat(source$1,source$2).subscribe(console.log,null,()=>console.log("Complete"))
`concat`在將上一個數據源傳遞下去的時候會調用上一個`Observable`的`unsubscribe`,如果上一個`Observable`一直為完結,后續的都不會被調用```javascript
const source$1 = Observable.internal(1000);
const source$2 = Observable.of(1);
const concated$ = Observable.concat(source$1,source$2);
// 此時 source$2永遠不會被調用
```在此推測:`rxjs/add/operator/*`下的屬性都是實例屬性,`rxjs/add/observable/*`下的屬性都是實例屬性
-
merge
先到先得merge用在同步數據的情況下和concat表現只,不建議使用
const Observable = require("rxjs").Observable; require("rxjs/add/operator/merge"); require("rxjs/add/operator/map"); require("rxjs/add/observable/timer");const source$1 = Observable.timer(0, 1000).map(x => x + "A"); const source$2 = Observable.timer(500, 1000).map(x => x + "B"); const source$3 = Observable.timer(1000, 1000).map(x => x + "C");// 此時 source$1與source$2永遠不會停止,所以 source$1.merge(source$2, source$3, /*此參數限制了合并的Observable的個數*/ 2).subscribe(console.log, null, () => console.log("Complete"));// 0A // 0B // 1A // 1B // 2A // 2B // 3A // 3B // 4A // 4B // ^C