2.1 數據源介紹
訪問時間 用戶id []里面是用戶輸入搜索內容 url結果排名 用戶點擊頁面排序 用戶點擊URL
字段與字段之間的分隔符號為 \t和空格 (制表符號)
2.2 需求分析
需求一: 統計每個 關鍵詞 出現了多少次,最終展示top10數據關鍵詞示例: ['.','+','的',360', '安全衛士', '哄搶', '救災物資', '75810', '部隊' ...]注意:'.','+','的'都需要過濾 ? 需求二: 統計每個用戶每個 搜索內容 點擊的次數,最終展示top5數據 ? 需求三: 統計每個分鐘頁面點擊次數,最終展示top5數據(課后作業)
2.3jieba分詞器
說明: 發現在數據中,并沒有直接的關鍵詞,關鍵詞數據是包含在搜索詞中,而且一個搜索詞中包含了多個關鍵詞,所有如何想基于關鍵詞進行統計, 首先需求先拆分搜索詞,獲取關鍵詞,思考:如何做呢? ? 借助第三方的分詞工具實現中文分詞Java語言:IK分詞器Python語言:jieba(結巴)分詞器 ? 如何使用jieba分詞器呢? 1- 需要在系統中安裝jieba分詞庫: local模式只需要安裝在node1即可 如果集群模式運行 需要各個節點都要安裝安裝命令: pip install -i https://pypi.tuna.tsinghua.edu.cn/simple jieba
安裝成功的截圖:
測試
# 導包 import jieba ? # main程序的入口 if __name__ == '__main__':# 精確模式r1 = list(jieba.cut('我要去黑馬找斌哥學習大數據技術'))print(r1)# 全模式r2 = list(jieba.cut('我要去黑馬找斌哥學習大數據技術',cut_all=True))print(r2)# 搜索引擎模式r3 = list(jieba.cut_for_search('我要去黑馬找斌哥學習大數據技術'))print(r3)
2.4 代碼實現
2.3.1 數據準備
-
1- 將數據文件拷貝到data目錄下:
-
2- 數據清洗_分析
# 導包 import os ? import jieba from pyspark import SparkConf, SparkContext ? # 綁定指定的python解釋器 os.environ['SPARK_HOME'] = '/export/server/spark' os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3' os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3' ? ? def get_topN_keyword(etlRDD, n):r1 = etlRDD.flatMap(lambda line_list: list(jieba.cut(line_list[2]))) \.filter(lambda word: word not in ('.', '+', '的')) \.map(lambda word: (word, 1)) \.reduceByKey(lambda agg, curr: agg + curr) \.top(n, lambda t: t[1])print(r1) ? ? def get_topN_search(etlRDD,n):r2 = etlRDD.map(lambda line_list: ((line_list[1], line_list[2]),1) ) \.reduceByKey(lambda agg, curr: agg + curr) \.top(n, lambda t: t[1])print(r2) ? ? # 創建main函數 if __name__ == '__main__':# 1.創建SparkContext對象conf = SparkConf().setAppName('pyspark_demo').setMaster('local[*]')sc = SparkContext(conf=conf) ?# 2.數據輸入textRDD = sc.textFile('file:///export/data/spark_project/spark_core/data/SogouQ.sample')print(textRDD.count()) ?# 測試原始數據量10000# 3.數據處理(切分,轉換,分組聚合)etlRDD = textRDD.filter(lambda line: line.strip() != '').map(lambda line: line.split()).filter(lambda line_list: len(line_list) >= 6)# 去除搜索內容兩端的 [ ]etlRDD = etlRDD.map(lambda line_list:[line_list[0],line_list[1],line_list[2][1:-1],line_list[3],line_list[4],line_list[5]])# 4.數據輸出# 需求一: 統計每個 關鍵詞 出現了多少次, 最終展示top10數據 注意:'.', '+', '的' 都需要過濾# 偽SQL:select 關鍵詞 ,count(*) from 搜狗表 group by 關鍵詞get_topN_keyword(etlRDD, 10)# 需求二: 統計每個用戶 每個 搜索內容 點擊的次數, 最終展示top5數據# 偽SQL:select 用戶,搜索內容,count(*) from 搜狗表 group by 用戶,搜索內容get_topN_search(etlRDD,5)# 5.關閉資源sc.stop()
可能遇到的錯誤:
原因: reduceByKey只能接收元組中有2個元素的情況
3、數據核對
大數據開發人員/數據分析人員,必須要對自己統計的指標結果負責!!! ? 結果數據的核對方式: 1- (不常用)在離線文件中直接ctrl+F搜索關鍵內容核對 2- (常用)一般原始數據會存放在MySQL/Hive中一份,可以編寫和代碼邏輯完全一樣的SQL來進行核對。可以通過如下方式來提高核對效率:2.1- 如果是分區表,挑選幾個分區進行核對即可2.2- 如果是分桶表, 可以抽樣查詢幾個桶核對2.3- 也可以使用tablesample函數對抽樣查詢2.4- 可以在SQL的where語句中,添加數據過濾條件,例如:時間范圍過濾條件、用戶編號過濾條件等,將核對的數據量縮小,提高核對效率 3- (不常用,難度比較大)可以對重點指標的同比、環比進行監控。還可以結合算法的方式對指標統計結果進行檢測。 4- 個人經驗或者同事互相幫忙查看測試