3章 RxJava操作符

本篇文章已授權微信公眾號 YYGeeker 獨家發布轉載請標明出處

CSDN學院課程地址

  • RxJava2從入門到精通-初級篇:edu.csdn.net/course/deta…
  • RxJava2從入門到精通-中級篇:edu.csdn.net/course/deta…
  • RxJava2從入門到精通-進階篇:edu.csdn.net/course/deta…
  • RxJava2從入門到精通-源碼分析篇:edu.csdn.net/course/deta…

3. RxJava操作符

RxJava操作符也是其精髓之一,可以通過一個簡單的操作符,實現復雜的業務邏輯,甚至還可以將操作符組合起來(即RxJava的組合過程),完成更為復雜的業務需求。比如我們前面用到的.create().subscribeOn().observeOn().subscribe()都是RxJava的操作符之一,下面我們將對RxJava的操作符進行分析

掌握RxJava操作符前,首先要學會看得懂RxJava的圖片,圖片是RxJava主導的精髓,下面我們通過例子說明

這張圖片我們先要分清楚概念上的東西,上下兩行橫向的直線區域代表著事件流,上面一行(上游)是我們的被觀察者Observable,下面一行(下游)是我們的觀察者Observer,事件流就是從上游的被觀察者發送給下游的觀察者的。而中間一行的flatMap區域則是我們的操作符部分,它可以對我們的數據進行變換操作。最后,數據流則是圖片上的圓形、方形、菱形等區域,也是從上游流向下游的,不同的形狀代表著不同的數據類型

這張圖片并不是表示沒有被觀察者Observable,而是Create方法本身就是創建了被觀察者,所以可以將被觀察者的上游省略。在進行事件的onNext()分發后,執行onComplete()事件,這樣就表示事件流已經結束,后續如果上游繼續發事件,則下游表示不接收。當事件流的onCompleted()或者onError()正好被調用過一次后,此后就不能再調用觀察者的任何其它回調方法

在理解RxJava操作符之前,需要將這幾個概念弄明白,整個操作符的章節都是圍繞這幾個概念進行的

  • 事件流:通過發射器發射的事件,從發射事件到結束事件的過程,這一過程稱為事件流
  • 數據流:通過發射器發射的數據,從數據輸入到數據輸出的過程,這一過程稱為數據流
  • 被觀察者:事件流的上游,即Observable,事件流開始的地方和數據流發射的地方
  • 觀察者:事件流的下游,即Observer,事件流結束的地方和數據流接收的地方

3.1 Creating Observables (創建操作符)

1、create

Observable最原始的創建方式,創建出一個最簡單的事件流,可以使用發射器發射特定的數據類型

public static void main(String[] args) {Observable.create(new ObservableOnSubscribe<Integer>() {@Overridepublic void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {for (int i = 1; i < 5; i++) {e.onNext(i);}e.onComplete();}}).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {System.out.println("onNext=" + integer);}}, new Consumer<Throwable>() {@Overridepublic void accept(Throwable throwable) throws Exception {}}, new Action() {@Overridepublic void run() throws Exception {System.out.println("onComplete");}});
}
復制代碼

輸出

onNext=1
onNext=2
onNext=3
onNext=4
onComplete
復制代碼

2、from

創建一個事件流并發出特定類型的數據流,其發射的數據流類型有如下幾個操作符

public static void main(String[] args) {Observable.fromArray(new Integer[]{1, 2, 3, 4, 5}).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {System.out.println("onNext=" + integer);}});
}
復制代碼

輸出

onNext=1
onNext=2
onNext=3
onNext=4
onNext=5
復制代碼

3、just

just操作符和from操作符很像,只是方法的參數有所差別,它可以接受多個參數

public static void main(String[] args) {Observable.just(1, 2, 3, 4, 5).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {System.out.println("onNext=" + integer);}});
}
復制代碼

輸出

onNext=1
onNext=2
onNext=3
onNext=4
onNext=5
復制代碼

4、defer

defer與just的區別是,just是直接將發射當前的數據流,而defer會等到訂閱的時候,才會去執行它的call()回調,再去發射當前的數據流。復雜點的理解就是:defer操作符是將一組數據流在原有的事件流基礎上緩存一個新的事件流,直到有人訂閱的時候,才會創建它緩存的事件流

public static void main(String[] args) {i = 10;Observable<Integer> just = Observable.just(i, i);Observable<Object> defer = Observable.defer(new Callable<ObservableSource<?>>() {@Overridepublic ObservableSource<?> call() throws Exception {//緩存新的事件流return Observable.just(i, i);}});i = 15;just.subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {System.out.println("onNext=" + integer);}});defer.subscribe(new Consumer<Object>() {@Overridepublic void accept(Object o) throws Exception {System.out.println("onNext=" + (int) o);}});i = 20;defer.subscribe(new Consumer<Object>() {@Overridepublic void accept(Object o) throws Exception {System.out.println("onNext=" + (int) o);}});
}
復制代碼

輸出

onNext=10
onNext=10
onNext=15
onNext=15
onNext=20
onNext=20
復制代碼

5、interval

interval操作符是按固定的時間間隔發射一個無限遞增的整數數據流,由于這段代碼的的延時操作都是非阻塞型的,所以在Java上運行會導致JVM的立馬停止,只能把這段代碼放在Android來運行,interval默認在computation調度器上執行

public void interval() {Observable.interval(1, TimeUnit.SECONDS).subscribe(new Consumer<Long>() {@Overridepublic void accept(Long aLong) throws Exception {System.out.println("onNext=" + aLong);}});
}
復制代碼

輸出

onNext=1
onNext=2
onNext=3
onNext=4
......
復制代碼

6、range

range操作符發射一個范圍內的有序整數數據流,你可以指定范圍的起始和長度

public static void main(String[] args) {Observable.range(1, 5).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {System.out.println("onNext=" + integer);}});
}
復制代碼

輸出

onNext=1
onNext=2
onNext=3
onNext=4
onNext=5
復制代碼

7、repeat

repeat操作符可以重復發送指定次數的某個事件流,repeat操作符默認在trampoline調度器上執行

public static void main(String[] args) {Observable.just(1).repeat(5).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {System.out.println("onNext=" + integer);}});
}
復制代碼

輸出

onNext=1
onNext=1
onNext=1
onNext=1
onNext=1
復制代碼

8、timer

timer操作符可以創建一個延時的事件流,由于這段代碼的的延時操作都是非阻塞型的,所以在Java上運行會導致JVM的立馬停止,只能把這段代碼放在Android來運行,默認在computation調度器上執行

public void timer() {Observable.timer(5, TimeUnit.SECONDS).subscribe(new Consumer<Long>() {@Overridepublic void accept(Long aLong) throws Exception {System.out.println("onNext=" + aLong);}});
}
復制代碼

輸出

onNext=0
復制代碼

9、小結

  1. create():創建最簡單的事件流
  2. from():創建事件流,可發送不同類型的數據流
  3. just():創建事件流,可發送多個參數的數據流
  4. defer():創建事件流,可緩存可激活事件流
  5. interval():創建延時重復的事件流
  6. range():創建事件流,可發送范圍內的數據流
  7. repeat():創建可重復次數的事件流
  8. timer():創建一次延時的事件流

補充:interval()、timer()、delay()的區別

  1. interval():用于創建事件流,周期性重復發送
  2. timer():用于創建事件流,延時發送一次
  3. delay():用于事件流中,可以延時某次事件流的發送

