簡述JAVA線程調度的原理,Rxjava原理(二)--線程調度

1. 創建線程池和線程管理策略分析

// 在開發中使用Rxjava來完成線程切換會調用到以下方法(還有幾個就不一一列舉了,原理一樣的),那么就從這里開始分析

Schedulers.io()

Schedulers.computation()

Schedulers.newThread()

AndroidSchedulers.mainThread()

當我們調用以上方法中的任意一個,都會調到Schedulers類中,Schedulers使用策略模式封裝了所有線程切換策略(因此后面以io()分析)。

// 1. Schedulers類中,靜態創建IOTask(),當調用Schedulers.io()的時候,就是返回這個Callable.

static {

SINGLE = RxJavaPlugins.initSingleScheduler(new SingleTask());

COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationTask());

IO = RxJavaPlugins.initIoScheduler(new IOTask());

TRAMPOLINE = TrampolineScheduler.instance();

NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());

}

// 2.創建IoScheduler

static final class IOTask implements Callable {

@Override

public Scheduler call() throws Exception {

return IoHolder.DEFAULT;

}

}

static final class IoHolder {

static final Scheduler DEFAULT = new IoScheduler();

}

// 3.創建線程池

public IoScheduler(ThreadFactory threadFactory) {

this.threadFactory = threadFactory;

this.pool = new AtomicReference(NONE);

start();

}

public void start() {

// CachedWorkerPool任務池,里面持有任務隊列和線程池

CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT, threadFactory);

if (!pool.compareAndSet(NONE, update)) {

update.shutdown();

}

}

//4. CachedWorkerPool構造方法中創建線程池,并且暴露get()提供需要執行的任務

static final class CachedWorkerPool implements Runnable {

private final long keepAliveTime;

private final ConcurrentLinkedQueue expiringWorkerQueue;

final CompositeDisposable allWorkers;

private final ScheduledExecutorService evictorService;

private final Future> evictorTask;

private final ThreadFactory threadFactory;

CachedWorkerPool(long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {

......

if (unit != null) {

// 創建線程池

evictor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY);

task = evictor.scheduleWithFixedDelay(this, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS);

}

......

}

ThreadWorker get() {

.....

while (!expiringWorkerQueue.isEmpty()) {

// 任務隊列不為空,從隊列中取一個并返回

ThreadWorker threadWorker = expiringWorkerQueue.poll();

if (threadWorker != null) {

return threadWorker;

}

}

// 如果任務隊列是空的,就創建一個并返回

ThreadWorker w = new ThreadWorker(threadFactory);

allWorkers.add(w);

return w;

}

......

}

用一張圖可能說明得比較清楚一些。

ce19d5012d66

Schedulers調度過程.png

2. Rxjava上游任務在子線程中執行分析

// 上游線程切換使用過程

Observable.create(new ObservableOnSubscribe() {

@Override

public void subscribe(ObservableEmitter e) throws Exception {

e.onNext("JackOu");

}

})

// ObservableCreate.subscribeOn

.subscribeOn(Schedulers.io())

// ObservableSubscribeOn.subscribe

.subscribe(new Observer() {

......

@Override

public void onNext(String s) {

}

......

});

從上面使用過程的代碼看下面的圖,分析Rxjava封裝任務和拋任務到線程池的過程。

ce19d5012d66

上游任務在線程池執行流程圖.png

當我們一訂閱(調用subscribe(Observer)方法)的時候,Rxjava將會把上游需要執行的任務和下游的觀察者經過層層包裹,包裹好之后,就會得到一個Scheduler.Worker任務對象。當調用發射器的onNext的方式的時候,結合第一小節的圖片,ObservableSubscribeOn就會將任務拋到線程池執行,在子線程中執行任務并且返回,從而完成線程切換功能。

3. Rxjava下游任務在主線程中執行分析

3.1 創建AndroidSchedulers.mainThread的過程

如第一節Schedulers的創建流程一樣,當調用AndroidSchedulers.mainThread()之后,最終會創建HandlerScheduler。

// 1.創建HandlerScheduler,并且傳入MainLooper

public final class AndroidSchedulers {

private static final class MainHolder {

// 創建HandlerScheduler

static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));

}

private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(

new Callable() {

@Override public Scheduler call() throws Exception {

return MainHolder.DEFAULT;

}

});

public static Scheduler mainThread() {

return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);

}

}

// 2.當創建任務的時候,創建HandlerWorker

final class HandlerScheduler extends Scheduler {

private final Handler handler;

HandlerScheduler(Handler handler) {

this.handler = handler;

}

@Override

public Worker createWorker() {

return new HandlerWorker(handler);

}

}

// 3.當執行任務的時候

private static final class HandlerWorker extends Worker {

private final Handler handler;

HandlerWorker(Handler handler) {

this.handler = handler;

}

@Override

public Disposable schedule(Runnable run, long delay, TimeUnit unit) {

......

// 包裝任務

run = RxJavaPlugins.onSchedule(run);

ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);

// 創建Message包裝任務

Message message = Message.obtain(handler, scheduled);

message.obj = this;

// 發送任務到MainLooper中,該任務就在主線程中執行了

handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));

