官术网_书友最值得收藏!

Na?ve Bayes model on SageMaker notebooks using Apache Spark

In the previous section Classifying text with language models, we saw how you can train a model with scikit-learn on a SageMaker notebook instance. This is feasible for examples as small as the ones we collected from Twitter. What if, instead, we had hundreds of terabytes worth of tweet data? For starters, we would not be able to store the data in a single machine. Even if we could, it would probably take too long to train on such large dataset. Apache Spark solves this problem for us by implementing ML algorithms that can read data from distributed datasets (such as AWS S3) and can distribute the computing across many machines. AWS provides a product called Elastic MapReduce (EMR) that is capable of launching and managing clusters on which we can perform ML at scale.

Many of the ML algorithms require several passes over the data (although this is not the case for Naive Bayes). Apache Spark provides a way to cache the datasets in memory so that we can efficiently run algorithms that require several passes over the data (such as logistic regression or decision trees, which we will see in the following chapters). We will show how to launch EMR clusters in Chapter 4Predicting User Behavior with Tree-Based Methods; however, in this section, we will present how similar it is to work with Apache Spark and scikit-learn. In fact, many of the interfaces in Apache Spark (such as pipelines, Transformers, and Estimators) were inspired by scikit-learn.

Apache Spark supports four main languages: R, Python, Scala, and Java. In this book we will use the Python flavor, also called PySpark. Even though our spark code will run on a single machine (that is, will run on our SageMaker notebook instance), it could run on multiple machines without any code changes if our data was larger and we had a Spark Cluster (in Chapter 4Predicting User Behavior with Tree-Based Methods, we will dive into creating Spark Clusters with EMR).

In Spark, the first thing we need to do is to create a Spark session. We do this by first creating a Spark context, and then creating a session for SQL-like manipulation of data:

from pyspark.context import SparkContext
from pyspark.sql import SQLContext

sc = SparkContext('local', 'test')
sql = SQLContext(sc)

Since we will run Spark locally (on a single machine) we specify local. However, if we were to run this on a cluster, we would need to specify the master address of the cluster instead. Spark works with abstractions called DataFrames that allow us to manipulate huge tables of data using SQL-like operations.

Our first task will be to define DataFrames for our raw data:

from pyspark.sql.functions import lit

dems_df = sql.read.text("file://" + SRC_PATH + 'dem.txt')
gop_df = sql.read.text("file://" + SRC_PATH + 'gop.txt')
corpus_df = dems_df.select("value", lit(1).alias("label")).union(gop_df.select("value", lit(0).alias("label")))

In the first two lines, we create DataFrames out of our raw tweets. We also create corpus_df, which contains both sources of tweets, and add the label by creating a column with a literal of 1 for Democrats and 0 for Republicans:

>>> corpus_df.select("*").limit(2).show()

+--------------------+-----+
| value|label|
+--------------------+-----+
|This ruling is th...| 1 . |
|No president shou...| 1 . |
+--------------------+-----+

Spark works in a lazy fashion, so, even though we defined and unioned the DataFrame, no actual processing will happen until we perform the first operation on the data. In our case, this will be the splitting of the DataFrame into testing and training:

train_df, test_df = corpus_df.randomSplit([0.75, 0.25])

Now, we are ready to train our model. Spark supports the same concept of pipelines. We will build a pipeline with the necessary transformations for our model. It's very similar to our previous example, except that Spark has two separate stages for tokenization and stopword removal:

from pyspark.ml import Pipeline
from pyspark.ml.feature import CountVectorizer, Tokenizer, StopWordsRemover
tokenizer = Tokenizer(inputCol="value", outputCol="words")
stop_words_remover = StopWordsRemover(inputCol="words", outputCol="words_cleaned")
vectorizer = CountVectorizer(inputCol="words_cleaned", outputCol="features")
cleaning_pipeline = Pipeline(stages = [tokenizer, stop_words_remover, vectorizer])
cleaning_pipeline_model = cleaning_pipeline.fit(corpus_df)
cleaned_training_df = cleaning_pipeline_model.transform(train_df)
cleaned_testing_df = cleaning_pipeline_model.transform(test_df)

A Spark ML pipeline consists of a series of stages. Each stage can be a Transformer or an Estimator. Transformers apply a well-defined transformation on a dataset, while Estimators have the added capability of producing models by traversing the dataset. NaiveBayes and CountVectorizer are examples of Estimators, while tokenizer and StopWordsRemover are examples of Transformers. Models, in turn, are Transformers, because they can provide predictions for all elements in a dataset as a transformation.

As you can see in the preceding code, we defined a pipeline with all the necessary stages to clean the data. Each stage will transform the original DataFrame (which only has two columns value, which are the raw tweet text and label) and add more columns.

In the following code, the relevant columns used at training time are the features (a sparse vector representing the BoWs exactly like our scikit-learn example) and the label:

>>> cleaned_training_df.show(n=3)

+-----------+------------------+-------------+--------------------+
| value |label| . words . |words_cleaned| features |
+-----------+------------------+-------------+--------------------+
|#Tuesday...| 1 . |[#tuesday...|[#tuesday... |(3025,[63,1398,18...|
|#WorldAI...| 1 . |[#worlda....|[#worldai... |(3025,[37,75,155,...|
|@Tony4W....| 1 . |[.@tony4w...|[.@tony4w... |(3025,[41,131,160...|
+-----------------+------------+-------------+--------------------+

By specifying these columns to the NaiveBayes classifier we can train a model:

from pyspark.ml.classification import NaiveBayes
naive_bayes = NaiveBayes(featuresCol="features", labelCol="label")

The model is a transformer that can provide predictions for each row in our training DataFrame:

naive_bayes_model = naive_bayes.fit(cleaned_training_df)
predictions_df = naive_bayes_model.transform(cleaned_testing_df)

>>> predictions_df.select("features", "label", "prediction").limit(3).show()
+--------------------+-----+----------+
| features |label|prediction|
+--------------------+-----+----------+
|(3025,[1303,1858,...| 1 . | 1.0 |
|(3025,[1,20,91,13...| 1 . | 1.0 |
|(3025,[16,145,157...| 1 . | 1.0 |
+--------------------+-----+----------+

Similar to our previous example, we can evaluate the accuracy of our models. By using the MulticlassClassificationEvaluator class and specifying the actual and predicted labels, we can obtain accuracy:

from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(
labelCol="label", predictionCol="prediction", metricName="accuracy")
evaluator.evaluate(predictions_df)

The output is 0.93, which is similar to the results we had on scikit-learn.

主站蜘蛛池模板: 长汀县| 海南省| 高雄市| 巫山县| 诸暨市| 嘉定区| 墨脱县| 茌平县| 肥东县| 托克托县| 富源县| 武义县| 衡山县| 开封县| 台湾省| 温州市| 云林县| 惠东县| 塔城市| 胶州市| 麻江县| 长春市| 万州区| 宝鸡市| 中江县| 安宁市| 五华县| 岳普湖县| 永定县| 法库县| 左权县| 家居| 辽中县| 革吉县| 保山市| 宝兴县| 巢湖市| 天峻县| 阜新| 安康市| 北碚区|