3.2 Transforming Observables (轉換操作符)

1、map

map操作符可以將數據流進行類型轉換

public static void main(String[] args) {Observable.just(1).map(new Function<Integer, String>() {@Overridepublic String apply(Integer integer) throws Exception {return "發送過來的數據會被變成字符串" + integer;}}).subscribe(new Consumer<String>() {@Overridepublic void accept(String s) throws Exception {System.out.println("onNext=" + s);}});
}
復制代碼

輸出

onNext=發送過來的數據會被變成字符串1
復制代碼

2、flatMap

flatMap操作符將數據流進行類型轉換,然后將新的數據流傳遞給新的事件流進行分發,這里通過模擬請求登錄的延時操作進行說明,由于這段代碼的的延時操作都是非阻塞型的,所以在Java上運行會導致JVM的立馬停止,只能把這段代碼放在Android來運行

public void flatMap() {Observable.just(new UserParams("hensen", "123456")).flatMap(new Function<UserParams, ObservableSource<String>>() {@Overridepublic ObservableSource<String> apply(UserParams userParams) throws Exception {return Observable.just(userParams.username + "登錄成功").delay(2, TimeUnit.SECONDS);}}).subscribe(new Consumer<String>() {@Overridepublic void accept(String s) throws Exception {System.out.println(s);}});
}public static class UserParams {public UserParams(String username, String password) {this.username = username;this.password = password;}public String username;public String password;
}
復制代碼

輸出

hensen登錄成功
復制代碼

補充:

  • concatMap與flatMap功能一樣,唯一的區別就是concatMap是有序的,flatMap是亂序的

3、groupBy

groupBy操作符可以將發射出來的數據項進行分組,并將分組后的數據項保存在具有key-value映射的事件流中。groupBy具體的分組規則由groupBy操作符傳遞進來的函數參數Function所決定的,它可以將key和value按照Function的返回值進行分組,返回一個具有分組規則的事件流GroupedObservable,注意這里分組出來的事件流是按照原始事件流的順序輸出的,我們可以通過sorted()對數據項進行排序,然后輸出有序的數據流。

public static void main(String[] args) {Observable.just("java", "c++", "c", "c#", "javaScript", "Android").groupBy(new Function<String, Character>() {@Overridepublic Character apply(String s) throws Exception {return s.charAt(0);//按首字母分組}}).subscribe(new Consumer<GroupedObservable<Character, String>>() {@Overridepublic void accept(final GroupedObservable<Character, String> characterStringGroupedObservable) throws Exception {//排序后,直接訂閱輸出key和valuecharacterStringGroupedObservable.sorted().subscribe(new Consumer<String>() {@Overridepublic void accept(String s) throws Exception {System.out.println("onNext= key:" + characterStringGroupedObservable.getKey() + " value:" + s);}});}});
}
復制代碼

輸出

onNext= key:A value:Android
onNext= key:c value:c
onNext= key:c value:c#
onNext= key:c value:c++
onNext= key:j value:java
onNext= key:j value:javaScript
復制代碼

4、scan

scan操作符會對發射的數據和上一輪發射的數據進行函數處理,并返回的數據供下一輪使用,持續這個過程來產生剩余的數據流。其應用場景有簡單的累加計算,判斷所有數據的最小值等

public static void main(String[] args) {Observable.just(8, 2, 13, 1, 15).scan(new BiFunction<Integer, Integer, Integer>() {@Overridepublic Integer apply(Integer integer, Integer integer2) throws Exception {return integer < integer2 ? integer : integer2;}}).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer item) throws Exception {System.out.println("onNext=" + item);}});
}
復制代碼

輸出

onNext=8
onNext=2
onNext=2
onNext=1
onNext=1
復制代碼

5、buffer

buffer操作符可以將發射出來的數據流,在給定的緩存池中進行緩存,當緩存池中的數據項溢滿時,則將緩存池的數據項進行輸出,重復上述過程,直到將發射出來的數據全部發射出去。如果發射出來的數據不夠緩存池的大小,則按照當前發射出來的數量進行輸出。如果對buffer操作符設置了skip參數,則buffer每次緩存池溢滿時,會跳過指定的skip數據項,然后再進行緩存和輸出。

Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9).buffer(5).subscribe(new Consumer<List<Integer>>() {@Overridepublic void accept(List<Integer> integers) throws Exception {System.out.println("onNext=" + integers.toString());}
});
復制代碼

輸出

onNext=[1, 2, 3, 4, 5]
onNext=[6, 7, 8, 9]
復制代碼

6、window

window操作符和buffer操作符在功能上實現的效果是一樣的,但window操作符最大區別在于同樣是緩存一定數量的數據項,window操作符最終發射出來的是新的事件流integerObservable,而buffer操作符發射出來的是新的數據流,也就是說,window操作符發射出來新的事件流中的數據項,還可以經過Rxjava其他操作符進行處理。

public static void main(String[] args) {Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9).window(2, 1).subscribe(new Consumer<Observable<Integer>>() {@Overridepublic void accept(Observable<Integer> integerObservable) throws Exception {integerObservable.subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {System.out.println("onNext=" + integer);}});}});
}
復制代碼

輸出

onNext=1
onNext=2
onNext=2
onNext=3
onNext=3
onNext=4
onNext=4
onNext=5
onNext=5
onNext=6
onNext=6
onNext=7
onNext=7
onNext=8
onNext=8
onNext=9
onNext=9
復制代碼

7、小結

  1. map():對數據流的類型進行轉換
  2. flatMap():對數據流的類型進行包裝成另一個數據流
  3. groupby():對所有的數據流進行分組
  4. scan():對上一輪處理過后的數據流進行函數處理
  5. buffer():緩存發射的數據流到一定數量,隨后發射出數據流集合
  6. window():緩存發射的數據流到一定數量,隨后發射出新的事件流

3.3 Filtering Observables (過濾操作符)

1、debounce

debounce操作符會去過濾掉發射速率過快的數據項,下面的例子onNext事件可以想象成按鈕的點擊事件,如果在2秒種內頻繁的點擊,則其點擊事件會被忽略,當i為3的除數的時候,發射的事件的時間會超過規定忽略事件的時間,那么則允許觸發點擊事件。這就有點像我們頻繁點擊按鈕,但始終只會觸發一次點擊事件,這樣就不會導致重復去響應點擊事件

public static void main(String[] args) {Observable.create(new ObservableOnSubscribe<Integer>() {@Overridepublic void subscribe(ObservableEmitter<Integer> emitter) throws Exception {for (int i = 0; i < 100; i++) {if (i % 3 == 0) {Thread.sleep(3000);} else {Thread.sleep(1000);}emitter.onNext(i);}}}).debounce(2, TimeUnit.SECONDS).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {System.out.println("onNext=" + integer);}});
}
復制代碼

輸出

onNext=2
onNext=5
onNext=8
onNext=11
onNext=14
......
復制代碼

2、distinct

distinct操作符會過濾重復發送的數據項

public static void main(String[] args) {Observable.just(1, 2, 3, 4, 1, 2, 3).distinct().subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {System.out.println("onNext=" + integer);}});
}
復制代碼

輸出

onNext=1
onNext=2
onNext=3
onNext=4
復制代碼

3、elementAt

elementAt操作符只取指定的角標的事件

public static void main(String[] args) {Observable.just(1, 2, 3, 4, 1, 2, 3).elementAt(0).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {System.out.println("onNext=" + integer);}});
}
復制代碼

