1. 創建線程池和線程管理策略分析
// 在開發中使用Rxjava來完成線程切換會調用到以下方法(還有幾個就不一一列舉了,原理一樣的),那么就從這里開始分析
Schedulers.io()
Schedulers.computation()
Schedulers.newThread()
AndroidSchedulers.mainThread()
當我們調用以上方法中的任意一個,都會調到Schedulers類中,Schedulers使用策略模式封裝了所有線程切換策略(因此后面以io()分析)。
// 1. Schedulers類中,靜態創建IOTask(),當調用Schedulers.io()的時候,就是返回這個Callable.
static {
SINGLE = RxJavaPlugins.initSingleScheduler(new SingleTask());
COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationTask());
IO = RxJavaPlugins.initIoScheduler(new IOTask());
TRAMPOLINE = TrampolineScheduler.instance();
NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
}
// 2.創建IoScheduler
static final class IOTask implements Callable {
@Override
public Scheduler call() throws Exception {
return IoHolder.DEFAULT;
}
}
static final class IoHolder {
static final Scheduler DEFAULT = new IoScheduler();
}
// 3.創建線程池
public IoScheduler(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
this.pool = new AtomicReference(NONE);
start();
}
public void start() {
// CachedWorkerPool任務池,里面持有任務隊列和線程池
CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT, threadFactory);
if (!pool.compareAndSet(NONE, update)) {
update.shutdown();
}
}
//4. CachedWorkerPool構造方法中創建線程池,并且暴露get()提供需要執行的任務
static final class CachedWorkerPool implements Runnable {
private final long keepAliveTime;
private final ConcurrentLinkedQueue expiringWorkerQueue;
final CompositeDisposable allWorkers;
private final ScheduledExecutorService evictorService;
private final Future> evictorTask;
private final ThreadFactory threadFactory;
CachedWorkerPool(long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
......
if (unit != null) {
// 創建線程池
evictor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY);
task = evictor.scheduleWithFixedDelay(this, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS);
}
......
}
ThreadWorker get() {
.....
while (!expiringWorkerQueue.isEmpty()) {
// 任務隊列不為空,從隊列中取一個并返回
ThreadWorker threadWorker = expiringWorkerQueue.poll();
if (threadWorker != null) {
return threadWorker;
}
}
// 如果任務隊列是空的,就創建一個并返回
ThreadWorker w = new ThreadWorker(threadFactory);
allWorkers.add(w);
return w;
}
......
}
用一張圖可能說明得比較清楚一些。
Schedulers調度過程.png
2. Rxjava上游任務在子線程中執行分析
// 上游線程切換使用過程
Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter e) throws Exception {
e.onNext("JackOu");
}
})
// ObservableCreate.subscribeOn
.subscribeOn(Schedulers.io())
// ObservableSubscribeOn.subscribe
.subscribe(new Observer() {
......
@Override
public void onNext(String s) {
}
......
});
從上面使用過程的代碼看下面的圖,分析Rxjava封裝任務和拋任務到線程池的過程。
上游任務在線程池執行流程圖.png
當我們一訂閱(調用subscribe(Observer)方法)的時候,Rxjava將會把上游需要執行的任務和下游的觀察者經過層層包裹,包裹好之后,就會得到一個Scheduler.Worker任務對象。當調用發射器的onNext的方式的時候,結合第一小節的圖片,ObservableSubscribeOn就會將任務拋到線程池執行,在子線程中執行任務并且返回,從而完成線程切換功能。
3. Rxjava下游任務在主線程中執行分析
3.1 創建AndroidSchedulers.mainThread的過程
如第一節Schedulers的創建流程一樣,當調用AndroidSchedulers.mainThread()之后,最終會創建HandlerScheduler。
// 1.創建HandlerScheduler,并且傳入MainLooper
public final class AndroidSchedulers {
private static final class MainHolder {
// 創建HandlerScheduler
static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
}
private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
new Callable() {
@Override public Scheduler call() throws Exception {
return MainHolder.DEFAULT;
}
});
public static Scheduler mainThread() {
return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
}
}
// 2.當創建任務的時候,創建HandlerWorker
final class HandlerScheduler extends Scheduler {
private final Handler handler;
HandlerScheduler(Handler handler) {
this.handler = handler;
}
@Override
public Worker createWorker() {
return new HandlerWorker(handler);
}
}
// 3.當執行任務的時候
private static final class HandlerWorker extends Worker {
private final Handler handler;
HandlerWorker(Handler handler) {
this.handler = handler;
}
@Override
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
......
// 包裝任務
run = RxJavaPlugins.onSchedule(run);
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
// 創建Message包裝任務
Message message = Message.obtain(handler, scheduled);
message.obj = this;
// 發送任務到MainLooper中,該任務就在主線程中執行了
handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));
......
return scheduled;
}
}
其實真正將任務放在主線程中執行就是上面三個步驟,但是Rxjava增加了很多其他功能,例如解除訂閱(將任務包裝在Disposable中),增加hook功能(在任務外面在包裝了ScheduledRunnable)等等,其最內層的本質就是我們需要執行的任務。細化的包裹情況如下圖:
主線程執行任務.png
4.多個線程切換,以哪個為準
如下面代碼,我們作死得切換線程,那么哪些線程會最終執行我們的任務呢
Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter e) throws Exception {
e.onNext("JackOu");
}
})
.subscribeOn(Schedulers.io()) // 上游切換,靠近上游的生效
.subscribeOn(Schedulers.newThread())
.subscribeOn(Schedulers.computation())
.observeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.observeOn(AndroidSchedulers.mainThread()) // 下游切換,靠近下游的生效
.subscribe(new Observer() {
......
@Override
public void onNext(String s) {
}
......
});
我們可以從第二節和第三節看出,當我們每調用一次subscribeOn方法,上游就會多包裝一層Scheduler,在訂閱之后,解包裹的時候越靠近“待執行任務”的subscribeOn越后解包,所以最靠近任務的subscribeOn調用會是最終被執行,也就是最終被執行的線程。
因此我們可以總結得到:
總結一: 在多次調用線程切換的時候,第一次調用subscribeOn的線程切換會是最后執行任務的線程;最后調用observeOn切換的線程會是最后執行的線程。
總結二:從調用關系來看,越靠近上游的線程切換,將是最終執行任務的線程;越靠近下游的線程切換,將是最終執行任務的線程。