為什么80%的碼農都做不了架構師?>>> ??
一、前言
? ? Elastic-Job是一個優秀的分布式作業調度框架。
????Elastic-Job是一個分布式調度解決方案,由兩個相互獨立的子項目Elastic-Job-Lite和Elastic-Job-Cloud組成。
????Elastic-Job-Lite定位為輕量級無中心化解決方案,使用jar包的形式提供分布式任務的協調服務。
????Elastic-Job-Cloud使用Mesos + Docker的解決方案,額外提供資源治理、應用分發以及進程隔離等服務。
1. Elastic-Job-Lite
-
分布式調度協調
-
彈性擴容縮容
-
失效轉移
-
錯過執行作業重觸發
-
作業分片一致性,保證同一分片在分布式環境中僅一個執行實例
-
自診斷并修復分布式不穩定造成的問題
-
支持并行調度
-
支持作業生命周期操作
-
豐富的作業類型
-
Spring整合以及命名空間提供
-
運維平臺
2. Elastic-Job-Cloud
-
應用自動分發
-
基于Fenzo的彈性資源分配
-
分布式調度協調
-
彈性擴容縮容
-
失效轉移
-
錯過執行作業重觸發
-
作業分片一致性,保證同一分片在分布式環境中僅一個執行實例
-
支持并行調度
-
支持作業生命周期操作
-
豐富的作業類型
-
Spring整合
-
運維平臺
-
基于Docker的進程隔離(TBD)
二、導讀
? ? 1、Elastic-Job的核心思想
? ? 2、Elastic-Job的基本使用
三、Elastic-Job的核心思想
? ? 對于分布式計算而言,分片是最基本的思想,Elastic-Job也是沿用了這個思想,每個job跑部分數據,所有job執行完成,便是全量數據,官網給出的SimpleJob例子如下:
public class MyElasticJob implements SimpleJob {@Overridepublic void execute(ShardingContext context) {switch (context.getShardingItem()) {case 0: // do something by sharding item 0break;case 1: // do something by sharding item 1break;case 2: // do something by sharding item 2break;// case n: ...}}
}
? ? 用switch case循環來對應分片的業務邏輯,case分片的index,進入業務邏輯執行。當然這里也有不適應的場景,類似于MapReduce需要shuffle的場景就不適合了,比方說,要根據某一個字段全局分組聚合求結果,這時候怎么分片都可能會不合理,因為每個分片只能處理N分之一的數據,沒辦法shuffle再聚合,這一點,也要根據具體的業務來使用。
? ?那么ShardingContext可以拿到那些信息呢?源碼如下
????
public final class ShardingContext {/*** 作業名稱.*/private final String jobName;/*** 作業任務ID.*/private final String taskId;/*** 分片總數.*/private final int shardingTotalCount;/*** 作業自定義參數.* 可以配置多個相同的作業, 但是用不同的參數作為不同的調度實例.*/private final String jobParameter;/*** 分配于本作業實例的分片項.*/private final int shardingItem;/*** 分配于本作業實例的分片參數.*/private final String shardingParameter;public ShardingContext(final ShardingContexts shardingContexts, final int shardingItem) {jobName = shardingContexts.getJobName();taskId = shardingContexts.getTaskId();shardingTotalCount = shardingContexts.getShardingTotalCount();jobParameter = shardingContexts.getJobParameter();this.shardingItem = shardingItem;shardingParameter = shardingContexts.getShardingItemParameters().get(shardingItem);}
}
? ? 以上代碼,jobParameter和shardingItem是最有用的參數,shardingItem決定switch case循環的走向,shardingParameter可以用業務的查詢條件,也可以用字符串拼接的方式組裝很復雜的參數用于特定的業務。
四、Elastic-Job的基本使用
? ? 1、Job配置項
public class ElasticJobConfig {private static CoordinatorRegistryCenter createRegistryCenter() {ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration("127.0.0.1:2181", "elastic-job");CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(zookeeperConfiguration);regCenter.init();return regCenter;}private static LiteJobConfiguration createJobConfiguration() {JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration.newBuilder("jobdemo", "0/5 * * * * ?", 3).shardingItemParameters("0=A,1=A,2=B").failover(true).misfire(true).build();SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig,MyElasticJob.class.getCanonicalName());LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).overwrite(true).build();return simpleJobRootConfig;}public static void main(String[] args) {new JobScheduler(createRegistryCenter(), createJobConfiguration()).init();}
}
? ? 幾點說明:
? ? 注冊中心配置項,設置zookeeper集群地址,我這里用的本地單節點,所以只有一個,當然可以配置任務名稱,命名空間(namespace,本質上會在zk里生成一個目錄),超時時間,最大重試次數等等
????LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).overwrite(true).build()中,overwrite參數非常重要,設置這個參數為true,修改過job配置信息才會覆蓋zookeeper里的數據,要不然不會生效。
? ? 2、SimpleJob的實現
public class MyElasticJob implements SimpleJob {@Overridepublic void execute(ShardingContext shardingContext) {switch (shardingContext.getShardingItem()) {case 0: {System.out.println("當前分片:" + shardingContext.getShardingItem() + "=====" + "參數:"+ shardingContext.getShardingParameter() + " =====" + Thread.currentThread());break;}case 1: {System.out.println("當前分片:" + shardingContext.getShardingItem() + "=====" + "參數:"+ shardingContext.getShardingParameter() + " =====" + Thread.currentThread());break;}case 2: {System.out.println("當前分片:" + shardingContext.getShardingItem() + "=====" + "參數:"+ shardingContext.getShardingParameter() + " =====" + Thread.currentThread());break;}default: {System.out.println("當前分片:" + shardingContext.getShardingItem() + "=====" + "參數:"+ shardingContext.getShardingParameter() + " =====" + Thread.currentThread());break;}}}
}
? ? 上面設置每5秒鐘執行一次,執行ElasticJobConfig的main方法,執行結果如下:
????
? ? 從上面的結果,可以看出,執行每個分片的任務,其實是放到一個線程池去執行的,對應的分片信息和參數信息在shardingContext可以拿到,實現業務非常方便。
? ? 最后,如果啟動多個JVM,那么這些任務就分散到各個節點里,如果一個節點宕機,下次觸發任務時,將把該分片任務丟到健康機器執行,這里做到了節點容錯。但是某個分片的任務在執行過程中失敗了,那么這里是不會重新觸發改分片任務的執行的。
?
?
????