輸出

onNext=1
復制代碼

4、filter

filter操作符可以過濾指定函數的數據項

public static void main(String[] args) {Observable.just(1, 2, 3, 4, 1, 2, 3).filter(new Predicate<Integer>() {@Overridepublic boolean test(Integer integer) throws Exception {return integer > 2;}}).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {System.out.println("onNext=" + integer);}});
}
復制代碼

輸出

onNext=3
onNext=4
onNext=3
復制代碼

5、first

first操作符只發射第一項數據項

public static void main(String[] args) {Observable.just(1, 2, 3, 4, 1, 2, 3).first(7).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {System.out.println("onNext=" + integer);}});
}
復制代碼

輸出

onNext=1
復制代碼

6、ignoreElements

ignoreElements操作符不發射任何數據,只發射事件流的終止通知

public static void main(String[] args) {Observable.just(1, 2, 3, 4, 1, 2, 3).ignoreElements().subscribe(new Action() {@Overridepublic void run() throws Exception {System.out.println("onComplete");}});
}
復制代碼

輸出

onComplete
復制代碼

7、last

last操作符只發射最后一項數據

public static void main(String[] args) {Observable.just(1, 2, 3, 4, 1, 2, 3).last(7).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {System.out.println("onNext=" + integer);}});
}
復制代碼

輸出

onNext=3
復制代碼

8、sample

sample操作符會在指定的事件內從數據項中采集所需要的數據,由于這段代碼的的延時操作都是非阻塞型的,所以在Java上運行會導致JVM的立馬停止,只能把這段代碼放在Android來運行

public void sample() {Observable.interval(1, TimeUnit.SECONDS).sample(2, TimeUnit.SECONDS).subscribe(new Consumer<Long>() {@Overridepublic void accept(Long aLong) throws Exception {System.out.println("onNext=" + aLong);}});
}
復制代碼

輸出

onNext=2
onNext=4
onNext=6
onNext=8
復制代碼

9、skip

skip操作符可以忽略事件流發射的前N項數據項,只保留之后的數據

public static void main(String[] args) {Observable.just(1, 2, 3, 4, 5, 6, 7, 8).skip(3).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer i) throws Exception {System.out.println("onNext=" + i);}});
}
復制代碼

輸出

onNext=4
onNext=5
onNext=6
onNext=7
onNext=8
復制代碼

10、skipLast

skipLast操作符可以抑制事件流發射的后N項數據

public static void main(String[] args) {Observable.just(1, 2, 3, 4, 5, 6, 7, 8).skipLast(3).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer i) throws Exception {System.out.println("onNext=" + i);}});
}
復制代碼

輸出

onNext=1
onNext=2
onNext=3
onNext=4
onNext=5
復制代碼

11、take

take操作符可以在事件流中只發射前面的N項數據

public static void main(String[] args) {Observable.just(1, 2, 3, 4, 5, 6, 7, 8).take(3).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer i) throws Exception {System.out.println("onNext=" + i);}});
}
復制代碼

輸出

onNext=1
onNext=2
onNext=3
復制代碼

12、takeLast

takeLast操作符事件流只發射數據流的后N項數據項,忽略前面的數據項

public static void main(String[] args) {Observable.just(1, 2, 3, 4, 5, 6, 7, 8).takeLast(3).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer i) throws Exception {System.out.println("onNext=" + i);}});
}
復制代碼

輸出

onNext=6
onNext=7
onNext=8
復制代碼

還有一個操作符叫takeLastBuffer,它和takeLast類似,,唯一的不同是它把所有的數據項收集到一個List再發射,而不是依次發射一個

13、小結

  1. debounce():事件流只發射規定范圍時間內的數據項
  2. distinct():事件流只發射不重復的數據項
  3. elementAt():事件流只發射第N個數據項
  4. filter():事件流只發射符合規定函數的數據項
  5. first():事件流只發射第一個數據項
  6. ignoreElements():忽略事件流的發射,只發射事件流的終止事件
  7. last():事件流只發射最后一項數據項
  8. sample():事件流對指定的時間間隔進行數據項的采樣
  9. skip():事件流忽略前N個數據項
  10. skipLast():事件流忽略后N個數據項
  11. take():事件流只發射前N個數據項
  12. takeLast():事件流只發射后N個數據項

3.4 Combining Observables (組合操作符)

1、merge/concat

merge操作符可以合并兩個事件流,如果在merge操作符上增加延時發送的操作,那么就會導致其發射的數據項是無序的,會跟著發射的時間點進行合并。雖然是將兩個事件流合并成一個事件流進行發射,但在最終的一個事件流中,發射出來的卻是兩次數據流。由于concat操作符和merge操作符的效果是一樣的,這里只舉一例

merge和concat的區別

  • merge():合并后發射的數據項是無序的
  • concat():合并后發射的數據項是有序的
public static void main(String[] args) {Observable<String> just1 = Observable.just("A", "B", "C", "D", "E");Observable<String> just2 = Observable.just("1", "2", "3", "4", "5");Observable.merge(just1, just2).subscribe(new Consumer<Serializable>() {@Overridepublic void accept(Serializable serializable) throws Exception {System.out.println("onNext=" + serializable.toString());}});
}
復制代碼

輸出

onNext=A
onNext=B
onNext=C
onNext=D
onNext=E
onNext=1
onNext=2
onNext=3
onNext=4
onNext=5
復制代碼

2、zip

zip操作符是將兩個數據流進行指定的函數規則合并

public static void main(String[] args) {Observable<String> just1 = Observable.just("A", "B", "C", "D", "E");Observable<String> just2 = Observable.just("1", "2", "3", "4", "5");Observable.zip(just1, just2, new BiFunction<String, String, String>() {@Overridepublic String apply(String s, String s2) throws Exception {return s + s2;}}).subscribe(new Consumer<String>() {@Overridepublic void accept(String s) throws Exception {System.out.println("onNext=" + s);}});
}
復制代碼

輸出

onNext=A1
onNext=B2
onNext=C3
onNext=D4
onNext=E5
復制代碼

3、startWith

startWith操作符是將另一個數據流合并到原數據流的開頭

public static void main(String[] args) {Observable<String> just1 = Observable.just("A", "B", "C", "D", "E");Observable<String> just2 = Observable.just("1", "2", "3", "4", "5");just1.startWith(just2).subscribe(new Consumer<String>() {@Overridepublic void accept(String s) throws Exception {System.out.println("onNext=" + s);}});
}
復制代碼

輸出

onNext=1
onNext=2
onNext=3
onNext=4
onNext=5
onNext=A
onNext=B
onNext=C
onNext=D
onNext=E
復制代碼

4、join

join操作符是有時間期限的合并操作符,由于這段代碼的的延時操作都是非阻塞型的,所以在Java上運行會導致JVM的立馬停止,只能把這段代碼放在Android來運行

public void join() {Observable<String> just1 = Observable.just("A", "B", "C", "D", "E");Observable<Long> just2 = Observable.interval(1, TimeUnit.SECONDS);just1.join(just2, new Function<String, ObservableSource<Long>>() {@Overridepublic ObservableSource<Long> apply(String s) throws Exception {return Observable.timer(3, TimeUnit.SECONDS);}}, new Function<Long, ObservableSource<Long>>() {@Overridepublic ObservableSource<Long> apply(Long l) throws Exception {return Observable.timer(8, TimeUnit.SECONDS);}}, new BiFunction<String, Long, String>() {@Overridepublic String apply(String s, Long l) throws Exception {return s + l;}}).subscribe(new Consumer<String>() {@Overridepublic void accept(String s) throws Exception {System.out.println("onNext=" + s);}});
}
復制代碼

