二 根據用戶行為數據創建ALS模型并召回商品
2.0 用戶行為數據拆分
-
方便練習可以對數據做拆分處理
- pandas的數據分批讀取 chunk 厚厚的一塊 相當大的數量或部分
import pandas as pd reader = pd.read_csv('behavior_log.csv',chunksize=100,iterator=True) count = 0; for chunk in reader:count += 1if count ==1:chunk.to_csv('test4.csv',index = False)elif count>1 and count<1000:chunk.to_csv('test4.csv',index = False, mode = 'a',header = False)else:break pd.read_csv('test4.csv')
2.1 預處理behavior_log數據集
- 創建spark session
import os
# 配置spark driver和pyspark運行時,所使用的python解釋器路徑
PYSPARK_PYTHON = "/home/hadoop/miniconda3/envs/datapy365spark23/bin/python"
JAVA_HOME='/home/hadoop/app/jdk1.8.0_191'
# 當存在多個版本時,不指定很可能會導致出錯
os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON
os.environ['JAVA_HOME']=JAVA_HOME
# spark配置信息
from pyspark import SparkConf
from pyspark.sql import SparkSessionSPARK_APP_NAME = "preprocessingBehaviorLog"
SPARK_URL = "spark://192.168.199.188:7077"conf = SparkConf() # 創建spark config對象
config = (("spark.app.name", SPARK_APP_NAME), # 設置啟動的spark的app名稱,沒有提供,將隨機產生一個名稱("spark.executor.memory", "6g"), # 設置該app啟動時占用的內存用量,默認1g("spark.master", SPARK_URL), # spark master的地址("spark.executor.cores", "4"), # 設置spark executor使用的CPU核心數# 以下三項配置,可以控制執行器數量
# ("spark.dynamicAllocation.enabled", True),
# ("spark.dynamicAllocation.initialExecutors", 1), # 1個執行器
# ("spark.shuffle.service.enabled", True)
# ('spark.sql.pivotMaxValues', '99999'), # 當需要pivot DF,且值很多時,需要修改,默認是10000
)
# 查看更詳細配置及說明:https://spark.apache.org/docs/latest/configuration.htmlconf.setAll(config)# 利用config對象,創建spark session
spark = SparkSession.builder.config(conf=conf).getOrCreate()
- 從hdfs中加載csv文件為DataFrame
# 從hdfs加載CSV文件為DataFrame
df = spark.read.csv("hdfs://localhost:9000/datasets/behavior_log.csv", header=True)
df.show() # 查看dataframe,默認顯示前20條
# 大致查看一下數據類型
df.printSchema() # 打印當前dataframe的結構
顯示結果:
+------+----------+----+-----+------+
| user|time_stamp|btag| cate| brand|
+------+----------+----+-----+------+
|558157|1493741625| pv| 6250| 91286|
|558157|1493741626| pv| 6250| 91286|
|558157|1493741627| pv| 6250| 91286|
|728690|1493776998| pv|11800| 62353|
|332634|1493809895| pv| 1101|365477|
|857237|1493816945| pv| 1043|110616|
|619381|1493774638| pv| 385|428950|
|467042|1493772641| pv| 8237|301299|
|467042|1493772644| pv| 8237|301299|
|991528|1493780710| pv| 7270|274795|
|991528|1493780712| pv| 7270|274795|
|991528|1493780712| pv| 7270|274795|
|991528|1493780712| pv| 7270|274795|
|991528|1493780714| pv| 7270|274795|
|991528|1493780765| pv| 7270|274795|
|991528|1493780714| pv| 7270|274795|
|991528|1493780765| pv| 7270|274795|
|991528|1493780764| pv| 7270|274795|
|991528|1493780633| pv| 7270|274795|
|991528|1493780764| pv| 7270|274795|
+------+----------+----+-----+------+
only showing top 20 rowsroot|-- user: string (nullable = true)|-- time_stamp: string (nullable = true)|-- btag: string (nullable = true)|-- cate: string (nullable = true)|-- brand: string (nullable = true)
- 從hdfs加載數據為dataframe,并設置結構
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType
# 構建結構對象
schema = StructType([StructField("userId", IntegerType()),StructField("timestamp", LongType()),StructField("btag", StringType()),StructField("cateId", IntegerType()),StructField("brandId", IntegerType())
])
# 從hdfs加載數據為dataframe,并設置結構
behavior_log_df = spark.read.csv("hdfs://localhost:8020/datasets/behavior_log.csv", header=True, schema=schema)
behavior_log_df.show()
behavior_log_df.count()
顯示結果:
+------+----------+----+------+-------+
|userId| timestamp|btag|cateId|brandId|
+------+----------+----+------+-------+
|558157|1493741625| pv| 6250| 91286|
|558157|1493741626| pv| 6250| 91286|
|558157|1493741627| pv| 6250| 91286|
|728690|1493776998| pv| 11800| 62353|
|332634|1493809895| pv| 1101| 365477|
|857237|1493816945| pv| 1043| 110616|
|619381|1493774638| pv| 385| 428950|
|467042|1493772641| pv| 8237| 301299|
|467042|1493772644| pv| 8237| 301299|
|991528|1493780710| pv| 7270| 274795|
|991528|1493780712| pv| 7270| 274795|
|991528|1493780712| pv| 7270| 274795|
|991528|1493780712| pv| 7270| 274795|
|991528|1493780714| pv| 7270| 274795|
|991528|1493780765| pv| 7270| 274795|
|991528|1493780714| pv| 7270| 274795|
|991528|1493780765| pv| 7270| 274795|
|991528|1493780764| pv| 7270| 274795|
|991528|1493780633| pv| 7270| 274795|
|991528|1493780764| pv| 7270| 274795|
+------+----------+----+------+-------+
only showing top 20 rowsroot|-- userId: integer (nullable = true)|-- timestamp: long (nullable = true)|-- btag: string (nullable = true)|-- cateId: integer (nullable = true)|-- brandId: integer (nullable = true)
- 分析數據集字段的類型和格式
- 查看是否有空值
- 查看每列數據的類型
- 查看每列數據的類別情況
print("查看userId的數據情況:", behavior_log_df.groupBy("userId").count().count())
# 約113w用戶
#注意:behavior_log_df.groupBy("userId").count() 返回的是一個dataframe,這里的count計算的是每一個分組的個數,但當前還沒有進行計算
# 當調用df.count()時才開始進行計算,這里的count計算的是dataframe的條目數,也就是共有多少個分組
查看user的數據情況: 1136340
print("查看btag的數據情況:", behavior_log_df.groupBy("btag").count().collect()) # collect會把計算結果全部加載到內存,謹慎使用
# 只有四種類型數據:pv、fav、cart、buy
# 這里由于類型只有四個,所以直接使用collect,把數據全部加載出來
查看btag的數據情況: [Row(btag='buy', count=9115919), Row(btag='fav', count=9301837), Row(btag='cart', count=15946033), Row(btag='pv', count=688904345)]
print("查看cateId的數據情況:", behavior_log_df.groupBy("cateId").count().count())
# 約12968類別id
查看cateId的數據情況: 12968
print("查看brandId的數據情況:", behavior_log_df.groupBy("brandId").count().count())
# 約460561品牌id
查看brandId的數據情況: 460561
print("判斷數據是否有空值:", behavior_log_df.count(), behavior_log_df.dropna().count())
# 約7億條目723268134 723268134
# 本數據集無空值條目,可放心處理
判斷數據是否有空值: 723268134 723268134
- pivot透視操作,把某列里的字段值轉換成行并進行聚合運算(pyspark.sql.GroupedData.pivot)
- 如果透視的字段中的不同屬性值超過10000個,則需要設置spark.sql.pivotMaxValues,否則計算過程中會出現錯誤。文檔介紹。
# 統計每個用戶對各類商品的pv、fav、cart、buy數量
cate_count_df = behavior_log_df.groupBy(behavior_log_df.userId, behavior_log_df.cateId).pivot("btag",["pv","fav","cart","buy"]).count()
cate_count_df.printSchema() # 此時還沒有開始計算
顯示效果:
root|-- userId: integer (nullable = true)|-- cateId: integer (nullable = true)|-- pv: long (nullable = true)|-- fav: long (nullable = true)|-- cart: long (nullable = true)|-- buy: long (nullable = true)
- 統計每個用戶對各個品牌的pv、fav、cart、buy數量并保存結果
# 統計每個用戶對各個品牌的pv、fav、cart、buy數量
brand_count_df = behavior_log_df.groupBy(behavior_log_df.userId, behavior_log_df.brandId).pivot("btag",["pv","fav","cart","buy"]).count()
# brand_count_df.show() # 同上
# 113w * 46w
# 由于運算時間比較長,所以這里先將結果存儲起來,供后續其他操作使用
# 寫入數據時才開始計算
cate_count_df.write.csv("hdfs://localhost:9000/preprocessing_dataset/cate_count.csv", header=True)
brand_count_df.write.csv("hdfs://localhost:9000/preprocessing_dataset/brand_count.csv", header=True)
2.2 根據用戶對類目偏好打分訓練ALS模型
- 根據您統計的次數 + 打分規則 ==> 偏好打分數據集 ==> ALS模型
# spark ml的模型訓練是基于內存的,如果數據過大,內存空間小,迭代次數過多的化,可能會造成內存溢出,報錯
# 設置Checkpoint的話,會把所有數據落盤,這樣如果異常退出,下次重啟后,可以接著上次的訓練節點繼續運行
# 但該方法其實指標不治本,因為無法防止內存溢出,所以還是會報錯
# 如果數據量大,應考慮的是增加內存、或限制迭代次數和訓練數據量級等
spark.sparkContext.setCheckpointDir("hdfs://localhost:8020/checkPoint/")
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType, FloatType# 構建結構對象
schema = StructType([StructField("userId", IntegerType()),StructField("cateId", IntegerType()),StructField("pv", IntegerType()),StructField("fav", IntegerType()),StructField("cart", IntegerType()),StructField("buy", IntegerType())
])# 從hdfs加載CSV文件
cate_count_df = spark.read.csv("hdfs://localhost:9000/preprocessing_dataset/cate_count.csv", header=True, schema=schema)
cate_count_df.printSchema()
cate_count_df.first() # 第一行數據
顯示結果:
root|-- userId: integer (nullable = true)|-- cateId: integer (nullable = true)|-- pv: integer (nullable = true)|-- fav: integer (nullable = true)|-- cart: integer (nullable = true)|-- buy: integer (nullable = true)Row(userId=1061650, cateId=4520, pv=2326, fav=None, cart=53, buy=None)
- 處理每一行數據:r表示row對象
def process_row(r):# 處理每一行數據:r表示row對象# 偏好評分規則:# m: 用戶對應的行為次數# 該偏好權重比例,次數上限僅供參考,具體數值應根據產品業務場景權衡# pv: if m<=20: score=0.2*m; else score=4# fav: if m<=20: score=0.4*m; else score=8# cart: if m<=20: score=0.6*m; else score=12# buy: if m<=20: score=1*m; else score=20# 注意這里要全部設為浮點數,spark運算時對類型比較敏感,要保持數據類型都一致pv_count = r.pv if r.pv else 0.0fav_count = r.fav if r.fav else 0.0cart_count = r.cart if r.cart else 0.0buy_count = r.buy if r.buy else 0.0pv_score = 0.2*pv_count if pv_count<=20 else 4.0fav_score = 0.4*fav_count if fav_count<=20 else 8.0cart_score = 0.6*cart_count if cart_count<=20 else 12.0buy_score = 1.0*buy_count if buy_count<=20 else 20.0rating = pv_score + fav_score + cart_score + buy_score# 返回用戶ID、分類ID、用戶對分類的偏好打分return r.userId, r.cateId, rating
- 返回一個PythonRDD類型
# 返回一個PythonRDD類型,此時還沒開始計算
cate_count_df.rdd.map(process_row).toDF(["userId", "cateId", "rating"])
顯示結果:
DataFrame[userId: bigint, cateId: bigint, rating: double]
- 用戶對商品類別的打分數據
# 用戶對商品類別的打分數據
# map返回的結果是rdd類型,需要調用toDF方法轉換為Dataframe
cate_rating_df = cate_count_df.rdd.map(process_row).toDF(["userId", "cateId", "rating"])
# 注意:toDF不是每個rdd都有的方法,僅局限于此處的rdd
# 可通過該方法獲得 user-cate-matrix
# 但由于cateId字段過多,這里運算量比很大,機器內存要求很高才能執行,否則無法完成任務
# 請謹慎使用# 但好在我們訓練ALS模型時,不需要轉換為user-cate-matrix,所以這里可以不用運行
# cate_rating_df.groupBy("userId").povit("cateId").min("rating")
# 用戶對類別的偏好打分數據
cate_rating_df
顯示結果:
DataFrame[userId: bigint, cateId: bigint, rating: double]
- 通常如果USER-ITEM打分數據應該是通過一下方式進行處理轉換為USER-ITEM-MATRIX
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-bYZwXB3C-1691901742059)(/img/CF%E4%BB%8B%E7%BB%8D.png)]
但這里我們將使用的Spark的ALS模型進行CF推薦,因此注意這里數據輸入不需要提前轉換為矩陣,直接是 USER-ITEM-RATE的數據
-
基于Spark的ALS隱因子模型進行CF評分預測
-
ALS的意思是交替最小二乘法(Alternating Least Squares),是Spark2.*中加入的進行基于模型的協同過濾(model-based CF)的推薦系統算法。
同SVD,它也是一種矩陣分解技術,對數據進行降維處理。
-
詳細使用方法:pyspark.ml.recommendation.ALS
-
注意:由于數據量巨大,因此這里也不考慮基于內存的CF算法
參考:為什么Spark中只有ALS
-
# 使用pyspark中的ALS矩陣分解方法實現CF評分預測
# 文檔地址:https://spark.apache.org/docs/2.2.2/api/python/pyspark.ml.html?highlight=vectors#module-pyspark.ml.recommendation
from pyspark.ml.recommendation import ALS # ml:dataframe, mllib:rdd# 利用打分數據,訓練ALS模型
als = ALS(userCol='userId', itemCol='cateId', ratingCol='rating', checkpointInterval=5)# 此處訓練時間較長
model = als.fit(cate_rating_df)
- 模型訓練好后,調用方法進行使用,具體API查看
# model.recommendForAllUsers(N) 給所有用戶推薦TOP-N個物品
ret = model.recommendForAllUsers(3)
# 由于是給所有用戶進行推薦,此處運算時間也較長
ret.show()
# 推薦結果存放在recommendations列中,
ret.select("recommendations").show()
顯示結果:
+------+--------------------+
|userId| recommendations|
+------+--------------------+
| 148|[[3347, 12.547271...|
| 463|[[1610, 9.250818]...|
| 471|[[1610, 10.246621...|
| 496|[[1610, 5.162216]...|
| 833|[[5607, 9.065482]...|
| 1088|[[104, 6.886987],...|
| 1238|[[5631, 14.51981]...|
| 1342|[[5720, 10.89842]...|
| 1580|[[5731, 8.466453]...|
| 1591|[[1610, 12.835257...|
| 1645|[[1610, 11.968531...|
| 1829|[[1610, 17.576496...|
| 1959|[[1610, 8.353473]...|
| 2122|[[1610, 12.652732...|
| 2142|[[1610, 12.48068]...|
| 2366|[[1610, 11.904813...|
| 2659|[[5607, 11.699315...|
| 2866|[[1610, 7.752719]...|
| 3175|[[3347, 2.3429515...|
| 3749|[[1610, 3.641833]...|
+------+--------------------+
only showing top 20 rows+--------------------+
| recommendations|
+--------------------+
|[[3347, 12.547271...|
|[[1610, 9.250818]...|
|[[1610, 10.246621...|
|[[1610, 5.162216]...|
|[[5607, 9.065482]...|
|[[104, 6.886987],...|
|[[5631, 14.51981]...|
|[[5720, 10.89842]...|
|[[5731, 8.466453]...|
|[[1610, 12.835257...|
|[[1610, 11.968531...|
|[[1610, 17.576496...|
|[[1610, 8.353473]...|
|[[1610, 12.652732...|
|[[1610, 12.48068]...|
|[[1610, 11.904813...|
|[[5607, 11.699315...|
|[[1610, 7.752719]...|
|[[3347, 2.3429515...|
|[[1610, 3.641833]...|
+--------------------+
only showing top 20 rows
- model.recommendForUserSubset 給部分用戶推薦TOP-N個物品
# 注意:recommendForUserSubset API,2.2.2版本中無法使用
dataset = spark.createDataFrame([[1],[2],[3]])
dataset = dataset.withColumnRenamed("_1", "userId")
ret = model.recommendForUserSubset(dataset, 3)# 只給部分用推薦,運算時間短
ret.show()
ret.collect() # 注意: collect會將所有數據加載到內存,慎用
顯示結果:
+------+--------------------+
|userId| recommendations|
+------+--------------------+
| 1|[[1610, 25.4989],...|
| 3|[[5607, 13.665942...|
| 2|[[5579, 5.9051886...|
+------+--------------------+[Row(userId=1, recommendations=[Row(cateId=1610, rating=25.498899459838867), Row(cateId=5737, rating=24.901548385620117), Row(cateId=3347, rating=20.736785888671875)]),Row(userId=3, recommendations=[Row(cateId=5607, rating=13.665942192077637), Row(cateId=1610, rating=11.770171165466309), Row(cateId=3347, rating=10.35690689086914)]),Row(userId=2, recommendations=[Row(cateId=5579, rating=5.90518856048584), Row(cateId=2447, rating=5.624575138092041), Row(cateId=5690, rating=5.2555742263793945)])]
- transform中提供userId和cateId可以對打分進行預測,利用打分結果排序后
# transform中提供userId和cateId可以對打分進行預測,利用打分結果排序后,同樣可以實現TOP-N的推薦
model.transform
# 將模型進行存儲
model.save("hdfs://localhost:8020/models/userCateRatingALSModel.obj")
# 測試存儲的模型
from pyspark.ml.recommendation import ALSModel
# 從hdfs加載之前存儲的模型
als_model = ALSModel.load("hdfs://localhost:8020/models/userCateRatingALSModel.obj")
# model.recommendForAllUsers(N) 給用戶推薦TOP-N個物品
result = als_model.recommendForAllUsers(3)
result.show()
顯示結果:
+------+--------------------+
|userId| recommendations|
+------+--------------------+
| 148|[[3347, 12.547271...|
| 463|[[1610, 9.250818]...|
| 471|[[1610, 10.246621...|
| 496|[[1610, 5.162216]...|
| 833|[[5607, 9.065482]...|
| 1088|[[104, 6.886987],...|
| 1238|[[5631, 14.51981]...|
| 1342|[[5720, 10.89842]...|
| 1580|[[5731, 8.466453]...|
| 1591|[[1610, 12.835257...|
| 1645|[[1610, 11.968531...|
| 1829|[[1610, 17.576496...|
| 1959|[[1610, 8.353473]...|
| 2122|[[1610, 12.652732...|
| 2142|[[1610, 12.48068]...|
| 2366|[[1610, 11.904813...|
| 2659|[[5607, 11.699315...|
| 2866|[[1610, 7.752719]...|
| 3175|[[3347, 2.3429515...|
| 3749|[[1610, 3.641833]...|
+------+--------------------+
only showing top 20 rows
- 召回到redis
import redis
host = "192.168.19.8"
port = 6379
# 召回到redis
def recall_cate_by_cf(partition):# 建立redis 連接池pool = redis.ConnectionPool(host=host, port=port)# 建立redis客戶端client = redis.Redis(connection_pool=pool)for row in partition:client.hset("recall_cate", row.userId, [i.cateId for i in row.recommendations])
# 對每個分片的數據進行處理 #mapPartition Transformation map
# foreachPartition Action操作 foreachRDD
result.foreachPartition(recall_cate_by_cf)# 注意:這里這是召回的是用戶最感興趣的n個類別
# 總的條目數,查看redis中總的條目數是否一致
result.count()
顯示結果:
1136340
2.3 根據用戶對品牌偏好打分訓練ALS模型
from pyspark.sql.types import StructType, StructField, StringType, IntegerTypeschema = StructType([StructField("userId", IntegerType()),StructField("brandId", IntegerType()),StructField("pv", IntegerType()),StructField("fav", IntegerType()),StructField("cart", IntegerType()),StructField("buy", IntegerType())
])
# 從hdfs加載預處理好的品牌的統計數據
brand_count_df = spark.read.csv("hdfs://localhost:8020/preprocessing_dataset/brand_count.csv", header=True, schema=schema)
# brand_count_df.show()
def process_row(r):# 處理每一行數據:r表示row對象# 偏好評分規則:# m: 用戶對應的行為次數# 該偏好權重比例,次數上限僅供參考,具體數值應根據產品業務場景權衡# pv: if m<=20: score=0.2*m; else score=4# fav: if m<=20: score=0.4*m; else score=8# cart: if m<=20: score=0.6*m; else score=12# buy: if m<=20: score=1*m; else score=20# 注意這里要全部設為浮點數,spark運算時對類型比較敏感,要保持數據類型都一致pv_count = r.pv if r.pv else 0.0fav_count = r.fav if r.fav else 0.0cart_count = r.cart if r.cart else 0.0buy_count = r.buy if r.buy else 0.0pv_score = 0.2*pv_count if pv_count<=20 else 4.0fav_score = 0.4*fav_count if fav_count<=20 else 8.0cart_score = 0.6*cart_count if cart_count<=20 else 12.0buy_score = 1.0*buy_count if buy_count<=20 else 20.0rating = pv_score + fav_score + cart_score + buy_score# 返回用戶ID、品牌ID、用戶對品牌的偏好打分return r.userId, r.brandId, rating
# 用戶對品牌的打分數據
brand_rating_df = brand_count_df.rdd.map(process_row).toDF(["userId", "brandId", "rating"])
# brand_rating_df.show()
-
基于Spark的ALS隱因子模型進行CF評分預測
-
ALS的意思是交替最小二乘法(Alternating Least Squares),是Spark中進行基于模型的協同過濾(model-based CF)的推薦系統算法,也是目前Spark內唯一一個推薦算法。
同SVD,它也是一種矩陣分解技術,但理論上,ALS在海量數據的處理上要優于SVD。
更多了解:pyspark.ml.recommendation.ALS
注意:由于數據量巨大,因此這里不考慮基于內存的CF算法
參考:為什么Spark中只有ALS
-
-
使用pyspark中的ALS矩陣分解方法實現CF評分預測
# 使用pyspark中的ALS矩陣分解方法實現CF評分預測
# 文檔地址:https://spark.apache.org/docs/latest/api/python/pyspark.ml.html?highlight=vectors#module-pyspark.ml.recommendation
from pyspark.ml.recommendation import ALSals = ALS(userCol='userId', itemCol='brandId', ratingCol='rating', checkpointInterval=2)
# 利用打分數據,訓練ALS模型
# 此處訓練時間較長
model = als.fit(brand_rating_df)
# model.recommendForAllUsers(N) 給用戶推薦TOP-N個物品
model.recommendForAllUsers(3).show()
# 將模型進行存儲
model.save("hdfs://localhost:9000/models/userBrandRatingModel.obj")
# 測試存儲的模型
from pyspark.ml.recommendation import ALSModel
# 從hdfs加載模型
my_model = ALSModel.load("hdfs://localhost:9000/models/userBrandRatingModel.obj")
my_model
# model.recommendForAllUsers(N) 給用戶推薦TOP-N個物品
my_model.recommendForAllUsers(3).first()