MapReduce編程實踐

一、MapReduce編程思想

學些MapRedcue主要是學習它的編程思想,在MR的編程模型中,主要思想是把對數據的運算流程分成map和reduce兩個階段:

Map階段:讀取原始數據,形成key-value數據(map方法)。即,負責數據的過濾分發

Reduce階段:把map階段的key-value數據按照相同的key進行分組聚合(reduce方法)。即,數據的計算歸并

它其實是一種數據邏輯運算模型,對于這樣的運算模型,有一些成熟的具體軟件實現,比如hadoop中的mapreduce框架、spark等,例如在hadoop的mr框架中,對map階段的具體實現是map task,對reduce階段的實現是reduce task。這些框架已經為我們提供了一些通用功能的實現,讓我們專注于數據處理的邏輯,而不考慮分布式的具體實現,比如讀取文件、寫文件、數據分發等。我們要做的工作就是在這些編程框架下,來實現我們的具體需求。

下面我們先介紹一些map task和reduce task中的一些具體實現:

二、MapTask和ReduceTask

2.1 Map Task

讀數據:利用InputFormat組件完成數據的讀取。

    InputFormat-->TextInputFormat 讀取文本文件的具體實現

            -->SequenceFileInputFormat 讀取Sequence文件

            -->DBInputFormat 讀數據庫

處理數據:這一階段將讀取到的數據按照規則進行處理,生成key-value形式的結果。maptask通過調用用Mapper類的map方法實現對數據的處理。

分區:這一階段主要是把map階段產生的key-value數據進行分區,以分發給不同的reduce task來處理,使用的是Partitioner類。maptask通過調用Partitioner類的getPartition()方法來決定如何劃分數據給不同的reduce task。

排序:這一階段,對key-value數據做排序。maptask會按照key對數據進行排序,排序時調用key.compareTo()方法來實現對key-value數據排序。

2.2 Reduce Task

讀數據:這一階段通過http方式從maptask產生的數據文件中下載屬于自己的“區”的數據。由于一個區的數據可能來自多個maptask,所以reduce還要把這些分散的數據進行合并(歸并排序)

處理數據:一個reduce task中,處理剛才下載到自己本地的數據。通過調用GroupingComparator的compare()方法來判斷文件中的哪些key-value屬于同一組。然后將這一組數傳給Reducer類的reduce()方法聚合一次。