join操作符有三個函數需要設置

  • 第一個函數:規定just2的過期期限
  • 第二個函數:規定just1的過期期限
  • 第三個函數:規定just1和just2的合并規則

由于just2的期限只有3秒的時間,而just2延時1秒發送一次,所以just2只發射了2次,其輸出的結果就只能和just2輸出的兩次進行合并,其輸出格式有點類似我們的排列組合

onNext=A0
onNext=B0
onNext=C0
onNext=D0
onNext=E0
onNext=A1
onNext=B1
onNext=C1
onNext=D1
onNext=E1
復制代碼

5、combineLatest

conbineLatest操作符會尋找其他事件流最近發射的數據流進行合并,由于這段代碼的的延時操作都是非阻塞型的,所以在Java上運行會導致JVM的立馬停止,只能把這段代碼放在Android來運行

public static String[] str = {"A", "B", "C", "D", "E"};public void combineLatest() {Observable<String> just1 = Observable.interval(1, TimeUnit.SECONDS).map(new Function<Long, String>() {@Overridepublic String apply(Long aLong) throws Exception {return str[(int) (aLong % 5)];}});Observable<Long> just2 = Observable.interval(1, TimeUnit.SECONDS);Observable.combineLatest(just1, just2, new BiFunction<String, Long, String>() {@Overridepublic String apply(String s, Long l) throws Exception {return s + l;}}).subscribe(new Consumer<String>() {@Overridepublic void accept(String s) throws Exception {System.out.println("onNext=" + s);}});
}
復制代碼

輸出

onNext=A0
onNext=B0
onNext=B1
onNext=C1
onNext=C2
onNext=D2
onNext=D3
onNext=E3
onNext=E4
onNext=A4
onNext=A5
復制代碼

6、小結

  1. merge()/concat():無序/有序的合并兩個數據流
  2. zip():兩個數據流的數據項合并成一個數據流一同發出
  3. startWith():將待合并的數據流放在自身前面一同發出
  4. join():將數據流進行排列組合發出,不過數據流都是有時間期限的
  5. combineLatest():合并最近發射出的數據項成數據流一同發出

3.5 Error Handling Operators(錯誤處理操作符)

1、onErrorReturn

onErrorReturn操作符表示當錯誤發生時,它會忽略onError的回調且會發射一個新的數據項并回調onCompleted()

public static void main(String[] args) {Observable.create(new ObservableOnSubscribe<Integer>() {@Overridepublic void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {for (int i = 1; i < 5; i++) {if(i == 4){e.onError(new Exception("onError crash"));}e.onNext(i);}}}).onErrorReturn(new Function<Throwable, Integer>() {@Overridepublic Integer apply(Throwable throwable) throws Exception {return -1;}}).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {System.out.println("onNext=" + integer);}}, new Consumer<Throwable>() {@Overridepublic void accept(Throwable throwable) throws Exception {System.out.println("onError");}}, new Action() {@Overridepublic void run() throws Exception {System.out.println("onComplete");}});
}
復制代碼

輸出

onNext=1
onNext=2
onNext=3
onNext=-1
onComplete
復制代碼

2、onErrorResumeNext

onErrorResumeNext操作符表示當錯誤發生時,它會忽略onError的回調且會發射一個新的事件流并回調onCompleted()

public static void main(String[] args) {Observable.create(new ObservableOnSubscribe<Integer>() {@Overridepublic void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {for (int i = 1; i < 5; i++) {if(i == 4){e.onError(new Exception("onError crash"));}e.onNext(i);}}}).onErrorResumeNext(new Function<Throwable, ObservableSource<? extends Integer>>() {@Overridepublic ObservableSource<? extends Integer> apply(Throwable throwable) throws Exception {return Observable.just(-1);}}).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {System.out.println("onNext=" + integer);}}, new Consumer<Throwable>() {@Overridepublic void accept(Throwable throwable) throws Exception {System.out.println("onError");}}, new Action() {@Overridepublic void run() throws Exception {System.out.println("onComplete");}});
}
復制代碼

輸出

onNext=1
onNext=2
onNext=3
onNext=-1
onComplete
復制代碼

3、onExceptionResumeNext

onExceptionResumeNext操作符表示當錯誤發生時,如果onError收到的Throwable不是一個Exception,它會回調onError方法,且不會回調備用的事件流,如果onError收到的Throwable是一個Exception,它會回調備用的事件流進行數據的發射

