2019獨角獸企業重金招聘Python工程師標準>>>
提交作業并監控
JobClient是用戶作業與JobTracker交互的主要接口,它提供了提交作業,跟蹤作業進度、訪問任務報告及logs、以及獲取MR集群狀態信息等方法。
提交作業流程包括:
- 檢查作業的輸入輸出
- 計算作業的輸入分片(InputSplit)
- 如果需要,為DistributedCache設置必須的賬戶信息
- 將作業用到的jar包文件和配置信息拷貝至文件系統(一般為HDFS)上的MR系統路徑中
- 提交作業到JobTracker,并可監控作業狀態
作業歷史(Job History)文件會記錄在hadoop.job.history.user.location指定的位置,默認在作業輸出路徑下的logs/history/路徑下。因此歷史日志默認在mapred.output.dir/logs/history下。用戶可以將hadoop.job.history.user.location值設置為none來不記錄作業歷史。
使用命令來查看歷史日志:
1 | $hadoop job -history output-dir |
上面命令會顯示作業的詳細信息、失敗的被kill的任務(tip)的詳細信息。使用下面命令可以查看作業更詳細的信息:
1 | $hadoop job -history all output-dir |
可以使用OutputLogFilter從輸出路徑中過濾日志文件。
一般,我們創建應用,通過JobConf設置作業的各種屬性,然后使用JobClient提交作業并監控進度。
作業控制
有時可能需要一個作業鏈完成復雜的任務。這點是可以輕松實現的,因為作業輸出一般都在分布式文件系統上,作業輸出可以當做下個作業的輸入,這樣就形成了鏈式作業。
這種作業成功是否依賴于客戶端。客戶端可以使用以下方式來控制作業的執行:
- runJob(JobConf):提交作業并僅在作業完成時返回
- submitJob(JobConf):提交作業后立即返回一個RunningJob的引用,使用它可以查詢作業狀態并處理調度邏輯。
- JobConf.setJobEndNotificationURI(String):設置作業完成時通知
你也可以使用Oozie來實現復雜的作業鏈。
作業輸入
下面講作業輸入的內容。
InputFormat?描述MR作業的輸入信息。InputFormat有以下作用:
- 驗證作業的輸入信息
- 將輸入文件拆分為邏輯上的輸入分片(InputSplit),每個輸入分片被分配到一個獨立的Mapper
- 提供RecordReader實現類從輸入分片中搜集輸入記錄供Mapper處理
基于文件的InputFormat實現類即FileInputFormal是通過計算輸入文件的總大小(以字節為單位)來分裂成邏輯分片的。然而文件系統的塊大小又作為輸入分片大小的上限,下限可以通過mapred.min.split.size來設定。
基于輸入大小進行邏輯分片機制在很多情況下是不適合的,還需要注意記錄邊界。在這些情況下,應用應該實現RecordReader來處理記錄邊界,為獨立的mapper任務提供面向行的邏輯分片。
TextInputFormat是默認的輸入格式。
如果作業輸入格式為TextInputFormat,MR框架可以檢測以.gz擴展名的輸入文件并自動使用合適的壓縮算法來解壓文件。特別注意的是,.gz格式的壓縮文件不能被分片,每個文件作為一個輸入分片被一個mapper處理。
輸入分片(InputSplit)
InputSplit的數據會被一個獨立的mapper來處理。一般輸入分片是對輸入基于面向字節為單位的。可以使用RecordReader來處理提供面向行的輸入分片。
FileSplit是默認的InputSplit,通過設置map.input.file設置輸入文件路徑。
RecordReader
RecordReader從InputSplit讀入數據對。RecordReader將InputSplit提供得面向字節的的輸入轉換為面向行的分片給Mapper實現類來處理。RecordReader的責任就是處理行邊界并以kv方式將數據傳給mapper作業。
作業輸出
OutFormat描述作業的輸出信息。MR框架使用OutputFormat來處理:
- 驗證作業的輸出信息。例如驗證輸出路徑是否存在.
- 提供RecordWriter實現來寫作業的輸出文件。輸出文件存儲在文件系統中。TextOutputFormat是默認的輸出格式。
OutputCommitter
OutputCommitter是MR作業的輸出提交對象。MR框架使用它來處理:
- 初始化時準備作業。例如,作業初始化時創建臨時輸出路徑。作業準備階段通過一個獨立的任務在作業的PREP狀態時完成,然后初始化作業的所有任務。一旦準備階段完成,作業狀態切換到Running狀態。
- 作業完成后清理作業。例如,在作業完成后清理輸出臨時路徑。作業清理階段通過一個獨立的任務在作業最后完成。作業在清理階段完成后設置為成功/失敗/中止。
- 設置任務的臨時輸出。任務準備階段在任務初始化時作為同一個任務的一部分。
- 檢查任務是否需要一個提交對象。目的是為了在任務不需要時避免一個提交過程。
- 提交任務輸出。一旦任務完成,如果需要的話,任務將提交其輸出物。
- 銷毀任務的提交。如果任務失敗或者被中止,輸出將被清理。如果不能被清理(如異常的塊),與該任務相同的任務id會啟動并處理清理。
FileOutputCommitter是默認的OutputCommiter實現。作業準備及清理任務占用同一個TaskTracker上空閑的map或者reduce槽。作業清理任務、任務清理任務以及作業準備任務按照該順序擁有最高的優先級。
任務副作用文件
在一些應用中,任務需要創建和/或寫入文件,這些文件不同于實際的作業輸出文件。
在這種情況下,相同的2個Mapper或者Reducer有可能同時運行(比如推測執行的任務),并在文件系統上嘗試打開和或者寫入相同文件或路徑。所以你必須為每個任務的執行使用唯一名字(比如使用attapid,attempt2007092218120001m000000_0)。
為了避免這樣的情況,當OutputCommiter為FileOutputCommiter時,MR框架通過${mapred.work.output.dir}為每個任務執行嘗試維護一個特殊的路徑:${mapred.output.dir}/temporary/${taskid},這個路徑用來存儲任務執行的輸出。在任務執行成功完成時,${mapred.output.dir}/temporary/${taskid} 下的文件被移到${mapred.output.dir}下。當然框架丟棄不成功的任務執行的子路徑。這個進程對用戶應用透明。
MR應用開發者可以利用這個特性在執行過程中通過FileOutputFormat.getWorkOutputPath()在${mapred.work.output.dir} 下創建需要的site files,框架會像成功的任務試執行那樣處理,這樣就避免為每個試執行任務取唯一名字。
注意:特殊試執行任務執行過程中${mapred.work.output.dir} 值實際是${mapred.output.dir}/temporary/${taskid},該值被框架定制。所以直接在FileOutputFormat.getWorkOutputPath()返回的路徑中創建site-files,來使用這個特性。
對于只有mapper的作業,side-files將直接進入hdfs。
RecordWriter
RecordWriter將輸出的kv值寫入輸出文件。RecordWriter實現類將作業輸出寫入文件系統。
其他特性
提交作業到隊列
用戶提交的作業到隊列中,隊列是一個作業的集合,允許MR提供一些特定功能。隊列使用ACL控制哪些用戶提交作業。隊列一般和Hadoop Scheduler調度器一起使用。
Hadoop 安裝后默認配置了名稱為default的隊列,這個隊列是必須的。隊列名稱可以在hadoop配置文件中mapred.queue.names屬性里配置。一些作業調度器比如Capacity Scheduler支持多個隊列。
作業可以通過mapred.job.queue.name或者通過setQueueName(Stirng)設置隊列名稱,這是可選的。如果作業名稱沒有設置隊列名稱,則提交到default的隊列中。
計數器Counters
計數器用來描述MR中所有的計數,可以是MR框架定義也可以是應用提供。每個計數器可以是任何Enum類型。一組特定的Enum被聚合成一個組即為Counters.Group.
應用可以定義Enum類型的計數器,并可以通過Reporter.incrCounter(Enum,long或者Reporter.incrCounter(String,String,long在map和或者reduce中更新這個計數器的值。然后這些計數器統一被框架聚合。
DistributedCache
DistributedCache可以高效地將應用明細、大的只讀文件發布。DistributedCache是MR框架提供的緩存文件(如文本、壓縮包、jar包等)的高效工具。
應用需要在JobConf里使用hdfs://指定地址進行緩存。這些文件必須在文件系統中已經存在。
MR作業的任務在某個節點上執行前,MR框架會拷貝必需的文件到這個節點。特別說明的是,作業必需的所有文件只需要拷貝一次,支持壓縮包,并在各個節點上解壓,有助于提高MR執行效率。
DistributedCache會跟蹤所有緩存文件的修改時間戳。作業執行過程中應用或者外部不應該修改被緩存的文件。
通常,DistributedCache被用來緩存簡單、只讀的數據或者文本文件以及像壓縮包/jar包等復雜文件。壓縮包比如zip\tar\tgz\tar.gz文件到節點上被解壓。所有文件都有執行權限。
屬性 mapred.cache.{files|archives}配置的文件和包等可以被發布,多個文件可以使用“,”來分隔。也可以調用 DistributedCache.addCacheFile(URI,conf)或者DistributedCache.addCacheArchive(URI,conf) ,DistributedCache.setCacheFiles(URIs,conf)/DistributedCache.setCacheArchives(URIs,conf) 設置,其中URI為hdfs://host:port/absolute-path#link-name形式。如果使用streaming方式,可以通過 -cacheFile/-cacheArchive來設置。
另外,用戶也可以通過DistributedCache.createSymlink(Configuration)?API 將分布式緩存的文件使用符號鏈接到當前工作路徑。或者也可以通過設置mapred.create.symlink 為yes。這樣分布式緩存將URI的最末部分作為符號鏈接。例如hdfs://namenode:port/lib.so.1#lib.so 的符號鏈接為lib.so.
分布式緩存也可以用作在map,reduce階段中基本的軟件分布式處理機制,可以用來分布式處理jar包或者本地庫。DistributedCache.addArchiveToClassPath(Path, Configuration)或者DistributedCache.addFileToClassPath(Path, Configuration)用來將需要緩存的文件或者jar包加入到作業運行的子jvm的classpath中。也可以通過mapred.job.classpath.{files|archives}來設置。同樣這些分布式緩存的文件或jar包也可以使用符號鏈接到當前工作路徑,當然也支持本地庫并可被作業在執行中加載。
工具類(Tool)
Tool接口支持常用的Hadoop命令行參數的處理操作。它是MR工具或者應用的標準工具。應用應該通過ToolRunner.run(Tool,Stirng[])將標準命令行的參數使用GenericOptionsParser來處理。只處理應用的自定義參數。
Hadoop中命令行里通用的參數有:
1 2 3 4 | -conf -D -fs -jt |
IsolationRunner
IsolationRunner是MR應用的調試工具。
要使用它,需要先設置keep.failed.task.files = true,另外請注意屬性keep.tasks.files.pattern。
下一步,到失敗作業的某個失敗節點上,cd到TT的本地路徑并運行IsolationRunner。
1 2 | $ cd /taskTracker/${taskid}/work $ hadoop org.apache.hadoop.mapred.IsolationRunner ../job.xml |
IsolationRunner 會再單獨的jvm中對相同的輸入上運行失敗的任務并可以debug。
性能分析(Profiling)
Profiling可以用來對作業的map,reduce任務使用java內置的性能分析工具進行采樣,得出具有代表性(2個或3個試樣)的采樣結果。
用戶可以通過設置mapred.task.profile屬性指定框架是否對作業的任務收集性能信息。也可以通過API JobConf.setProfileEnabled(boolean)來指定。如果值為true,任務性能分析就啟用了。采樣結果保存在用戶日志目錄中,默認性能采樣不開啟。
一旦需要性能分析,可以通過屬性mapred.task.profile.{maps|reduces} 設置采樣任務的范圍。使用JobConf.setProfileTaskRange(boolean,String)一樣可以設置。默認的range是0-2.
可以設置mapred.task.profile.params確定分析參數,api方式為 JobConf.setProfileParams(String)。如果參數中含有“%s”占位符,MR框架會使用采樣結果輸出文件名替代它。這些參數被傳到子JVM.默認值為-agentlib:hprof=cpu=samples,heap=sites,force=n,thread=y,verbose=n,file=%s.
調試工具(debugging)
MR框架支持用戶自定義腳本調試工具。例如當MR任務失敗時,用戶可以運行debug腳本處理任務日志。腳本可以訪問任務的輸出和錯誤日志,系統日志以及作業配置信息。調試腳本輸出和錯誤流作為作業輸出一部分顯示到控制臺診斷信息中。
下面講討論如何為作業提交調試腳本。當然腳本需要分發并提交到Hadoop中MR中。
如何分發腳本文件
使用DistributedCache分發并且為腳本文件建立符號鏈接。
如何提交腳本
一種便捷的途徑是通過設置mapred.map.task.debug.script和mapred.reduce.task.debug.script分別實現對map和reduce指定調試腳本。也可以通過API方式,分別是JobConf.setMapDebugScript(String)和JobConf.setReduceDebugScript(String)。如果是streaming方式,可以通過命令行里指定-mapdebug 和-reducedebug實現。
腳本的參數是任務的輸出流、錯誤流、系統日志以及job配置文件。調試模式下,以下命令在任務失敗的節點上運行:
1 | $script $stdout $stderr $syslog $jobconf |
Pipes程序在命令形式上以第五個參數傳入。如:
1 | $script $stdout $stderr $syslog $jobconf $program |
默認行為
對pipes,使用默認的腳本以gdb方式處理核心轉儲(core dumps),打印棧軌跡,并給出正在運行線程的信息。
JobControl
JobControl可以對一組MR作業和他們的依賴環境進行集中管理。
數據壓縮
Hadoop Mapreduce可以對中間結果(map輸出)和作業最終輸出以指定壓縮方式壓縮。通常都會啟用中間結果的壓縮,因為可以顯著提高作業運行效率并且不需要對作業本身進行改變。只有MR shuffle階段生成的臨時數據文件被壓縮(最終結果可能或者不被壓縮)。
Hadoop支持zlib,gzip,snappy壓縮算法。推薦Snappy,因為其壓縮和解壓相比其他算法要高效。
因為性能和java庫不可用原因,Hadoop也提供本地的zlib和gzip實現。詳情可以參考這個文檔
中間輸出物
應用可以通過JobConf.setCOmpressMapOutput(boolean)設置是否對中間結果進行壓縮,使用JobConf.setMapOutputCompressorClass(Class)設置壓縮算法。
作業輸出
通過FileOutputFormat.setCompressOutput(JobConf,boolean)設置是否壓縮最終產出物,通過FileOutputFormat.setOutputComressorClass(JobConf,Class)設置壓縮方式。
如果作業輸出以SequenceFileOutputFormat格式存儲。則可以通過SequenceFileOutputFormat.setOutputCompressionType(Jobconf,SequenceFile.CompressionType)API 設定壓縮方式.壓縮方式有RECORD/BLOCK默認是RECORD。
跳過損壞的記錄
Hadoop提供了一個選項,在MR處理map階段時跳過被損壞的輸入記錄。應用可以通過SkipBadRecords類使用這個特性。
作業處理時可能對確定的輸入集上map任務會失敗。通常是map函數存在bug,這時需要fix這些bug。但有時卻無法解決這種特殊情況。比如這個bug可能是第三方庫導致的。這時這些任務在經過若干嘗試后仍然無法成功完成,作業失敗。這時跳過這些記錄集,對作業最終結果影響不大,仍然可以接受(比如對非常大的數據進行統計時)。
默認該特性沒有開啟。可以通過SkipBadRecords.setMapperMaxSkipRecords(Configuration,long)和SkipBadRecords.setReducerMaxSkipGroups(Configuration,long)開啟。
開啟后,框架對一定數目的map失敗使用skipping 模式。更多信息可以查看SkipBadRecords.setAttemptsToStartSkipping(Configuration, int)。skipping模式中,map任務維護一個正被處理的記錄范圍值。這個過程框架通過被處理的記錄行數計數器實現。可以了解SkipBadRecords.COUNTERMAPPROCESSED_RECORDS和SkipBadRecords.COUNTERREDUCEPROCESSED_GROUPS。這個計數器讓框架知道多少行記錄被成功處理了,哪些范圍的記錄導致了map的失敗,當然這些壞記錄就被跳過了。
被跳過的記錄數目依賴應用被處理的記錄計數器統計頻率。建議當每行記錄被處理時就增加該計數器。但是這個在一些應用中無法實現。這時框架可能也跳過了壞記錄附近的記錄。用戶可以通過SkipBadRecords.setMapperMaxSkipRecords(Configuration,long)和SkipBadRecords.setReducerMaxSkipGroups(Configuration,long)控制被跳過的記錄數。框架會使用類似二分搜索的方式努力去減小被跳過的記錄范圍。被跳過的范圍會分為2部分,并對其中的一部分執行。一旦失敗了,框架可以知道這部分含有損壞的記錄。任務將重新執行直到遇到可接受的數據或者所有嘗試機會用完。如果要提高任務嘗試次數,可以通過JobConf.setMaxMapAttampts(int)和JobConf.setMaxReduceAttempts(int)設置。
被跳過的記錄隨后會以sequence file格式寫入hdfs以便于后面可能的分析。路徑可以通過SkipBadRecords.setSkipOutputPath(JobConf,Path)設定。
?
http://www.importnew.com/4736.html