Hadoop Streaming詳解

一: Hadoop Streaming詳解

1、Streaming的作用

Hadoop Streaming框架,最大的好處是,讓任何語言編寫的map, reduce程序能夠在hadoop集群上運行;map/reduce程序只要遵循從標準輸入stdin讀,寫出到標準輸出stdout即可

其次,容易進行單機調試,通過管道前后相接的方式就可以模擬streaming, 在本地完成map/reduce程序的調試

# cat inputfile | mapper | sort | reducer > output

最后,streaming框架還提供了作業提交時的豐富參數控制,直接通過streaming參數,而不需要使用java語言修改;很多mapreduce的高階功能,都可以通過steaming參數的調整來完成

?

?2、Streaming的局限

Streaming默認只能處理文本數據Textfile,對于二進制數據,比較好的方法是將二進制的key, value進行base64編碼,轉化為文本

Mapper和reducer的前后都要進行標準輸入和標準輸出的轉化,涉及數據拷貝和解析,帶來了一定的開銷

?

3、Streaming命令的相關參數? ? (普通選項、streaming選項)

Streaming命令的形式如下:

#? /usr/local/src/hadoop-1.2.1/bin/hadoop jar ?hadoop-streaming.jar \

? ?[普通選項]? [Streaming選項]????? ???# ?注意:普通選項一定要寫在streaming選項前面

?

普通選項

參數

可選/必選

解釋

-conf? 配置文件

可選

指定一個應用程序配置文件

-fs? host:port or local

可選

指定一個namenode

-jt?? host:port? or local

可選

指定一個jobtracker

-files 文件1,文件2,

?

-files? hdfs://192.168.179.100:9000/file1.txt,

hdfs://192.168.179.100:9000/file2.txt

?

將代替-cacheFile選項

可選

類似-file, 不同的

1)將HDFS中的多個文件進行分發

?

2)文件已經位于HDFS上

?

3)框架會在該作業attemps目錄內創建一個符號鏈接,指向該作業的jar目錄(放置所有分發文件)

-archives

?

框架會在作業的attempt目錄創建符號鏈接,指向作業的jar目錄,jar目錄中才是分發到本地的壓縮文件

?

-archives hdfs://host:fs_port/user/testfile.tgz#tgzdir

testfile.tgz是用戶上傳到HDFS的打包壓縮文件

#后的tgzdir是別名,hadoop-1.2.1中必須要別名

可選

逗號分隔的多個壓縮文件,已經位于HDFS上

?

框架自動分發壓縮文件到計算節點,并且Inputformat會自動進行解壓

-D?? property=value

可選

重點,很多屬性通過-D指定

?

插曲1: mapred-site.xml 指定mapslotreduceslot

Map和reduce在datanode上的運行,會受到slot的限制,并且有各自的slot限制; 每個Datanode讀取相應的配置文件, 從而確定每個datanode上能運行的最大map,reduce個數,以及節點能力是否充分發揮

Hadoop1.0中,slot在mapred-site.xml中配(mapreduce作業前配置好), 基本上每個slot在運行1個map, reduce作業后會占用1個CPU core,? ?最激進的做法是設置map和reduce的slot都是CPU core-1 (Map執行完后才會進行reduce),? 預留1個CPU core給tasktracker(比如上報心跳等),? 但通常reducer的slot要比reducer少,考慮大多數情況下mapper要比reducer多

默認map的slot為2,reduce的slot也為2

<configuration>

??????? <property>

??????????????? <name>mapred.job.tracker</name>

??????????????? <value>http://192.168.179.100:9001</value>

??????? </property>

??????? <property>

?????????????? <name>mapred.tasktracker.map.tasks.maximum</name>

?????????????? <value>15</value>

??????? </property>

??????? <property>

?????????????? <name>mapreduce.tasktracker.tasks.reduce.maximum</name>

?????????????? <value>10</value>

??????? </property>

</configuration>

?

插曲二: mapred-site.xml 指定map最終輸出的merge文件的存放路徑

<configuration>

??????? <property>

??????????????? <name>mapred.job.tracker</name>

??????????????? <value>http://192.168.179.100:9001</value>

??????? </property>

??????? <property>

?????????????? <name>mapred.local.dir</name>

?????????????? <value>/usr/loca/src/hadoop-1.2.1/tmp/mapoutput</value>

??????? </property>

</configuration>

?

當1個作業被提交并在tasktracer的管理下開始運行時,會對每個job創建1個目錄,所有分發的文件,都放置在這里

${mapred.local.dir}/taskTracker/$user/jobcache/$jobid/jars/

?