public static void main(String[] args) {Observable.create(new ObservableOnSubscribe<Integer>() {@Overridepublic void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {for (int i = 1; i < 5; i++) {if(i == 4){e.onError(new Exception("onException crash"));//e.onError(new Error("onError crash"));}e.onNext(i);}}}).onExceptionResumeNext(new ObservableSource<Integer>() {@Overridepublic void subscribe(Observer<? super Integer> observer) {//備用事件流observer.onNext(8);}}).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {System.out.println("onNext=" + integer);}}, new Consumer<Throwable>() {@Overridepublic void accept(Throwable throwable) throws Exception {System.out.println("onError");}}, new Action() {@Overridepublic void run() throws Exception {System.out.println("onComplete");}});
}
復制代碼

輸出

onNext=1
onNext=2
onNext=3
onNext=8
復制代碼

4、retry

retry操作符表示當錯誤發生時,發射器會重新發射

public static void main(String[] args) {Observable.create(new ObservableOnSubscribe<Integer>() {@Overridepublic void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {for (int i = 1; i < 5; i++) {if (i == 4) {e.onError(new Exception("onError crash"));}e.onNext(i);}}}).retry(1).onErrorReturn(new Function<Throwable, Integer>() {@Overridepublic Integer apply(Throwable throwable) throws Exception {return -1;}}).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {System.out.println("onNext=" + integer);}}, new Consumer<Throwable>() {@Overridepublic void accept(Throwable throwable) throws Exception {System.out.println("onError");}}, new Action() {@Overridepublic void run() throws Exception {System.out.println("onComplete");}});
}
復制代碼

輸出

onNext=1
onNext=2
onNext=3
onNext=1
onNext=2
onNext=3
onNext=-1
onComplete
復制代碼
  • retry():表示重試無限次
  • retry(long times):表示重試指定次數
  • retry(Func predicate):可以根據函數參數中的Throwable類型和重試次數決定本次需不需要重試

5、retryWhen

retryWhen操作符和retry操作符相似,區別在于retryWhen將錯誤Throwable傳遞給了函數進行處理并產生新的事件流進行處理,由于這段代碼的的延時操作都是非阻塞型的,所以在Java上運行會導致JVM的立馬停止,只能把這段代碼放在Android來運行

private static int retryCount = 0;
private static int maxRetries = 2;public void retryWhen(){Observable.create(new ObservableOnSubscribe<Integer>() {@Overridepublic void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {for (int i = 1; i < 5; i++) {if (i == 4) {e.onError(new Exception("onError crash"));}e.onNext(i);}}}).retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {@Overridepublic ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {@Overridepublic ObservableSource<?> apply(Throwable throwable) throws Exception {if (++retryCount <= maxRetries) {// When this Observable calls onNext, the original Observable will be retried (i.e. re-subscribed).System.out.println("get error, it will try after " + 1 + " seconds, retry count " + retryCount);return Observable.timer(1, TimeUnit.SECONDS);}return Observable.error(throwable);}});}}).onErrorReturn(new Function<Throwable, Integer>() {@Overridepublic Integer apply(Throwable throwable) throws Exception {return -1;}}).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {System.out.println("onNext=" + integer);}}, new Consumer<Throwable>() {@Overridepublic void accept(Throwable throwable) throws Exception {System.out.println("onError");}}, new Action() {@Overridepublic void run() throws Exception {System.out.println("onComplete");}});
}
復制代碼

輸出

onNext=1
onNext=2
onNext=3
get error, it will try after 1 seconds, retry count 1
onNext=1
onNext=2
onNext=3
get error, it will try after 1 seconds, retry count 2
onNext=1
onNext=2
onNext=3
onNext=-1
onComplete
復制代碼

6、小結

  • onErrorReturn():當錯誤發生時,它會忽略onError的回調且會發射一個新的數據項并回調onCompleted()
  • onErrorResumeNext():當錯誤發生時,它會忽略onError的回調且會發射一個新的事件流并回調onCompleted()
  • onExceptionResumeNext():當錯誤發生時,如果onError收到的Throwable不是一個Exception,它會回調onError方法,且不會回調備用的事件流,如果onError收到的Throwable是一個Exception,它會回調備用的事件流進行數據的發射
  • retry():當錯誤發生時,發射器會重新發射
  • retryWhen():當錯誤發生時,根據Tharowble類型決定發射器是否重新發射

3.6 Observable Utility Operators(輔助性操作符)

1、delay

delay操作符可以延時某次事件發送的數據流,由于這段代碼的的延時操作都是非阻塞型的,所以在Java上運行會導致JVM的立馬停止,只能把這段代碼放在Android來運行

public void deley() {Observable.just(1, 2, 3, 4, 5).delay(2, TimeUnit.SECONDS).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {System.out.println("onNext=" + integer);}});
}
復制代碼

輸出

onNext=1
onNext=2
onNext=3
onNext=4
onNext=5
復制代碼

delay和delaySubscription的效果是一樣的,只不過delay是對數據流的延時,而delaySubscription是對事件流的延時

2、do

do操作符可以監聽整個事件流的生命周期,do操作符分為多個類型,而且每個類型的作用都不同

  1. doOnNext():接收每次發送的數據項
  2. doOnEach():接收每次發送的數據項
  3. doOnSubscribe():當事件流被訂閱時被調用
  4. doOnDispose():當事件流被釋放時被調用
  5. doOnComplete():當事件流被正常終止時被調用
  6. doOnError():當事件流被異常終止時被調用
  7. doOnTerminate():當事件流被終止之前被調用,無論正常終止還是異常終止都會調用
  8. doFinally():當事件流被終止之后被調用,無論正常終止還是異常終止都會調用
public static void main(String[] args) {Observable.just(1, 2, 3).doOnNext(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {System.out.println("doOnNext");}}).doOnEach(new Consumer<Notification<Integer>>() {@Overridepublic void accept(Notification<Integer> integerNotification) throws Exception {System.out.println("doOnEach");}}).doOnSubscribe(new Consumer<Disposable>() {@Overridepublic void accept(Disposable disposable) throws Exception {System.out.println("doOnSubscribe");}}).doOnDispose(new Action() {@Overridepublic void run() throws Exception {System.out.println("doOnDispose");}}).doOnTerminate(new Action() {@Overridepublic void run() throws Exception {System.out.println("doOnTerminate");}}).doOnError(new Consumer<Throwable>() {@Overridepublic void accept(Throwable throwable) throws Exception {System.out.println("doOnError");}}).doOnComplete(new Action() {@Overridepublic void run() throws Exception {System.out.println("doOnComplete");}}).doFinally(new Action() {@Overridepublic void run() throws Exception {System.out.println("doFinally");}}).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {System.out.println("onNext=" + integer);}});
}
復制代碼

輸出

doOnSubscribe
doOnNext
doOnEach
onNext=1
doOnNext
doOnEach
onNext=2
doOnNext
doOnEach
onNext=3
doOnEach
doOnTerminate
doOnComplete
doFinally
復制代碼

3、materialize/dematerialize

materialize操作符將發射出的數據項轉換成為一個Notification對象,而dematerialize操作符則是跟materialize操作符相反,這兩個操作符有點類似我們Java對象的裝箱和拆箱功能

public static void main(String[] args) {Observable.just(1, 2, 3, 4, 5).materialize().subscribe(new Consumer<Notification<Integer>>() {@Overridepublic void accept(Notification<Integer> integerNotification) throws Exception {System.out.println("onNext=" + integerNotification.getValue());}});Observable.just(1, 2, 3, 4, 5).materialize().dematerialize().subscribe(new Consumer<Object>() {@Overridepublic void accept(Object object) throws Exception {System.out.println("onNext=" + object.toString());}});
}
復制代碼

輸出

onNext=1
onNext=2
onNext=3
onNext=4
onNext=5
onNext=null
onNext=1
onNext=2
onNext=3
onNext=4
onNext=5
復制代碼

輸出的時候,materialize會輸出多個null,是因為null的事件為onCompleted事件,而dematerialize把onCompleted事件給去掉了,這個原因也可以從圖片中看出來

4、serialize

serialize操作符可以將異步執行的事件流進行同步操作,直到事件流結束

public static void main(String[] args) {Observable.just(1, 2, 3, 4, 5).serialize().subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {System.out.println("onNext=" + integer);}});
}
復制代碼

輸出

onNext=1
onNext=2
onNext=3
onNext=4
onNext=5
復制代碼

5、timeInterval

timeInterval操作符可以將發射的數據項轉換為帶有時間間隔的數據項,由于這段代碼的的延時操作都是非阻塞型的,所以在Java上運行會導致JVM的立馬停止,只能把這段代碼放在Android來運行

public void timeInterval(){Observable.interval(2, TimeUnit.SECONDS).timeInterval(TimeUnit.SECONDS).subscribe(new Consumer<Timed<Long>>() {@Overridepublic void accept(Timed<Long> longTimed) throws Exception {System.out.println("onNext=" + longTimed.value() + " timeInterval=" + longTimed.time());}});
}
復制代碼

輸出

onNext=0 timeInterval=2
onNext=1 timeInterval=2
onNext=2 timeInterval=2
onNext=3 timeInterval=2
onNext=4 timeInterval=2
復制代碼

6、timeout

timeout操作符表示當發射的數據項超過了規定的限制時間,則發射onError事件,這里直接讓程序超過規定的限制時間,由于這段代碼的的延時操作都是非阻塞型的,所以在Java上運行會導致JVM的立馬停止,只能把這段代碼放在Android來運行

public void timeOut(){Observable.interval(2, TimeUnit.SECONDS).timeout(1, TimeUnit.SECONDS).subscribe(new Consumer<Long>() {@Overridepublic void accept(Long aLong) throws Exception {System.out.println("onNext=" + aLong);}}, new Consumer<Throwable>() {@Overridepublic void accept(Throwable throwable) throws Exception {System.out.println("onError");}});
}
復制代碼

輸出

onError
復制代碼

7、timestamp

timestamp操作符會給每個發射的數據項帶上時間戳,由于這段代碼的的延時操作都是非阻塞型的,所以在Java上運行會導致JVM的立馬停止,只能把這段代碼放在Android來運行

