今天我們講講分布式定時任務調度—ElasticJob。
一、概述
1、什么是分布式任務調度
我們可以思考?下下?業務場景的解決?案:
-
某電商平臺需要每天上午10點,下午3點,晚上8點發放?批優惠券
-
某銀?系統需要在信?卡到期還款?的前三天進?短信提醒
-
某財務系統需要在每天凌晨0:10分結算前?天的財務數據,統計匯總
以上場景就是任務調度所需要解決的問題
任務調度是為了?動完成特定任務,在約定的特定時刻去執?任務的過程
2、為什么需要分布式調度
我們在之前使?過Spring中提供的定時任務注解@Scheduled,在業務類中?法中貼上這個注解然后在啟動類上貼上 @EnableScheduling 注解
那為什么又需要分布式調度?
感覺Spring給我們提供的這個注解可以完成任務調度的功能,好像已經完美解決問題了,為什么還需要分布式呢?
主要有如下這?點原因:
1.單機處理極限:原本1分鐘內需要處理1萬個訂單,但是現在需要1分鐘內處理10萬個訂單;原來?個統計需要1?時,現在業務?需要10分鐘就統計出來。你也許會說,你也可以多線程、單機多進程處理。的確,多線程并?處理可以提?單位時間的處理效率,但是單機能?畢竟有限(主要是CPU、內存和磁盤),始終會有單機處理不過來的情況。
2.?可?:單機版的定式任務調度只能在?臺機器上運?,如果程序或者系統出現異常就會導致功能不可?。雖然可以在單機程序實現的?夠穩定,但始終有機會遇到?程序引起的故障,?這個對于?個系統的核?功能來說是不可接受的。
3.防?重復執?: 在單機模式下,定時任務是沒什么問題的。但當我們部署了多臺服務,同時?每臺服務?有定時任務時,可能會出現任務重復執行
這個時候就需要分布式的任務調度來實現了。
3、Elastic-Job介紹
Elastic-Job是?個分布式調度的解決?案,由當當?開源,它由兩個相互獨?的?項?Elastic-job-Lite和
Elastic-Job-Cloud組成,使?Elastic-Job可以快速實現分布式任務調度。
Elastic-Job的地址: ElasticJob - Distributed scheduled job solution
功能列表:
-
分布式調度協調
在分布式環境中,任務能夠按照指定的調度策略執?,并且能夠避免同?任務多實例重復執?。
-
豐富的調度策略:
基于成熟的定時任務作業框架Quartz cron表達式執?定時任務。
-
彈性拓容縮容
當集群中增加?個實例,它應當能夠被選舉被執?任務;當集群減少?個實例時,他所執?的任務能被轉移到別的示例中執?。
-
失效轉移
某示例在任務執?失敗后,會被轉移到其他實例執?。
-
錯過執?任務重觸發
若因某種原因導致作業錯過執?,?動記錄錯誤執?的作業,并在下次次作業完成后?動觸發。
-
?持并?調度
?持任務分?,任務分?是指將?個任務分成多個?任務在多個實例同時執?。
-
作業分??致性
當任務被分?后,保證同?分?在分布式環境中僅?個執?實例。
-
?持作業?命周期操作
可以動態對任務進?開啟及停?操作。
-
豐富的作業類型
?持Simple、DataFlow、Script三種作業類型
系統運行架構圖
二、Zookeeper下載
建議:zookeeper3.4.6以上版本,JDK1.7以上,maven在3.0.4以上
我這里將Zookeeper安裝到了Linux系統上,以我自己安裝為例,可自行選擇。
1.上傳,將zookeeper-3.4.11.tar.gz
上傳到/usr/local/src/soft/zookeeper
目錄下
2.解壓文件到指定目錄
tar -zxvf /usr/local/src/soft/zookeeper-3.4.11.tar.gz -C /usr/local/src/soft/zookeeper
3.拷貝配置文件
cp /usr/local/src/soft/zookeeper/zookeeper-3.4.11/conf/zoo_sample.cfg /usr/local/src/soft/zookeeper/zookeeper-3.4.11/conf/zoo.cfg
4.啟動
/usr/local/src/soft/zookeeper/apache-zookeeper-3.5.6-bin/bin/zkServer.sh start
5.檢查進程是否開啟
這個命令不一定有效
jps
可以試試這個命令,查看狀態
/usr/local/src/soft/zookeeper/apache-zookeeper-3.5.6-bin/bin/zkServer.sh status
三、任務分片參數
1、分片的概念
作業分片是指任務的分布式執行,需要將一個任務拆分為多個獨立的任務項,然后由分布式的應用實例分別執行某一個或者幾個分布項。
例如:Elastic-job快速入門中文件備份的案例,現有兩臺服務器,每臺服務器分別跑一個應用實例。為了快速執行作業,那么可以講任務分成4片,每個應用實例都執行兩片。作業遍歷數據邏輯應為:實例1查找text和image類型文件執行備份,實例2査找radio和vedio類型文件執行備份。如果由于服務器拓容應用實例數量增加為4,則作業諞歷數據的邏輯應為: 4個實例分別處理text,image,radio,video類型的文件。
可以看到,通過對任務的合理分片化,從而達到任務并行處理的效果.
-
當只有一臺機器時,給定時任務分片四個,在機器中啟動四個線程,分別處理四個分片的內容
-
當只有兩臺機器時,分片由兩臺機器進行分配,A負責索引為0,1的內容,B負責索引為2,3的內容
-
三臺機器,如圖
-
四臺機器,平均分攤
集群之后,可以分攤CPU的處理壓力,提高數據處理的速度,那到底幾臺機器好呢?
分片數建議是機器個數的倍數
在秒殺項目中,我們將秒殺商品的場次分成了10、12、14三個場次。在這里我們就根據場次將其分成三片
2、分片分配機制
ElasticJob的分片分配機制
-
Zookeeper 協調:ElasticJob 通過 Zookeeper 協調任務實例的分片分配。每個任務實例啟動時,會向 Zookeeper 注冊自己,并獲取分配給自己的分片信息。
-
動態分片分配:如果任務實例的數量發生變化(比如新增或減少實例),Zookeeper 會重新分配分片,確保每個分片都有任務實例處理。
-
分片參數傳遞:分片參數通過
ShardingContext
傳遞給任務實例,任務實例根據分片參數執行對應的邏輯。
ElasticJob 通過 Zookeeper 協調,將分片分配給不同的任務實例。每個任務實例啟動時,會從 Zookeeper 獲取分配給自己的分片信息(分片編號和分片參數)。
三、項目集成
1、依賴添加
<dependency><groupId>com.dangdang</groupId><artifactId>elastic-job-lite-spring</artifactId><version>2.1.5</version>
</dependency>
<!--zookeeper客戶端-->
<dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>2.10.0</version>
</dependency>
2、分布式調度配置
1)注冊中心配置
獲取zk的地址和任務名稱,將任務注冊到zk注冊中心中
@Configuration
public class RegistryCenterConfig {@Bean(initMethod = "init")public CoordinatorRegistryCenter createRegistryCenter(@Value("${elasticjob.zookeeper-url}") String zookeeperUrl, @Value("${elasticjob.group-name}") String groupName) {//zk的配置ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration(zookeeperUrl,groupName);//設置zk超時時間zookeeperConfiguration.setSessionTimeoutMilliseconds(100);//創建注冊中心CoordinatorRegistryCenter zookeeperRegistryCenter = new ZookeeperRegistryCenter(zookeeperConfiguration);return zookeeperRegistryCenter;}
}
2)分布式調度參數配置
#分布式定時任務配置
elasticjob:zookeeper-url: 192.168.88.130:2181group-name: shop-job-groupjobCron:#3分鐘執行一次seckillProduct: 0 0/3 * * * ?
3)定時任務配置
需要使用定時任務的服務可能不止一個,不同的定時任務,表達式和分片參數、個數都不一樣
將不同的定時任務創建不同的LiteJobConfiguration對象,指定不同的參數類型,將其創建為一個Bean,交給spring容器管理
@Configuration
public class BusinessJobConfig {
@Bean(initMethod = "init")public SpringJobScheduler initSPJob(CoordinatorRegistryCenter registryCenter, SeckillProductJob seckillProductJob){LiteJobConfiguration jobConfiguration = ElasticJobUtil.createJobConfiguration(seckillProductJob.getClass(),seckillProductJob.getCron(),//任務類的cron表達式seckillProductJob.getShardingTotalCount(), //分片個數seckillProductJob.getShardingParameters(), //分片參數seckillProductJob.isDataflowType());//不是dataflow類型SpringJobScheduler springJobScheduler = new SpringJobScheduler(seckillProductJob, registryCenter,jobConfiguration );return springJobScheduler;}
}
4)分布式調度工具類
主要是用于創建LiteJobConfiguration對象,為不同的定時任務定義不同的配置類型
-
指定定時任務類
-
任務類的cron表達式
-
分片個數
-
分片參數
-
是否為dataflow類型
public class ElasticJobUtil {public static LiteJobConfiguration createJobConfiguration(final Class<? extends SimpleJob> jobClass,final String cron,final int shardingTotalCount,
final String shardingItemParameters,boolean dataflowType) {// 定義作業核心配置JobCoreConfiguration.Builder jobCoreConfigurationBuilder = JobCoreConfiguration.newBuilder(jobClass.getSimpleName(), cron, shardingTotalCount);if(!StringUtils.isEmpty(shardingItemParameters)){jobCoreConfigurationBuilder.shardingItemParameters(shardingItemParameters);}JobTypeConfiguration jobConfig = null;if(dataflowType){jobConfig = new DataflowJobConfiguration(jobCoreConfigurationBuilder.build(),jobClass.getCanonicalName(),true);}else {// 定義SIMPLE類型配置jobConfig = new SimpleJobConfiguration(jobCoreConfigurationBuilder.build(), jobClass.getCanonicalName());}// 定義Lite作業根配置LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(jobConfig).overwrite(true).build();return simpleJobRootConfig;}public static LiteJobConfiguration createDefaultSimpleJobConfiguration(final Class<? extends SimpleJob> jobClass, final String cron) {// 創建默認的SIMPLE類型作業配置return createJobConfiguration(jobClass,cron,1,null,false);}public static LiteJobConfiguration createDefaultDataFlowJobConfiguration(final Class<? extends SimpleJob> jobClass, final String cron) {// 創建默認的DataFlow類型作業配置return createJobConfiguration(jobClass,cron,1,null,true);}
}
5)任務分片參數配置
-
刪除之前的數據
-
查詢當天的數據同步Redis,定時任務每天執行一次
-
給定時任務分片處理,分三片
給定時任務做分片處理:0=10,1=12,2=14(第10場秒殺任務,第12場.........)
jobSharding:seckillProduct:shardingParameters: 0=10,1=12,2=14shardingTotalCount: 3dataflowType: false
3、定時任務類
初始化秒殺商品定時任務
@Data
@RefreshScope
@Component
public class SeckillProductJob implements SimpleJob {//表達式@Value("${elasticjob.jobCron.seckillProduct}")private String cron;//分片參數@Value("${jobSharding.seckillProduct.shardingParameters}")private String shardingParameters;//分片個數@Value("${jobSharding.seckillProduct.shardingTotalCount}")private int shardingTotalCount;@Value("${jobSharding.seckillProduct.dataflowType}")private boolean dataflowType;@Resourceprivate SeckillProductFeignApi seckillProductFeignApi;@Autowiredprivate StringRedisTemplate redisTemplate;@Overridepublic void execute(ShardingContext shardingContext) {String time = shardingContext.getShardingParameter();//遠程調用商品服務獲取秒殺列表集合Result<List<SeckillProductVo>> result = seckillProductFeignApi.queryByTime(Integer.valueOf(time));if (result==null||result.hasError()) {//通知管理員return;}List<SeckillProductVo> seckillProductVoList = result.getData();//獲取秒殺商品key-seckillProductHash:10String key = JobRedisKey.SECKILL_PRODUCT_HASH.getRealKey(time);//刪除之前的緩存redisTemplate.delete(key);HashMap<String, String> seckillProductMap = new HashMap<>();//存儲集合數據到Redis中for (SeckillProductVo vo : seckillProductVoList) {seckillProductMap.put(vo.getId().toString(), JSON.toJSONString(vo));}redisTemplate.opsForHash().putAll(key, seckillProductMap);System.out.println("分布式商品秒殺任務執行...............");}
}
秒殺列表緩存成功
4、任務分片處理邏輯
分片參數設置為 0=10,1=12,2=14
,表示分片0處理時間參數為10的任務,分片1處理時間參數為12的任務,分片2處理時間參數為14的任務。ElasticJob 會根據分片參數將任務分片,并將每個分片分配給不同的任務實例執行。
-
分片總數(ShardingTotalCount):表示任務的總分片數。比如你有3場秒殺活動,可以將分片總數設置為3,每個分片處理一場秒殺活動。
-
分片參數(ShardingParameters):可以為每個分片指定參數,比如分片0處理第一場秒殺,分片1處理第二場秒殺,分片2處理第三場秒殺。
ElasticJob 通過 Zookeeper 協調,將分片分配給不同的任務實例。每個任務實例啟動時,會從 Zookeeper 獲取分配給自己的分片信息(分片編號和分片參數)。
今天的分享結束,感興趣的兄弟請點贊、收藏,關注我不迷路!下期再見!