......

return scheduled;

}

}

其實真正將任務放在主線程中執行就是上面三個步驟,但是Rxjava增加了很多其他功能,例如解除訂閱(將任務包裝在Disposable中),增加hook功能(在任務外面在包裝了ScheduledRunnable)等等,其最內層的本質就是我們需要執行的任務。細化的包裹情況如下圖:

ce19d5012d66

主線程執行任務.png

4.多個線程切換,以哪個為準

如下面代碼,我們作死得切換線程,那么哪些線程會最終執行我們的任務呢

Observable.create(new ObservableOnSubscribe() {

@Override

public void subscribe(ObservableEmitter e) throws Exception {

e.onNext("JackOu");

}

})

.subscribeOn(Schedulers.io()) // 上游切換,靠近上游的生效

.subscribeOn(Schedulers.newThread())

.subscribeOn(Schedulers.computation())

.observeOn(Schedulers.io())

.observeOn(Schedulers.computation())

.observeOn(AndroidSchedulers.mainThread()) // 下游切換,靠近下游的生效

.subscribe(new Observer() {

......

@Override

public void onNext(String s) {

}

......

});

我們可以從第二節和第三節看出,當我們每調用一次subscribeOn方法,上游就會多包裝一層Scheduler,在訂閱之后,解包裹的時候越靠近“待執行任務”的subscribeOn越后解包,所以最靠近任務的subscribeOn調用會是最終被執行,也就是最終被執行的線程。

因此我們可以總結得到:

總結一: 在多次調用線程切換的時候,第一次調用subscribeOn的線程切換會是最后執行任務的線程;最后調用observeOn切換的線程會是最后執行的線程。

總結二:從調用關系來看,越靠近上游的線程切換,將是最終執行任務的線程;越靠近下游的線程切換,將是最終執行任務的線程。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/394930.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/394930.shtml
英文地址,請注明出處:http://en.pswp.cn/news/394930.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

[前端隨筆][css] 彈性布局

說在前面 彈性布局&#xff0c;顧名思義就是有彈性&#xff0c;能夠根據屏幕/當前空間大小自由伸縮的。使用彈性布局可以很好的適應各種尺寸的客戶端。 關鍵代碼 display:flex;    設定元素為彈性布局  <文檔傳送門> box-flex: 參數;   設定元素為彈性布局  &…

不同的模塊中定義同樣的宏為不同的值合法嗎_如何創建自定義的建模規范

本文摘要&#xff1a;主要介紹如何創建自定義的建模規范檢查&#xff0c;以及在建模規范檢查中&#xff0c;如何增加自動修正模型使之符合規范。比如我們想創建一個自定義的規則&#xff0c;對于constant模塊&#xff0c;1. 如果value是參數的話&#xff0c;則輸出數據類型必須…

DBCP連接池配置常用參數說明

參數默認值說明username\傳遞給JDBC驅動的用于建立連接的用戶名password\傳遞給JDBC驅動的用于建立連接的密碼url\傳遞給JDBC驅動的用于建立連接的URLdriverClassName\使用的JDBC驅動的完整有效的Java 類名initialSize 0初始化連接:連接池啟動時創建的初始化連接數量,1.2版本后…

科大訊飛 ai算法挑戰賽_為井字游戲挑戰構建AI算法

科大訊飛 ai算法挑戰賽by Ben Carp通過本卡爾普 為井字游戲挑戰構建AI算法 (Building an AI algorithm for the Tic-Tac-Toe challenge) As part of the freeCodeCamp curriculum, I was challenged build a Tic-Tac-Toe web app. It was a real pleasure.作為freeCodeCamp課程…

js serialize php 解,[轉]JavaScript 版本的 PHP serialize/unserialize 完整實現

