一: Hadoop Streaming詳解
1、Streaming的作用
Hadoop Streaming框架,最大的好處是,讓任何語言編寫的map, reduce程序能夠在hadoop集群上運行;map/reduce程序只要遵循從標準輸入stdin讀,寫出到標準輸出stdout即可
其次,容易進行單機調試,通過管道前后相接的方式就可以模擬streaming, 在本地完成map/reduce程序的調試
# cat inputfile | mapper | sort | reducer > output
最后,streaming框架還提供了作業提交時的豐富參數控制,直接通過streaming參數,而不需要使用java語言修改;很多mapreduce的高階功能,都可以通過steaming參數的調整來完成
?
?2、Streaming的局限
Streaming默認只能處理文本數據Textfile,對于二進制數據,比較好的方法是將二進制的key, value進行base64編碼,轉化為文本
Mapper和reducer的前后都要進行標準輸入和標準輸出的轉化,涉及數據拷貝和解析,帶來了一定的開銷
?
3、Streaming命令的相關參數? ? (普通選項、streaming選項)
Streaming命令的形式如下:
#? /usr/local/src/hadoop-1.2.1/bin/hadoop jar ?hadoop-streaming.jar \
? ?[普通選項]? [Streaming選項]????? ???# ?注意:普通選項一定要寫在streaming選項前面
?
普通選項
參數 | 可選/必選 | 解釋 |
-conf? 配置文件 | 可選 | 指定一個應用程序配置文件 |
-fs? host:port or local | 可選 | 指定一個namenode |
-jt?? host:port? or local | 可選 | 指定一個jobtracker |
-files 文件1,文件2, ? -files? hdfs://192.168.179.100:9000/file1.txt, hdfs://192.168.179.100:9000/file2.txt ? 將代替-cacheFile選項 | 可選 | 類似-file, 不同的 1)將HDFS中的多個文件進行分發 ? 2)文件已經位于HDFS上 ? 3)框架會在該作業attemps目錄內創建一個符號鏈接,指向該作業的jar目錄(放置所有分發文件) |
-archives ? 框架會在作業的attempt目錄創建符號鏈接,指向作業的jar目錄,jar目錄中才是分發到本地的壓縮文件 ? -archives hdfs://host:fs_port/user/testfile.tgz#tgzdir testfile.tgz是用戶上傳到HDFS的打包壓縮文件 #后的tgzdir是別名,hadoop-1.2.1中必須要別名 | 可選 | 逗號分隔的多個壓縮文件,已經位于HDFS上 ? 框架自動分發壓縮文件到計算節點,并且Inputformat會自動進行解壓 |
-D?? property=value | 可選 | 重點,很多屬性通過-D指定 |
?
插曲1: mapred-site.xml 指定map的slot,reduce的slot
Map和reduce在datanode上的運行,會受到slot的限制,并且有各自的slot限制; 每個Datanode讀取相應的配置文件, 從而確定每個datanode上能運行的最大map,reduce個數,以及節點能力是否充分發揮
Hadoop1.0中,slot在mapred-site.xml中配置(mapreduce作業前配置好), 基本上每個slot在運行1個map, reduce作業后會占用1個CPU core,? ?最激進的做法是設置map和reduce的slot都是CPU core-1 (Map執行完后才會進行reduce),? 預留1個CPU core給tasktracker(比如上報心跳等),? 但通常reducer的slot要比reducer少,考慮大多數情況下mapper要比reducer多
默認map的slot為2,reduce的slot也為2
<configuration>
??????? <property>
??????????????? <name>mapred.job.tracker</name>
??????????????? <value>http://192.168.179.100:9001</value>
??????? </property>
??????? <property>
?????????????? <name>mapred.tasktracker.map.tasks.maximum</name>
?????????????? <value>15</value>
??????? </property>
??????? <property>
?????????????? <name>mapreduce.tasktracker.tasks.reduce.maximum</name>
?????????????? <value>10</value>
??????? </property>
</configuration>
?
插曲二: mapred-site.xml 指定map最終輸出的merge文件的存放路徑
<configuration>
??????? <property>
??????????????? <name>mapred.job.tracker</name>
??????????????? <value>http://192.168.179.100:9001</value>
??????? </property>
??????? <property>
?????????????? <name>mapred.local.dir</name>
?????????????? <value>/usr/loca/src/hadoop-1.2.1/tmp/mapoutput</value>
??????? </property>
</configuration>
?
當1個作業被提交并在tasktracer的管理下開始運行時,會對每個job創建1個目錄,所有分發的文件,都放置在這里
${mapred.local.dir}/taskTracker/$user/jobcache/$jobid/jars/
?
普通選項中的-D? property=value
-D??? 普通選項,使用最多的高級參數,替代-jobconf(參數將被廢棄),需要注意的是 -D選項要放在streaming參數的前面,一般我會放在參數的開頭
類別 | ? | ? | ? |
? 指定目錄 | -D? dfs.data.dir=/tmp | 修改本地臨時目錄 | ? |
-D? mapred.local.dir=/tmp/local -D? mapred.system.dir=/tmp/system -D? mapred.tmp.dir=/tmp/tmp | 指定額外的本地臨時目錄 | ? | |
指定作業名 | -D? mapred.job.name=”Test001” | ? | ? |
指定只有map的作業 | -D? mapred.reduce.tasks=0 | 該作業只有mapper, mapper的輸出直接作為作業的輸出 | ? |
指定reducer個數 | -D? mapred.reduce.tasks=2 | ? | ? |
指定mapper個數 | -D? mapred.map.tasks=2 | 指定了不一定生效輸入文件為壓縮文件時,mapper和壓縮文件個數一一對應, | 輸入數據為壓縮文件時,mapper和文件個數一一對應,比較好的控制Mapper數量的方法 |
指定Mapper輸出的key,value分隔符 | -D stream.map.output.field.separator=. -D stream.num.map.output.key.fields=4 | Mapper的輸出使用.做分割符,并且第4個.之前的部分作為key, 剩余的部分作為value (包含剩余的.) 如果mapper的輸出沒有4個., 則整體一行作為key, value為空 | 默認: 使用 \t做分隔符,第1個\t之前的部分作為key, 剩余為value, 如果mapper輸出沒有\t,則整體一行作為key,value為空 |
指定reducer輸出的value, key分隔符 | -D stream.reduce.output.field.seperator=. -D stream.num.reduce.output.key.fields=4 | 指定reduce輸出根據.分割,直到第4個.之前的內容為key,其他為value | Reducer程序要根據指定進行key,value的構造 |
不常用 | -D stream.map.input.field.seperator | Inputformat如何分行,默認\n | ? |
不常用 | -D stream.reduce.input.field.seperator | ? | ? |
作業優先級 | -D? mapred.job.priority=HIGH | VERY_LOW, LOW, NORMAL, HIGH, VERY_HIGH | ? |
最多同時運行的map任務數 | -D mapred.job.map.capacity=5 | ? | ? |
最多同時運行的reduce任務數 | -D mapred.job.reduce.capacity=3 | ? | ? |
Task沒有響應(輸入輸出)的最大時間 | -D mapred.task.timeout=6000 | 毫秒 | 超時后,該task被終止 |
Map的輸出是否壓縮 | -D mapred.compress.map.output=True | ? | ? |
Map的輸出的壓縮方式 | -D mapred.map.output.comression.codec= | ? | ? |
Reduce的輸出是否壓縮 | -D mapred.output.compress=True | ? | ? |
Reducer的輸出的壓縮方式 | -D mapred.output.compression.codec= | ? | ? |
?
-D 指定job名稱
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \
-D mapred.job.name=”Test001”
-D 指定reduce任務、map任務個數
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \-D mapred.job.name=”Teset001”
-D mapred.reduce.tasks=2 # reduce task個數,一定生效
-D mapred.map.tasks=5 # map task個數,不一定生效
-D 指定mapper的輸出分隔符
-D stream.map.output.field.seperator=. # 指定mapper每條輸出key,value分隔符 -D stream.num.map.output.key.fields=4 # 第4個.之前的部分為key,剩余為value -D map.output.key.field.separator=. # 設置map輸出中,Key內部的分隔符
?
-D 指定基于哪些key進行分桶
基于指定的Key進行分桶,打標簽
指定列數
-D num.key.fields.for.partition=1 # 只用1列Key做分桶
-D num.key.fields.for.partition=2 # 使用1,2共兩列key做分桶
指定某些字段做key
-D mapred.text.key.partitioner.option =-k1,2 # 第1,2列Key做分桶
-D mapred.text.key.partitioner.option =-k2,2 # 第2列key做分桶
都要修改partition為能夠只基于某些Key進行分桶的類
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner
-D 指定將reducer的輸出進行壓縮
-D mapred.output.compress=true-D mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec
-D 指定將mapper的輸出進行壓縮
-D mapred.compress.map.output=true-D mapred.map.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec
?
-D 指定Comparator對key進行數字、倒序排序
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \-D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \ # 使用keyFieldBasedComparator進行key排序-D stream.map.output.field.separator=. \-D stream.num.map.output.key.fields=4 \-D map.output.key.field.separator=. \-D mapred.text.key.comparator.options=-k2,2nr \# -k2,2只用第二列排序,n數字排序,r倒序(從大到小)-input myInputDirs \-output myOutputDir \-mapper org.apache.hadoop.mapred.lib.IdentityMapper \-reducer org.apache.hadoop.mapred.lib.IdentityReducer
?
-D 指定每個reduce task申請的內存數量
-D mapreduce.reduce.memory.mb=512 #單位為M
?
?
Streaming選項
參數 | 可選/必選 | 參數描述 |
-input <HDFS目錄或文件路徑> 支持*通配符,指定多個文件或目錄,多次-input,指定多個輸入文件/目錄 | 必選 | Mapper的輸入數據,文件要在任務提交前手動上傳到HDFS |
-output <HDFS目錄> # 路徑不能已存在,否則認為是其他job的輸出 | 必選 | reducer輸出結果的HDFS存放路徑, ?不能已存在,但腳本中一定要配置 |
-mapper <可執行命令或java類> ? -mapper “python map.py” -mapper “bash map.sh” -mapper “perl map.perl” | 必選 | Mapper程序 |
-reducer <可執行命令或java類> ? -reducer “python reducer.py” -reducer “bash reducer.sh” -reducer “perl reducer.sh” | 可選 | Reducer程序,不需要reduce處理就不指定 |
-combiner <可執行命令或java類> ? -combiner “python map.py” -combiner “bash map.sh” -combiner “perl map.perl” | 可選 | 處理mapper輸出的combiner程序 |
-file <本地mapper、reducer程序文件、程序運行需要的其他文件> ? -file map.py -file reduce.py -file white_list | 可選??????????????????????????? 文件在本地,小文件 | 將本地文件分發給計算節點 ? 文件作為作業的一部分,一起被打包并提交,所有分發的文件最終會被放置在datanode該job的同一個專屬目錄下:jobcache/job_xxx/jar ? |
-cacheFile “hdfs://master:9000/cachefile_dir/white_list” | ? | 分發HDFS文件 ? Job運行需要的程序,輔助文件都先放到HDFS上,指定HDFS文件路徑,將HDFS文件拷貝到計算節點,也是都放置在job的同一個專屬目錄下: jobcache/job_xxx/jar |
-cacheArchive ? “hdfs://master:9000/w.tar.gz#WLDIR” | ? | 分發HDFS壓縮文件、壓縮文件內部具有目錄結構 ? ? |
-numReduceTasks ?<數字> ? -numReduceTasks? 2 | 可選 | 指定該任務的reducer個數 |
-inputformat? <Java類名> | 可選 | 指定自己定義的inputformat類,默認TextInputformat類 |
-outputformat? <Java類名> | 可選 | 指定自己定義的outputformat類,默認TextOutputformat類 |
-cmdenv? name=value | 可選 | 傳遞給streaming命令的環境變量 |
?
?
二、Mapper輸入/輸出,根據哪些key分桶,根據哪些key進行排序
?
先看看Hadoop-1.2.1 文檔原文中的解釋
As the mapper task runs, it converts its inputs into lines and feed the lines to the stdin of the process. In the meantime, the mapper collects the line oriented outputs from the stdout of the process and converts each line into a key/value pair, which is collected as the output of the mapper. By default, the?prefix of a line up to the first tab character?is the?key?and the rest of the line (excluding the tab character) will be the?value. If there is no tab character in the line, then entire line is considered as key and the value is null. However, this can be customized, as discussed later.
?
Mapper輸入:
每一個mapper開始運行時,輸入文件會被轉換成多行(TextInputformat根據\n來進行分行),并將每一行傳遞給stdin, 作為Mapper的輸入, mapper直接對stdin中的每行內容做處理
?
Mapper輸出分隔符:
默認情況下hadoop設置mapper輸出的key, value通過tab進行分隔,可以重新指定
-D stream.map.output.field.seperator=.? ? # 指定mapper每條輸出key,value分隔符
-D stream.num.map.output.key.fields=4? ?# 第4個.之前的部分為key,剩余為value
?
mapper的輸出會經歷:
1、 partition前,根據mapper輸出分隔符分離出Key和Value;
-D stream.map.output.field.separator=. ? ?# 指定mapper每條輸出key,value分隔符
-D stream.num.map.output.key.fields=4 ? # 第4個.之前的為key, 剩下的為value
-D? map.output.key.field.separator=. ? ? ? ?# 設置map輸出中,Key內部的分隔符
?
2、 根據 “分桶分隔符”,確定哪些key被用來做partition(默認是用所有key, 只有1列; 或者是Mapper輸出分隔符分離出的所有key都被用于Partition)
基于指定的Key進行分桶,打標簽
指定列數
-D num.key.fields.for.partition=1?? ?????# 只用1列Key做分桶,也就是第一列
-D num.key.fields.for.partition=2?????? # 使用1,2共兩列key做分桶(列數)
?
指定某些字段做key
-D mapred.text.key.partitioner.option =-k1,2? ?# 第1,2列Key做分桶
-D mapred.text.key.partitioner.option =-k2,2?? # 第2列key做分桶
?
#都要修改partition為能夠只基于某些Key進行分桶的類
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner
?
3、Spill時根據Partition標簽和所有Key進行排序;
4、Partition標簽和key之間,也是通過mapper輸出分隔符來隔離
5、reducer前的文件會刪除partition標簽,并根據Mapper輸出分隔符確定出key, 進行Reducer前的歸并排序;(reducer前的歸并排序,基于所有mapper的key進行排序)
因此如果要定義新的Mapper輸出分隔符就要做到:1)mapper代碼中根據新分隔符來構建輸出到stdout的內容;2)提交作業時,通過—D 指定新的Mapper輸出分隔符,以及第幾個分隔符來分離Key
?
Reducer的輸入:
每個Reducer的每條輸入,就是去除Partition標簽(根據Mapper分隔符分離出partition標簽)后的內容,和Mapper輸出到stdout中的內容相同,但不同記錄之間已經做了排序;因此如果重新指定了Mapper的輸出分隔符,Reducer程序就要修改為根據新的Mapper輸出分隔符來分離Key,value;
?
Reducer的輸出:
Reducer的輸出,默認也是根據tab來分離key,value, 這也是reducer程序要根據tab來組合key,value輸出給stdout的原因; ?當Reducer輸出分隔符重新指定,Reducer程序中輸出給stdout的內容也要配合新的分隔符來構造(Reducer->stdout-> outputformat ->file, ?outputformat根據reducer的輸出分隔符來分離key,value, ?并寫入文件)
-D stream.reduce.output.field.seperator=.?? ???# reducer輸出key,value間的分隔符
-D stream.num.reduce.output.key.fields=4? ???# 第4個.之前的內容為key, 其他為value