RxBus的核心功能是基于Rxjava的,在RxJava中有個Subject類,它繼承Observable類,同時實現了Observer接口,因此Subject可以同時擔當訂閱者和被訂閱者的角色,這里我們使用Subject的子類PublishSubject來創建一個Subject對象(PublishSubject只有被訂閱后才會把接收到的事件立刻發送給訂閱者),在需要接收事件的地方,訂閱該Subject對象,之后如果Subject對象接收到事件,則會發射給該訂閱者,此時Subject對象充當被訂閱者的角色。完成了訂閱,在需要發送事件的地方將事件發送給之前被訂閱的Subject對象,則此時Subject對象做為訂閱者接收事件,然后會立刻將事件轉發給訂閱該Subject對象的訂閱者,以便訂閱者處理相應事件,到這里就完成了事件的發送與處理。最后就是取消訂閱的操作了,Rxjava中,訂閱操作會返回一個Subscription對象,以便在合適的時機取消訂閱,防止內存泄漏,如果一個類產生多個Subscription對象,我們可以用一個CompositeSubscription存儲起來,以進行批量的取消訂閱。
首先添加類庫
// RxAndroid
compile 'io.reactivex:rxandroid:1.2.1'
// RxJava
compile 'io.reactivex:rxjava:1.2.4'
1、接下來結合實現代碼再做進一步的解釋:
package com.example.rxbus;
import java.util.HashMap;
import rx.Observable;
import rx.Subscription;
import rx.android.schedulers.AndroidSchedulers;
import rx.functions.Action1;
import rx.schedulers.Schedulers;
import rx.subjects.PublishSubject;
import rx.subjects.SerializedSubject;
import rx.subscriptions.CompositeSubscription;
/**
* @author: lijuan
* @description:
* @date: 2017-06-07
* @time: 17:24
*/
public class RxBus {
private static volatile RxBus mInstance;
private SerializedSubject mSubject;
private HashMap mSubscriptionMap;
private RxBus() {
mSubject = new SerializedSubject<>(PublishSubject.create());
}
public static RxBus getInstance() {
if (mInstance == null) {
synchronized (RxBus.class) {
if (mInstance == null) {
mInstance = new RxBus();
}
}
}
return mInstance;
}
/**
* 發送事件
*
* @param o
*/
public void post(Object o) {
mSubject.onNext(o);
}
/**
* 返回指定類型的Observable實例
*
* @param type
* @param
* @return
*/
public Observable tObservable(final Class type) {
return mSubject.ofType(type);
}
/**
* 是否已有觀察者訂閱
*
* @return
*/
public boolean hasObservers() {
return mSubject.hasObservers();
}
/**
* 一個默認的訂閱方法
*
* @param type
* @param next
* @param error
* @param
* @return
*/
public Subscription doSubscribe(Class type, Action1 next, Action1 error) {
return tObservable(type)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(next, error);
}
/**
* 保存訂閱后的subscription
*
* @param o
* @param subscription
*/
public void addSubscription(Object o, Subscription subscription) {
if (mSubscriptionMap == null) {
mSubscriptionMap = new HashMap<>();
}
String key = o.getClass().getName();
if (mSubscriptionMap.get(key) != null) {
mSubscriptionMap.get(key).add(subscription);
} else {
CompositeSubscription compositeSubscription = new CompositeSubscription();
compositeSubscription.add(subscription);
mSubscriptionMap.put(key, compositeSubscription);
}
}
/**
* 取消訂閱
*
* @param o
*/
public void unSubscribe(Object o) {
if (mSubscriptionMap == null) {
return;
}
String key = o.getClass().getName();
if (!mSubscriptionMap.containsKey(key)) {
return;
}
if (mSubscriptionMap.get(key) != null) {
mSubscriptionMap.get(key).unsubscribe();
}
mSubscriptionMap.remove(key);
}
}
先看一下這個私有的構造函數:
private RxBus() {
mSubject = new SerializedSubject<>(PublishSubject.create());
}
由于Subject類是非線程安全的,所以我們通過它的子類SerializedSubject將PublishSubject轉換成一個線程安全的Subject對象。之后可通過單例方法getInstance()進行RxBus的初始化。
在toObservable()根據事件類型,通過mSubject.ofType(type);得到一個Observable對象,讓其它訂閱者來訂閱。其實ofType()方法,會過濾掉不符合條件的事件類型,然后將滿足條件的事件類型通過cast()方法,轉換成對應類型的Observable對象,這點可通過源碼查看。
同時封裝了一個簡單的訂閱方法doSubscribe(),只需要傳入事件類型,相應的回調即可。其實可以根據需求在RxBus中擴展滿足自己需求的doSubscribe()方法,來簡化使用時的代碼邏輯。
在需要發送事件的地方調用post()方法,它間接的通過mSubject.onNext(o);將事件發送給訂閱者。
同時RxBus提供了addSubscription()、unSubscribe()方法,分別來保存訂閱時返回的Subscription對象,以及取消訂閱。
接下我們在具體的場景中測試一下:
1、我們在Activity的onCreate()方法中進行進行訂閱操作:
public static final int SEND = 0x131;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
doSubscribe();
}
/**
* 訂閱事件監聽
*/
public void doSubscribe() {
Subscription subscription = RxBus.getInstance()
.tObservable(NewsModel.class)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1() {
@Override
public void call(NewsModel model) {
switch (model.getStatus()) {
case SEND:
Log.e("rxbus", model.getMsg());
break;
default:
break;
}
}
});
RxBus.getInstance().addSubscription(this, subscription);
}
可以看到我們設定事件類型為NewsModel實體類,當然我們可以設定事件類型為為String或者Integer,并且Subscriber的回調發生在主線程,同時保存了Subscription對象。
2、現在通過一個Button發送事件:
RxBus.getInstance().post(new NewsModel(SEND, "發送一條信息"));
3、最后不要忘了在onDestory()中對廣播進行取消注冊,以及取消訂閱
protected void onDestroy() {
super.onDestroy();
RxBus.getInstance().unSubscribe(this);
}
其它場景有興趣的可自行測試!本篇文章就這樣子啦,存在總結不到位的地方還望指導,感謝_
參考資料:Android 用RxJava模擬一個EventBus ———RxBus