日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

使用spark ml pipeline进行机器学习

發布時間:2024/1/23 编程问答 30 豆豆
生活随笔 收集整理的這篇文章主要介紹了 使用spark ml pipeline进行机器学习 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

一、關于spark ml?pipeline與機器學習
一個典型的機器學習構建包含若干個過程
1、源數據ETL
2、數據預處理
3、特征選取
4、模型訓練與驗證
以上四個步驟可以抽象為一個包括多個步驟的流水線式工作,從數據收集開始至輸出我們需要的最終結果。因此,對以上多個步驟、進行抽象建模,簡化為流水線式工作流程則存在著可行性,對利用spark進行機器學習的用戶來說,流水線式機器學習比單個步驟獨立建模更加高效、易用。
受?scikit-learn?項目的啟發,并且總結了MLlib在處理復雜機器學習問題的弊端(主要為工作繁雜,流程不清晰),旨在向用戶提供基于DataFrame 之上的更加高層次的 API 庫,以更加方便的構建復雜的機器學習工作流式應用。一個pipeline 在結構上會包含一個或多個Stage,每一個 Stage 都會完成一個任務,如數據集處理轉化,模型訓練,參數設置或數據預測等,這樣的Stage 在 ML 里按照處理問題類型的不同都有相應的定義和實現。兩個主要的stage為Transformer和Estimator。Transformer主要是用來操作一個DataFrame 數據并生成另外一個DataFrame 數據,比如svm模型、一個特征提取工具,都可以抽象為一個Transformer。Estimator 則主要是用來做模型擬合用的,用來生成一個Transformer。可能這樣說比較難以理解,下面就以一個完整的機器學習案例來說明spark ml pipeline是怎么構建機器學習工作流的。
二、使用spark ml pipeline構建機器學習工作流
在此以Kaggle數據競賽Display Advertising Challenge的數據集(該數據集為利用用戶特征進行廣告點擊預測)開始,利用spark ml pipeline構建一個完整的機器學習工作流程。
Display Advertising Challenge的這份數據本身就不多做介紹了,主要包括3部分,numerical型特征集、Categorical類型特征集、類標簽。
首先,讀入樣本集,并將樣本集劃分為訓練集與測試集:
???????//使用file標記文件路徑,允許spark讀取本地文件
????????String fileReadPath = "file:\\D:\\dac_sample\\dac_sample.txt";
????????//使用textFile讀入數據
????????SparkContext sc = Contexts.sparkContext;
????????RDD<String> file = sc.textFile(fileReadPath,1);
????????JavaRDD<String> sparkContent = file.toJavaRDD();
????????JavaRDD<Row> sampleRow = sparkContent.map(new Function<String, Row>() {
????????????public Row call(String string) {
????????????????String tempStr = string.replace("\t",",");
????????????????String[] features = tempStr.split(",");
????????????????int intLable= Integer.parseInt(features[0]);
????????????????String intFeature1 ?= features[1];
????????????????String intFeature2 ?= features[2]; ???????????????String CatFeature1 = features[14];
????????????????String CatFeature2 = features[15];
????????????????return RowFactory.create(intLable, intFeature1, intFeature2, CatFeature1, CatFeature2);
????????????}
????????});
?
?
????????double[] weights = {0.8, 0.2};
????????Long seed = 42L;
????????JavaRDD<Row>[] sampleRows = sampleRow.randomSplit(weights,seed);