普通選項中的-D? property=value
-D??? 普通選項,使用最多的高級參數,替代-jobconf(參數將被廢棄),需要注意的是 -D選項要放在streaming參數的前面,一般我會放在參數的開頭

類別

?

?

?

?

指定目錄

-D? dfs.data.dir=/tmp

修改本地臨時目錄

?

-D? mapred.local.dir=/tmp/local

-D? mapred.system.dir=/tmp/system

-D? mapred.tmp.dir=/tmp/tmp

指定額外的本地臨時目錄

?

指定作業名

-D? mapred.job.name=”Test001”

?

?

指定只有map的作業

-D? mapred.reduce.tasks=0

該作業只有mapper, mapper的輸出直接作為作業的輸出

?

指定reducer個數

-D? mapred.reduce.tasks=2

?

?

指定mapper個數

-D? mapred.map.tasks=2

指定了不一定生效輸入文件為壓縮文件時,mapper和壓縮文件個數一一對應,

輸入數據為壓縮文件時,mapper和文件個數一一對應,比較好的控制Mapper數量的方法

指定Mapper輸出的key,value分隔符

-D stream.map.output.field.separator=.

-D stream.num.map.output.key.fields=4

Mapper的輸出使用.做分割符,并且第4個.之前的部分作為key, 剩余的部分作為value (包含剩余的.)

如果mapper的輸出沒有4個., 則整體一行作為key, value為空

默認:

使用

\t做分隔符,第1個\t之前的部分作為key, 剩余為value, 如果mapper輸出沒有\t,則整體一行作為key,value為空

指定reducer輸出的value, key分隔符

-D stream.reduce.output.field.seperator=.

-D stream.num.reduce.output.key.fields=4

指定reduce輸出根據.分割,直到第4個.之前的內容為key,其他為value

Reducer程序要根據指定進行key,value的構造

不常用

-D stream.map.input.field.seperator

Inputformat如何分行,默認\n

?

不常用

-D stream.reduce.input.field.seperator

?

?

作業優先級

-D? mapred.job.priority=HIGH

VERY_LOW, LOW, NORMAL, HIGH, VERY_HIGH

?

最多同時運行的map任務數

-D mapred.job.map.capacity=5

?

?

最多同時運行的reduce任務數

-D mapred.job.reduce.capacity=3

?

?

Task沒有響應(輸入輸出)的最大時間

-D mapred.task.timeout=6000

毫秒

超時后,該task被終止

Map的輸出是否壓縮

-D mapred.compress.map.output=True

?

?

Map的輸出的壓縮方式

-D mapred.map.output.comression.codec=

?

?

Reduce的輸出是否壓縮

-D mapred.output.compress=True

?

?

Reducer的輸出的壓縮方式

-D mapred.output.compression.codec=

?

?

?

-D 指定job名稱
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \
-D mapred.job.name=”Test001”
-D 指定reduce任務、map任務個數
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \-D mapred.job.name=”Teset001”
-D mapred.reduce.tasks=2    # reduce task個數,一定生效
-D mapred.map.tasks=5       # map task個數,不一定生效
-D 指定mapper的輸出分隔符
-D stream.map.output.field.seperator=.  # 指定mapper每條輸出key,value分隔符
-D stream.num.map.output.key.fields=4   # 第4個.之前的部分為key,剩余為value
-D map.output.key.field.separator=.      # 設置map輸出中,Key內部的分隔符

?

-D 指定基于哪些key進行分桶

基于指定的Key進行分桶,打標簽

指定列數

-D num.key.fields.for.partition=1       # 只用1列Key做分桶
-D num.key.fields.for.partition=2       # 使用1,2共兩列key做分桶

指定某些字段做key

-D mapred.text.key.partitioner.option =-k1,2   # 第1,2列Key做分桶
-D mapred.text.key.partitioner.option =-k2,2   # 第2列key做分桶

都要修改partition為能夠只基于某些Key進行分桶的類

-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner
-D 指定將reducer的輸出進行壓縮
-D mapred.output.compress=true-D mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec
-D 指定將mapper的輸出進行壓縮
-D mapred.compress.map.output=true-D mapred.map.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec

?

-D 指定Comparatorkey進行數字、倒序排序
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \-D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \  # 使用keyFieldBasedComparator進行key排序-D stream.map.output.field.separator=. \-D stream.num.map.output.key.fields=4 \-D map.output.key.field.separator=. \-D mapred.text.key.comparator.options=-k2,2nr \# -k2,2只用第二列排序,n數字排序,r倒序(從大到小)-input myInputDirs \-output myOutputDir \-mapper org.apache.hadoop.mapred.lib.IdentityMapper \-reducer org.apache.hadoop.mapred.lib.IdentityReducer

