現在很多Android App的開發開始使用Rxjava,但是Rxjava以學習曲線陡峭著稱,入門有些困難。經過一段時間的學習和使用,這里來介紹一下我對Rxjava的理解。
說到Rxjava首先需要了解的兩個東西,一個是Observable(被觀察者,事件源)和 Subscriber(觀察者,是 Observer的子類)。Observable發出一系列事件,Subscriber處理這些事件。首先來看一個基本的例子,我們如何創建并使用Observable。
Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext("hello"); } }).subscribe(new Subscriber<String>(){ @Override public void onCompleted() {} @Override public void onError(Throwable e) { } @Override public void onNext(String s) { Log.d("rx-info", s); } });
創建Observable的最基本的方法是通過Observable.create() 來進行,當有Subscriber通過Observable.subscribe() 方法進行訂閱之后Observable就會發射事件,注意必須要有訂閱者訂閱才會發射事件。發射的方式是通過調用 Observable中的 OnSubsribe 類型的成員來實現(每個Observable有一個final OnSubscribe<T> onSubscribe 成員,該成員是一個接口,后面詳細說),在 Onsubsribe類型成員中調用 call() 方法,注意,這個call方法的參數就是 Observable.subscribe() 方法傳入的 Subsriber實例。總的一句話就是在Obsubscribe 的 call方法中執行訂閱者(Subscriber)的三個方法 onNext(), onCompleted() 和 onError()。
一開始就是一堆 Observable , Subscriber,subscribe() , OnSubscribe 估計看得頭暈,因此我們需要先來對這些東西有一個了解。這里只列出一個幫助理解的大概。
public class Observable<T> {final OnSubscribe<T> onSubscribe;protected Observable(OnSubscribe<T> f) {this.onSubscribe = f;}public final static <T> Observable<T> create(OnSubscribe<T> f) {return new Observable<T>(hook.onCreate(f));}public interface OnSubscribe<T> extends Action1<Subscriber<? super T>> {// cover for generics insanity}public final Subscription subscribe(Subscriber<? super T> subscriber) {return Observable.subscribe(subscriber, this);}public interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<? super T>> {// cover for generics insanity}
}
public interface Action1<T> extends Action {void call(T t);
}
public interface Subscription {void unsubscribe();boolean isUnsubscribed();
}
public interface Observer<T> {void onCompleted();void onError(Throwable e);void onNext(T t);
}public abstract class Subscriber<T> implements Observer<T>, Subscription {//...
}
通過上面的代碼幫助理清楚 Observable, Observer, Subscriber, OnSubsriber, subscribe() 之間的關系。這里額外提一下 Observable.subscribe() 方法有多個重載:
Subscription subscribe()
Subscription subscribe(Action1<? super T> onNext)
Subscription subscribe(Action1<? super T> onNext, Action1< java.lang .Throwable> onError)
Subscription subscribe(Action1<? super T> onNext, Action1< java.lang .Throwable> onError, Action0 onComplete)
Subscription subscribe(Observer<? super T> observer)
Subscription subscribe(Subscriber<? super T> subscriber)
其它的ActionX 和 FuncX 請大家自行去查閱定義。
介紹了基本的創建Observable和 Observable是怎么發射事件的之后,來介紹一下Rxjava的Operator和Operator的原理。
Rxjava的Operator常見的有map, flatMap, concat, merge之類的。這里就不介紹Operator的使用了,介紹一下其原理。介紹原理還是來看源碼,以map為例。
首先看一下使用map的例子:
Observable.create(new Observable.OnSubscribe<String>() {@Overridepublic void call(Subscriber<? super String> subscriber) {subscriber.onNext("hello");}
})
.map(new Func1<String, String>() {@Overridepublic String call(String s) {return s + "word";}
})
.subscribe(new Subscriber<String>() {@Overridepublic void onCompleted() {}@Overridepublic void onError(Throwable e) {}@Overridepublic void onNext(String s) {Log.d("info-rx", s);}
});
繼續來看 map的定義:
public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {return lift(new OperatorMap<T, R>(func));}
簡單說一下Func1,其中的T表示傳入的參數類型,R表示方法返回的參數類型。Operator的操作原理最核心的就是lift的實現。
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {return new Observable<R>(new OnSubscribe<R>() {@Overridepublic void call(Subscriber<? super R> o) {try {Subscriber<? super T> st = hook.onLift(operator).call(o);try {// new Subscriber created and being subscribed with so 'onStart' itst.onStart();onSubscribe.call(st);} catch (Throwable e) {// localized capture of errors rather than it skipping all operators // and ending up in the try/catch of the subscribe method which then// prevents onErrorResumeNext and other similar approaches to error handlingExceptions.throwIfFatal(e);st.onError(e);}} catch (Throwable e) {Exceptions.throwIfFatal(e);// if the lift function failed all we can do is pass the error to the final Subscriber// as we don't have the operator available to uso.onError(e);}}});}
lift方法看起來太過復雜,稍作簡化:
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {return new Observable<R>(...);
}
lift方法實際是產生一個新的 Observable。在map()調用之后,我們操作的就是新的Observable對象,我們可以把它取名為Observable$2,我們這里調用subscribe就是Observable$2.subscribe,繼續看到subscribe里,重要的幾個調用:
hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
return hook.onSubscribeReturn(subscriber);
注意,這里的observable是Observable$2!!也就是說,這里的onSubscribe是,lift中定義的!!
回過頭來看lift方法中創建新Observable的過程:
return new Observable<R>(new OnSubscribe<R>() {@Overridepublic void call(Subscriber<? super R> o) {try {Subscriber<? super T> st = hook.onLift(operator).call(o);try {// new Subscriber created and being subscribed with so 'onStart' itst.onStart();onSubscribe.call(st); //請注意我!! 這個onSubscribe是原始的OnSubScribe對象!!} catch (Throwable e) {// localized capture of errors rather than it skipping all operators // and ending up in the try/catch of the subscribe method which then// prevents onErrorResumeNext and other similar approaches to error handlingif (e instanceof OnErrorNotImplementedException) {throw (OnErrorNotImplementedException) e;}st.onError(e);}} catch (Throwable e) {if (e instanceof OnErrorNotImplementedException) {throw (OnErrorNotImplementedException) e;}// if the lift function failed all we can do is pass the error to the final Subscriber// as we don't have the operator available to uso.onError(e);}}
});
一定一定要注意這段函數執行的上下文!,這段函數中的onSubscribe對象指向的是外部類,也就是第一個Observable的onSubScribe!而不是Observable$2中的onSubscribe,接下來看:
Subscriber<? super T> st = hook.onLift(operator).call(o);
這行代碼,就是定義operator,生成一個經過operator操作過的Subscriber,看下OperatorMap這個類中的call方法:
@Override
public Subscriber<? super T> call(final Subscriber<? super R> o) {return new Subscriber<T>(o) {@Overridepublic void onCompleted() {o.onCompleted();}@Overridepublic void onError(Throwable e) {o.onError(e);}@Overridepublic void onNext(T t) {try {o.onNext(transformer.call(t));} catch (Throwable e) {Exceptions.throwIfFatal(e);onError(OnErrorThrowable.addValueAsLastCause(e, t));}}};
}
沒錯,對傳入的Subscriber做了一個代理,把轉換后的值傳入。這樣就生成了一個代理的Subscriber,最后我們最外層的OnSubscribe對象對我們代理的Subscriber進行了調用:
@Override
public void call(Subscriber<? super String> subscriber) {//此處的subscriber就是被map包裹(wrapper)后的對象。subscriber.onNext("hello");
}
然后這個subscriber傳入到內部,鏈式的通知,最后通知到我們在subscribe函數中定義的對象。
分析lift的原理,其實還是回到了一開始介紹的Observable必須要有訂閱者進行訂閱才能發射事件。lift方法會產生一個新的Observable,并且這個Observable位于原始Observable和后面的Subsriber之間,因此lift方法也需要提供一個新的Subscriber來使得新產生的Observable發射事件,這個新的Subsriber就是對事件鏈后方的Subscriber就行包裝做一個代理。
詳細使用Rxjava可參見:
1.?給 Android 開發者的 RxJava 詳解
2.Rxjava使用基礎合集
?