PageRank 是 Google 創始人拉里·佩奇(Larry Page)和謝爾蓋·布林(Sergey Brin)在 1998 年提出的一種網頁排名算法,用于衡量網頁“重要性”的一種方式。它是搜索引擎中用于排序網頁的一種基礎算法
一個網頁越是被其他重要網頁鏈接,它就越重要
PageRank 的計算流程
-
初始化:假設總共 N 個網頁,每個網頁初始 PR 值為 1/N。
-
迭代計算:通過 MapReduce 不斷迭代更新 PR 值,直到值趨于穩定。
-
結果輸出:PR 值越大,說明該網頁越重要,排名越靠前
A 0.25 B C D
B 0.25 A D
C 0.25 C
D 0.25 B C
-
第一列:網頁編號(如 A)
-
第二列:初始 PageRank 值(例如 0.25)
-
后續列:該網頁鏈接到的其他網頁
迭代的計算PageRank
值,每次MapReduce 的輸出要和輸入的格式是一樣的,這樣才能使得Mapreduce 的輸出用來作為下一輪MapReduce 的輸入
Map過程
解析輸入行,提取:
-
當前網頁 ID
-
當前網頁的 PR 值
-
當前網頁鏈接的其他網頁列表
計算出要鏈接到的其他網友的個數,然后求出當前網頁對其他網頁的貢獻值。
第一種輸出的< key ,value>
中的key
?表示其他網頁,value
?表示當前網頁對其他網頁的貢獻值
為了區別這兩種輸出
出鏈網頁貢獻值(標記為 @):<出鏈網頁, @貢獻值>
第二種輸出的< key ,value>
中的key
?表示當前網頁,value
?表示所有其他網頁。
網頁鏈接列表(標記為 &):<當前網頁, &鏈接網頁列表>
?
B @0.0833
C @0.0833
D @0.0833
A &B C D
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;/*map過程*/
public class MyMapper extends Mapper<Object,Text,Text,Text>{ private String id;private float pr; private int count;private float average_pr; public void map(Object key,Text value,Context context)throws IOException,InterruptedException{ StringTokenizer str = new StringTokenizer(value.toString());//對value進行解析id =str.nextToken();//id為解析的第一個詞,代表當前網頁pr = Float.parseFloat(str.nextToken());//pr為解析的第二個詞,轉換為float類型,代表PageRank值count = str.countTokens();//count為剩余詞的個數,代表當前網頁的出鏈網頁個數average_pr = pr/count;//求出當前網頁對出鏈網頁的貢獻值String linkids ="&";//下面是輸出的兩類,分別有'@'和'&'區分while(str.hasMoreTokens()){String linkid = str.nextToken();context.write(new Text(linkid),new Text("@"+average_pr));//輸出的是<出鏈網頁,獲得的貢獻值>linkids +=" "+ linkid;} context.write(new Text(id), new Text(linkids));//輸出的是<當前網頁,所有出鏈網頁>}
}
輸入數據格式(value):網頁ID ?PageRank值 ?出鏈網頁1 ?出鏈網頁2 ...
輸出鍵值對:
-
<出鏈網頁ID, "@貢獻值">
(表示這個網頁從別的網頁獲得了多少貢獻) -
<當前網頁ID, "& 出鏈網頁列表">
(保留網頁結構)
String id; ? ? ? ? // 當前網頁ID
float pr; ? ? ? ? ?// 當前網頁的PageRank值
int count; ? ? ? ? // 出鏈網頁的數量
float average_pr; ?// 當前網頁對每個出鏈網頁的平均貢獻值
StringTokenizer str = new StringTokenizer(value.toString());是把整行字符串(比如 "A 1.0 B C D")按照空格分割成一個個小單元(token)
id = str.nextToken(); ?// 第一個token是當前網頁ID------取出第一個單詞(比如 A
),表示當前正在處理的網頁 ID,賦值給 id
pr = Float.parseFloat(str.nextToken()); ? // 第二個token是當前網頁的PageRank值
取出第二個單詞(比如 "1.0"
),將其轉為 float
類型,就是當前網頁的 PageRank 值,賦值給 pr
count = str.countTokens();// 剩下的token是出鏈網頁數量----
統計剩余 token 的數量
average_pr = pr / count; //把當前網頁的 PageRank 值平均分配給所有它鏈接的網頁
貢獻值輸出:
while(str.hasMoreTokens()) {String linkid = str.nextToken(); // B, 然后 C, 然后 Dcontext.write(new Text(linkid), new Text("@" + average_pr));linkids += linkid + " "; // 把 B、C、D 加入 linkids 中
}
str.hasMoreTokens()?只要還有未讀取的 token(即還有出鏈網頁沒處理完),就繼續執行循環體
網頁結構輸出(帶 &
開頭):
String linkids記錄當前網頁的所有出鏈網頁 ID?
context.write(new Text(id), new Text(linkids));
Shuffle 是指 Map 階段輸出的數據按照 key 進行分組,并將具有相同 key 的數據發送到同一個 Reduce 任務中處理的過程?
每個網頁 Map 階段都會:
-
向它出鏈的網頁發 PageRank 貢獻(加@前綴)
-
自己保留一份出鏈結構
Shuffle 階段:按網頁ID歸并聚合
-
對 Map 輸出的 key(網頁 ID)進行排序
-
將相同 key 的所有 value 合并成一個列表
Reducer 接收到的格式為:<網頁ID, [貢獻值, 出鏈結構]>
<網頁ID, 列表[@貢獻1, @貢獻2, ..., &出鏈結構]>
Reduce過程
-
求每個網頁的新 PageRank 值
-
保留該網頁的出鏈結構
-
輸出格式為:
網頁ID 新的PR值 出鏈網頁列表
shuffule
的輸出也即是reduce
的輸入。
reduce
輸入的key
?直接作為輸出的key
對reduce
輸入的value
?進行解析,它是一個列表
a.若列表里的值里包含`@`,就把該值`@`后面的字符串轉化成`float`型加起來
b.若列表里的值里包含`&`,就把該值`&`后面的字符串提取出來
c.把所有貢獻值的加總,和提取的字符串進行連接,作為`reduce`的輸出`value`
public class MyReducer extends Reducer<Text,Text,Text,Text>{
繼承 Hadoop 提供的 Reducer
類,泛型參數說明:
-
Text, Text
:輸入的 key 和 value 類型 -
Text, Text
:輸出的 key 和 value 類型
public void reduce(Text key, Iterable<Text> values, Context context)
? ? ? ? throws IOException, InterruptedException {
為每一個網頁 key
傳入一個 values
列表,里面是 Shuffle 過程收集到的所有值
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;/*** Reduce過程:計算每個網頁的新PageRank值,并保留出鏈網頁結構。* 輸入:<網頁ID, [@貢獻值, @貢獻值, ..., &出鏈網頁列表]>* 輸出:<網頁ID, 新PageRank值 + 出鏈網頁列表>*/
public class MyReducer extends Reducer<Text, Text, Text, Text> {public void reduce(Text key, Iterable<Text> values, Context context)throws IOException, InterruptedException {String lianjie = ""; // 用于保存當前網頁的出鏈網頁列表(結構信息)float pr = 0; // 用于累加當前網頁從其他網頁獲得的PageRank貢獻值// 遍歷所有傳入的值:包含兩類信息,分別通過首字符判斷for (Text val : values) {String strVal = val.toString(); // 當前值轉換為字符串if (strVal.substring(0, 1).equals("@")) {// 以@開頭,表示這是從其他網頁傳來的PageRank貢獻值// 取出@后面的數值并累加pr += Float.parseFloat(strVal.substring(1));} else if (strVal.substring(0, 1).equals("&")) {// 以&開頭,表示這是本網頁的出鏈結構信息// 將&后面的網頁列表保留下來lianjie += strVal.substring(1); // 注意可能是多個網頁用空格分隔}}// 平滑處理(加入跳轉因子d = 0.8)// 假設網頁總數為4,(1 - d) / N = 0.2 * 0.25 = 0.05// 新PageRank = d * 貢獻值總和 + (1 - d)/Npr = 0.8f * pr + 0.2f * 0.25f;// 構造輸出字符串:新PR值 + 出鏈網頁列表String result = pr + lianjie;// 輸出結果:<當前網頁ID, 新的PageRank值 + 出鏈網頁列表>context.write(key, new Text(result));}
}
遍歷所有值,分類處理
pr += Float.parseFloat(val.toString().substring(1));
如果是 @
開頭,就從第 1 個字符開始截取字符串(去掉 @
),再把它轉換成浮點數,并累加到 pr
中
lianjie += val.toString().substring(1);
如果是 &
開頭,就把 &
后面的出鏈網頁字符串加到變量 lianjie
中
-
以
@
開頭:表示來自其他網頁的 PageRank 貢獻值,提取并累加。 -
以
&
開頭:表示這是該網頁自身的 出鏈網頁結構,保留下來。
pr = 0.8f * pr + 0.2f * 0.25f;
?PageRank 中的阻尼系數模型:
-
0.8f
:阻尼系數 d(表示 80% 用戶點擊鏈接) -
0.2f
:1 - d,有 20% 用戶會隨機跳轉 -
0.25f
:假設網頁總數是 4 個,隨機跳轉概率均分為 0.25
PR(A) = d × 所有貢獻值之和 + (1 - d) / N
?
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.util.Scanner;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import java.net.URI;public class MyRunner {public static void main(String[] args)throws IOException, ClassNotFoundException, InterruptedException {// 創建 Hadoop 配置對象Configuration conf = new Configuration();// 使用控制臺輸入獲取初始輸入路徑和輸出路徑Scanner sc = new Scanner(System.in);System.out.print("inputPath:");String inputPath = sc.next(); // 第一次輸入的 HDFS 輸入路徑,如:/pagerank/inputSystem.out.print("outputPath:");String outputPath = sc.next(); // 第一次輸出的 HDFS 路徑,如:/pagerank/output// 進行 PageRank 的迭代計算,這里迭代 5 次for (int i = 1; i <= 5; i++) {// 創建新的 MapReduce 作業Job job = Job.getInstance(conf);// 設置 Job 的主類,用于打包 Jarjob.setJarByClass(MyRunner.class);// 設置 Map 和 Reduce 的處理類job.setMapperClass(MyMapper.class);job.setReducerClass(MyReducer.class);// 設置 Map 階段輸出鍵值對類型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);// 設置 Reduce 階段輸出鍵值對類型(最終輸出)job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);// 設置輸入數據路徑(每輪迭代輸入路徑是上一輪的輸出)FileInputFormat.setInputPaths(job, new Path("hdfs://master:9000" + inputPath));// 設置輸出數據路徑(每輪迭代輸出不同路徑)FileOutputFormat.setOutputPath(job, new Path("hdfs://master:9000" + outputPath));// 更新下一輪迭代的輸入輸出路徑inputPath = outputPath; // 當前輸出變為下一輪的輸入outputPath = outputPath + i; // 每次輸出加上數字以區分路徑(如 output1, output2,...)// 提交作業并等待執行完成job.waitForCompletion(true);}// 讀取最終輸出文件內容并打印到控制臺try {// 獲取 Hadoop 文件系統FileSystem fs = FileSystem.get(new URI("hdfs://master:9000"), new Configuration());// 拼接最終輸出文件的路徑(最后一輪輸出的 part-r-00000)Path srcPath = new Path(outputPath.substring(0, outputPath.length() - 1) + "/part-r-00000");// 打開輸出文件FSDataInputStream is = fs.open(srcPath);// 打印最終結果到控制臺System.out.println("Results:");while (true) {String line = is.readLine(); // 讀取一行結果if (line == null) break; // 如果到文件末尾,結束循環System.out.println(line); // 打印當前行}is.close(); // 關閉輸入流} catch (Exception e) {e.printStackTrace(); // 如果讀取輸出失敗,打印錯誤}}
}