視頻轉碼
視頻上傳成功后需要對視頻進行轉碼處理。
- 首先我們要分清文件格式和編碼格式:
- 文件格式:是指.mp4、.avi、.rmvb等這些不同擴展名的視頻文件的文件格式 ,視頻文件的內容主要包括視頻和音頻,其文件格式是按照一定的編碼格式去編碼,并且按照該文件所規定的封裝格式將視頻、音頻、字幕等信息封裝在一起,播放器會根據它們的封裝格式去提取出編碼,然后由播放器解碼,最終播放音視頻。
- 音視頻編碼格式:通過音視頻的壓縮技術,將視頻格式轉換成另一種視頻格式,通過視頻編碼實現流媒體的傳輸。比如:一個.avi的視頻文件原來的編碼是a,通過編碼后編碼格式變為b,音頻原來為c,通過編碼后變為d
- 音視頻編碼格式各類繁多,主要有幾下幾類:
- MPEG系列
(由ISO[國際標準組織機構]下屬的MPEG[運動圖象專家組]開發)視頻編碼方面主要是Mpeg1(vcd用的就是它)、Mpeg2(DVD使用)、Mpeg4(的DVDRIP使用的都是它的變種,如:divx,xvid等)、Mpeg4 AVC(正熱門);音頻編碼方面主要是MPEG Audio Layer 1/2、MPEG Audio Layer 3(大名鼎鼎的mp3)、MPEG-2 AAC 、MPEG-4 AAC等等。注意:DVD音頻沒有采用Mpeg的。
- H.26X系列
(由ITU[國際電傳視訊聯盟]主導,側重網絡傳輸,注意:只是視頻編碼)
包括H.261、H.262、H.263、H.263+、H.263++、H.264(就是MPEG4 AVC-合作的結晶)
目前最常用的編碼標準是視頻H.264,音頻AAC。
FFmpeg的基本使用
我們將視頻錄制完成后,使用視頻編碼軟件對視頻進行編碼,本項目使用FFmpeg對視頻進行編碼。
FFmpeg被許多開源項目采用,QQ影音、暴風影音、VLC等。
- 下載:FFmpeg Download FFmpeg
- 請從常用工具軟件目錄找到ffmpeg.exe,并將ffmpeg.exe加入環境變量path中。
- 測試是否正常:cmd運行 ffmpeg -version
- 安裝成功,作下簡單測試
- 官方文檔(英文):ffmpeg Documentation
- 進入視頻文件目錄, 執行如下命令:
# 將文件1.avi 轉成 mp4
ffmpeg.exe -i 1.avi 1.mp4# 將文件nacos.avi 轉成 mp3
ffmpeg -i nacos.avi nacos.mp3# 將文件nacos.avi 轉成 gif
ffmpeg -i nacos.avi nacos.gif
- 這里只是簡單的演示, 更多復雜的參數需要自己擴展, 搜索 "ffmpeg常用命令"
視頻處理工具類
將課程資料的工具類中的util拷貝至base工程。
其中Mp4VideoUtil類是用于將視頻轉為mp4格式,是我們項目要使用的工具類。
- 下邊看下這個類的代碼,并進行測試。
- 我們要通過ffmpeg對視頻轉碼,Java程序調用ffmpeg,使用java.lang.ProcessBuilder去完成,具體在Mp4VideoUtil類的63行,下邊進行簡單的測試,下邊的代碼運行本機安裝的QQ軟件。
public static void main(String[] args) throws IOException {ProcessBuilder builder = new ProcessBuilder();builder.command("D:\\UserApp\\QQ\\Bin\\QQ.exe");//將標準輸入流和錯誤輸入流合并,通過標準輸入流程讀取信息builder.redirectErrorStream(true);Process p = builder.start();}
- 對Mp4VideoUtil類需要學習使用方法,下邊代碼將一個avi視頻轉為mp4視頻,如下
public static void main(String[] args) throws IOException {//ffmpeg的路徑String ffmpeg_path = "D:\\UserDatas\\Note\\16Project\\xue-cheng-zai-xian\\ffmpeg\\ffmpeg.exe";//ffmpeg的安裝位置//源avi視頻的路徑String video_path = "D:\\UserDatas\\Note\\16Project\\xue-cheng-zai-xian\\test.avi";//轉換后mp4文件的名稱String mp4_name = "test.mp4";//轉換后mp4文件的路徑String mp4_path = "D:\\UserDatas\\Note\\16Project\\xue-cheng-zai-xian\\test.mp4";//創建工具類對象Mp4VideoUtil videoUtil = new Mp4VideoUtil(ffmpeg_path, video_path, mp4_name, mp4_path);//開始視頻轉換,成功將返回successString s = videoUtil.generateMp4();System.out.println(s);}
- 成功返回sucess字符串, 失敗返回失敗原因
分布式任務處理
對一個視頻的轉碼可以理解為一個任務的執行,如果視頻的數量比較多,如何去高效處理一批任務呢?
- 多線程
多線程是充分利用單機的資源。
- 分布式加多線程
充分利用多臺計算機,每臺計算機使用多線程處理。
什么是分布式任務調度?
我們可以先思考一下下面業務場景的解決方案:
- 每隔24小時執行數據備份任務。
- 12306網站會根據車次不同,設置幾個時間點分批次放票。
- 某財務系統需要在每天上午10點前結算前一天的賬單數據,統計匯總。
- 商品成功發貨后,需要向客戶發送短信提醒。
類似的場景還有很多,我們該如何實現?
- 多線程方式實現:
- 學過多線程的同學,可能會想到,我們可以開啟一個線程,每sleep一段時間,就去檢查是否已到預期執行時間。
- 以下代碼簡單實現了任務調度的功能:
public static void main(String[] args) { //任務執行間隔時間final long timeInterval = 1000;Runnable runnable = new Runnable() {public void run() {while (true) {//TODO:somethingtry {Thread.sleep(timeInterval);} catch (InterruptedException e) {e.printStackTrace();}}}};Thread thread = new Thread(runnable);thread.start();
}
- 上面的代碼實現了按一定的間隔時間執行任務調度的功能。
- Timer方式實現
- Jdk也為我們提供了相關支持,如Timer、ScheduledExecutor,下邊我們了解下。
public static void main(String[] args){ Timer timer = new Timer(); timer.schedule(new TimerTask(){@Override public void run() { //TODO:something} }, 1000, 2000); //1秒后開始調度,每2秒執行一次
}
- Timer 的優點在于簡單易用,每個Timer對應一個線程,因此可以同時啟動多個Timer并行執行多個任務,同一個Timer中的任務是串行執行
- ScheduledExecutor方式實現:
public static void main(String [] agrs){ScheduledExecutorService service = Executors.newScheduledThreadPool(10);service.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {//TODO:somethingSystem.out.println("todo something");}}, 1,2, TimeUnit.SECONDS);
}
- Java 5 推出了基于線程池設計的 ScheduledExecutor,其設計思想是,每一個被調度的任務都會由線程池中一個線程去執行,因此任務是并發執行的,相互之間不會受到干擾。
- Timer 和 ScheduledExecutor 都僅能提供基于開始時間與重復間隔的任務調度,不能勝任更加復雜的調度需求。比如,設置每月第一天凌晨1點執行任務、復雜調度任務的管理、任務間傳遞數據等等。
- 第三方Quartz方式實現
- 項目地址:https://github.com/quartz-scheduler/quartz
- Quartz 是一個功能強大的任務調度框架,它可以滿足更多更復雜的調度需求,Quartz 設計的核心類包括 Scheduler, Job 以及 Trigger。其中,Job 負責定義需要執行的任務,Trigger 負責設置調度策略,Scheduler 將二者組裝在一起,并觸發任務開始執行。Quartz支持簡單的按時間間隔調度、還支持按日歷調度方式,通過設置CronTrigger表達式(包括:秒、分、時、日、月、周、年)進行任務調度。
- 下邊是一個例子代碼:
public static void main(String [] agrs) throws SchedulerException {//創建一個SchedulerSchedulerFactory schedulerFactory = new StdSchedulerFactory();Scheduler scheduler = schedulerFactory.getScheduler();//創建JobDetailJobBuilder jobDetailBuilder = JobBuilder.newJob(MyJob.class);jobDetailBuilder.withIdentity("jobName","jobGroupName");JobDetail jobDetail = jobDetailBuilder.build();//創建觸發的CronTrigger 支持按日歷調度CronTrigger trigger = TriggerBuilder.newTrigger().withIdentity("triggerName", "triggerGroupName").startNow().withSchedule(CronScheduleBuilder.cronSchedule("0/2 * * * * ?")).build();scheduler.scheduleJob(jobDetail,trigger);scheduler.start();
}public class MyJob implements Job {@Overridepublic void execute(JobExecutionContext jobExecutionContext){System.out.println("todo something");}
}
通過以上內容我們學習了什么是任務調度,任務調度所解決的問題,以及任務調度的多種實現方式。
任務調度顧名思義,就是對任務的調度,它是指系統為了完成特定業務,基于給定時間點,給定時間間隔或者給定執行次數自動執行任務。
什么是分布式任務調度?
通常任務調度的程序是集成在應用中的,比如:優惠卷服務中包括了定時發放優惠卷的的調度程序,結算服務中包括了定期生成報表的任務調度程序,由于采用分布式架構,一個服務往往會部署多個冗余實例來運行我們的業務,在這種分布式系統環境下運行任務調度,我們稱之為分布式任務調度,如下圖:
分布式調度要實現的目標:
不管是任務調度程序集成在應用程序中,還是單獨構建的任務調度系統,如果采用分布式調度任務的方式就相當于將任務調度程序分布式構建,這樣就可以具有分布式系統的特點,并且提高任務的調度處理能力:
1、并行任務調度
并行任務調度實現靠多線程,如果有大量任務需要調度,此時光靠多線程就會有瓶頸了,因為一臺計算機CPU的處理能力是有限的。
如果將任務調度程序分布式部署,每個結點還可以部署為集群,這樣就可以讓多臺計算機共同去完成任務調度,我們可以將任務分割為若干個分片,由不同的實例并行執行,來提高任務調度的處理效率。
2、高可用
若某一個實例宕機,不影響其他實例來執行任務。
3、彈性擴容
當集群中增加實例就可以提高并執行任務的處理效率。
4、任務管理與監測
對系統中存在的所有定時任務進行統一的管理及監測。讓開發人員及運維人員能夠時刻了解任務執行情況,從而做出快速的應急處理響應。
5、避免任務重復執行
當任務調度以集群方式部署,同一個任務調度可能會執行多次,比如在上面提到的電商系統中到點發優惠券的例子,就會發放多次優惠券,對公司造成很多損失,所以我們需要控制相同的任務在多個運行實例上只執行一次。
XXL-JOB
介紹
XXL-JOB是一個輕量級分布式任務調度平臺,其核心設計目標是開發迅速、學習簡單、輕量級、易擴展。現已開放源代碼并接入多家公司線上產品線,開箱即用。
- 官網:https://www.xuxueli.com/xxl-job/
- 文檔:https://www.xuxueli.com/xxl-job/#%E3%80%8A%E5%88%86%E5%B8%83%E5%BC%8F%E4%BB%BB%E5%8A%A1%E8%B0%83%E5%BA%A6%E5%B9%B3%E5%8F%B0XXL-JOB%E3%80%8B
核心概念
XXL-JOB主要有調度中心、執行器、任務:
- 調度中心:
負責管理調度信息,按照調度配置發出調度請求,自身不承擔業務代碼;
主要職責為執行器管理、任務管理、監控運維、日志管理等
- 任務執行器:
負責接收調度請求并執行任務邏輯;
主要職責是注冊服務、任務執行服務(接收到任務后會放入線程池中的任務隊列)、執行結果上報、日志服務等
- 任務:負責執行具體的業務處理。
調度中心與執行器之間的工作流程如下:
1.任務執行器根據配置的調度中心的地址,自動注冊到調度中心
2.達到任務觸發條件,調度中心下發任務
3.執行器基于線程池執行任務,并把執行結果放入內存隊列中、把執行日志寫入日志文件中
4.執行器消費內存隊列中的執行結果,主動上報給調度中心
5.當用戶在調度中心查看任務日志,調度中心請求任務執行器,任務執行器讀取任務日志文件并返回日志詳情
搭建XXL-JOB
調度中心
首先下載XXL-JOB
- GitHub:https://github.com/xuxueli/xxl-job
- 碼云:xxl-job: 一個分布式任務調度平臺,其核心設計目標是開發迅速、學習簡單、輕量級、易擴展。現已開放源代碼并接入多家公司線上產品線,開箱即用。
- 項目使用2.3.1版本: https://github.com/xuxueli/xxl-job/releases/tag/2.3.1
- 也可從課程資料目錄獲取,解壓xxl-job-2.3.1.zip
使用IDEA打開解壓后的目錄
- xxl-job-admin:調度中心
- xxl-job-core:公共依賴
- xxl-job-executor-samples:執行器Sample示例(選擇合適的版本執行器,可直接使用)
- xxl-job-executor-sample-springboot:Springboot版本,通過Springboot管理執行器,推薦這種方式;
- xxl-job-executor-sample-frameless:無框架版本;
- doc :文檔資料,包含數據庫腳本
在下發的虛擬機的MySQL中已經創建了xxl_job_2.3.1數據庫
執行sh /data/soft/restart.sh自動啟動xxl-job調度中心
- 訪問:http://192.168.101.65:8088/xxl-job-admin/
- 賬號和密碼:admin/123456
- 如果無法使用虛擬機運行xxl-job可以在本機idea運行xxl-job調度中心。
執行器
下邊配置執行器,執行器負責與調度中心通信接收調度中心發起的任務調度請求。
- 下邊進入調度中心添加執行器
點擊新增,填寫執行器信息,appname是前邊在nacos中配置xxl信息時指定的執行器的應用名。
- 首先在媒資管理模塊的service工程添加依賴,在項目的父工程已約定了版本2.3.1
<dependency><groupId>com.xuxueli</groupId><artifactId>xxl-job-core</artifactId>
</dependency>
- 在nacos下的media-service-dev.yaml下配置xxl-job
xxl:job:admin: addresses: http://192.168.101.65:8088/xxl-job-adminexecutor:appname: media-process-serviceaddress: ip: port: 9999logpath: /data/applogs/xxl-job/jobhandlerlogretentiondays: 30accessToken: default_token
注意配置中的appname這是執行器的應用名,port是執行器啟動的端口,如果本地啟動多個執行器注意端口不能重復。
- 配置xxl-job的執行器
將xxl-job示例工程下配置類拷貝到媒資管理的service工程下
- 到此完成媒資管理模塊service工程配置xxl-job執行器,在xxl-job調度中心添加執行器
- 下邊準備測試執行器與調度中心是否正常通信,因為接口工程依賴了service工程,所以啟動媒資管理模塊的接口工程。
- 啟動后觀察日志,出現下邊的日志表示執行器在調度中心注冊成功
- 同時觀察調度中心中的執行器界面, 在線機器地址處已顯示1個執行器。
執行任務
下邊編寫任務,參考示例工程中任務類的編寫方法,如下圖:
在媒資服務service包下新建jobhandler存放任務類,下邊參考示例工程編寫一個任務類
/*** @description 測試執行器* @author Mr.M* @date 2022/9/13 20:32* @version 1.0*/
@Component
@Slf4j
public class SampleXxlJob {/*** 1、簡單任務示例(Bean模式)*/@XxlJob("testJob")public void testJob() throws Exception {log.info("開始執行.....");}
}
下邊在調度中心添加任務,進入任務管理
- 調度類型:
- 固定速度指按固定的間隔定時調度。
- Cron,通過Cron表達式實現更豐富的定時調度策略。
- Cron表達式是一個字符串,通過它可以定義調度策略,格式如下:
- {秒數} {分鐘} {小時} {日期} {月份} {星期} {年份(可為空)}
- xxl-job提供圖形界面去配置
- 運行模式有BEAN和GLUE,
- bean模式較常用就是在項目工程中編寫執行器的任務代碼,
- GLUE是將任務代碼編寫在調度中心。
- JobHandler即任務方法名,填寫任務方法上邊@XxlJob注解中的名稱。
- 路由策略:當執行器集群部署時,調度中心向哪個執行器下發任務,這里選擇第一個表示只向第一個執行器下發任務,路由策略的其它選項稍后在分片廣播章節詳細解釋。
- 高級配置的其它配置項稍后在分片廣播章節詳細解釋。
添加成功, 啟動任務
通過調度日志查看任務執行情況
下邊啟動媒資管理的service工程,啟動執行器。觀察執行器方法的執行。
如果要停止任務需要在調度中心操作
任務跑一段時間注意清理日志
分片廣播
掌握了xxl-job的基本使用,下邊思考如何進行分布式任務處理呢?如下圖,我們會啟動多個執行器組成一個集群,去執行任務。
執行器在集群部署下調度中心有哪些路由策略呢?
查看xxl-job官方文檔,閱讀高級配置相關的內容:
高級配置:- 路由策略:當執行器集群部署時,提供豐富的路由策略,包括;FIRST(第一個):固定選擇第一個機器;LAST(最后一個):固定選擇最后一個機器;ROUND(輪詢):;RANDOM(隨機):隨機選擇在線的機器;CONSISTENT_HASH(一致性HASH):每個任務按照Hash算法固定選擇某一臺機器,且所有任務均勻散列在不同機器上。LEAST_FREQUENTLY_USED(最不經常使用):使用頻率最低的機器優先被選舉;LEAST_RECENTLY_USED(最近最久未使用):最久未使用的機器優先被選舉;FAILOVER(故障轉移):按照順序依次進行心跳檢測,第一個心跳檢測成功的機器選定為目標執行器并發起調度;BUSYOVER(忙碌轉移):按照順序依次進行空閑檢測,第一個空閑檢測成功的機器選定為目標執行器并發起調度;SHARDING_BROADCAST(分片廣播):廣播觸發對應集群中所有機器執行一次任務,同時系統自動傳遞分片參數;可根據分片參數開發分片任務;- 子任務:每個任務都擁有一個唯一的任務ID(任務ID可以從任務列表獲取),當本任務執行結束并且執行成功時,將會觸發子任務ID所對應的任務的一次主動調度,通過子任務可以實現一個任務執行完成去執行另一個任務。- 調度過期策略:- 忽略:調度過期后,忽略過期的任務,從當前時間開始重新計算下次觸發時間;- 立即執行一次:調度過期后,立即執行一次,并從當前時間開始重新計算下次觸發時間;- 阻塞處理策略:調度過于密集執行器來不及處理時的處理策略;單機串行(默認):調度請求進入單機執行器后,調度請求進入FIFO隊列并以串行方式運行;丟棄后續調度:調度請求進入單機執行器后,發現執行器存在運行的調度任務,本次請求將會被丟棄并標記為失敗;覆蓋之前調度:調度請求進入單機執行器后,發現執行器存在運行的調度任務,將會終止運行中的調度任務并清空隊列,然后運行本地調度任務;- 任務超時時間:支持自定義任務超時時間,任務運行超時將會主動中斷任務;- 失敗重試次數;支持自定義任務失敗重試次數,當任務失敗時將會按照預設的失敗重試次數主動進行重試;
下邊要重點說的是分片廣播策略,分片是指是調度中心以執行器為維度進行分片,將集群中的執行器標上序號:0,1,2,3...,廣播是指每次調度會向集群中的所有執行器發送任務調度,請求中攜帶分片參數。
- 每個執行器收到調度請求同時接收分片參數。
- xxl-job支持動態擴容執行器集群從而動態增加分片數量,當有任務量增加可以部署更多的執行器到集群中,調度中心會動態修改分片的數量。
作業分片適用哪些場景呢?
? 分片任務場景:10個執行器的集群來處理10w條數據,每臺機器只需要處理1w條數據,耗時降低10倍;
? 廣播任務場景:廣播執行器同時運行shell腳本、廣播集群節點進行緩存更新等。
所以,廣播分片方式不僅可以充分發揮每個執行器的能力,并且根據分片參數可以控制任務是否執行,最終靈活控制了執行器集群分布式處理任務。
使用說明:
"分片廣播" 和普通任務開發流程一致,不同之處在于可以獲取分片參數進行分片業務處理。
- Java語言任務獲取分片參數方式:
- BEAN、GLUE模式(Java),可參考Sample示例執行器中的示例任務"ShardingJobHandler":
- 定義作業分片的任務方法
/*** @author Mr.M* @version 1.0* @description 測試執行器* @date 2022/9/13 20:32*/
@Component
@Slf4j
public class SampleXxlJob {/*** 2、分片廣播任務*/@XxlJob("shardingJobHandler")public void shardingJobHandler() throws Exception {// 分片序號,從0開始int shardIndex = XxlJobHelper.getShardIndex();// 分片總數int shardTotal = XxlJobHelper.getShardTotal();log.info("分片參數:當前分片序號 = {}, 總分片數 = {}", shardIndex, shardTotal);}}
- 在調度中心添加任務
- 啟動任務,觀察日志
- 下邊啟動兩個執行器實例,觀察每個實例的執行情況
- 首先在nacos中配置media-service的本地優先配置:
- 將media-service啟動兩個實例, 兩個實例的在啟動時注意端口不能沖突:
- 實例1 在VM options處添加:-Dserver.port=63051 -Dxxl.job.executor.port=9998
- 實例2 在VM options處添加:-Dserver.port=63050 -Dxxl.job.executor.port=9999
- 觀察任務調度中心,稍等片刻執行器有兩個
- 觀察兩個執行實例的日志:
- 從日志可以看每個實例的分片序號不同。如果其中一個執行器掛掉,只剩下一個執行器在工作,稍等片刻調用中心發現少了一個執行器將動態調整總分片數為1。
技術方案
作業分片方案
掌握了xxl-job的分片廣播調度方式,下邊思考如何分布式去執行學成在線平臺中的視頻處理任務。
任務添加成功后,對于要處理的任務會添加到待處理任務表中,現在啟動多個執行器實例去查詢這些待處理任務,此時如何保證多個執行器不會查詢到重復的任務呢?
XXL-JOB并不直接提供數據處理的功能,它只會給執行器分配好分片序號,在向執行器任務調度的同時下發分片總數以及分片序號等參數,執行器收到這些參數根據自己的業務需求去利用這些參數。
下圖表示了多個執行器獲取視頻處理任務的結構:
每個執行器收到廣播任務有兩個參數:分片總數、分片序號。每個執行從數據表取任務時可以讓任務id 模上分片總數,如果等于分片序號則執行此任務。
上邊兩個執行器實例那么分片總數為2,序號為0、1,從任務1開始,如下:
1 % 2 = 1 執行器2執行
2 % 2 = 0 執行器1執行
3 % 2 = 1 執行器2執行
以此類推.
保證任務不重復執行
通過作業分片方案保證了執行器之間查詢到不重復的任務,如果一個執行器在處理一個視頻還沒有完成,此時調度中心又一次請求調度,為了不重復處理同一個視頻該怎么辦?
- 首先配置調度過期策略:
- 查看文檔如下:
- 調度過期策略:調度中心錯過調度時間的補償處理策略,包括:忽略、立即補償觸發一次等;
- 忽略:調度過期后,忽略過期的任務,從當前時間開始重新計算下次觸發時間;
- 立即執行一次:調度過期后,立即執行一次,并從當前時間開始重新計算下次觸發時間;
- 阻塞處理策略:調度過于密集執行器來不及處理時的處理策略;
- 這里我們選擇忽略,如果立即執行一次就可能重復執行相同的任務。
- 其次,再看阻塞處理策略,阻塞處理策略就是當前執行器正在執行任務還沒有結束時調度中心進行任務調度,此時該如何處理。
- 查看文檔如下:
--單機串行(默認):調度請求進入單機執行器后,調度請求進入FIFO隊列并以串行方式運行;
--丟棄后續調度:調度請求進入單機執行器后,發現執行器存在運行的調度任務,本次請求將會被丟棄并標記為失敗;
--覆蓋之前調度:調度請求進入單機執行器后,發現執行器存在運行的調度任務,將會終止運行中的調度任務并清空隊列,然后運行本地調度任務; - 這里如果選擇覆蓋之前調度則可能重復執行任務,這里選擇丟棄后續調度或單機串行方式來避免任務重復執行。
- 只做這些配置可以保證任務不會重復執行嗎?
- 做不到,還需要保證任務處理的冪等性,什么是任務的冪等性?任務的冪等性是指:對于數據的操作不論多少次,操作的結果始終是一致的。在本項目中要實現的是不論多少次任務調度同一個視頻只執行一次成功的轉碼。
- 什么是冪等性?它描述了一次和多次請求某一個資源對于資源本身應該具有同樣的結果。
- 冪等性是為了解決重復提交問題,比如:惡意刷單,重復支付等。
- 解決冪等性常用的方案:
- 1)數據庫約束,比如:唯一索引,主鍵。
- 2)樂觀鎖,常用于數據庫,更新數據時根據樂觀鎖狀態去更新。
- 3)唯一序列號,操作傳遞一個唯一序列號,操作時判斷與該序列號相等則執行。
- 基于以上分析,在執行器接收調度請求去執行視頻處理任務時要實現視頻處理的冪等性,要有辦法去判斷該視頻是否處理完成,如果正在處理中或處理完則不再處理。這里我們在數據庫視頻處理表中添加處理狀態字段,視頻處理完成更新狀態為完成,執行視頻處理前判斷狀態是否完成,如果完成則不再處理。
視頻處理方案
確定了分片方案,下邊梳理整個視頻上傳及處理的業務流程。
上傳視頻成功向視頻處理待處理表添加記錄。
視頻處理的詳細流程如下:
1、任務調度中心廣播作業分片。
2、執行器收到廣播作業分片,從數據庫讀取待處理任務,讀取未處理及處理失敗的任務。
3、執行器更新任務為處理中,根據任務內容從MinIO下載要處理的文件。
4、執行器啟動多線程去處理任務。
5、任務處理完成,上傳處理后的視頻到MinIO。
6、將更新任務處理結果,如果視頻處理完成除了更新任務處理結果以外還要將文件的訪問地址更新至任務處理表及文件表中,最后將任務完成記錄寫入歷史表。
查詢待處理任務
需求分析
查詢待處理任務只處理未提交及處理失敗的任務,任務處理失敗后進行重試,最多重試3次。
任務處理成功將待處理記錄移動到歷史任務表。
下圖是待處理任務表:
歷史任務表與待處理任務表的結構相同。
添加待處理任務
上傳視頻成功向視頻處理待處理表添加記錄,暫時只添加對avi視頻的處理記錄。
根據MIME Type去判斷是否是avi視頻,下邊列出部分MIME Type
avi視頻的MIME Type是video/x-msvideo
修改文件信息入庫方法,如下:
@Transactional
public MediaFiles addMediaFilesToDb(Long companyId, String fileMd5, UploadFileParamsDto uploadFileParamsDto, String bucket, String objectName) {//從數據庫查詢文件MediaFiles mediaFiles = mediaFilesMapper.selectById(fileMd5);if (mediaFiles == null) {... ...//添加到待處理任務表addWaitingTask(mediaFiles);log.debug("保存文件信息到數據庫成功,{}", mediaFiles.toString());}return mediaFiles;}/*** 添加待處理任務* @param mediaFiles 媒資文件信息*/
private void addWaitingTask(MediaFiles mediaFiles){//文件名稱String filename = mediaFiles.getFilename();//文件擴展名String exension = filename.substring(filename.lastIndexOf("."));//文件mimeTypeString mimeType = getMimeType(exension);//如果是avi視頻添加到視頻待處理表if(mimeType.equals("video/x-msvideo")){MediaProcess mediaProcess = new MediaProcess();BeanUtils.copyProperties(mediaFiles,mediaProcess);mediaProcess.setStatus("1");//未處理mediaProcess.setFailCount(0);//失敗次數默認為0mediaProcessMapper.insert(mediaProcess);}
}
查詢待處理任務
如何保證查詢到的待處理視頻記錄不重復?
編寫根據分片參數獲取待處理任務的DAO方法,定義DAO接口如下:
public interface MediaProcessMapper extends BaseMapper<MediaProcess> {/*** @description 根據分片參數獲取待處理任務* @param shardTotal 分片總數* @param shardindex 分片序號* @param count 任務數* @return java.util.List<com.xuecheng.media.model.po.MediaProcess>* @author Mr.M* @date 2022/9/14 8:54*/@Select("select * from media_process t where t.id % #{shardTotal} = #{shardIndex} and (t.status = '1' or t.status = '3') and t.fail_count < 3 limit #{count}")List<MediaProcess> selectListByShardIndex(@Param("shardTotal") int shardTotal, @Param("shardIndex") int shardIndex, @Param("count") int count);}
定義Service接口,查詢待處理
package com.xuecheng.media.service;/*** @author Mr.M* @version 1.0* @description 媒資文件處理業務方法* @date 2022/9/10 8:55*/
public interface MediaFileProcessService {/*** @param shardIndex 分片序號* @param shardTotal 分片總數* @param count 獲取記錄數* @return java.util.List<com.xuecheng.media.model.po.MediaProcess>* @description 獲取待處理任務* @author Mr.M* @date 2022/9/14 14:49*/public List<MediaProcess> getMediaProcessList(int shardIndex, int shardTotal, int count);}
service接口實現
package com.xuecheng.media.service.impl;/*** @author Mr.M* @version 1.0* @description TODO* @date 2022/9/14 14:41*/
@Slf4j
@Service
public class MediaFileProcessServiceImpl implements MediaFileProcessService {@AutowiredMediaFilesMapper mediaFilesMapper;@AutowiredMediaProcessMapper mediaProcessMapper;@Overridepublic List<MediaProcess> getMediaProcessList(int shardIndex, int shardTotal, int count) {List<MediaProcess> mediaProcesses = mediaProcessMapper.selectListByShardIndex(shardTotal, shardIndex, count);return mediaProcesses;}}
開始執行任務
分布式鎖
為了避免多線程去爭搶同一個任務可以使用synchronized同步鎖去解決,如下代碼:
synchronized(鎖對象){執行任務...
}
synchronized只能保證同一個虛擬機中多個線程去爭搶鎖。
如果是多個執行器分布式部署,并不能保證同一個視頻只有一個執行器去處理。
現在要實現分布式環境下所有虛擬機中的線程去同步執行就需要讓多個虛擬機去共用一個鎖,虛擬機可以分布式部署,鎖也可以分布式部署,如下圖:
虛擬機都去搶占同一個鎖,鎖是一個單獨的程序提供加鎖、解鎖服務。
該鎖已不屬于某個虛擬機,而是分布式部署,由多個虛擬機所共享,這種鎖叫分布式鎖。
現分布式鎖的方案有很多,常用的如下:
1、基于數據庫實現分布鎖
利用數據庫主鍵唯一性的特點,或利用數據庫唯一索引、行級鎖的特點,多個線程同時去更新相同的記錄,誰更新成功誰就搶到鎖。
2、基于redis實現鎖
redis提供了分布式鎖的實現方案,比如:SETNX、set nx、redisson等。
拿SETNX舉例說明,SETNX命令的工作過程是去set一個不存在的key,多個線程去設置同一個key只會有一個線程設置成功,設置成功的的線程拿到鎖。
3、使用zookeeper實現
zookeeper是一個分布式協調服務,主要解決分布式程序之間的同步的問題。zookeeper的結構類似的文件目錄,多線程向zookeeper創建一個子目錄(節點)只會有一個創建成功,利用此特點可以實現分布式鎖,誰創建該結點成功誰就獲得鎖。
開啟任務
下邊基于數據庫方式實現分布鎖,開始執行任務將任務執行狀態更新為4表示任務執行中。
- 下邊的sql語句可以實現更新操作:
update media_process m set m.status='4' where m.id=?
如果是多個線程去執行該sql都將會執行成功,但需求是只能有一個線程搶到鎖,所以此sql無法滿足需求。
- 下邊使用樂觀鎖的方式實現更新操作:
update media_process m set m.status='4' where (m.status='1' or m.status='3') and m.fail_count<3 and m.id=?
多個線程同時執行上邊的sql只會有一個線程執行成功。
- 什么是樂觀鎖、悲觀鎖?
- synchronized是一種悲觀鎖,在執行被synchronized包裹的代碼時需要首先獲取鎖,沒有拿到鎖則無法執行,是總悲觀的認為別的線程會去搶,所以要悲觀鎖。
- 樂觀鎖的思想是它不認為會有線程去爭搶,盡管去執行,如果沒有執行成功就再去重試。
定義mapper
package com.xuecheng.media.mapper;/*** <p>* Mapper 接口* </p>** @author itcast*/
public interface MediaProcessMapper extends BaseMapper<MediaProcess> {/*** 開啟一個任務* @param id 任務id* @return 更新記錄數*/@Update("update media_process m set m.status='4' where (m.status='1' or m.status='3') and m.fail_count<3 and m.id=#{id}")int startTask(@Param("id") long id);}
service方法
package com.xuecheng.media.service;/*** @author Mr.M* @version 1.0* @description 媒資文件處理業務方法* @date 2022/9/10 8:55*/
public interface MediaFileProcessService {/*** 開啟一個任務* @param id 任務id* @return true開啟任務成功,false開啟任務失敗*/public boolean startTask(long id);}
package com.xuecheng.media.service.impl;/*** @author Mr.M* @version 1.0* @description TODO* @date 2022/9/14 14:41*/
@Slf4j
@Service
public class MediaFileProcessServiceImpl implements MediaFileProcessService {@AutowiredMediaFilesMapper mediaFilesMapper;@AutowiredMediaProcessMapper mediaProcessMapper;//實現如下public boolean startTask(long id) {int result = mediaProcessMapper.startTask(id);return result<=0?false:true;}}
更新任務狀態
任務處理完成需要更新任務處理結果,任務執行成功更新視頻的URL、及任務處理結果,將待處理任務記錄刪除,同時向歷史任務表添加記錄。
在MediaFileProcessService接口添加方法
package com.xuecheng.media.service;/*** @author Mr.M* @version 1.0* @description 媒資文件處理業務方法* @date 2022/9/10 8:55*/
public interface MediaFileProcessService {/*** @description 保存任務結果* @param taskId 任務id* @param status 任務狀態* @param fileId 文件id* @param url url* @param errorMsg 錯誤信息* @return void* @author Mr.M* @date 2022/10/15 11:29*/void saveProcessFinishStatus(Long taskId,String status,String fileId,String url,String errorMsg);}
service接口方法實現如下:
package com.xuecheng.media.service.impl;/*** @author Mr.M* @version 1.0* @description TODO* @date 2022/9/14 14:41*/
@Slf4j
@Service
public class MediaFileProcessServiceImpl implements MediaFileProcessService {@AutowiredMediaFilesMapper mediaFilesMapper;@AutowiredMediaProcessMapper mediaProcessMapper;@AutowiredMediaProcessHistoryMapper mediaProcessHistoryMapper;@Transactional@Overridepublic void saveProcessFinishStatus(Long taskId, String status, String fileId, String url, String errorMsg) {//查出任務,如果不存在則直接返回MediaProcess mediaProcess = mediaProcessMapper.selectById(taskId);if(mediaProcess == null){return ;}//處理失敗,更新任務處理結果LambdaQueryWrapper<MediaProcess> queryWrapperById = new LambdaQueryWrapper<MediaProcess>().eq(MediaProcess::getId, taskId);//處理失敗if(status.equals("3")){MediaProcess mediaProcess_u = new MediaProcess();mediaProcess_u.setStatus("3");mediaProcess_u.setErrormsg(errorMsg);mediaProcess_u.setFailCount(mediaProcess.getFailCount()+1);mediaProcessMapper.update(mediaProcess_u,queryWrapperById);log.debug("更新任務處理狀態為失敗,任務信息:{}",mediaProcess_u);return ;}//任務處理成功MediaFiles mediaFiles = mediaFilesMapper.selectById(fileId);if(mediaFiles!=null){//更新媒資文件中的訪問urlmediaFiles.setUrl(url);mediaFilesMapper.updateById(mediaFiles);}//處理成功,更新url和狀態mediaProcess.setUrl(url);mediaProcess.setStatus("2");mediaProcess.setFinishDate(LocalDateTime.now());mediaProcessMapper.updateById(mediaProcess);//添加到歷史記錄MediaProcessHistory mediaProcessHistory = new MediaProcessHistory();BeanUtils.copyProperties(mediaProcess, mediaProcessHistory);mediaProcessHistoryMapper.insert(mediaProcessHistory);//刪除mediaProcessmediaProcessMapper.deleteById(mediaProcess.getId());}}
視頻處理
視頻采用并發處理,每個視頻使用一個線程去處理,每次處理的視頻數量不要超過cpu核心數。
所有視頻處理完成結束本次執行,為防止代碼異常出現無限期等待則添加超時設置,到達超時時間還沒有處理完成仍結束任務。
定義任務類VideoTask 如下:
package com.xuecheng.media.jobhandler;/*** 視頻處理任務類** @author Mr.M* @version 1.0* @description TODO* @date 2022/10/15 11:58*/
@Slf4j
@Component
public class VideoTask {@AutowiredMediaFileProcessService mediaFileProcessService;@AutowiredMediaFileService mediaFileService;@Value("${videoprocess.ffmpegpath}")private String ffmpeg_path;@XxlJob("videoJobHandler")public void videoJobHandler() throws Exception {// 分片參數int shardIndex = XxlJobHelper.getShardIndex(); // 執行器序號,從0開始int shardTotal = XxlJobHelper.getShardTotal(); // 執行器總數//取出cpu核心數作為一次處理數據的條數int processors = Runtime.getRuntime().availableProcessors();//查詢待處理的任務List<MediaProcess> mediaProcessList = mediaFileProcessService.getMediaProcessList(shardIndex, shardTotal, processors);//實際查到的任務數int size = mediaProcessList.size();log.debug("取到視頻處理任務數:" + size);if (size <= 0) {return;}//創建線程池ExecutorService executorService = Executors.newFixedThreadPool(size);// 使用的計數器CountDownLatch countDownLatch = new CountDownLatch(size);mediaProcessList.forEach(mediaProcess -> {// 將任務加入線程池executorService.execute(() -> {try {Long taskId = mediaProcess.getId(); // 任務id// 開啟任務boolean b = mediaFileProcessService.startTask(taskId);if (!b) {log.debug("搶占任務失敗, 任務id: {}", taskId);return;}// 準備參數String bucket = mediaProcess.getBucket(); //桶String filePath = mediaProcess.getFilePath(); //存儲路徑String fileId = mediaProcess.getFileId(); //原始視頻的md5值//將要處理的文件下載到服務器上File originalFile = mediaFileService.downloadFileFromMinIO(bucket, filePath);if (originalFile == null) {log.debug("下載待處理文件失敗,originalFile:{}", mediaProcess.getBucket().concat(mediaProcess.getFilePath()));mediaFileProcessService.saveProcessFinishStatus(mediaProcess.getId(), "3", fileId, null, "下載待處理文件失敗");return;}//處理結束的視頻文件File mp4File = null;try {mp4File = File.createTempFile("mp4", ".mp4");} catch (IOException e) {log.error("創建mp4臨時文件失敗");mediaFileProcessService.saveProcessFinishStatus(mediaProcess.getId(), "3", fileId, null, "創建mp4臨時文件失敗");return;}//視頻處理結果String result = "";try {//開始處理視頻Mp4VideoUtil videoUtil = new Mp4VideoUtil(ffmpeg_path, originalFile.getAbsolutePath(), mp4File.getName(), mp4File.getAbsolutePath());//開始視頻轉換,成功將返回successresult = videoUtil.generateMp4();} catch (Exception e) {e.printStackTrace();log.error("處理視頻文件:{},出錯:{}", mediaProcess.getFilePath(), e.getMessage());}if (!result.equals("success")) {//記錄錯誤信息log.error("處理視頻失敗,視頻地址:{},錯誤信息:{}", bucket + filePath, result);mediaFileProcessService.saveProcessFinishStatus(mediaProcess.getId(), "3", fileId, null, result);return;}//將mp4上傳至minio//mp4在minio的存儲路徑String objectName = getFilePath(fileId, ".mp4");//訪問urlString url = "/" + bucket + "/" + objectName;try {mediaFileService.addMediaFilesToMinIO(mp4File.getAbsolutePath(), "video/mp4", bucket, objectName);//將url存儲至數據,并更新狀態為成功,并將待處理視頻記錄刪除存入歷史mediaFileProcessService.saveProcessFinishStatus(mediaProcess.getId(), "2", fileId, url, null);} catch (Exception e) {log.error("上傳視頻失敗或入庫失敗,視頻地址:{},錯誤信息:{}", bucket + objectName, e.getMessage());//最終還是失敗了mediaFileProcessService.saveProcessFinishStatus(mediaProcess.getId(), "3", fileId, null, "處理后視頻上傳或入庫失敗");}} finally {// 計數器減1countDownLatch.countDown();}});});//等待,給一個充裕的超時時間,防止無限等待,到達超時時間還沒有處理完成則結束任務countDownLatch.await(30, TimeUnit.MINUTES);}private String getFilePath(String fileMd5, String fileExt) {return fileMd5.substring(0, 1) + "/" + fileMd5.substring(1, 2) + "/" + fileMd5 + "/" + fileMd5 + fileExt;}}
測試
基本測試
進入xxl-job調度中心添加執行器和視頻處理任務
- 添加執行器
- 視頻處理任務
- 在xxl-job配置任務調度策略:
-
- 1)配置阻塞處理策略為:丟棄后續調度。
- 2)配置視頻處理調度時間間隔不用根據視頻處理時間去確定,可以配置的小一些,如:5分鐘,即使到達調度時間如果視頻沒有處理完會丟棄調度請求。
- 配置完成開始測試視頻處理:
- 首先上傳至少4個視頻,非mp4格式。
- 在xxl-job啟動視頻處理任務
- 觀察媒資管理服務后臺日志
失敗測試
1、先停止調度中心的視頻處理任務。
2、上傳視頻,手動修改待處理任務表中file_path字段為一個不存在的文件地址
3、啟動任務
觀察任務處理失敗后是否會重試,并記錄失敗次數。
搶占任務測試
1、修改調度中心中視頻處理任務的阻塞處理策略為“覆蓋之間的調度”
2、在搶占任務代碼處打斷點并選擇支持多線程方式
3、在搶占任務代碼處的下邊兩行代碼分別打上斷點,避免觀察時代碼繼續執行。
4、啟動任務
此時多個線程執行都停留在斷點處
依次放行,觀察同一個任務只會被一個線程搶占成功。