得到樣本集后,構建出?DataFrame格式的數據供spark ml pipeline使用:
????????List<StructField> fields = new ArrayList<StructField>();
????????fields.add(DataTypes.createStructField("lable", DataTypes.IntegerType, false));
????????fields.add(DataTypes.createStructField("intFeature1", DataTypes.StringType, true));
????????fields.add(DataTypes.createStructField("intFeature2", DataTypes.StringType, true));
????????fields.add(DataTypes.createStructField("CatFeature1", DataTypes.StringType, true));
????????fields.add(DataTypes.createStructField("CatFeature2", DataTypes.StringType, true));
????????//and so on
?
?
????????StructType schema = DataTypes.createStructType(fields);
????????DataFrame dfTrain = Contexts.hiveContext.createDataFrame(sampleRows[0], schema);//訓練數據
????????dfTrain.registerTempTable("tmpTable1");
????????DataFrame dfTest = Contexts.hiveContext.createDataFrame(sampleRows[1], schema);//測試數據
????????dfTest.registerTempTable("tmpTable2");
由于在dfTrain、dfTest中所有的特征目前都為string類型,而機器學習則要求其特征為numerical類型,在此需要對特征做轉換,包括類型轉換和缺失值的處理。
首先,將intFeature由string轉為double,cast()方法將表中指定列string類型轉換為double類型,并生成新列并命名為intFeature1Temp,
之后,需要刪除原來的數據列 并將新列重命名為intFeature1,這樣,就將string類型的特征轉換得到double類型的特征了。
? ? ? ? //Cast integer features from String to Double
? ? ? ?dfTest = dfTest.withColumn("intFeature1Temp",dfTest.col("intFeature1").cast("double"));
? ? ? ?dfTest = dfTest.drop("intFeature1").withColumnRenamed("intFeature1Temp","intFeature1");
如果intFeature特征是年齡或者特征等類型,則需要進行分箱操作,將一個特征按照指定范圍進行劃分:
? ? ? ? /*特征轉換,部分特征需要進行分箱,比如年齡,進行分段成成年未成年等 */
? ? ? ? double[] splitV = {0.0,16.0,Double.MAX_VALUE};
? ? ? ? Bucketizer bucketizer = new Bucketizer().setInputCol("").setOutputCol("").setSplits(splitV);

再次,需要將categorical 類型的特征轉換為numerical類型。主要包括兩個步驟,缺失值處理和編碼轉換。
缺失值處理方面,可以使用全局的NA來統一標記缺失值:
? ? ? ? /*將categoricalb類型的變量的缺失值使用NA值填充*/
? ? ? ? String[] strCols = {"CatFeature1","CatFeature2"};
? ? ? ? dfTrain = dfTrain.na().fill("NA",strCols);
? ? ? ? dfTest = dfTest.na().fill("NA",strCols);

缺失值處理完成之后,就可以正式的對categorical類型的特征進行numerical轉換了。在spark ml中,可以借助StringIndexer和oneHotEncoder完成
這一任務:
? ? ? ? // StringIndexer ?oneHotEncoder 將 categorical變量轉換為 numerical 變量
? ? ? ? // 如某列特征為星期幾、天氣等等特征,則轉換為七個0-1特征
? ? ? ? StringIndexer cat1Index = new StringIndexer().setInputCol("CatFeature1").setOutputCol("indexedCat1").setHandleInvalid("skip");
? ? ? ? OneHotEncoder cat1Encoder = new OneHotEncoder().setInputCol(cat1Index.getOutputCol()).setOutputCol("CatVector1");
? ? ? ? StringIndexer cat2Index = new StringIndexer().setInputCol("CatFeature2").setOutputCol("indexedCat2");
? ? ? ? OneHotEncoder cat2Encoder = new OneHotEncoder().setInputCol(cat2Index.getOutputCol()).setOutputCol("CatVector2");

