目錄
先了解什么是任務調度?
什么是分布式任務調度?
了解XXL-JOB分布式任務調度平臺 ?
如何搭建XXL-JOB?
分片廣播
作業分片方案
最近學習在項目的媒資管理模塊如何高效處理大量視頻,上傳單個視頻可能涉及到轉碼,內容審核等多個處理部分
如果采用傳統的單線程處理模式可能會導致資源利用率低下,所有操作依次排隊會導致執行處理視頻耗時較長,效率不高的問題出現
因此,在處理大量視頻文件時,我們需要學習更高效的任務處理方式,如何高效滿足這些視頻處理需求?
1.采用多線程
- 在處理大量視頻任務時,單線程處理效率較低,無法充分利用系統資源。為了提高處理效率可以采用多線程技術來并發處理任務
核心思想是:將一個任務拆分為多個子任務,并發執行這些子任務,充分利用CPU和內存資源,提高任務處理效率
2.采用分布式架構+多線程
- 通過分布式架構和多線程技術的結合,最大化利用計算資源,高效處理大規模任務,例如我們所說的處理大量視頻文件
采用分布式+線程可擴展性更強,同時它也是一種 分布式任務調度 的處理方案
先了解什么是任務調度?
任務調度:顧名思義就是對任務的調度,它是指系統為了完成特定的業務,基于給定的時間點,給定的時間間隔或者給定執行次數自動執行任務
我們思考一下下面業務場景
- 某財務系統需要在每天上午10點結算前一天的賬單數據,統計匯總
- 12306網站會根據車次不同,設置幾個時間點分批放票
- 商品成功發貨后,需要向客戶發送短信提醒
以上這些場景,就是 任務調度 所需要解決的問題,類似場景還有很多,我們該如何實現?這里我舉例三種實現方法
1.多線程方式實現
回顧多線程思想,當我們可以開啟一個線程,每sleep一段時間,就去檢查是否已經到預期執行時間
簡單實現任務調度的功能(按一定的間隔時間執行任務調度的功能):
public static void main(String[] args){//任務執行時間間隔final long timeInterval = 1000;Runnable runnable = new Runnable()}{public void run(){while(true){//TODO:somethingtry{Thread.sleep(timeInterval);}catch(InterruptedException e){e.printStackTrace(); } }}};Thread thread = new Thread(runnable); thread.start(); }
}
2.Timer?和 ScheduledExecutor(JDK提供的相關支持)是用于定時任務調度的工具類,幫助我們在指定的時間點或周期性的執行任務
Timer的核心功能:
- 單次任務:在指定的延遲后執行一次任務
- 周期性任務:以固定的時間間隔重復執行任務
Timer的優點在于簡單易用,每個Timer對應一個線程,因此可以同時啟動多個Timer并行執行多個任務,同一個Timer中的任務是串行執行
Timer方法的實現:
public static void main(String[] args){Timer timer = new Timer();timer.schedule(new TimerTask(){@Overridepublic void run(){//TODO:someting }},1000,2000); //一秒開始調度,每2秒執行一次}
ScheduledExecutor方式實現:
public static void main(String [] args){//創建一個固定大小為10的線程池,用于執行定時任務(線程池中的線程可以并發執行多個任務)ScheduledExecutorService service = Executors.newScheduledThreadPool(10);service.scheduleAtFixedRate(new Runnable(){@Override//任務邏輯是打印“todo something”public void run(){//TODO:somethingSystem.out.print("todo something");}//任務將在一秒后首次執行,之后,任務以每2秒執行一次},1,2,TimeUnit.SECONDS);
}
scheduleAtFixedRate:
是用于以固定的速率調度任務,任務會在指定的初始延遲后開始執行,之后以固定的時間間隔重復執行
基于線程池設計的ScheduledExecutor,其設計思想是,每一個被調度的任務都會由線程池中一個線程去執行,因為任務是并發執行的,相互之間不會受到打擾
Timer和ScheduledExecutor都僅能提供基于開始時間與重復間隔的任務調度,不能勝任更加復雜的調度需求 — 比如,設置每月第一天凌晨1點執行任務,復雜調度任務的管理,任務間傳遞數據等等
3.再學習另一個功能強大的 任務調度框架Quartz,它可以滿足更多更復雜的調度需求,Quartz設計的核心類包括Scheduler,Job以及Trigger,其中,Job負責定義需要執行的任務,Trigger負責設置調度策略,Scheduler將兩者組裝起來,并觸發任務開始執行,Quartz支持簡單的按時間間隔調度,還支持按日歷調度方式,通過設置CronTrigger表達式(包括:秒,分,時,日,月,周,年)進行任務調度
public static void main(String [] agrs) throws SchedulerException {//創建一個SchedulerSchedulerFactory schedulerFactory = new StdSchedulerFactory();Scheduler scheduler = schedulerFactory.getScheduler();//創建JobDetailJobBuilder jobDetailBuilder = JobBuilder.newJob(MyJob.class);jobDetailBuilder.withIdentity("jobName","jobGroupName");JobDetail jobDetail = jobDetailBuilder.build();//創建觸發的CronTrigger 支持按日歷調度CronTrigger trigger = TriggerBuilder.newTrigger().withIdentity("triggerName", "triggerGroupName").startNow().withSchedule(CronScheduleBuilder.cronSchedule("0/2 * * * * ?")).build();scheduler.scheduleJob(jobDetail,trigger);scheduler.start();
}public class MyJob implements Job {@Overridepublic void execute(JobExecutionContext jobExecutionContext){System.out.println("todo something");}
}
通過以上內容學習了什么是任務調度,任務調度所解決的問題,以及任務調度的多種實現方式
什么是分布式任務調度?
? ?通常任務調度的程序是集成在應用中的,比如:優惠卷服務中包括了定時發放優惠卷的的調度程序,結算服務中包括了定期生成報表的任務調度程序,由于采用分布式架構,一個服務往往會部署多個冗余實例來運行我們的業務,在這種分布式系統環境下運行任務調度,我們稱之為分布式任務調度,如下圖:
分布式調度要實現的目標:
不管是任務調度程序集成在應用程序中,還是單獨構建的任務調度系統,如果采用分布式調度任務的方式就相當于將任務調度程序分布式構建,這樣就可以具有分布式系統的特點,并且提高任務的調度處理能力
1.并行任務調度
并行任務調度實現靠多線程,如果有大量任務需要調度,此時光靠多線程就會有瓶頸了,因為一臺計算機CPU的處理能力是有限的
如果將任務調度程序分布式部署,每個結點還可以部署為集群,這樣就可以讓多臺計算機共同去完成任務調度,我們可以將任務分割為若干個分片,由不同的實例并行執行,來提高任務調度的處理效率。
2、高可用
若某一個實例宕機,不影響其他實例來執行任務。
3、彈性擴容
當集群中增加實例就可以提高并執行任務的處理效率。
4、任務管理與監測
對系統中存在的所有定時任務進行統一的管理及監測。讓開發人員及運維人員能夠時刻了解任務執行情況,從而做出快速的應急處理響應。
5、避免任務重復執行
當任務調度以集群方式部署,同一個任務調度可能會執行多次,比如在上面提到的電商系統中到點發優惠券的例子,就會發放多次優惠券,對公司造成很多損失,所以我們需要控制相同的任務在多個運行實例上只執行一次。
了解XXL-JOB分布式任務調度平臺 ?
XXL-JOB是一個 輕量級分布式任務調度平臺,專門解決分布式系統中的定時任務調度問題,其核心設計目標是開發迅速、學習簡單、輕量級、易擴展
官網:https://www.xuxueli.com/xxl-job/
XXL-JOB(如圖所示)主要有調度中心,執行器,任務:
1.調度中心(XXL-JOB Admin):
調度中心是XXL-JOB的核心管理平臺,負責對任務進行統一的管理和調度,它提供了一個可視化的Web界面,方便開發者進行任務的配置,監控和管理?
2.執行器(XXL-JOB Executor)
執行器是實際執行任務的組件,它負責接收調度中心發送的任務請求,并執行具體的業務邏輯,執行器可以部署在多個節點上,實現分布式任務執行
3.任務
負責執行具體的業務邏輯
調度中心與執行器之間的工作流程如下:
執行流程:
1.任務執行器根據配置的調度中心的地址,自動注冊到調度中心
2.達到任務觸發條件,調度中心下發任務
3.執行器基于線程池執行任務,并將執行結果放到內存隊列中,把執行日志寫入日志文件中
4.執行器消費內存隊列中的執行結果,主動上報給調度中心
5.當用戶在調度中心查看任務日志,調度中心請求任務執行器,任務執行器讀取任務日志文件并返回日志詳情?
如何搭建XXL-JOB?
首先需要下載XXL-JOB,可以從GitHub地址或者Gitee地址進行下載
GitHub 地址:
-
倉庫地址:GitHub - xuxueli/xxl-job: A distributed task scheduling framework.(分布式任務調度平臺XXL-JOB)
Gitee 地址:
-
倉庫地址:xxl-job: 一個分布式任務調度平臺,其核心設計目標是開發迅速、學習簡單、輕量級、易擴展。現已開放源代碼并接入多家公司線上產品線,開箱即用。
如何集成XXL-JOB?我在我的媒資管理模塊項目中進行演示
1.當下載好后,用IDEA打開目錄顯示界面如下
其中的 xxl-job-admin為調度中心,xxl-job-core為公共依賴,xxl-job-executor-samples為執行器(推薦使用Springboot版本,通過Springboot管理執行器)
2.再創建一個命名為xxl_job_2.3.1的數據庫
CREATE DATABASE xxl_job_2.3.1 DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci;
根據數據庫腳本創建數據庫,修改數據庫連接信息和端口,啟動xxl-job-admin,訪問http://local:18088/xxl-job-admin/(分布式任務調度平臺 — 管理和調度分布式環境中的定時任務)
使用用戶名:admin / 密碼:123456 登錄
下面配置執行器,執行器負責與調度中心通信接收調度中心發起的任務調度請求
1.進入調度中心添加執行器
啟動成功之后,可以選擇在Linux上運行
使用maven命令,將xxl-job-admin打包,然后將其上傳至Linux中,使用命令啟動
nohup java -jar /絕對路徑/xxl-job-admin-2.3.1.jar &
添加完執行器后,接著配置執行器,執行器負責與調度中心通信,接收調度中心發起的任務調度請求
1.首先在media-service工程中添加依賴(父工程中完成了版本控制,這里的版本是2.3.1)
<dependency><groupId>com.xuxueli</groupId><artifactId>xxl-job-core</artifactId>
</dependency>
2.在nacos下的media-service-dev.yaml下配置xxl-job
注意這里配置的appname是執行器的應用名,稍后會在調度中心配置執行器的時候使用
xxl:job:admin:addresses: http://192.168.101.128:18088/xxl-job-admin/executor:appname: media-process-serviceaddress:ip:port: 9999logpath: /data/applogs/xxl-job-jobhandlerlogretentiondays: 30accessToken: default_token
3.配置xxl-job的執行器
將xxl-job-executor-sample-springboot示例工程下的配置類拷貝到媒資管理的service工程下:
該類中的屬性就是獲取配置文件中的配置得到的,同時提供了一個執行器的Bean,用于初始化XXL-JOB執行器的核心組件XxIJobSpringExecutor
并將其注冊為Spring Bean,使得執行器能夠連接到XXL-JOB管理平臺并接收任務調度請求
@Configuration
public class XxlJobConfig {private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);@Value("${xxl.job.admin.addresses}")private String adminAddresses;@Value("${xxl.job.accessToken}")private String accessToken;@Value("${xxl.job.executor.appname}")private String appname;@Value("${xxl.job.executor.address}")private String address;@Value("${xxl.job.executor.ip}")private String ip;@Value("${xxl.job.executor.port}")private int port;@Value("${xxl.job.executor.logpath}")private String logPath;@Value("${xxl.job.executor.logretentiondays}")private int logRetentionDays;@Beanpublic XxlJobSpringExecutor xxlJobExecutor() {logger.info(">>>>>>>>>>> xxl-job config init.");XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();xxlJobSpringExecutor.setAdminAddresses(adminAddresses);xxlJobSpringExecutor.setAppname(appname);xxlJobSpringExecutor.setAddress(address);xxlJobSpringExecutor.setIp(ip);xxlJobSpringExecutor.setPort(port);xxlJobSpringExecutor.setAccessToken(accessToken);xxlJobSpringExecutor.setLogPath(logPath);xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);return xxlJobSpringExecutor;}/*** 針對多網卡、容器內部署等情況,可借助 "spring-cloud-commons" 提供的 "InetUtils" 組件靈活定制注冊IP;** 1、引入依賴:* <dependency>* <groupId>org.springframework.cloud</groupId>* <artifactId>spring-cloud-commons</artifactId>* <version>${version}</version>* </dependency>** 2、配置文件,或者容器啟動變量* spring.cloud.inetutils.preferred-networks: 'xxx.xxx.xxx.'** 3、獲取IP* String ip_ = inetUtils.findFirstNonLoopbackHostInfo().getIpAddress();*/}
再次進入調度中心,添加執行器
重啟媒資管理服務模塊,就可以看到執行器在調度中心注冊成功
下面我們編寫任務演示如何使用,在media-service下新建包com.project.media.service.jobhandler,在該包下定義我們的任務類
package com.xuecheng.media.service.jobhandler;import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.annotation.XxlJob;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;import java.util.concurrent.TimeUnit;/*** @description 測試執行器* @author Mr.M* @date 2022/9/13 20:32* @version 1.0*/@Component@Slf4j
public class SampleJob {/*** 1、簡單任務示例(Bean模式)*/@XxlJob("testJob")public void testJob() throws Exception {log.info("開始執行.....");}}
然后進入調度中心添加任務,進入任務管理,新增任務信息
調度類型:固定速度指按固定的間隔定時調度
Cron:通過Cron表達式實現更豐富的定時調度策略,它的表達式是一個字符串,通過它可以定義調度策略,格式如下:
{秒數}{分數}{小時}{日期}{月份}{星期}{年份(可為空)}
比如:
30 10 1**?表示每天1點10分30秒觸發
0/30****? 每30秒觸發一次
* 0/10***?每10分鐘觸發一次
添加成功,啟動任務
下面啟動媒資管理的service工程,啟動執行器并觀察執行器方法的執行
掌握了xxL-Job的基本使用,繼續思考如何進行分布式任務處理?比如說我們會啟動多個執行器組成一個集群,去執行任務
結合以上XXL-JOB的實現,我們可以利用 分片廣播策略?來提高處理效率尤其是處理數據庫中大量數據(比如項目中媒資管理模塊需要處理大量視頻文件)
分片廣播
分片廣播是分布式任務調度中一種常見模式,主要用于將一個大任務拆分成多個子任務(分片),并將這些子任務 并發分發 到多個執行器節點上執行,從而提高任務處理效率
以上圖示是 調度中心以執行器為維度進行分片,將集群中的執行器標上序號:0,1,2,3....,廣播是指每次調度會向集群中的所有執行器發送任務調度,請求中攜帶分片參數,每個執行器收到調度請求同時接收分片參數
像這種分片廣播適用于哪些場景?
分片任務場景:10個執行器的集群來處理10w條數據(數據之多),每臺機器只需要處理1w條數據(分片執行),耗時降低10倍;
廣播任務場景:廣播執行器同時運行shell腳本,廣播集群節點進行緩存更新等
"分片廣播" 和普通任務開發流程一致,不同之處在于可以獲取分片參數進行分片業務處理。
?測試分片廣播策略
1.定義作業分片的任務方法
/***分片廣播任務*/@XxlJob("shardingJobHandler")public void shardingJobHandler() throws Exception{//分片參數int shardIndex = XxlJobHelper.getShardIndex();int shardTotal = XxlJobHelper.getShardTotal();Log.info("分片參數:當前分片序號 = {},總分片數 = {}",shardIndex,shardTotal);
}
2.在調度中心添加任務
回到任務調度界面,可以看到該項任務添加成功
3.啟動任務,觀察日志
下面需要啟動兩個執行器實例,觀察每個實例的執行情況
1.首先在nacos中配置media-service的本地優先部署
#配置本地優先
spring:cloud:config:override-none: true
2.將media-service啟動兩個實例,兩個實例在啟動時端口不能沖突
例如:在VM options處添加:-Dserver.port=63051 - Dxxl.job.executor.port=9998
在VM options處添加: -Dserver.port=63050 - Dxxl.job.executor.port=9999
3.啟動兩個實例,觀察任務調度中心,稍等片刻執行器會有兩個
分別觀察這兩個執行實例的日志
日志一:
日志二:
從日志可以看到每個實例的分片序號不同,如果其中一個執行器掛掉,只剩下一個執行器在工作,稍等片刻調用中心發現少了一個執行器將動態調整總分片數為1
作業分片方案
以上任務添加成功后,對于要處理的任務會添加到待處理任務表中,現在啟動多個執行器實例去查詢這些待處理任務,如何保證多個執行器不會查詢到重復的任務呢?
XXL-JOB并不直接提供數據處理的功能,它只會給執行器分配好分片序號,在向執行器任務調度的同時下發分片總數以及分片序號等這些參數,執行器收到這些參數后根據自己的業務需求去利用這些參數
下圖表示了多個執行器獲取視頻處理任務的結構:
每個執行器收到廣播任務會有兩個參數:分片總數,分片序號
每次執行從數據表取任務時可以讓任務id 模上(取模運算) 分片總數,如果等于分片?序號則能執行此任務
假設我們有6個待處理的視頻任務,任務ID依次為1-6,同時有3個執行器實例,那么分片總數就是3,對應的分片序號則為0,1,2。具體任務分配情況如下:
任務序號 | 任務 ID? 分片總數(求模運算) | 應執行的執行器分片序號 |
---|---|---|
1 | 1 % 3 = 1 | 執行器 2 |
2 | 2 % 3 = 2 | 執行器 3 |
3 | 3 % 3 = 0 | 執行器 1 |
4 | 4 % 3 = 1 | 執行器 2 |
5 | 5 % 3 = 2 | 執行器 3 |
6 | 6 % 3 = 0 | 執行器 1 |
從上述表格中可以清晰地看到,每個任務都根據其任務ID的取模結果被精準地分配到了對應的執行器上,而不會出現多個執行器獲取到同一個任務的情況
通過這種XXL-JOB的分片機制,可以用于處理視頻轉碼,視頻審核,視頻分發等任務,將大量視頻處理任務均勻分配給多個執行器,保證任務的高效執行和不會重復處理