輸出結果:調用OutputFormat組件將結果key-value數據寫出去。

    Outputformat --> TextOutputFormat 寫文本文件(會把一個key-value對寫一行,分隔符為制表符\t

          --> SequenceFileOutputFormat 寫Sequence文件(直接將key-value對象序列化到文件中)

          --> DBOutputFormat?

下面介紹下利用MapReduce框架下的一般編程過程。我們要做的 工作就是把我們對數據的處理邏輯加入到框架的業務邏輯中。我們編寫的MapReduce的job客戶端主要包括三個部分,Mapper 、 Reducer和JobSubmitter,三個部分分別完成MR程序的map邏輯、reduce邏輯以及將我們編寫的job程序提交給集群。下面分別介紹這三個部分如何實現。

三、Hadoop中MapReduce框架下的一般編程步驟

Mapper:創建類,該類要實現Mapper父類,復寫read()方法,在方法內實現當前工程中的map邏輯。

Reducer:創建類,繼承Reducer父類,復寫reduce()方法,方法內實現當前工程中的reduce邏輯。

jobSubmitter:這是job在集群上實際運行的類,主要是通過main方法,封裝job相關參數,并把job提交。jobsubmitter內一般包括以下操作

step1:創建Configuration對象,并通過創建的對象對集群進行配置,同時支持用戶自定義一些變量并配置。這一步有些像我們集群搭建的時候對$haoop_home/etc/hadoop/*下的一些文件進行的配置。

step2:獲得job對象,并通過job對象對我們job運行進行一些配置。例如,設置集群運行的jar文件、設置實際執行map和reduce的類等,下面列出一些必要設置和可選設置。

        Configuration conf = new Configuration(); //創建集群配置對象。Job job = Job.getInstance(conf);//根據配置對象獲取一個job客戶端實例。job.setJarByClass(JobSubmitter.class);//設置集群上job執行的類job.setMapperClass(FlowCountMapper.class);//設置job執行時使用的Mapper類job.setReducerClass(FlowCountReducer.class);//設置job執行時使用的Reducer類job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(FlowBean.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowBean.class);FileInputFormat.setInputPaths(job, new Path("I:\\hadooptest\\input"));FileOutputFormat.setOutputPath(job, new Path("I:\\hadooptest\\output_pri"));//設置maptask做數據分發時使用的分發邏輯類,如果不指定,默認使用hashparjob.setPartitionerClass(ProvincePartitioner.class);job.setNumReduceTasks(4);//自定義的分發邏輯下,可能產生n個分區,所以reducetask的數量需要是nboolean res = job.waitForCompletion(true);System.exit(res ? 0:-1);

?一般實踐中,可以定義一個類,其中添加main方法對job進行提交,并在其中定義靜態內部類maper和reduce類。

四、MapReduce框架中的可自定義項

<不小心刪除以后就沒有再補充了,挺重要的。。。。補上吧。。。。>

總結,你要把bean寫到文本嗎?重寫toString方法

要傳輸嗎?實現Writable接口

要排序嗎?實現writablecompareble接口

?

遇到一些復雜的需求,需要我們自定義實現一些組件

2.1 自定義序列化數據類型

MapReduce框架為我們提供了基本數據類型的序列化類型,如String的Text類型,int的IntWritalbe類型,null的NullWritable類型等。但是有時候會有一些我們自定義的類型需要我們在map和reduce之間進行傳輸或者需要寫到hdfs上。hadoop提供了自己的序列化機制,實現自定義類型的序列化和反序列化將自定義的類實現hadoop提供的Writable接口。

自定義類實現Writable接口,實現readFields(in)write(out)方法。

同時,重寫toString()方法,可以自定義在寫到文件系統時候寫入的字段內容。

     * hadoop系統在序列化該類的對象時要調用的方法*/@Overridepublic void write(DataOutput out) throws IOException {out.writeInt(upFlow);out.writeUTF(phone);out.writeInt(dFlow);out.writeInt(amountFlow);}/*** hadoop系統在反序列化該類的對象時要調用的方法*/@Overridepublic void readFields(DataInput in) throws IOException {this.upFlow = in.readInt();this.phone = in.readUTF();this.dFlow = in.readInt();this.amountFlow = in.readInt();}@Overridepublic String toString() {return this.phone + ","+this.upFlow +","+ this.dFlow +"," + this.amountFlow;}
View Code

2.2 自定義排序規則

MapReduce中提供了一個排序機制,map worker 和reduce worker ,都會對數據按照key的大小來排序,所以map和reduce階段輸出的記錄都是經過排序的(按照key排序)。我們在實踐中有時候需要對計算出來的結果進行排序,比如一個這樣的需求:計算每個頁面訪問次數,并按照訪問量倒序輸出。我們可以在統計了每個頁面訪問次數之后進行排序,但是我們還可以直接應用MR自身的排序特性,在MR處理的時候按照我們的需求進行排序。這時候就需要我們自定義排序規則。

自定義類,實現WritableComparable接口,實現其中的compareTo()方法,在其中自定義排序的規則。同時一般還要實現readFields(in) 和write(out)和toString()方法。

public class PageCount implements WritableComparable<PageCount>{private String page;private int count;public void set(String page, int count) {this.page = page;this.count = count;}public String getPage() {return page;}public void setPage(String page) {this.page = page;}public int getCount() {return count;}public void setCount(int count) {this.count = count;}@Overridepublic int compareTo(PageCount o) {return o.getCount()-this.count==0?this.page.compareTo(o.getPage()):o.getCount()-this.count;}@Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(this.page);out.writeInt(this.count);}@Overridepublic void readFields(DataInput in) throws IOException {this.page= in.readUTF();this.count = in.readInt();}@Overridepublic String toString() {return this.page + "," + this.count;}}
View Code

總結:

實現Writable接口,是為了bean能夠傳輸,能夠寫到文件系統中。

實現WritableComparable還為了bean能夠按照你定義的規則進行排序。

2.2 自定義分區規則

我們知道,map計算出來的結果會分發給不同的reduce任務去進一步處理。MR中提供了一個默認的數據分發規則,會按照map的輸出中的key的hashcode,然后模除reduce task的數量,模除的結果就是數據的分區。我們可以通過自定義map數據分發給reduce的規則,實現把數據按照自己的需求記錄到不同的數據中。比如實現這樣的需求,有一個通話記錄的文件,按照歸屬地分別存儲數據。

