MapReduce簡單應用(三)——高級WordCount

目錄

  • 1. 高級WordCount
    • 1.1 IntWritable降序排列
    • 1.2 輸入輸出格式
    • 1.3 處理流程
  • 2. 代碼和結果
    • 2.1 pom.xml中依賴配置
    • 2.2 工具類util
    • 2.3 高級WordCount
    • 2.4 結果
  • 參考

??本文引用的Apache Hadoop源代碼基于Apache許可證 2.0,詳情請參閱 Apache許可證2.0。

1. 高級WordCount

??文本內容就是下文2.3中的代碼,目標是要實現文本計數,并且數量在前,文本在后,同時數量要升序排列。

1.1 IntWritable降序排列

??IntWritable類型中實現一個升序排列的比較器,代碼如下。而實現IntWritable降序排序只需要定義一個新類,繼承IntWritable.Comparator,并且重載public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2),使其返回值為父類該方法返回值的相反數。此外,如果你想要讓作為鍵的IntWritable類型進行降序排列,還需要在MapReduce任務調度代碼中設置Job.setSortComparatorClass(比較器.class)

/*** Licensed to the Apache Software Foundation (ASF) under one* or more contributor license agreements.  See the NOTICE file* distributed with this work for additional information* regarding copyright ownership.  The ASF licenses this file* to you under the Apache License, Version 2.0 (the* "License"); you may not use this file except in compliance* with the License.  You may obtain a copy of the License at**     http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/public static class Comparator extends WritableComparator {public Comparator() {super(IntWritable.class);}@Overridepublic int compare(byte[] b1, int s1, int l1,byte[] b2, int s2, int l2) {int thisValue = readInt(b1, s1);int thatValue = readInt(b2, s2);return (thisValue<thatValue ? -1 : (thisValue==thatValue ? 0 : 1));}}

1.2 輸入輸出格式

java類名輸入/輸出功能
org.apache.hadoop.mapreduce.lib.input.TextInputFormatMapReduce默認的輸入格式將輸入文件按行分割,每一行作為<key, value>對,其中key是行的偏移量(從0開始),value 是行的內容
org.apache.hadoop.mapreduce.lib.output.TextOutputFormatMapReduce默認的輸出格式將輸出寫成文本文件,每個<key, value>對占一行,key和value之間用制表符(\t)分隔
org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormatSequenceFile的輸入格式讀取Hadoop的二進制文件格式SequenceFile
org.apache.hadoop.mapreduce.lib.input.SequenceFileOutputFormatSequenceFile的輸出格式將輸出寫成Hadoop的二進制文件格式SequenceFile

??(Hadoop定義的SequenceFile是一種高效、可分割的二進制文件格式,支持壓縮)
??(Hadoop定義了好多輸入輸出格式,由于我沒有詳細使用,這里就不介紹了)
??如果要進行多次MapReduce作業,中間結果可以以SequenceFile的形式存儲,加速作業的運行。

1.3 處理流程

??首先高級WordCount也要像普通WordCount一樣對文本進行計數,因此Reduce函數輸入的鍵值對為<Text,IntWritable>。而最終要求的結果鍵值對為<IntWritable, Text>,如果把Reduce函數的輸出鍵值對直接變為<IntWritable, Text>并且在該任務中只使用一個作業的話,你會發現無法完成IntWritable降序排列(盡管你可以已經設置SortComparatorClass),那是因為Shuffle過程的排序只會發生在Map結束后Reduce發生前,這時鍵的類型是Text而非IntWritable。
??為了解決這個任務,需要進行兩次作業,第一次作業負責計數,并以SequenceFile的格式輸出,Map的輸出、Reduce的輸入和輸出均為<Text, IntWritable>,最終文件輸出格式選擇SequenceFileOutputFormat;第二次作業負責交換鍵值對,并以SequenceFile的個數讀入,然后再對鍵進行降序排列,這就需要使用Hadoop自帶的org.apache.hadoop.mapreduce.lib.map.InverseMapper,它能交換鍵值對。這次作業的輸入格式選擇SequenceFileInputFormat,Map輸入和Map輸出分別是<Text, IntWritable>、<IntWritable, Text>,這時設置SortComparatorClass就可以實現IntWritable降序排列。

2. 代碼和結果

2.1 pom.xml中依賴配置

  <dependencies><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.11</version><scope>test</scope></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>3.3.6</version><exclusions><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-mapreduce-client-core</artifactId><version>3.3.6</version><type>pom</type></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-mapreduce-client-jobclient</artifactId><version>3.3.6</version></dependency></dependencies>

2.2 工具類util

import java.net.URI;
import java.util.regex.Matcher;
import java.util.regex.Pattern;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;public class util {public static FileSystem getFileSystem(String uri, Configuration conf) throws Exception {URI add = new URI(uri);return FileSystem.get(add, conf);}public static void removeALL(String uri, Configuration conf, String path) throws Exception {FileSystem fs = getFileSystem(uri, conf);if (fs.exists(new Path(path))) {boolean isDeleted = fs.delete(new Path(path), true);System.out.println("Delete Output Folder? " + isDeleted);}}public static void removeALL(String uri, Configuration conf, String[] pathList) throws Exception {FileSystem fs = getFileSystem(uri, conf);for (String path : pathList) {if (fs.exists(new Path(path))) {boolean isDeleted = fs.delete(new Path(path), true);System.out.println(String.format("Delete %s? %s", path, isDeleted));}}}public static void showResult(String uri, Configuration conf, String path) throws Exception {FileSystem fs = getFileSystem(uri, conf);String regex = "part-r-";Pattern pattern = Pattern.compile(regex);if (fs.exists(new Path(path))) {FileStatus[] files = fs.listStatus(new Path(path));for (FileStatus file : files) {Matcher matcher = pattern.matcher(file.getPath().toString());if (matcher.find()) {System.out.println(file.getPath() + ":");FSDataInputStream openStream = fs.open(file.getPath());IOUtils.copyBytes(openStream, System.out, 1024);openStream.close();}}}}
}

2.3 高級WordCount

import java.io.IOException;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.map.InverseMapper;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;public class App {public static class IntWritableDecreaseingComparator extends IntWritable.Comparator {@Overridepublic int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {return -super.compare(b1, s1, l1, b2, s2, l2);}}public static class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> {@Overrideprotected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {String[] splitStr = value.toString().split("\\s+");for (String str : splitStr) {context.write(new Text(str), new IntWritable(1));}}}public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Context context)throws IOException, InterruptedException {int sum = 0;for (IntWritable val : values) {sum += val.get();}context.write(key, new IntWritable(sum));}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();String tempPath = "hdfs://localhost:9000/user/developer/Temp";String[] myArgs = {"file:///home/developer/CodeArtsProjects/advanced-word-count/AdvancedWordCount.txt","hdfs://localhost:9000/user/developer/AdvancedWordCount/output"};util.removeALL("hdfs://localhost:9000", conf, new String[] { tempPath, myArgs[myArgs.length - 1] });Job job = Job.getInstance(conf, "AdvancedWordCount");job.setJarByClass(App.class);job.setMapperClass(MyMapper.class);job.setReducerClass(MyReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);job.setOutputFormatClass(SequenceFileOutputFormat.class);job.setNumReduceTasks(2);for (int i = 0; i < myArgs.length - 1; i++) {FileInputFormat.addInputPath(job, new Path(myArgs[i]));}FileOutputFormat.setOutputPath(job, new Path(tempPath));int res1 = job.waitForCompletion(true) ? 0 : 1;if (res1 == 0) {Job sortJob = Job.getInstance(conf, "Sort");sortJob.setJarByClass(App.class);sortJob.setMapperClass(InverseMapper.class);sortJob.setInputFormatClass(SequenceFileInputFormat.class);sortJob.setOutputKeyClass(IntWritable.class);sortJob.setOutputValueClass(Text.class);sortJob.setSortComparatorClass(IntWritableDecreaseingComparator.class);FileInputFormat.addInputPath(sortJob, new Path(tempPath));FileOutputFormat.setOutputPath(sortJob, new Path(myArgs[myArgs.length - 1]));int res2 = sortJob.waitForCompletion(true) ? 0 : 1;if (res2 == 0) {System.out.println("高級WordCount結果為:");util.showResult("hdfs://localhost:9000", conf, myArgs[myArgs.length - 1]);}System.exit(res2);}System.exit(res1);}
}

2.4 結果

在這里插入圖片描述
??結果文件內容如下:

64
14      {
13      }
12      import
8       int
7       public
7       =
4       static
4       class
4       -
4       new
4       @Override
3       for
3       :
3       void
3       throws
3       extends
2       l1,
2       1;
2       0;
2       String[]
2       s2,
2       s1,
2       i
2       context.write(new
2       context)
2       conf,
2       InterruptedException
2       key,
2       IntWritable,
2       return
2       IOException,
2       b2,
2       sum
2       Context
2       protected
2       myArgs[myArgs.length
2       Text,
2       1]);
1       };
1       values,
1       values)
1       value.toString().split("\\s+");
1       value,
1       val.get();
1       val
1       util.showResult("hdfs://localhost:9000",
1       util.removeALL("hdfs://localhost:9000",
1       str
1       splitStr)
1       splitStr
1       res
1       reduce(Text
1       org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
1       org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
1       org.apache.hadoop.mapreduce.Reducer;
1       org.apache.hadoop.mapreduce.Mapper;
1       org.apache.hadoop.mapreduce.Job;
1       org.apache.hadoop.io.WritableComparable;
1       org.apache.hadoop.io.Text;
1       org.apache.hadoop.io.LongWritable;
1       org.apache.hadoop.io.IntWritable;
1       org.apache.hadoop.fs.Path;
1       org.apache.hadoop.conf.Configuration;
1       myArgs.length
1       myArgs
1       map(LongWritable
1       main(String[]
1       l2);
1       l2)
1       key);
1       job.waitForCompletion(true)
1       job.setSortComparatorClass(IntWritableDecreaseingComparator.class);
1       job.setReducerClass(MyReducer.class);
1       job.setOutputValueClass(Text.class);
1       job.setOutputKeyClass(IntWritable.class);
1       job.setMapperClass(MyMapper.class);
1       job.setJarByClass(App.class);
1       job.setCombinerClass(MyReducer.class);
1       job
1       java.io.IOException;
1       if
1       i++)
1       compare(byte[]
1       compare(WritableComparable
1       byte[]
1       b1,
1       b);
1       b)
1       args)
1       a,
1       WritableComparable
1       Text>
1       Text(str),
1       Text
1       System.out.println("高級WordCount結果為:");
1       System.exit(res);
1       Reducer<Text,
1       Path(myArgs[myArgs.length
1       Path(myArgs[i]));
1       MyReducer
1       MyMapper
1       Mapper<LongWritable,
1       Job.getInstance(conf,
1       Job
1       Iterable<IntWritable>
1       IntWritableDecreaseingComparator
1       IntWritable>
1       IntWritable.Comparator
1       IntWritable(sum),
1       IntWritable(1));
1       FileOutputFormat.setOutputPath(job,
1       FileInputFormat.addInputPath(job,
1       Exception
1       Configuration();
1       Configuration
1       App
1       ?
1       ==
1       <
1       1]));
1       0)
1       0
1       -super.compare(b1,
1       -super.compare(a,
1       +=
1       (res
1       (int
1       (String
1       (IntWritable
1       "hdfs://localhost:9000/user/developer/AdvancedWordCount/output"
1       "file:///home/developer/CodeArtsProjects/AdvancedWordCount.txt",
1       "AdvancedWordCount");
1       conf

參考

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/895174.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/895174.shtml
英文地址,請注明出處:http://en.pswp.cn/news/895174.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

智慧機房解決方案(文末聯系,領取整套資料,可做論文)

智慧機房解決方案-軟件部分 一、方案概述 本智慧機房解決方案旨在通過硬件設備與軟件系統的深度整合&#xff0c;實現機房的智能化管理與服務&#xff0c;提升機房管理人員的工作效率&#xff0c;優化機房運營效率&#xff0c;確保機房設備的安全穩定運行。軟件部分包括機房管…

(五)Spring Boot學習——spring security +jwt使用(前后端分離模式)

一定要熟悉spring security原理和jwt無狀態原理&#xff0c;理解了才知道代碼作用。 在 Spring Security JWT 認證流程中&#xff0c;通常的做法是&#xff1a; 用戶提交用戶名和密碼Spring Security 認證管理器 (AuthenticationManager) 進行認證如果認證成功&#xff0c;生…

清華DeepSeek手冊:從入門到精通(網頁版便于閱讀)

目錄 一、產品概述二、清華DeepSeek從入門到精通三、PDF文件路徑 一、產品概述 DeepSeek是國產領先的人工智能技術平臺&#xff0c;提供從數據分析到大型語言模型的全棧解決方案。其核心產品包括網頁端數據分析工具[1] 、視覺語言模型(DeepSeek-VL)[2] 和670億參數大型語言模型…

阿里云百煉初探DeepSeek模型調用

阿里云百煉初探DeepSeek模型調用 阿里云百煉為什么選擇百煉開始使用百煉方式一&#xff1a;文本對話方式二&#xff1a;文本調試方式三&#xff1a;API調用 DeepSeek調用1、搜索模型2、查看API調用3、開始調用安裝依賴查看API Key運行以下代碼 4、流式輸出 總結 阿里云百煉 阿…

【網絡安全】服務器安裝Docker及拉取鏡像教程

文章目錄 1. 安裝 Docker2. 拉取鏡像3. 運行 Ubuntu 容器4. 執行相關操作5. 退出并停止容器1. 安裝 Docker # 更新軟件包索引 sudo apt update# 安裝必要的依賴 sudo apt install -y ca-certificates curl gnupg

AI刷題-子數組和的最大值問題

目錄 問題描述 輸入格式 輸出格式 輸入樣例 輸出樣例 說明 數據范圍 解題思路&#xff1a; 問題理解 數據結構選擇 算法步驟 具體步驟 代碼實現&#xff1a; 1.特判&#xff1a; 不需要刪除元素的時候 2.在前面的判斷結束后&#xff1a;k1&#xff0c;&#xff…

pytest.fixture

pytest.fixture 是 pytest 測試框架中的一個非常強大的功能,它允許你在測試函數運行前后執行一些設置或清理代碼。以下是關于 pytest.fixture 的詳細介紹: 一、定義與用途 pytest.fixture 是一個裝飾器,用于標記一個函數為 fixture。Fixture 函數中的代碼可以在測試函數運…

Swift的方法派發機制

1. 靜態派發&#xff08;Static Dispatch&#xff09; 靜態派發在編譯時確定方法的具體實現&#xff0c;調用時直接跳轉到該實現。靜態派發的優點是性能高&#xff0c;因為不需要運行時查找方法實現。 適用場景&#xff1a; 值類型&#xff08;Struct 和 Enum&#xff09;&am…

C++并發編程指南 09(共享數據)

文章目錄 第3章 共享數據本章主要內容共享數據的問題使用互斥保護數據保護數據的替代方案 3.1 共享數據的問題共享數據的核心問題不變量的重要性示例&#xff1a;刪除雙鏈表中的節點多線程環境中的問題條件競爭的后果總結3.1.1 條件競爭3.1.2 避免惡性條件競爭 3.2 使用互斥量3…

ZooKeeper 技術全解:概念、功能、文件系統與主從同步

引言 隨著分布式系統變得越來越復雜&#xff0c;對協調服務的需求也在不斷增長。ZooKeeper 作為一個由 Apache 維護的開源分布式協調服務框架&#xff0c;廣泛用于 Hadoop 生態系統和其他需要協調的分布式環境中。這一系統旨在解決分布式應用中常見的挑戰&#xff0c;如配置管…

設計方案主要做哪些事情?

目錄 1. 需求分析 2. 系統架構設計 3. 數據庫設計 4. 接口設計 5. 緩存設計 6. 安全設計 7. 性能優化 8. 高可用與容災 9. 監控與日志 10. 測試方案 11. 部署方案 12. 文檔編寫 13. 風險評估 14. 項目管理 總結 設計方案是項目開發的關鍵步驟,確保項目按計劃進…

【語法】C++的內存管理 模板

內存管理&#xff1a; 在C語言中&#xff0c;動態開辟空間可以用malloc&#xff0c;calloc&#xff0c;realloc這三個函數&#xff0c;下面先來復習一下這三者的區別 malloc和calloc都是用來開辟新空間&#xff0c;calloc在malloc的基礎上還會初始化該空間為0&#xff0c;用法…

30~32.ppt

目錄 30.導游小姚-介紹首都北京? 題目? 解析 31.小張-旅游產品推廣文章 題目 解析 32.小李-水的知識? 題目? 解析 30.導游小姚-介紹首都北京? 題目 解析 新建幻燈片-從大綱-重置-檢查設計→主題對話框→瀏覽主題&#xff1a;考生文件夾&#xff08;注意&#x…

深度學習-交易預測

下面為你詳細介紹如何使用Python結合深度學習庫TensorFlow和Keras來構建一個簡單的交易預測模型。在這個示例中&#xff0c;我們以股票價格預測為例&#xff0c;假設我們要根據過去一段時間的股票價格數據來預測未來的價格走勢。 步驟分析 數據準備&#xff1a;獲取股票價格數…

C++ STL Map 學習學案(提高版)

C++ STL Map 學案(初中生版) 一、學習目標 深入理解 STL 中 map 容器的概念、特點和用途。熟練掌握 map 容器的基本操作,如插入、查找、刪除和遍歷元素。能夠運用 map 容器解決實際編程問題,提升邏輯思維和編程實踐能力。二、知識講解 引入 在日常生活中,我們常常會遇到…

uniapp實現人臉識別(不使用三方插件)

uniapp實現人臉識別 內容簡介功能實現上傳身份證進行人臉比對 遇到的問題 內容簡介 1.拍攝/相冊將身份證照片上傳到接口進行圖片解析 2.使用live-pusher組件拍攝人臉照片&#xff0c;上傳接口與身份證人臉進行比對 功能實現 上傳身份證 先看下效果 點擊按鈕調用chooseImage…

Evaluating Very Long-Term Conversational Memory of LLM Agents 論文

Abstract : 長期開放域對話的現有作品著重于評估不超過五個聊天會議的上下文中的模型響應。盡管LongContext大語言模型&#xff08;LLM&#xff09;和檢索增強發電&#xff08;RAG&#xff09;技術的進步&#xff0c;但在長期對話中的功效仍未得到探索。為了解決這一研究差距&a…

相對收益-固定收益組合歸因-Campisi模型

固定收益組合歸因-Campisi模型 1 Campisi模型11.1 Campisi歸因框架1.2 Campisi模型絕對收益分解1.2.1 票息收益1. 2.2 收斂收益1. 2.3 騎乘收益1. 2.4 平移收益1. 2.5 扭曲收益1. 2.6 利差收益1. 2.7 殘差收益 1.3 Campisi模型超額收益分解 2 Campisi模型22.1 分解框架2.2 模型…

IntelliJ IDEA使用經驗(十三):使用Git克隆github的開源項目

文章目錄 問題背景辦法1、設置git代理&#xff1b;2、再次克隆項目&#xff1b;3、再次按常規方式進行git克隆即可。 問題背景 由于github在國外&#xff0c;很多時候我們在使用idea克隆開源項目的時候&#xff0c;沒辦法檢出&#xff0c;提示 連接重置。 辦法 1、設置git代…

JAVA安全之Java Agent打內存馬

基本介紹 Java Agent是一種特殊的Java程序&#xff0c;它允許開發者在Java虛擬機(JVM)啟動時或運行期間通過java.lang.instrument包提供的Java標準接口進行代碼插樁&#xff0c;從而實現在Java應用程序類加載和運行期間動態修改已加載或者未加載的類&#xff0c;包括類的屬性、…