【PySpark入门】手把手实现PySpark机器学习项目-回归算法
摘要??
?PySpark作為工業(yè)界常用于處理大數(shù)據(jù)以及分布式計(jì)算的工具,特別是在算法建模時(shí)起到了非常大的作用。PySpark如何建模呢?這篇文章手把手帶你入門PySpark,提前感受工業(yè)界的建模過程!
任務(wù)簡介??
在電商中,了解用戶在不同品類的各個(gè)產(chǎn)品的購買力是非常重要的!這將有助于他們?yōu)椴煌a(chǎn)品的客戶創(chuàng)建個(gè)性化的產(chǎn)品。在這篇文章中,筆者在真實(shí)的數(shù)據(jù)集中手把手實(shí)現(xiàn)如何預(yù)測用戶在不同品類的各個(gè)產(chǎn)品的購買行為。
如果有興趣和筆者一步步實(shí)現(xiàn)項(xiàng)目,可以先根據(jù)上一篇文章的介紹中安裝PySpark,并在網(wǎng)站中下載數(shù)據(jù)。
https://datahack.analyticsvidhya.com/contest/black-friday/
數(shù)據(jù)集簡介??
某零售公司想要了解針對不同類別的各種產(chǎn)品的顧客購買行為(購買量)。他們?yōu)樯蟼€(gè)月選定的大批量產(chǎn)品分享了各種客戶的購買匯總。該數(shù)據(jù)集還包含客戶人口統(tǒng)計(jì)信息(age, gender, marital status, city_type, stay_in_current_city),產(chǎn)品詳細(xì)信息(product_id and product category)以及上個(gè)月的purchase_amount總數(shù)。現(xiàn)在,他們希望建立一個(gè)模型來預(yù)測客戶對各種產(chǎn)品的購買量,這將有助于他們?yōu)椴煌a(chǎn)品的客戶創(chuàng)建個(gè)性化的產(chǎn)品。
手把手實(shí)戰(zhàn)項(xiàng)目??
1. 導(dǎo)入數(shù)據(jù)
這里我們使用PySpark的讀數(shù)據(jù)接口read.csv讀取數(shù)據(jù),和pandas讀取數(shù)據(jù)接口迷之相似。
from pyspark.sql import SparkSessionspark = SparkSession \.builder \.appName("test") \.config("spark.some.config.option", "setting") \.getOrCreate()train = spark.read.csv('./BlackFriday/train.csv', header=True, inferSchema=True) test = spark.read.csv('./BlackFriday/test.csv', header=True, inferSchema=True2. 分析數(shù)據(jù)的類型
要查看Dataframe中列的類型,可以使用printSchema()方法。讓我們在train上應(yīng)用printSchema(),它將以樹格式打印模式。
train.printSchema() """ root|-- User_ID: integer (nullable = true)|-- Product_ID: string (nullable = true)|-- Gender: string (nullable = true)|-- Age: string (nullable = true)|-- Occupation: integer (nullable = true)|-- City_Category: string (nullable = true)|-- Stay_In_Current_City_Years: string (nullable = true)|-- Marital_Status: integer (nullable = true)|-- Product_Category_1: integer (nullable = true)|-- Product_Category_2: integer (nullable = true)|-- Product_Category_3: integer (nullable = true)|-- Purchase: integer (nullable = true) """3.?預(yù)覽數(shù)據(jù)集
在PySpark中,我們使用head()方法預(yù)覽數(shù)據(jù)集以查看Dataframe的前n行,就像python中的pandas一樣。我們需要在head方法中提供一個(gè)參數(shù)(行數(shù))。讓我們看一下train的前5行。
train.head(5) """ [Row(User_ID=1000001, Product_ID='P00069042', Gender='F', Age='0-17', Occupation=10, City_Category='A', Stay_In_Current_City_Years='2', Marital_Status=0, Product_Category_1=3, Product_Category_2=None, Product_Category_3=None, Purchase=8370), Row(User_ID=1000001, Product_ID='P00248942', Gender='F', Age='0-17', Occupation=10, City_Category='A', Stay_In_Current_City_Years='2', Marital_Status=0, Product_Category_1=1, Product_Category_2=6, Product_Category_3=14, Purchase=15200), Row(User_ID=1000001, Product_ID='P00087842', Gender='F', Age='0-17', Occupation=10, City_Category='A', Stay_In_Current_City_Years='2', Marital_Status=0, Product_Category_1=12, Product_Category_2=None, Product_Category_3=None, Purchase=1422), Row(User_ID=1000001, Product_ID='P00085442', Gender='F', Age='0-17', Occupation=10, City_Category='A', Stay_In_Current_City_Years='2', Marital_Status=0, Product_Category_1=12, Product_Category_2=14, Product_Category_3=None, Purchase=1057), Row(User_ID=1000002,?Product_ID='P00285442',?Gender='M',?Age='55+',?Occupation=16,?City_Category='C',?Stay_In_Current_City_Years='4+',?Marital_Status=0,?Product_Category_1=8,?Product_Category_2=None,?Product_Category_3=None,?Purchase=7969)] """要查看數(shù)據(jù)框架中的行數(shù),我們需要調(diào)用方法count()。讓我們核對一下train上的行數(shù)。Pandas和Spark的count方法是不同的。
4.?插補(bǔ)缺失值
通過調(diào)用drop()方法,可以檢查train上非空數(shù)值的個(gè)數(shù),并進(jìn)行測試。默認(rèn)情況下,drop()方法將刪除包含任何空值的行。我們還可以通過設(shè)置參數(shù)“all”,當(dāng)且僅當(dāng)該行所有參數(shù)都為null時(shí)以刪除該行。這與pandas上的drop方法類似。
train.na.drop('any').count(),test.na.drop('any').count() """ (166821, 71037) """在這里,為了填充簡單,我使用-1來填充train和test的null值。雖然這不是一個(gè)很好的填充方法,你可以選擇其他的填充方式。
train = train.fillna(-1) test = test.fillna(-1)5.?分析數(shù)值特征
我們還可以使用describe()方法查看Dataframe列的各種匯總統(tǒng)計(jì)信息,它顯示了數(shù)字變量的統(tǒng)計(jì)信息。要顯示結(jié)果,我們需要調(diào)用show()方法。
train.describe().show() """ +-------+------------------+----------+------+------+------------------+-------------+--------------------------+-------------------+------------------+------------------+------------------+-----------------+ |summary| User_ID|Product_ID|Gender| Age| Occupation|City_Category|Stay_In_Current_City_Years| Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3| Purchase| +-------+------------------+----------+------+------+------------------+-------------+--------------------------+-------------------+------------------+------------------+------------------+-----------------+ | count| 550068| 550068|550068|550068| 550068| 550068| 550068| 550068| 550068| 550068| 550068| 550068| | mean|1003028.8424013031| null| null| null| 8.076706879876669| null| 1.468494139793958|0.40965298835780306| 5.404270017525106| 6.419769919355425| 3.145214773446192|9263.968712959126| | stddev| 1727.591585530871| null| null| null|6.5226604873418115| null| 0.989086680757309| 0.4917701263173259| 3.936211369201324| 6.565109781181374| 6.681038828257864|5023.065393820593| | min| 1000001| P00000142| F| 0-17| 0| A| 0| 0| 1| -1| -1| 12| | max| 1006040| P0099942| M| 55+| 20| C| 4+| 1| 20| 18| 18| 23961| +-------+------------------+----------+------+------+------------------+-------------+--------------------------+-------------------+------------------+------------------+------------------+-----------------+ """上面看起來好像比較亂,這里我們選擇某一列來看看
讓我們從一個(gè)列中選擇一個(gè)名為“User_ID”的列,我們需要調(diào)用一個(gè)方法select并傳遞我們想要選擇的列名。select方法將顯示所選列的結(jié)果。我們還可以通過提供用逗號分隔的列名,從數(shù)據(jù)框架中選擇多個(gè)列。
train.select('User_ID','Age').show(5) """ +-------+----+ |User_ID| Age| +-------+----+ |1000001|0-17| |1000001|0-17| |1000001|0-17| |1000001|0-17| |1000002| 55+| +-------+----+ only showing top 5 rows """ 6.?分析categorical特征為了建立一個(gè)模型,我們需要在“train”和“test”中看到分類特征的分布。這里我只對Product_ID顯示這個(gè),但是我們也可以對任何分類特性執(zhí)行相同的操作。讓我們看看在“train”和“test”中Product_ID的不同類別的數(shù)量。這可以通過應(yīng)用distinct()和count()方法來實(shí)現(xiàn)。
train.select('Product_ID').distinct().count(), test.select('Product_ID').distinct().count() """ (3631, 3491) """在計(jì)算“train”和“test”的不同值的數(shù)量后,我們可以看到“train”和“test”有更多的類別。讓我們使用相減方法檢查Product_ID的類別,這些類別正在"test"中,但不在“train”中。我們也可以對所有的分類特征做同樣的處理。
diff_cat_in_train_test=test.select('Product_ID').subtract(train.select('Product_ID'))diff_cat_in_train_test.distinct().count() """ (46, None) """diff_cat_in_train_test.distinct().show(5) """ +----------+ |Product_ID| +----------+ | P00322642| | P00300142| | P00077642| | P00249942| | P00294942| +----------+ only?showing?top?5?rows """以上你可以看到46個(gè)不同的類別是在"test"中,而不在"train"中。在這種情況下,我們要么收集更多關(guān)于它們的數(shù)據(jù),要么跳過那些類別(無效類別)的“test”。
7.?將分類變量轉(zhuǎn)換為標(biāo)簽我們還需要通過在Product_ID上應(yīng)用StringIndexer轉(zhuǎn)換將分類列轉(zhuǎn)換為標(biāo)簽,該轉(zhuǎn)換將標(biāo)簽的Product_ID列編碼為標(biāo)簽索引的列。
from pyspark.ml.feature import StringIndexer plan_indexer = StringIndexer(inputCol = 'Product_ID', outputCol = 'product_id_trans') labeller = plan_indexer.fit(train)在上面,我們將fit()方法應(yīng)用于“train”數(shù)據(jù)框架上,構(gòu)建了一個(gè)標(biāo)簽。稍后我們將使用這個(gè)標(biāo)簽來轉(zhuǎn)換我們的"train"和“test”。讓我們在labeller的幫助下轉(zhuǎn)換我們的train和test的Dataframe。我們需要調(diào)用transform方法。我們將把轉(zhuǎn)換結(jié)果存儲在Train1和Test1中.
Train1 = labeller.transform(train) Test1 = labeller.transform(test) Train1.show(2) """ +-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+----------------+ |User_ID|Product_ID|Gender| Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|product_id_trans| +-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+----------------+ |1000001| P00069042| F|0-17| 10| A| 2| 0| 3| -1| -1| 8370| 766.0| |1000001| P00248942| F|0-17| 10| A| 2| 0| 1| 6| 14| 15200| 183.0| +-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+----------------+ only?showing?top?2?rows """Train1.select('product_id_trans').show(2) """ +----------------+ |product_id_trans| +----------------+ | 766.0| | 183.0| +----------------+ only showing top 2 rows """上面已經(jīng)顯示了我們在以前的"train" Dataframe中成功的添加了一個(gè)轉(zhuǎn)化后的列“product_id_trans”,("Train1" Dataframe)。
8.?選擇特征來構(gòu)建機(jī)器學(xué)習(xí)模型
首先,我們需要從pyspark.ml.feature導(dǎo)入RFormula;然后,我們需要在這個(gè)公式中指定依賴和獨(dú)立的列;我們還必須為為features列和label列指定名稱。
from pyspark.ml.feature import RFormula formula = RFormula(formula="Purchase ~ Age+ Occupation +City_Category+Stay_In_Current_City_Years+Product_Category_1+Product_Category_2+ Gender",featuresCol="features",labelCol="label")在創(chuàng)建了這個(gè)公式之后,我們需要將這個(gè)公式應(yīng)用到我們的Train1上,并通過這個(gè)公式轉(zhuǎn)換Train1,Test1。讓我們看看如何做到這一點(diǎn),在擬合變換train1之后,
t1 = formula.fit(Train1) train1 = t1.transform(Train1) test1 = t1.transform(Test1) train1.show(2) """ +-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+----------------+--------------------+-------+ |User_ID|Product_ID|Gender| Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|product_id_trans| features| label| +-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+----------------+--------------------+-------+ |1000001| P00069042| F|0-17| 10| A| 2| 0| 3| -1| -1| 8370| 766.0|(16,[6,10,13,14],...| 8370.0| |1000001| P00248942| F|0-17| 10| A| 2| 0| 1| 6| 14| 15200| 183.0|(16,[6,10,13,14],...|15200.0| +-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+----------------+--------------------+-------+ only showing top 2 rows """在應(yīng)用了這個(gè)公式之后,我們可以看到train1和test1有兩個(gè)額外的列,稱為features和label,并對我們在公式中指定的列進(jìn)行標(biāo)記(featuresCol= features和labelCol= label)。直觀上,train1和test1中的features列中的所有分類變量都被轉(zhuǎn)換為數(shù)值,數(shù)值變量與之前應(yīng)用ML時(shí)相同。我們還可以查看train1和test1中的列特性和標(biāo)簽。
train1.select('features').show(2) """ +--------------------+ | features| +--------------------+ |(16,[6,10,13,14],...| |(16,[6,10,13,14],...| +--------------------+ only showing top 2 rows """train1.select('label').show(2) """ +-------+ | label| +-------+ | 8370.0| |15200.0| +-------+ only showing top 2 rows """9.?建立機(jī)器學(xué)習(xí)模型
在應(yīng)用RFormula和轉(zhuǎn)換Dataframe之后,我們現(xiàn)在需要根據(jù)這些數(shù)據(jù)開發(fā)機(jī)器學(xué)習(xí)模型。我想為這個(gè)任務(wù)應(yīng)用一個(gè)隨機(jī)森林回歸。讓我們導(dǎo)入一個(gè)在pyspark.ml中定義的隨機(jī)森林回歸器。然后建立一個(gè)叫做rf的模型。我將使用隨機(jī)森林算法的默認(rèn)參數(shù)。
from pyspark.ml.regression import RandomForestRegressor rf = RandomForestRegressor()在創(chuàng)建一個(gè)模型rf之后,我們需要將train1數(shù)據(jù)劃分為train_cv和test_cv進(jìn)行交叉驗(yàn)證。這里,我們將train1數(shù)據(jù)區(qū)域劃分為train_cv的70%和test_cv的30%。
(train_cv, test_cv) = train1.randomSplit([0.7, 0.3])在train_cv上建立模型,在test_cv上進(jìn)行預(yù)測。結(jié)果將保存在predictions中。
model1 = rf.fit(train_cv) predictions = model1.transform(test_cv)10. 模型效果評估
讓我們評估對test_cv的預(yù)測,看看rmse和mse是多少。
為了評估模型,我們需要從pyspark.ml.evaluation中導(dǎo)入RegressionEvaluator。我們必須為此創(chuàng)建一個(gè)對象。有一種方法叫 evaluate for evaluator ,它對模型求值。我們需要為此指定度量標(biāo)準(zhǔn)。
from pyspark.ml.evaluation import RegressionEvaluator evaluator = RegressionEvaluator() mse = evaluator.evaluate(predictions,{evaluator.metricName:"mse" }) import numpy as np np.sqrt(mse), mse """ (3832.4796474051345, 14687900.247774584) """經(jīng)過計(jì)算,我們可以看到我們的rmse是3827.767295494888。
現(xiàn)在,我們將在所有的train1數(shù)據(jù)集上再次訓(xùn)練一個(gè)模型。
model = rf.fit(train1) predictions1 = model.transform(test1)預(yù)測之后,我們得到測試集預(yù)測結(jié)果,并將其保存成csv文件。
df = predictions1.selectExpr("User_ID as User_ID", "Product_ID as Product_ID", 'prediction as Purchase') df.toPandas().to_csv('./BlackFriday/submission.csv')寫入csv文件后(submission.csv)。我們可以上傳我們的第一個(gè)解決方案來查看分?jǐn)?shù),我得到的分?jǐn)?shù)是3844.20920145983。
總結(jié)??
在本文中,我以一個(gè)真實(shí)案例介紹了PySpark建模流程。這只是本系列文章的開始。在接下來的幾周,我將繼續(xù)分享PySpark使用的教程。同時(shí),如果你有任何問題,或者你想對我要講的內(nèi)容提出任何建議,歡迎留言。
個(gè)人微信:加時(shí)請注明 (昵稱+公司/學(xué)校+方向)
? 歷史精品文章推薦???
知否?知否?一文看懂深度文本分類之DPCNN原理與代碼
機(jī)器學(xué)習(xí)入門方法和資料合集
撩一發(fā)深度文本分類之RNN via Attention
15分鐘帶你入門sklearn與機(jī)器學(xué)習(xí)——分類算法篇
如何為你的回歸問題選擇最合適的機(jī)器學(xué)習(xí)方法?
PySpark 安裝、配置之使用初體驗(yàn)
總結(jié)
以上是生活随笔為你收集整理的【PySpark入门】手把手实现PySpark机器学习项目-回归算法的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 方程组的几何解释 [MIT线代第一课pd
- 下一篇: 如何科学的打开 Leetcode