public void timeStamp() {Observable.interval(2, TimeUnit.SECONDS).timestamp(TimeUnit.MILLISECONDS).subscribe(new Consumer<Timed<Long>>() {@Overridepublic void accept(Timed<Long> longTimed) throws Exception {System.out.println("onNext=" + longTimed.value() + " timeInterval=" + longTimed.time());}});
}
復制代碼

輸出

onNext=0 timeInterval=1525755132132
onNext=1 timeInterval=1525755134168
onNext=2 timeInterval=1525755136132
onNext=3 timeInterval=1525755138132
復制代碼

8、using

using操作符可以讓你的事件流存在一次性的數據項,即用完就將資源釋放掉

using操作符接受三個參數:

  • 一個用戶創建一次性資源的工廠函數
  • 一個用于創建一次性事件的工廠函數
  • 一個用于釋放資源的函數
public static class UserBean {String name;int age;public UserBean(String name, int age) {this.name = name;this.age = age;}
}public static void main(String[] args) {Observable.using(new Callable<UserBean>() {@Overridepublic UserBean call() throws Exception {//從網絡中獲取某個對象return new UserBean("俊俊俊", 22);}}, new Function<UserBean, ObservableSource<?>>() {@Overridepublic ObservableSource<?> apply(UserBean userBean) throws Exception {//拿出你想要的資源return Observable.just(userBean.name);}}, new Consumer<UserBean>() {@Overridepublic void accept(UserBean userBean) throws Exception {//釋放對象userBean = null;}}).subscribe(new Consumer<Object>() {@Overridepublic void accept(Object o) throws Exception {System.out.println("onNext=" + o.toString());}});
}
復制代碼

輸出

onNext=俊俊俊
復制代碼

9、to

to操作符可以將數據流中的數據項進行集合的轉換,to操作符分為多個類型,而且每個類型的作用都不同

  1. toList():轉換成List類型的集合
  2. toMap():轉換成Map類型的集合
  3. toMultimap():轉換成一對多(即<A類型,List<B類型>>)的Map類型的集合
  4. toSortedList():轉換成具有排序的List類型的集合
public static void main(String[] args) {Observable.just(1, 2, 3, 4, 5).toList().subscribe(new Consumer<List<Integer>>() {@Overridepublic void accept(List<Integer> integers) throws Exception {System.out.println("onNext=" + integers.toString());}});
}
復制代碼

輸出

onNext=[1, 2, 3, 4, 5]
復制代碼

10、小結

  1. delay():延遲事件發射的數據項
  2. do():監聽事件流的生命周期
  3. materialize()/dematerialize():對事件流進行裝箱/拆箱
  4. serialize():同步事件流的發射
  5. timeInterval():對事件流增加時間間隔
  6. timeout():對事件流增加限定時間
  7. timestamp():對事件流增加時間戳
  8. using():對事件流增加一次性的資源
  9. to():對數據流中的數據項進行集合的轉換

3.7 Conditional and Boolean Operators(條件和布爾操作符)

1、all

all操作符表示對所有數據項進行校驗,如果所有都通過則返回true,否則返回false

public static void main(String[] args) {Observable.just(1, 2, 3, 4, 5).all(new Predicate<Integer>() {@Overridepublic boolean test(Integer integer) throws Exception {return integer > 0;}}).subscribe(new Consumer<Boolean>() {@Overridepublic void accept(Boolean aBoolean) throws Exception {System.out.println("onNext=" + aBoolean);}});
}
復制代碼

輸出

onNext=true
復制代碼

2、contains

contains操作符表示事件流中發射的數據項當中是否包含有指定的數據項

public static void main(String[] args) {Observable.just(1, 2, 3, 4, 5).contains(2).subscribe(new Consumer<Boolean>() {@Overridepublic void accept(Boolean aBoolean) throws Exception {System.out.println("onNext=" + aBoolean);}});
}
復制代碼

輸出

onNext=true
復制代碼

3、amb

amb操作符在多個事件流中只發射最先發出數據的事件流,由于這段代碼的的延時操作都是非阻塞型的,所以在Java上運行會導致JVM的立馬停止,只能把這段代碼放在Android來運行

public void amb(){List<Observable<Integer>> list = new ArrayList<>();list.add(Observable.just(1, 2, 3).delay(3, TimeUnit.SECONDS));list.add(Observable.just(4, 5, 6).delay(2, TimeUnit.SECONDS));list.add(Observable.just(7, 8, 9).delay(1, TimeUnit.SECONDS));Observable.amb(list).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {System.out.println("onNext=" + integer);}});
}
復制代碼

輸出

onNext=7
onNext=8
onNext=9
復制代碼

4、defaultIfEmpty

defaultIfEmpty操作符會在事件流沒有發射任何數據時,發射一個指定的默認值

public static void main(String[] args) {Observable.empty().defaultIfEmpty(-1).subscribe(new Consumer<Object>() {@Overridepublic void accept(Object o) throws Exception {System.out.println("onNext=" + o.toString());}});
}
復制代碼

輸出

onNext=-1
復制代碼

5、sequenceEqual

sequenceEqual操作符可以判斷兩個數據流是否完全相等

public static void main(String[] args) {Observable<Integer> just1 = Observable.just(1, 2, 3);Observable<Integer> just2 = Observable.just(1, 2, 3);Observable.sequenceEqual(just1, just2).subscribe(new Consumer<Boolean>() {@Overridepublic void accept(Boolean aBoolean) throws Exception {System.out.println("onNext=" + aBoolean);}});
}
復制代碼

輸出

onNext=true
復制代碼

6、skipUntil/skipWhile

skipUtils操作符是在兩個事件流發射的時候,第一個事件流會等到第二個事件流開始發射的時候,第一個事件流才開始發射出數據項,它會忽略之前發射過的數據項,由于這段代碼的的延時操作都是非阻塞型的,所以在Java上運行會導致JVM的立馬停止,只能把這段代碼放在Android來運行

public void skipUntil(){Observable<Long> just1 = Observable.interval(1, TimeUnit.SECONDS);Observable<Integer> just2 = Observable.just(8).delay(3, TimeUnit.SECONDS);just1.skipUntil(just2).subscribe(new Consumer<Long>() {@Overridepublic void accept(Long aLong) throws Exception {System.out.println("onNext=" + aLong);}});
}
復制代碼

輸出

onNext=2
onNext=3
onNext=4
onNext=5
......
復制代碼

skipWhile操作符是在一個事件流中,從第一項數據項開始判斷是否符合某個特定條件,如果判斷值返回true,則不發射該數據項,繼續從下一個數據項執行同樣的判斷,直到某個數據項的判斷值返回false時,則終止判斷,發射剩余的所有數據項。需要注意的是,這里只要一次判斷為false則后面的所有數據項都不判斷

public static void main(String[] args) {Observable.just(1, 2, 3, 4, 5).skipWhile(new Predicate<Integer>() {@Overridepublic boolean test(Integer integer) throws Exception {return integer < 3;}}).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {System.out.println("onNext=" + integer);}});
}
復制代碼

輸出

onNext=3
onNext=4
onNext=5
復制代碼

7、takeUntil/takeWhile

takeUntil操作符跟skipUntil類似,skip表示跳過的意思,而take表示取值的意思,takeUntil操作符是在兩個事件流發射的時候,第一個事件流會等到第二個事件流開始發射的時候,第一個事件流停止發射數據項,它會忽略之后的數據項,由于這段代碼的的延時操作都是非阻塞型的,所以在Java上運行會導致JVM的立馬停止,只能把這段代碼放在Android來運行