?

-D 指定每個reduce task申請的內存數量
-D mapreduce.reduce.memory.mb=512  #單位為M

?

?

Streaming選項

參數

可選/必選

參數描述

-input <HDFS目錄或文件路徑>

支持*通配符,指定多個文件或目錄,多次-input,指定多個輸入文件/目錄

必選

Mapper的輸入數據,文件要在任務提交前手動上傳到HDFS

-output <HDFS目錄>

# 路徑不能已存在,否則認為是其他job的輸出

必選

reducer輸出結果的HDFS存放路徑, ?不能已存在,但腳本中一定要配置

-mapper <可執行命令或java>

?

-mapper “python map.py”

-mapper “bash map.sh”

-mapper “perl map.perl”

必選

Mapper程序

-reducer <可執行命令或java>

?

-reducer “python reducer.py”

-reducer “bash reducer.sh”

-reducer “perl reducer.sh”

可選

Reducer程序,不需要reduce處理就不指定

-combiner <可執行命令或java>

?

-combiner “python map.py”

-combiner “bash map.sh”

-combiner “perl map.perl”

可選

處理mapper輸出的combiner程序

-file

<本地mapperreducer程序文件、程序運行需要的其他文件>

?

-file map.py

-file reduce.py

-file white_list

可選??????????????????????????? 文件在本地,小文件

將本地文件分發給計算節點

?

文件作為作業的一部分,一起被打包并提交,所有分發的文件最終會被放置在datanodejob的同一個專屬目錄下:jobcache/job_xxx/jar

?

-cacheFile

hdfs://master:9000/cachefile_dir/white_list

?

分發HDFS文件

?

Job運行需要的程序,輔助文件都先放到HDFS上,指定HDFS文件路徑,將HDFS文件拷貝到計算節點,也是都放置在job的同一個專屬目錄下:

jobcache/job_xxx/jar

-cacheArchive

?

hdfs://master:9000/w.tar.gz#WLDIR

?

分發HDFS壓縮文件、壓縮文件內部具有目錄結構

?

?

-numReduceTasks ?<數字>

?

-numReduceTasks? 2

可選

指定該任務的reducer個數

-inputformat? <Java類名>

可選

指定自己定義的inputformat類,默認TextInputformat類

-outputformat? <Java類名>

可選

指定自己定義的outputformat類,默認TextOutputformat類

-cmdenv? name=value

可選

傳遞給streaming命令的環境變量

?

?

二、Mapper輸入/輸出,根據哪些key分桶,根據哪些key進行排序

?

先看看Hadoop-1.2.1 文檔原文中的解釋

As the mapper task runs, it converts its inputs into lines and feed the lines to the stdin of the process. In the meantime, the mapper collects the line oriented outputs from the stdout of the process and converts each line into a key/value pair, which is collected as the output of the mapper. By default, the?prefix of a line up to the first tab character?is the?key?and the rest of the line (excluding the tab character) will be the?value. If there is no tab character in the line, then entire line is considered as key and the value is null. However, this can be customized, as discussed later.

?

Mapper輸入:

每一個mapper開始運行時,輸入文件會被轉換成多行(TextInputformat根據\n來進行分行),并將每一行傳遞給stdin, 作為Mapper的輸入 mapper直接對stdin中的每行內容做處理

?

Mapper輸出分隔符:

默認情況下hadoop設置mapper輸出的key, value通過tab進行分隔,可以重新指定

-D stream.map.output.field.seperator=.? ? # 指定mapper每條輸出key,value分隔符

-D stream.num.map.output.key.fields=4? ?# 第4個.之前的部分為key,剩余為value

?

mapper的輸出會經歷

1、 partition前,根據mapper輸出分隔符分離出KeyValue

-D stream.map.output.field.separator=. ? ?# 指定mapper每條輸出key,value分隔符

-D stream.num.map.output.key.fields=4 ? # 第4個.之前的為key, 剩下的為value

-D? map.output.key.field.separator=. ? ? ? ?# 設置map輸出中,Key內部的分隔符

?

2、 根據 “分桶分隔符”,確定哪些key被用來做partition(默認是用所有key, 只有1列; 或者是Mapper輸出分隔符分離出的所有key都被用于Partition)

基于指定的Key進行分桶,打標簽

指定列數

-D num.key.fields.for.partition=1?? ?????# 只用1列Key做分桶,也就是第一列

-D num.key.fields.for.partition=2?????? # 使用1,2共兩列key做分桶(列數)

?

指定某些字段做key

