當(dāng)前位置:
首頁 >
Alink工作原理
發(fā)布時間:2024/8/26
39
生活家
一、Alink結(jié)構(gòu)簡析
Pipeline結(jié)構(gòu)
算法、預(yù)處理、特征工程等組件可加載進pipeline進行訓(xùn)練預(yù)測,組件也可單獨使用
pipeline構(gòu)成如下:
數(shù)據(jù)源
Alink對各種數(shù)據(jù)源的操作均為包裝成Operator,批與流采用不同Operator。同時,Pipeline也支持Table數(shù)據(jù)源的輸入,但其后續(xù)處理也是包裝成TableOp,使用外部源Table時要注意設(shè)置Environment和Pipeline相同
Alink可以對以上數(shù)據(jù)源直接獲取,也可對Flink的DataSet/DataStream包裝為Operator
批式/流式算法通用的串聯(lián)方式
Alink的fit和transform過程是同時支持BatchOperator和StreamOperator的,大部分?jǐn)?shù)據(jù)處理等組件均支持,但根據(jù)實際使用的算法,fit過程對pi與流的支持是不同的。
訓(xùn)練后或保存的model即可預(yù)測批數(shù)據(jù)也可預(yù)測流數(shù)據(jù)
邏輯回歸訓(xùn)練/預(yù)測過程示例
linkFrom內(nèi)部完成各業(yè)務(wù)處理邏輯,同時該部分可繼承EstimatorBase或TransformerBase形成PipelineStage
二、Alink使用介紹
使用概覽
Pipeline pipeline = new Pipeline(
new Imputer()
.setSelectedCols("review")
.setOutputCols("featureText")
.setStrategy("value")
.setFillValue("null"),
new Segment()
.setSelectedCol("featureText"),
new StopWordsRemover()
.setSelectedCol("featureText"),
new DocCountVectorizer()
.setFeatureType("TF")
.setSelectedCol("featureText")
.setOutputCol("featureVector"),
new LogisticRegression()
.setVectorCol("featureVector")
.setLabelCol("label")
.setPredictionCol("pred")
);
//pipeline.add(PipelineStage組件,index)
PipelineModel model = pipeline.fit(source);
model.save(filepath);
PipelineModel model =PipelineModel.load(modelPath);
model.transform(dataOperator);
//可以model.getLocalPredictor("review string").map(row)形式進行本地預(yù)測
Operator.execute();
數(shù)據(jù)獲取/保存
1)hive示例
data = HiveSourceBatchOp()
.setInputTableName("tbl")
.setPartitions("ds=2022/dt=01,ds=2022/dt=02").setHiveVersion("2.0.1")
.setHiveConfDir("hdfs://192.168.99.102:9000/hive-2.0.1/conf")
.setDbName("mydb")
sink = HiveSinkBatchOp()
.setHiveVersion("2.0.1")
.setHiveConfDir("hdfs://192.168.99.102:9000/hive-2.0.1/conf").setDbName("mydb")
.setOutputTableName("tbl_sink")
.setOverwriteSink(True)
2)Kafka
Kafka011SinkStreamOp sink = new Kafka011SinkStreamOp()
.setBootstrapServers("localhost:9092")
.setDataFormat("json")
.setTopic("iris");
3)DataSet
DataSetWrapperBatchOp op = new DataSetWrapperBatchOp(dataSet,filedNames,fieldTypes);
Alink算法與組件
總結(jié)
- 上一篇: Windows中的硬链接和软链接(har
- 下一篇: 浏览器访问http链接自动转htpps访