public void takeUntil(){Observable<Long> just1 = Observable.interval(1, TimeUnit.SECONDS);Observable<Integer> just2 = Observable.just(8).delay(3, TimeUnit.SECONDS);just1.takeUntil(just2).subscribe(new Consumer<Long>() {@Overridepublic void accept(Long aLong) throws Exception {System.out.println("onNext=" + aLong);}});
}
復制代碼

輸出

onNext=0
onNext=1
復制代碼

takeWhile操作符是在一個事件流中,從第一項數據項開始判斷是否符合某個特定條件,如果判斷值返回true,則發射該數據項,繼續從下一個數據項執行同樣的判斷,直到某個數據項的判斷值返回false時,則終止判斷,且剩余的所有數據項不會發射。需要注意的是,這里只要一次判斷為false則后面的所有數據項都不判斷

public static void main(String[] args) {Observable.just(1, 2, 3, 4, 0).takeWhile(new Predicate<Integer>() {@Overridepublic boolean test(Integer integer) throws Exception {return integer < 3;}}).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {System.out.println("onNext=" + integer);}});
}
復制代碼

輸出

onNext=1
onNext=2
復制代碼

8、小結

  1. all():對所有數據項進行校驗
  2. contains():所有數據項是否包含指定數據項
  3. amb():多個事件流中,只發射最先發出的事件流
  4. defaultIfEmpty():如果數據流為空則發射默認數據項
  5. sequenceEqual():判斷兩個數據流是否完全相等
  6. skipUntil():當兩個事件流發射時,第一個事件流的數據項會等到第二個事件流開始發射時才進行發射
  7. skipWhile():當發射的數據流達到某種條件時,才開始發射剩余所有數據項
  8. takeUntil():當兩個事件流發射時,第一個事件流的數據項會等到第二個事件流開始發射時終止發射
  9. takeWhile():當發射的數據流達到某種條件時,才停止發射剩余所有數據項

3.8 Mathematical and Aggregate Operators(數學運算及聚合操作符)

數學運算操作符比較簡單,對于數學運算操作符會放在小結中介紹,下面是對聚合操作符做介紹

1、reduce

reduce操作符跟scan操作符是一樣的,會對發射的數據和上一輪發射的數據進行函數處理,并返回的數據供下一輪使用,持續這個過程來產生剩余的數據流。reduce與scan的唯一區別在于reduce只輸出最后的結果,而scan會輸出每一次的結果,這點從圖片中也能看出來

public static void main(String[] args) {Observable.just(8, 2, 13, 1, 15).reduce(new BiFunction<Integer, Integer, Integer>() {@Overridepublic Integer apply(Integer integer, Integer integer2) throws Exception {return integer < integer2 ? integer : integer2;}}).subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer item) throws Exception {System.out.println("onNext=" + item);}});
}
復制代碼

輸出

onNext=1
復制代碼

2、collect

collect操作符跟reduce操作符類似,只不過collect增加了一個可改變數據結構的函數供我們處理

public static void main(String[] args) {Observable.just(8, 2, 13, 1, 15).collect(new Callable<String>() {@Overridepublic String call() throws Exception {return "A";}}, new BiConsumer<String, Integer>() {@Overridepublic void accept(String s, Integer integer) throws Exception {System.out.println("onNext=" + s + "  " + integer);}}).subscribe(new BiConsumer<String, Throwable>() {@Overridepublic void accept(String s, Throwable throwable) throws Exception {System.out.println("onNext2=" + s);}});
}
復制代碼

輸出

onNext=A  8
onNext=A  2
onNext=A  13
onNext=A  1
onNext=A  15
onNext2=A
復制代碼

3、小結

數學運算操作符的使用需要在gradle中添加rxjava-math的依賴

implementation 'io.reactivex:rxjava-math:1.0.0'
復制代碼
  1. average():求所有數據項的平均值
  2. max/min():求所有數據項的最大或最小值
  3. sum():求所有數據項的總和
  4. reduce():對上一輪處理過后的數據流進行函數處理,只返回最后的結果
  5. collect():對上一輪處理過后的數據流進行函數處理,可改變原始的數據結構

3.9 Connectable Observable(連接操作符)

1、publish

publish操作符是將普通的事件流轉化成可連接的事件流ConnectableObservable,它與普通的事件流不一樣,ConnectableObservable在沒有調用connect()進行連接的情況下,事件流是不會發射數據的

public static void main(String[] args) {ConnectableObservable<Integer> connectableObservable = Observable.just(1, 2, 3, 4, 5).publish();connectableObservable.subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {System.out.println("onNext=" + integer);}});
}
復制代碼

輸出

復制代碼

2、connect

connect操作符是將可連接的事件流進行連接并開始發射數據。這個方法需要注意的是,connect操作符必須在所有事件流被訂閱后才開始發射數據。如果放在subscribe之前的話,則訂閱者是無法收到數據的。如果后面還有訂閱者將訂閱此次事件流,則會丟失已經調用了connect后,發射出去的數據項

public static void main(String[] args) {ConnectableObservable<Integer> connectableObservable = Observable.just(1, 2, 3, 4, 5).publish();connectableObservable.subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {System.out.println("onNext=" + integer);}});connectableObservable.connect();
}
復制代碼

輸出

onNext=1
onNext=2
onNext=3
onNext=4
onNext=5
復制代碼

3、refCount

refCount操作符可以將可連接的事件流轉換成普通的事件流

public static void main(String[] args) {ConnectableObservable<Integer> connectableObservable = Observable.just(1, 2, 3, 4, 5).publish();connectableObservable.refCount().subscribe(new Consumer<Integer>() {@Overridepublic void accept(Integer integer) throws Exception {System.out.println("onNext=" + integer);}});
}
復制代碼

輸出

onNext=1
onNext=2
onNext=3
onNext=4
onNext=5
復制代碼

4、replay

replay操作符將彌補connect操作符的缺陷,由于connect會讓后面進行訂閱的訂閱者丟失之前發射出去的數據項,所以使用replay操作符可以將發射出去的數據項進行緩存,這樣使得后面的訂閱者都可以獲得完整的數據項。這里需要注意的是,replay操作符不能和publish操作符同時使用,否則將不會發射數據。例子中,讀者可以將replay操作符換成publish操作符,這時候的輸出就會丟失前2秒發射的數據項

public void replay(){ConnectableObservable<Long> connectableObservable = Observable.interval(1, TimeUnit.SECONDS).replay();connectableObservable.connect();connectableObservable.delaySubscription(3, TimeUnit.SECONDS).subscribe(new Consumer<Long>() {@Overridepublic void accept(Long aLong) throws Exception {System.out.println("onNext=" + aLong);}});
}
復制代碼

輸出

onNext=0
onNext=1
onNext=2
onNext=3
onNext=4
onNext=5
......
復制代碼

5、小結

  1. publish():將普通的事件流轉換成可連接的事件流
  2. connect():將可連接的事件流進行連接并發射數據
  3. refCount():將可連接的事件流轉換成普通的事件流
  4. replay():緩存可連接的事件流中的所有數據項

轉載于:https://juejin.im/post/5cd8dc55f265da039f0f3030

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/387704.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/387704.shtml
英文地址,請注明出處:http://en.pswp.cn/news/387704.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

virtualbox 使用