-D mapred.text.key.partitioner.option =-k1,2? ?# 第1,2列Key做分桶

-D mapred.text.key.partitioner.option =-k2,2?? # 第2列key做分桶

?

#都要修改partition為能夠只基于某些Key進行分桶的類

-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner

?

3、Spill時根據Partition標簽和所有Key進行排序

4Partition標簽和key之間,也是通過mapper輸出分隔符來隔離

5、reducer前的文件會刪除partition標簽,并根據Mapper輸出分隔符確定出key, 進行Reducer前的歸并排序;(reducer前的歸并排序,基于所有mapper的key進行排序

因此如果要定義新的Mapper輸出分隔符就要做到:1)mapper代碼中根據新分隔符來構建輸出到stdout的內容;2)提交作業時,通過—D 指定新的Mapper輸出分隔符,以及第幾個分隔符來分離Key

?

Reducer的輸入:

每個Reducer的每條輸入,就是去除Partition標簽(根據Mapper分隔符分離出partition標簽)后的內容,和Mapper輸出到stdout中的內容相同,但不同記錄之間已經做了排序;因此如果重新指定了Mapper的輸出分隔符,Reducer程序就要修改為根據新的Mapper輸出分隔符來分離Key,value;

?

Reducer的輸出:

Reducer的輸出,默認也是根據tab來分離key,value, 這也是reducer程序要根據tab來組合key,value輸出給stdout的原因; ?Reducer輸出分隔符重新指定,Reducer程序中輸出給stdout的內容也要配合新的分隔符來構造(Reducer->stdout-> outputformat ->file, ?outputformat根據reducer的輸出分隔符來分離key,value, ?并寫入文件

-D stream.reduce.output.field.seperator=.?? ???# reducer輸出key,value間的分隔符

-D stream.num.reduce.output.key.fields=4? ???# 第4個.之前的內容為key, 其他為value

轉載于:https://www.cnblogs.com/shay-zhangjin/p/7714868.html

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/391027.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/391027.shtml
英文地址,請注明出處:http://en.pswp.cn/news/391027.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

mongodb分布式集群搭建手記

一、架構簡介 目標 單機搭建mongodb分布式集群(副本集 分片集群)&#xff0c;演示mongodb分布式集群的安裝部署、簡單操作。 說明 在同一個vm啟動由兩個分片組成的分布式集群&#xff0c;每個分片都是一個PSS(Primary-Secondary-Secondary)模式的數據副本集&#xff1b; Confi…

歸約歸約沖突_JavaScript映射,歸約和過濾-帶有代碼示例的JS數組函數

歸約歸約沖突Map, reduce, and filter are all array methods in JavaScript. Each one will iterate over an array and perform a transformation or computation. Each will return a new array based on the result of the function. In this article, you will learn why …

為什么Java里面的靜態方法不能是抽象的

為什么Java里面的靜態方法不能是抽象的&#xff1f; 問題是為什么Java里面不能定義一個抽象的靜態方法&#xff1f;例如&#xff1a; abstract class foo {abstract void bar( ); // <-- this is okabstract static void bar2(); //<-- this isnt why? }回答一 因為抽…

python16_day37【爬蟲2】

一、異步非阻塞 1.自定義異步非阻塞 1 import socket2 import select3 4 class Request(object):5 def __init__(self,sock,func,url):6 self.sock sock7 self.func func8 self.url url9 10 def fileno(self): 11 return self.soc…

樸素貝葉斯實現分類_關于樸素貝葉斯分類及其實現的簡短教程

樸素貝葉斯實現分類Naive Bayes classification is one of the most simple and popular algorithms in data mining or machine learning (Listed in the top 10 popular algorithms by CRC Press Reference [1]). The basic idea of the Naive Bayes classification is very …

python:改良廖雪峰的使用元類自定義ORM

概要本文僅僅是對廖雪峰老師的使用元類自定義ORM進行改進&#xff0c;并不是要創建一個ORM框架 編寫fieldclass Field(object):def __init__(self, column_type,max_length,**kwargs):1&#xff0c;刪除了參數name&#xff0c;field參數全部為定義字段類型相關參數&#xff0c;…

2019年度年中回顧總結_我的2019年回顧和我的2020年目標(包括數量和收入)

2019年度年中回顧總結In this post were going to take a look at how 2019 was for me (mostly professionally) and were also going to set some goals for 2020! &#x1f929; 在這篇文章中&#xff0c;我們將了解2019年對我來說(主要是職業)如何&#xff0c;我們還將為20…

在Java里重寫equals和hashCode要注意什么問題

問題&#xff1a;在Java里重寫equals和hashCode要注意什么問題 重寫equals和hashCode有哪些問題或者陷阱需要注意&#xff1f; 回答一 理論&#xff08;對于語言律師或比較傾向于數學的人&#xff09;&#xff1a; equals() (javadoc) 必須定義為一個相等關系&#xff08;它…

vray陰天室內_陰天有話:第1部分

vray陰天室內When working with text data and NLP projects, word-frequency is often a useful feature to identify and look into. However, creating good visuals is often difficult because you don’t have a lot of options outside of bar charts. Lets face it; ba…

【codevs2497】 Acting Cute

這個題個人認為是我目前所做的最難的區間dp了&#xff0c;以前把環變成鏈的方法在這個題上并不能使用&#xff0c;因為那樣可能存在重復計算 我第一遍想的時候就是直接把環變成鏈了&#xff0c;wa了5個點&#xff0c;然后仔細思考一下就發現了問題 比如這個樣例 5 4 1 2 4 1 1 …

漸進式web應用程序_漸進式Web應用程序與加速的移動頁面:有什么區別,哪種最適合您?

漸進式web應用程序Do you understand what PWAs and AMPs are, and which might be better for you? Lets have a look and find out.您了解什么是PWA和AMP&#xff0c;哪一種可能更適合您&#xff1f; 讓我們看看并找出答案。 So many people own smartphones these days. T…

高光譜圖像分類_高光譜圖像分析-分類

高光譜圖像分類初學者指南 (Beginner’s Guide) This article provides detailed implementation of different classification algorithms on Hyperspectral Images(HSI).本文提供了在高光譜圖像(HSI)上不同分類算法的詳細實現。 目錄 (Table of Contents) Introduction to H…

在Java里如何給一個日期增加一天

在Java里如何給一個日期增加一天 我正在使用如下格式的日期: yyyy-mm-dd. 我怎么樣可以給一個日期增加一天&#xff1f; 回答一 這樣應該可以解決問題 String dt "2008-01-01"; // Start date SimpleDateFormat sdf new SimpleDateFormat("yyyy-MM-dd&q…

CentOS 7安裝和部署Docker

版權聲明&#xff1a;本文為博主原創文章&#xff0c;未經博主允許不得轉載。 https://blog.csdn.net/u010046908/article/details/79553227 Docker 要求 CentOS 系統的內核版本高于 3.10 &#xff0c;查看本頁面的前提條件來驗證你的CentOS 版本是否支持 Docker 。通過 uname …

JavaScript字符串方法終極指南-拆分

The split() method separates an original string into an array of substrings, based on a separator string that you pass as input. The original string is not altered by split().split()方法根據您作為輸入傳遞的separator字符串&#xff0c;將原始字符串分成子字符串…

機器人的動力學和動力學聯系_通過機器學習了解幸福動力學(第2部分)

機器人的動力學和動力學聯系Happiness is something we all aspire to, yet its key factors are still unclear.幸福是我們所有人都渴望的東西&#xff0c;但其關鍵因素仍不清楚。 Some would argue that wealth is the most important condition as it determines one’s li…

在Java里怎將字節數轉換為我們可以讀懂的格式?

問題&#xff1a;在Java里怎將字節數轉換為我們可以讀懂的格式&#xff1f; 在Java里怎將字節數轉換為我們可以讀懂的格式 像1024應該變成"1 Kb"&#xff0c;而1024*1024應該變成"1 Mb". 我很討厭為每個項目都寫一個工具方法。在Apache Commons有沒有這…

ubuntu 16.04 安裝mysql

2019獨角獸企業重金招聘Python工程師標準>>> 1) 安裝 sudo apt-get install mysql-server apt-get isntall mysql-client apt-get install libmysqlclient-dev 2) 驗證 sudo netstat -tap | grep mysql 如果有 就代表已經安裝成功。 3&#xff09;開啟遠程訪問 1、 …

shell:多個文件按行合并

paste file1 file2 file3 > file4 file1內容為&#xff1a; 1 2 3 file2內容為&#xff1a; a b c file3內容為&#xff1a; read write add file4內容為&#xff1a; 1 a read 2 b write 3 c add 轉載于:https://www.cnblogs.com/seaBiscuit0922/p/7728444.html

form子句語法錯誤_用示例語法解釋SQL的子句

form子句語法錯誤HAVING gives the DBA or SQL-using programmer a way to filter the data aggregated by the GROUP BY clause so that the user gets a limited set of records to view.HAVING為DBA或使用SQL的程序員提供了一種過濾由GROUP BY子句聚合的數據的方法&#xff…