至此,特征預處理步驟基本完成了。由于上述特征都是處于單獨的列并且列名獨立,為方便后續模型進行特征輸入,需要將其轉換為特征向量,并統一命名,
可以使用VectorAssembler類完成這一任務:
? ? ? ? /*轉換為特征向量*/
? ? ? ? String[] vectorAsCols = {"intFeature1","intFeature2","CatVector1","CatVector2"};
? ? ? ? VectorAssembler vectorAssembler = new VectorAssembler().setInputCols(vectorAsCols).setOutputCol("vectorFeature");
通常,預處理之后獲得的特征有成千上萬維,出于去除冗余特征、消除維數災難、提高模型質量的考慮,需要進行選擇。在此,使用卡方檢驗方法,
利用特征與類標簽之間的相關性,進行特征選取:
? ? ? ? /*特征較多時,使用卡方檢驗進行特征選擇,主要是考察特征與類標簽的相關性*/
? ? ? ? ChiSqSelector chiSqSelector = new ChiSqSelector().setFeaturesCol("vectorFeature").setLabelCol("label").setNumTopFeatures(10)
? ? ? ? ? ? ? ? .setOutputCol("selectedFeature");

在特征預處理和特征選取完成之后,就可以定義模型及其參數了。簡單期間,在此使用LogisticRegression模型,并設定最大迭代次數、正則化項:
? ? ? ? /* 設置最大迭代次數和正則化參數 setElasticNetParam=0.0 為L2正則化 setElasticNetParam=1.0為L1正則化*/
? ? ? ? /*設置特征向量的列名,標簽的列名*/
? ? ? ? LogisticRegression logModel = new LogisticRegression().setMaxIter(100).setRegParam(0.1).setElasticNetParam(0.0)
? ? ? ? ? ? ? ? .setFeaturesCol("selectedFeature").setLabelCol("lable");

在上述準備步驟完成之后,就可以開始定義pipeline并進行模型的學習了:
? ? ? ? /*將特征轉換,特征聚合,模型等組成一個管道,并調用它的fit方法擬合出模型*/
? ? ? ? PipelineStage[] pipelineStage = {cat1Index,cat2Index,cat1Encoder,cat2Encoder,vectorAssembler,logModel};
? ? ? ? Pipeline pipline = new Pipeline().setStages(pipelineStage);
? ? ? ? PipelineModel pModle = pipline.fit(dfTrain);
上面pipeline的fit方法得到的是一個Transformer,我們可以使它作用于訓練集得到模型在訓練集上的預測結果:
? ? ? ? //擬合得到模型的transform方法進行預測
? ? ? ? DataFrame output = pModle.transform(dfTest).select("selectedFeature", "label", "prediction", "rawPrediction", "probability");
? ? ? ? DataFrame prediction = output.select("label", "prediction");
? ? ? ? prediction.show();

分析計算,得到模型在訓練集上的準確率,看看模型的效果怎么樣:
? ? ? ? /*測試集合上的準確率*/
? ? ? ? long correct = prediction.filter(prediction.col("label").equalTo(prediction.col("'prediction"))).count();
? ? ? ? long total = prediction.count();
? ? ? ? double accuracy = correct / (double)total;
?
? ? ? ? System.out.println(accuracy);

最后,可以將模型保存下來,下次直接使用就可以了:
? ? ? ? String pModlePath = ""file:\\D:\\dac_sample\\";
? ? ? ? pModle.save(pModlePath);
三,梳理和總結:
上述,借助代碼實現了基于spark ml pipeline的機器學習,包括數據轉換、特征生成、特征選取、模型定義及模型學習等多個stage,得到的pipeline
模型后,就可以在新的數據集上進行預測,總結為兩部分并用流程圖表示如下:
訓練階段:

預測階段:


借助于Pepeline,在spark上進行機器學習的數據流向更加清晰,同時每一stage的任務也更加明了,因此,無論是在模型的預測使用上、還是
模型后續的改進優化上,都變得更加容易。
————————————————
版權聲明:本文為CSDN博主「大愚若智_」的原創文章,遵循 CC 4.0 BY-SA 版權協議,轉載請附上原文出處鏈接及本聲明。
原文鏈接:https://blog.csdn.net/zbc1090549839/article/details/50935274

總結

以上是生活随笔為你收集整理的使用spark ml pipeline进行机器学习的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。