本文是PySpark銷量預測系列第一篇,后面會陸續通過實戰案例詳細介紹PySpark銷量預測流程,包含特征工程、特征篩選、超參搜索、預測算法。
在零售銷量預測領域,銷售小票數據動輒上千萬條,這個量級在單機版上進行數據分析/挖掘是非常困難的,所以我們需要借助大數據利器--Spark來完成。
Spark作為一個快速通用的分布式計算平臺,可以高效的使用內存,向用戶呈現高級API,這些API將轉換為復雜的并行程序,用戶無需深入底層。
由于數據挖掘或分析人員,大多數熟悉的編程語言是Python,所以本章我們介紹Spark的Python版--PySpark。本節先介紹必要的基礎知識比如DataFrame和ML庫,在后續章節中會給出基于Spark機器學習的特征生成/特征選擇/超參數調優以及機器學習銷量預測算法。
1.Spark.DataFrame與Spark.ML簡介
從Spark 2.0開始,Spark機器學習API是基于DataFrame的Spark.ML ,而之前基于RDD的Spark.MLlib已進入維護模式,不再更新加入新特性。基于DataFrame的Spark.ML是在RDD的基礎上進一步的封裝,也是更加強大方便的機器學習API,同時如果已經習慣了Python機器學習庫如sklearn等,那么你會發現ML用起來很親切。本節主要厘清一些概念為接下來的機器學習做準備,所以可能知識點比較密集且枯燥。
下面我們就開始介紹DataFrame和ML
DataFrame 從屬于 Spark SQL 模塊,適用于結構化/數據庫表以及字典結構的數據,執行數據讀取操作返回的數據格式就是DataFrame,同時熟悉Python的pandas庫或者R語言的同學來說,更是覺得親切,Spark.DataFrame正是借鑒了二者。DataFrame的主要優點是Spark引擎在一開始就為其提供了性能優化,與Java或者Scala相比,Python中的RDD非常慢。每當使用RDD執行PySpark程序時,在PySpark驅動器中,啟動Py4j使用JavaSparkContext的JVM,PySpark將數據分發到多個節點的Python子進程中,此時Python和JVM之間是有很多上下文切換和通信開銷,而DataFrame存在的意義就是優化PySpark的查詢性能。
以上我們交代了Spark.DataFrame的由來,下面介紹其常見操作。
1.1 Spark.DataFrame生成
(1)使用toDF(基于RDD)
from pyspark import SparkConf,SparkContextfrom pyspark.sql import Rowconf = SparkConf().setMaster("local").setAppName("My App")sc = SparkContext(conf = conf)df = sc.parallelize([ \ Row(name='Alice', age=5, height=80), \ Row(name='Alice', age=5, height=80), \ Row(name='Alice', age=10, height=80)]).toDF()#查看數據類型df.dtypes#[('age', 'bigint'), ('height', 'bigint'), ('name', 'string')]查看df類型type(df)#class 'pyspark.sql.dataframe.DataFrame'>
可以將DataFrame視為關系數據表,在其上進行類似于SQL的操作,同時與平時建SQL表需要指定數據類型不同的是,此時數據列的類型是自動推斷,這也是其強大之處。
(2)讀取本地文件
from?pyspark.sql?import?SparkSessionspark = SparkSession.builder \ .master("local") \ .appName("Test Create DataFrame") \ .config("spark.some.config.option", "some-value") \ .getOrCreate()df = spark.read.csv('python/test_spark/ts_dataset.csv')
同理還可以讀取parquet/json文件
df_parquet=spark.read.parquet('....')df_json = spark.read.format('json').load('python/test_spark/ts_dataset.json')
以上兩種方式中,第一種是Spark1.x版本中以RDD作為主要API的方式,第二種的SparkSession是隨著spark2.x引入,封裝了SparkContext、SparkConf、sqlContext等,為用戶提供統一的接口來使用Spark各項功能的更高級抽象的啟動方式。
強調一點是,我們通過會話SparkSession讀取出來的數據類型就是DataFrame,而第一種還需要在RDD的基礎上使用toDF進行轉換。如果當前讀者使用的spark版本是2,那么,推薦使用第二種方式。
(3)讀取HIVE表
from pyspark.sql import SparkSessionspark = SparkSession. \ Builder(). \ config("spark.sql.crossJoin.enabled", "true"). \ config("spark.sql.execution.arrow.enabled", "true"). \ enableHiveSupport(). \ getOrCreate()df=spark.sql("""select regparam,fitIntercept, elasticNetParam from temp.model_best_param""")
這種類型和上文直接讀取本地文件類似,Spark任務在創建時,是默認支持Hive,可以直接訪問現有的 Hive支持的存儲格式。解釋一下,Apache Hive是Hadoop上一種常見的結構化數據源,支持包含HDFS在內的多種存儲系統上的表,由于實際工作中我們使用spark.sql讀取數據操作的機會更多,也是spark最核心組件之一,所以這里重點講解一些Spark.SQL。與Spark其他的組件一樣,在使用的時候是需要提前引入Spark.SQL,但也無需依賴大量的包,如果需要把Spark.SQL連接到一個部署好的Hive上,則需要把hive-site.xml復制到spark的配置文件目錄中,該部分內容參考網絡上其他的教程。以上代碼中enableHiveSupport的調用使得SparkSession支持Hive。如果是Spark 1.x版本,則使用以下方式引用。
from pyspark.sql import HiveContexthiveCtx=HiveContext(sc)data=hiveCtx.sql("select regparam,fitIntercept, elasticNetParam from temp.model_best_para ")
(4)pandas.DataFrame轉換而來
既然使用python進行數據處理,尤其是結構化數據,那么pandas一定繞不開,所以我們經常會有把做過一些處理的pandas.DataFrame數據轉換為Spark.DataFrame的訴求,好在Spark.DataFrame在設計之初就參考并考慮到了這個問題,所以實現方式也相當簡單。
import pandas as pddf = pd.read_csv('python/test_spark/ts_dataset.csv')#將pandas.Dataframe?轉換成-->Spark.DataFrame?spark_df=spark.createDataFrame(df)#將Spark.DataFrame?轉換成-->?pandas.Dataframepd_df = spark_df.toPandas()
以上將Spark.DataFrame 轉換成--> pandas.Dataframe的過程,不建議對超過10G的數據執行該操作。
本節開頭我們也說了Spark.DataFrame是從屬于Spark.sql的,Spark.sql作為Spark最重要的組件,是可以從各種結構化數據格式和數據源讀取和寫入的,所以上面我們也展示了讀取json/csv等本地以及數據庫中的數據。同時spark還允許用戶通過thrift的jdbc遠程訪問數據庫。總的來說 Spark 隱藏了分布式計算的復雜性, Spark SQL 、DataFrame更近一步用統一而簡潔的API接口隱藏了數據分析的復雜性。從開發速度和性能上來說,DataFrame + SQL 無疑是大數據分析的最好選擇。
1.2 Spark.DataFrame操作
以上我們強調了Spark.DataFrame可以靈活的讀取各種數據源,數據讀取加載后就是對其進行處理了,下面介紹讀取DataFrame格式的數據以后執行的一些簡單的操作。
(1)展示DataFrame
spark_df.show()
打印DataFrame的Schema信息
spark_df.printSchema()
顯示前n行
spark_df.head(5)
顯示數據長度與列名
df.count()df.columns
(2)操作DataFrame列
選擇列
ml_dataset=spark_df.select("features", "label")
增加/產生新的一列
from pyspark.sql.functions import *#注意這個*號,這里是導入了sql.functions中所有的函數,所以下文的abs就是由此而來df2 = spark_df.withColumn("abs_age", abs(df2.age))
刪除列
df3= spark_df.drop("age")
篩選
df4= spark_df.where(spark_df["age"]>20)
這里只是簡單的展示了一小部分最為常見的DataFrame操作,更詳盡的內容請查閱官方文檔或者其他參考資料。
1.3 Spark.ML簡介
以上我們介紹了與Spark.ML機器學習密切相關的數據類型和基本操作--Spark.DataFrame
猶如我們通過pandas.DataFrame對數據做加工,下面我們看看用這些清洗過后的制作佳肴的工具包--機器學習建模。
ML包括三個主要的抽象類:轉換器(Transformer)、評估器(Estimator)和管道(Pipeline)。
轉換器,顧名思義就是在原對象的基礎上對DataFrame進行轉換操作,常見的有spark.ml.feature中的對特征做歸一化,分箱,降度,OneHot等數據處理,通過transform()
方法將一個DataFrame轉換成另一個DataFrame。
評估器,評估器是用于機器學習諸如預測或分類等算法,訓練一個DataFrame并生成一個模型。用實現fit()方法來擬合模型。
from pyspark.ml.feature import MinMaxScaler#定義/引入轉換類max_min_scaler = MinMaxScaler(inputCol="age", outputCol="age_scaler")#fit數據max_min_age = max_min_scaler.fit(df)#執行轉換max_min_age_=max_min_age.transform(spark_df)
管道這一概念同樣受Python的Scikit-Learn庫的影響,PySpark ML中的管道指從轉換到評估的端到端的過程,為簡化機器學習過程并使其具備可擴展性,PySpark ML中的Pipelines API,類似于 Python 機器學習庫 Scikit-Learn 中的 Pipeline,采用了一系列 API 定義并標準化的的機器學習工作流,包含數據讀取、預處理、特征加工、特征選擇、模型擬合、模型驗證、模型評估等一系列工作,對DataFrame數據執行計算操作。Spark機器學習部分其他的如特征生成,模型訓練,模型保存,數據集劃分/超參數調優,后面我們會有實際案例進行詳細闡述。另外,隨著spark.3.0的發布,最近的ml簡介可以通過此鏈接了解
http://spark.apache.org/docs/latest/ml-guide.html
最后,順便介紹手頭上幾本密切相關書籍:
1.《Spark快速大數據分析》本書有些舊,主要是spark.1.x為主,少量的spark.2.X介紹,如果想要了解或者不得不使用rdd based ?APIs進行數據分析或者想深入spark更底層學習一點scala等函數式編程入門的還是不錯的選擇,比較全面通俗,豆瓣評分7.9。
2.《PySpark實戰指南》用Python進行Spark數據分析那就不得不提到這本書,倒不見得有多好,只是目前市面上也沒有更好的專門使用Python介紹Spark的中文書籍,本書從rdd到mllib的介紹以及ml包的介紹,可以通過書中提供的API介紹了解使用python進行spark機器學習的過程,當然,機器學習的一些細節是沒有涉及到的,總的來說更多的是展示流程和API的使用。