下載: phpserializer.js/* phpserializer.js - JavaScript to PHP serialize / unserialize class.** This class is designed to convert php variables to javascript* and javascript variables to php with a php serialize unserialize* compatible way.** Copyright (C) …

Git 的 .gitignore 配置

.gitignore 配置文件用于配置不需要加入版本管理的文件&#xff0c;配置好該文件可以為我們的版本管理帶來很大的便利&#xff0c;以下是個人對于配置 .gitignore 的一些心得。 1、配置語法&#xff1a; 以斜杠“/”開頭表示目錄&#xff1b; 以星號“*”通配多個字符&#xff…

wsdl文件是怎么生成的_C++ 動態庫.dll的生成---超級詳細!!!

怎么將建好的工程生成.dll工程&#xff1f;1、在C中打開工程2、運行結果&#xff1a;輸出Print修改開始&#xff1a;1、打開屬性。2、修改以下內容&#xff1a;目標文件擴展名&#xff0c;由.exe--》.dll,直接刪除修改即可配置類型&#xff0c;由.exe--》.dll,下拉菜單可選擇最…

時鐘設置

date --set"05/31/16 18:16" 時鐘設置 設置系統時間# date --set“07/07/06 10:19" &#xff08;月/日/年 時:分:秒&#xff09;2、hwclock/clock查看硬件時# hwclock --show# clock --show設置硬件時間# hwclock --set --date"07/07/06 10:19" &…

《成為一名機器學習工程師》_成為機器學習的拉斐爾·納達爾

《成為一名機器學習工程師》by Sudharsan Asaithambi通過Sudharsan Asaithambi 成為機器學習的拉斐爾納達爾 (Become the Rafael Nadal of Machine Learning) One year back, I was a newbie to the world of Machine Learning. I used to get overwhelmed by small decisions…

HTTP基本認證(Basic Authentication)的JAVA示例

大家在登錄網站的時候&#xff0c;大部分時候是通過一個表單提交登錄信息。但是有時候瀏覽器會彈出一個登錄驗證的對話框&#xff0c;如下圖&#xff0c;這就是使用HTTP基本認證。下面來看看一看這個認證的工作過程:第一步: 客戶端發送http request 給服務器,服務器驗證該用戶…

php-fpm 內存 facebook,【百家號】臉書百科,安裝php-fpm-5.4.16-42.遇到的小問題 Web程序 - 貪吃蛇學院-專業IT技術平臺...

環境&#xff1a;redhat 7.2版本 yum源也是7.2的iso[[email protected] lnmp_soft]# yum -y install php-fpm-5.4.16-42.el7.x86_64.rpm已加載插件&#xff1a;langpacks, product-id, search-disabled-repos, subscription-managerThis system is not registered to Red Hat S…

Codeforces Round #424 (Div. 2, rated, based on VK Cup Finals)

昨晚的沒來得及打&#xff0c;最近錯過好幾場CF了&#xff0c;這場應該不算太難 A. Unimodal Arraytime limit per test1 secondmemory limit per test256 megabytesinputstandard inputoutputstandard outputArray of integers is unimodal, if: it is strictly increasing in…

python能print中文嗎_python怎么print漢字

今天就為大家分享一篇python中使用print輸出中文的方法&#xff0c;具有很好的參考價值&#xff0c;希望對大家有所幫助。看Python簡明教程&#xff0c;學習使用print打印字符串&#xff0c;試了下打印中文&#xff0c;不行。&#xff08;推薦學習&#xff1a;Python視頻教程&a…

ajax的一些相關

1、AJAX Asynchronous&#xff08;異步的&#xff09; JavaScript and XML AJAX是能不刷新整個網頁的前提下&#xff0c;更新內容。通過少量的數據交換&#xff0c;達成局部頁面刷新的效果。 而form表單提交經常是刷新整個頁面&#xff0c;很繁瑣 2、AJAX是基于現有的Internet…

select ...as_一起使用.select .map和.reduce方法可充分利用Ruby

select ...asby Declan Meehan由Declan Meehan 一起使用.select .map和.reduce方法可充分利用Ruby (Get the most out of Ruby by using the .select .map and .reduce methods together) You should absolutely never ever repeat yourself when writing code. In other word…

一些書單

僅對近來的學習做些回顧吧 學習永無止境--> 2015年已完成書單&#xff1a; 文學&#xff1a; 硅谷之火浪潮之巔天才在左瘋子在右從0到1生命咖啡館黑客與畫家奇思妙想&#xff1a;15位計算機天才及其重大發現喬布斯傳平凡的世界&#xff08;三部全&#xff09;一只iphone的全…

oracle 11gogg,【OGG】Oracle GoldenGate 11g (二) GoldenGate 11g 單向同步配置 上

Oracle GoldenGate 11g (二)GoldenGate 11g 單向同步配置 上ItemSource SystemTarget SystemPlatformRHEL6.4 - 64bitRHEL6.4 - 64bitHostnamerhel64.oracle.comora11g.oracle.comDatabaseOracle 11.2.0.3Oracle 11.2.0.3Character SetAL32UTF8AL32UTF8ORACLE_SIDPRODEMREPList…

今天聽說了一個壓縮解壓整型的方式-group-varint

group varint https://github.com/facebook/folly/blob/master/folly/docs/GroupVarint.md 這個是facebook的實現 https://www.slideshare.net/parallellabs/building-software-systems-at-google-and-lessons-learned/48-Group_Varint_Encoding_Idea_encode

Centos7-卸載自帶的jdk 安裝jdk8

卸載JDK Centos7一般都會帶有自己的openjdk,我們一般都回用oracle的jdk,所以要卸載 步驟一&#xff1a;查詢系統是否以安裝jdk #rpm -qa|grep java 或 #rpm -qa|grep jdk 或 #rpm -qa|grep gcj 步驟二&#xff1a;卸載已安裝的jdk #rpm -e --nodeps java-1.8.0-openjdk…

小豬佩奇python_python畫個小豬佩奇

#!/usr/bin/python #-*- coding: utf-8 -*-import turtleast def nose(x,y):#鼻子 t.pu() t.goto(x,y) t.pd() t.seth(-30) t.begin_fill() a0.4 for i in range(120):if 0<i<30 or 60<i<90: aa0.08t.lt(3) #向左轉3度 t.fd(a) #向前走a的步長else: aa-0.08t.lt(3)…