實現文件拖拽功能 1、設備 -- 安裝增強功能 -- /bin/sh VboxLinuxaddition.run -- reboot 2、設備 -- 拖放 -- 雙向 3、虛擬機 -- 設置 -- 存儲 -- 控制器&#xff1a;SATA -- 勾選 使用主機輸入輸出&#xff08;I\O 緩存&#xff09; 4、虛擬機硬盤 -- 勾選固態驅動器 轉載于…

linux安裝mysql 5.6.33

.到MySQL官網下載mysql編譯好的二進制安裝包&#xff0c;在下載頁面Select Platform:選項選擇linux-generic&#xff0c;然后把頁面拉到底部&#xff0c;64位系統下載Linux - Generic (glibc 2.5) (x86, 64-bit)&#xff0c;下載后文件名&#xff1a;mysql-5.6.33-linux-glibc2…

Go 函數特性和網絡爬蟲示例

爬取頁面 這篇通過網絡爬蟲的示例&#xff0c;來了解 Go 語言的遞歸、多返回值、延遲函數調用、匿名函數等方面的函數特性。首先是爬蟲的基礎示例&#xff0c;下面兩個例子展示通過 net/http 包來爬取頁面的內容。 獲取一個 URL 下面的程序展示從互聯網獲取信息&#xff0c;獲…

Qt的安裝和使用中的常見問題(詳細版)

對于太長不看的朋友&#xff0c;可參考Qt的安裝和使用中的常見問題&#xff08;簡略版&#xff09;。 目錄 1、引入2、Qt簡介3、Qt版本 3.1 查看安裝的Qt版本3.2 查看當前項目使用的Qt版本3.3 查看當前項目使用的QtCreator版本3.4 Linux命令行下查看和使用不同版本的Qt4、Qt模塊…

python與C#的互相調用

python與C#的互相調用一、C#調用python新建一個項目&#xff0c;添加引用&#xff1a;IronPython.dll&#xff0c;Microsoft.Scripting.dll&#xff08;在IronPython的安裝目錄中&#xff09;。創建一個文本文件命名為hello.py,把該文件添加的當前的項目中,并設置為總是輸出。#…

各行業大數據可視化界面參考

轉載于:https://www.cnblogs.com/wangsongbai/p/10178096.html

mysql遠程連接 Host * is not allowed to connect to this MySQL server

localhost改成% 進入mysql的BIN目錄 代碼如下 復制代碼 mysql -u root -p mysql>use mysql; mysql>update user set host ’%where user ’root’; mysql>flush privileges; 具體分析 1、在本機登入mysql后&#xff0c;更改“mysql”數據庫里的“user”表里的“h…

今日聽聞這幾款手機軟件比較火爆 果然名不虛傳!

如今的時代&#xff0c;智能手機已經成為我們生活中不可缺少的一部分&#xff0c;大家之所以這么愛玩手機&#xff0c;其實并不是手機本身有多么吸引人&#xff0c;而是安裝在手機上的各種各樣的APP&#xff0c;比如各種社交軟件、音頻軟件、購物軟件以及地圖軟件等等。下面我們…

setdefault()方法

setdefault()方法 描述 字典 setdefault() 方法和 get()方法類似,返回指定鍵的值&#xff0c;如果鍵不在字典中&#xff0c;將會添加鍵并將值設置為一個指定值&#xff0c;默認為None。 get() 和 setdefault() 區別&#xff1a; setdefault() 返回的鍵如果不在字典中&#xff0…

Hive2.1.1、Hadoop2.7.3 部署

本文以遠程模式安裝Hive2.1.1將hive的元數據放置在MySQL數據庫中。 1 安裝mysql數據庫 sudo apt-get install mysql-server11 重啟mysql服務使得配置文件生效 sudo service mysql restart11 創建hive專用賬戶 CREATE USER hive% IDENTIFIED BY 123456;11 給hive賬戶授予所有權限…

Django 的簡單ajax

需要通過ajax實現局部刷新 js代碼 $(#guo-sou-ajax).click(function(){ #獲取id為guo-sou-ajax點擊后的信號console.log($(this).attr("data-action")) $.ajax({ #調用ajaxurl: $(this).attr("data-action"), #url保存在標簽里面的data-actio…

postman提取返回值

Postman是做接口測試的&#xff0c;但是很多接口并不是直接就能測&#xff0c;有的需要一些預處理。比如說身份認證&#xff0c;需要傳遞一個token。如果做網頁測試&#xff0c;一般打開登陸界面的時候就會生成一個token&#xff0c;如果返回值是json格式&#xff0c;用Postman…

docker下用keepalived+Haproxy實現高可用負載均衡集群

啟動keepalived后宿主機無法ping通用keepalived&#xff0c;報錯&#xff1a; [rootlocalhost ~]# ping 172.18.0.15 PING 172.18.0.15 (172.18.0.15) 56(84) bytes of data. From 172.18.0.1 icmp_seq1 Destination Host Unreachable From 172.18.0.1 icmp_seq2 Destination H…

hadoop hive 2.1.1 將Hive啟動為服務

我們之前使用的Shell方式與Hive交互只是Hive交互方式中的一種&#xff0c;還有一種就是將Hive啟動為服務&#xff0c;然后運行在一個節點上&#xff0c;那么剩下的節點就可以使用客戶端來連接它&#xff0c;從而也可以使用Hive的數據分析服務。 前臺模式 可以使用下面的命令來將…

大數據學習要知道的十大發展趨勢,以及學習大數據的幾點建議

2016年&#xff0c;近40%的公司正在實施和擴展大數據技術應用&#xff0c;另有30%的公司計劃在未來12個月內采用大數據技術&#xff0c;62.5%的公司現在至少有一個大數據項目投入生產&#xff0c;只有5.4%的公司沒有大數據應用計劃&#xff0c;或者是沒有正在進行的大數據項目&…

pickle 模塊

import pickle # class Elephant:def __init__(self, name, weight, height):self.name nameself.weight weightself.height heightdef tiaoxi(self):print(f"{self.name}大象特別喜歡調戲人")# e Elephant("寶寶", "185T", "175"…

Hiv:SQuirrel連接hive配置

熟悉了Sqlserver的sqlserver management studio、Oracle的PL/SQL可視化數據庫查詢分析工具&#xff0c;在剛開始使用hive、phoenix等類sql組件時&#xff0c;一直在苦苦搜尋是否也有類似的工具&#xff0c;不負所望&#xff0c;SQuirrel Sql client 可視化數據庫工具基本可滿足…

MariaDB 數據庫索引詳解(9)

MariaDB數據庫管理系統是MySQL的一個分支,主要由開源社區在維護,采用GPL授權許可MariaDB的目的是完全兼容MySQL,包括API和命令行,MySQL由于現在閉源了,而能輕松成為MySQL的代替品.在存儲引擎方面,使用XtraDB來代替MySQL的InnoDB,MariaDB由MySQL的創始人Michael Widenius主導開發…

Kettle連接Hive2的問題解決思路

在kettle上當選擇好HIVE2連接時候有報錯 org.pentaho.di.core.exception.KettleDatabaseException: Error occured while trying to connect to the databaseError connecting to database: (using class org.apache.hive.jdbc.HiveDriver)org/apache/http/client/CookieStore…

windows下cmd常用的命令

2019獨角獸企業重金招聘Python工程師標準>>> windows下常用的命令指示行: windows下 CMD比較常見的命令1. gpedit.msc-----組策略 2. sndrec32-------錄音機 3. Nslookup-------IP地址偵測器 4. explorer-------打開資源管理器 5. logoff---------注銷命令 6. …