?自定義類,繼承Partitioner父類(類的泛型為MapTask的輸出的key,value的類型),重寫?getPartition(<>key, <>value, int numPartitions)?方法,在其中自定義分區的規則,方法返回計算出來的分區數。MapTask每處理一行數據都會調用getPartition方法。因此最好不要在方法中創建可以給很多數據行共同使用的對象。在jobsubmitter中,設置maptask在做數據分區時使用的分區邏輯類,?job.setPartitonerClass(your.class)?,同時注意設置reduceTask的任務數量為我們在分區邏輯中定義的規則下回產生的分區數量,?job.setNumReduceTasks(numOfPartition);?

/*** 本類是提供給MapTask用的* MapTask通過這個類的getPartition方法,來計算它所產生的每一對kv數據該分發給哪一個reduce task* @author ThinkPad**/
public class ProvincePartitioner extends Partitioner<Text, FlowBean>{static HashMap<String,Integer> codeMap = new HashMap<>();static{codeMap.put("135", 0);codeMap.put("136", 1);codeMap.put("137", 2);codeMap.put("138", 3);codeMap.put("139", 4);}@Overridepublic int getPartition(Text key, FlowBean value, int numPartitions) {Integer code = codeMap.get(key.toString().substring(0, 3));return code==null?5:code;}}
Partitioner
public class JobSubmitter {public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(JobSubmitter.class);job.setMapperClass(FlowCountMapper.class);job.setReducerClass(FlowCountReducer.class);// 設置參數:maptask在做數據分區時,用哪個分區邏輯類  (如果不指定,它會用默認的HashPartitioner)job.setPartitionerClass(ProvincePartitioner.class);// 由于我們的ProvincePartitioner可能會產生6種分區號,所以,需要有6個reduce task來接收job.setNumReduceTasks(6);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(FlowBean.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowBean.class);FileInputFormat.setInputPaths(job, new Path("F:\\mrdata\\flow\\input"));FileOutputFormat.setOutputPath(job, new Path("F:\\mrdata\\flow\\province-output"));job.waitForCompletion(true);}}
JobSubmitter

2.3 自定義分組規則

MapTask每調用一次map就會產生一個k-v,多次調用后,生成多個k-v,具有相同key的的記錄稱為一組,會存入一個partition中,注意一個patition可以包含多個組。

?

一個ReduceTask處理一個partition,在處理的時候 ,按照key的順序進行。調用一次reduce會聚合一組數據,就是reduce方法中傳入的一個Itetor。為了確認一個分區中的兩條記錄是不是同一個組,會調用一個工具類GroupingCompatator的compare(01,02)方法,用來判斷兩個key是否相同,如果兩個key相等,則為同一組。利用這樣的機制,我們可以自定義一個分組規則。

自定義類,實現?WritableComparator?類實現?compare?方法,在其中告知MapTask如何判斷兩個 記錄是不是屬于同一個組。調用父類構造函數,指定比較的類。

public class OrderIdGroupingComparator extends WritableComparator {pbulic OrderIdGroupingComparator(){//通過構造函數指定要比較的類super(OrderBean.class, true);//
     }@Overridepublic int compare(WritableComparable a, WritableComparable b) {//參數中將來會傳入我們自定義的繼承了WritableComparable的bean,把a、b向下轉型為我們自定義類型的bean,才能比較a和bOrderBean o1 = (OrderBean)a;OrderBean o2 = (OrderBean)b;return o1.getOrderId().compareTo(o2.getOrderID);//id相同就是同一組
    }
}
View Code

在jobSubmiter中指定分組規則,

job.setGroupingComparatorClass(OrderIdGroupingComparator.class);

注意:關于區分分區和分組:

分區比分組的范圍更加大。分區是指,在map task結束之后,中間結果數據會被分給哪些reduce task,而分組是指,同一個分區中(即一個reduce task處理的數據中)數據的分組。在默認的計算分區的方法中,不同key的hash code對reduce task取模計算出來的結果可能相同,這樣的數據會被分到同一個分區;這一個分區中的key的haashcode不同,這樣就在一個區中分了不同組。

那么什么時候使用分區,什么時候使用分組呢?

再如在計算每個訂單中總金額最大的3筆中的案例中,可以考慮進行倒序排序,然后取前三;按照id進行倒序排序嗎?不現實,因為訂單id太多,不可能啟動那么多的reduce task。那么就要把多個訂單的數據存儲到第一個分區中,同時保證同一個訂單的數據全部在一個分區中,這時候,就需要自定義分區規則(保證同一訂單中的數據在同一個分區),但是又要分組排序,所以這時候就需要自定義分組規則(保證該分區中同一訂單在一組,不同訂單在不同組)

2.3自定義MapTask的局部聚合規則

默認情況下,map計算的結果逐條保存到磁盤中,傳輸給reduce之后也是分條的記錄,這樣可能造成一個問題就是如果某個分區下的數據較多,而有的分區下數據較少,就導致出現reduce task之間任務量差距較大,即出現數據傾斜的情況。一個解決辦法是在形成map結果文件的時候進行一次局部聚合。

使用Combiner組件可以實現在每個MapTask中對數據進行一次局部聚合。這個局部聚合的邏輯其實和Reducer的邏輯是一樣的,都是對map計算出的kv數據進行聚合,只不過如果是maptask來調用我們定義的Reducer實現類,則聚合的是當前這個maptask運行的結果,如果是reducetask來調用我們定義的Reducer實現類,則聚合的是全部maptask的運行結果。

定義類局部聚合類XXCombationer,繼承Rducer復寫reduce方法,在方法中實現具體的聚合邏輯;在jobSubmitter的job中設置mapTask端的局部聚合類為我們定義的類?job.setCombinerClass(XXCombiner.class)?。

?

2.4 控制輸入輸出格式。。。

?

?

五、MR程序的調試、執行方式

?

5.1 提交到linux運行

?

5.2 Win本地執行

?

轉載于:https://www.cnblogs.com/Jing-Wang/p/10886890.html

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/248921.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/248921.shtml
英文地址,請注明出處:http://en.pswp.cn/news/248921.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

webpack基礎+webpack配置文件常用配置項介紹+webpack-dev-server - QxQstar - 博客園

一.webpack基礎 1.在項目中生成package.json&#xff1a;在項目根目錄中輸入npm init&#xff0c;根據提示輸入相應信息。&#xff08;也可以不生成package.json文件&#xff0c;但是package.json是很有用的&#xff0c;所有建議生成&#xff09; 2.安裝webpaack a.在全局中安裝…

(十)

空轉載于:https://www.cnblogs.com/shanae/p/10034479.html

編譯原理--NFA/DFA

現成的, 講義: https://www.cnblogs.com/AndyEvans/p/10240790.html https://www.cnblogs.com/AndyEvans/p/10241031.html 一個例子, 寫得非常好. 一下子就全明白了, 尤其是像我這種沒有聽過編譯原理課程的人. https://blog.csdn.net/tyler_download/article/details/53139240 …

Python中pass的用法

空語句 do nothing保證格式完整保證語義完整以if語句為例&#xff0c;在c或c/java中&#xff1a; if(true); //do nothingelse{ //do something}對應于python就要這樣寫&#xff1a; if true: pass #do nothingelse: #do something 1 pass語句在函數中的作用當你在編寫一個程序…

express路由管理的幾種自動化方法分享-js教程-PHP中文網

我們平時在使用express寫代碼的過程中&#xff0c;會根據類別&#xff0c;將路由分為多個不同的文件&#xff0c;然后在項目的入口文件&#xff08;例如app.js&#xff09;中將其依次掛載&#xff0c;例如&#xff1a; 1 2 3 4 5 6 7 const index require(./routes/index) con…

dotnet不是內部或外部的命令,也不是可運行的程序或批處理文件

>>這臺電腦>>屬性>>高級系統設置>>環境變量>>系統變量>>Path>>編輯>> 變量值中添加 %SystemRoot%\system32;%SystemRoot%;%SystemRoot%\System32\Wbem; 即可。轉載于:https://www.cnblogs.com/ZCrystal/p/10894591.html

轉載 vue的基礎使用

轉載https://www.cnblogs.com/majj/p/9957597.html#top vue的介紹 前端框架和庫的區別nodejs的簡單使用vue的起步指令系統組件的使用過濾器的使用watch和computed鉤子函數漸進式的JavaScript框架 vue react angualr作者:尤雨溪 facebook 谷歌公…

Express實現路由分發控制、RESTful API

Express實現路由分發控制、RESTful API 標簽&#xff08;空格分隔&#xff09;&#xff1a; Node.js 最近在用Express作為自己的WEB應用框架&#xff0c;其中最為迷惑的就是Express的路由控制和分發&#xff0c;在網上搜了很多資料&#xff0c;但是大部分的資料都是將Express的…

springboot運行jar包時候加載指定目錄的其他jar支持包

最近發生一個小故障&#xff0c;調試好的項目&#xff0c;發布成jar包后無法找到oracle的驅動&#xff0c;研究了一下解決了。記錄一下。寫了一個run.sh腳本 #!/bin/bash cd ~ cd app nohup java -Djava.ext.dirs./lib -Doracle.jdbc.thinLogonCapabilityo3 -jar -Xms512m -Xmx…

OpenLayers3關于Map Export的Canvas跨域

一 Canvas跨域現象 地圖導出是地圖中常用的功能&#xff0c;并且OpenLayers3中也提供了兩個地圖導出的例子:http://openlayers.org/en/latest/examples/export-map.html http://openlayers.org/en/latest/examples/export-pdf.html。 看到這兩個例子我們都很興奮&#xff0c;直…

typescript-koa-postgresql 實現一個簡單的rest風格服務器 —— 連接 postgresql 數據庫...

接上一篇&#xff0c;這里使用 sequelize 來連接 postgresql 數據庫 1、安裝 sequelize&#xff0c;數據庫驅動 pg yarn add sequelize sequelize-typescript pg reflect-metadata 2、新建配置文件夾 conf 及 配置文件 db.conf.ts /*** name: 數據庫配置* param : undefined* r…

SmartGit使用教程

說明 官網的客戶端是命令行形式的&#xff0c;有興趣可以去了解下。這里針對圖形界面的smartgit做一個使用說明。 軟件下載和安裝 下載地址[2016.12.16測試可以] 按需選擇,如果不知道自己電腦是什么系統的&#xff0c;那我沒話說了https://www.syntevo.com/smartgit/ 安裝 …

jquery 下拉框 select2 運用 筆記

1,添加select2 樣式 參考&#xff08;https://select2.org/ &#xff09; 2,Html: <select id"txtType" name"Type" class"form-control select2" multiple"multiple"> </select> 3,jquery section scripts{ $(documen…

Asp.Net MVC中Action跳轉小結

首先我覺得action的跳轉大致可以這樣歸一下類&#xff0c;跳轉到同一控制器內的action和不同控制器內的action、帶有參數的action跳轉和不帶參數的action跳轉。 一、RedirectToAction(“Index”);//一個參數時在本Controller下&#xff0c;不傳入參數。 二、RedirectToAction(A…

獲取瀏覽器屏幕高度(js,jq) - 進擊的小牛牛 - 博客園

javascript IE中&#xff1a; document.body.clientWidth > BODY對象寬度 document.body.clientHeight > BODY對象高度 document.documentElement.clientWidth > 可見區域寬度 document.documentElement.clientHeight > 可見區域高度 FireFox中&#xff1a; docum…

589-N叉樹的前序遍歷

N階二叉樹&#xff1a; class Tree {public int val;public List<Tree> children;public Tree() {}public Tree(int _val, List<Tree> _children) {val _val;children _children;}}迭代法遍歷&#xff1a;public List<Integer> preorder(Tree root) {List&…

解析URL參數

1、拿到一個完整url后&#xff0c;如何解析該url得到里面的參數。 /*** 解析url中參數信息&#xff0c;返回參數數組*/ function convertUrlQuery($query) {$queryParts explode(&, $query);$params array();foreach ($queryParts as $param) {$item explode(, $param);…

第一個爬蟲和測試

Python測試函數的方法之一是用&#xff1a;try……except def gameover(a,b):if a>10 and b>10 and abs(a-b)2:return Trueif (a>11 and b<11) or (a<11 and b>11):return Truereturn False try:agameover(10,11)print(a) except:print("Error") g…

JS組件系列——Bootstrap 樹控件使用經驗分享 - 懶得安分 - 博客園

前言&#xff1a;很多時候我們在項目中需要用到樹&#xff0c;有些樹僅僅是展示層級關系&#xff0c;有些樹是為了展示和編輯層級關系&#xff0c;還有些樹是為了選中項然后其他地方調用選中項。不管怎么樣&#xff0c;樹控件都是很多項目里面不可或缺的組件之一。今天&#xf…

算法第4章實踐報告

1.實踐題目&#xff1a;最小刪數問題 2.問題描述&#xff1a;給定n位正整數a&#xff0c;去掉其中任意k≤n 個數字后&#xff0c;剩下的數字按原次序排列組成一個新 的正整數。對于給定的n位正整數a和正整數 k&#xff0c;設計一個算法找出剩下數字組成的新數最 小的刪數方案。…