官术网_书友最值得收藏!

Pipeline

Pipeline represents a sequence of stages, where every stage is a transformer or an estimator. All these stages run in an order and the dataset that is input is altered as it passes through every stage. For the stages of transformers, the transform () method is used, while for the stages of estimators, the fit() method is used to create a transformer.

Every DataFrame that is output from one stage is input for the next stage. The pipeline is also an estimator. Therefore, it produces PipelineModel once the fit() method is run. PipelineModel is a transformer. PipelineModel contains the same number of stages as in the original pipeline. PipelineModel and pipelines make sure that the test and training data pass through similar feature-processing steps. For instance, consider a pipeline with three stages: Tokenizer, which will tokenize the sentence and convert it into a word with the use of Tokenizer.transform()HashingTF, which is used to represent a string in a vector form as all ML algorithms understand only vectors and not strings and this uses the HashingTF.transform() method; and NaiveBayes, an estimator that is used for prediction.

We can save the model at HDFSlocation using the save() method, so in future we can load it using the load method and use it for prediction on the new dataset. This loaded model will work on the feature column of newDataset, and return the predicted column with this newDataset will also pass through all the stages of the pipeline:

import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.ml.classification.NaiveBayes

val df = spark.createDataFrame(Seq(
("This is the Transformer", 1.0),
("Transformer is pipeline component", 0.0)
)).toDF( "text", "label")

val tokenizer = new Tokenizer().setInputCol("text").setOutputCol("words")

val HashingTF=newHashingTF().setNumFeatures(1000).setInputCol(tokenizer.getOutputCol).setOutputCol(“features”)

val nb = new NaiveBayes().setModelType("multinomial")

val pipeline = new Pipeline().setStages(Array(tokenizer, hashingTF, nb))
val model = pipeline.fit(df)
model.save("/HDFSlocation/Path/")
val loadModel = PipelineModel.load(("/HDFSlocation/Path/")

val PredictedData = loadModel.transform(newDataset)
主站蜘蛛池模板: 洛浦县| 日土县| 莎车县| 寿阳县| 岳阳县| 鸡东县| 深圳市| 来凤县| 丰原市| 东乡县| 康定县| 荥经县| 吉首市| 万宁市| 金乡县| 海门市| 河北省| 武邑县| 嵩明县| 阳泉市| 金门县| 丰镇市| 辽中县| 双峰县| 红原县| 通河县| 江川县| 黄冈市| 宁都县| 聂拉木县| 丰都县| 射洪县| 丰台区| 茂名市| 郸城县| 乐平市| 中西区| 驻马店市| 泽州县| 济宁市| 儋州市|