官术网_书友最值得收藏!

Data ingestion from a NoSQL database

Data can also come from a NoSQL database. In this section, we are going to explore the code to implement in order to consume the data from a MongoDB (https://www.mongodb.com/) database.

The collection sparkexample of the sparkmdb database contains the same data as for the examples in Data ingestion through DataVec and transformation through Spark and Data ingestion from a relational database sections, but in the form of BSON documents; for example:

/* 1 */
{
"_id" : ObjectId("5ae39eed144dfae14837c625"),
"DateTimeString" : "2016-01-01 17:00:00.000",
"CustomerID" : "830a7u3",
"MerchantID" : "u323fy8902",
"NumItemsInTransaction" : 1,
"MerchantCountryCode" : "USA",
"TransactionAmountUSD" : 100.0,
"FraudLabel" : "Legit"
}

/* 2 */
{
"_id" : ObjectId("5ae3a15d144dfae14837c671"),
"DateTimeString" : "2016-01-01 18:03:01.256",
"CustomerID" : "830a7u3",
"MerchantID" : "9732498oeu",
"NumItemsInTransaction" : 3,
"MerchantCountryCode" : "FR",
"TransactionAmountUSD" : 73.0,
"FraudLabel" : "Legit"
}
...

The dependencies to add to the Scala Spark project are the following:

  • Apache Spark 2.2.1
  • Apache Spark SQL 2.2.1
  • The MongoDB connector for Spark 2.2.0

We need to create a Spark Session, as follows:

val sparkSession = SparkSession.builder()
.master("local")
.appName("MongoSparkConnectorIntro")
.config("spark.mongodb.input.uri", "mongodb://mdbhost:27017/sparkmdb.sparkexample")
.config("spark.mongodb.output.uri", "mongodb://mdbhost:27017/sparkmdb.sparkexample")
.getOrCreate()

Specify the connection to the database. After the session as been created, it is possible to use it to load data from the sparkexample collection through the com.mongodb.spark.MongoSpark class, as follows:

val df = MongoSpark.load(sparkSession)

The returned DataFrame has the same structure as for the sparkexample collection. Use the following instruction:

df.printSchema()

It prints the following output: 

Of course, the retrieved data is that in the DB collection, as follows:

df.collect.foreach { println }

It returns the following:

[830a7u3,2016-01-01 17:00:00.000,Legit,USA,u323fy8902,1,100.0,[5ae39eed144dfae14837c625]]
[830a7u3,2016-01-01 18:03:01.256,Legit,FR,9732498oeu,3,73.0,[5ae3a15d144dfae14837c671]]
...

It is also possible to run SQL queries on the DataFrame. We need first to create a case class to define the schema for the DataFrame, as follows:

case class Transaction(CustomerID: String,
MerchantID: String,
MerchantCountryCode: String,
DateTimeString: String,
NumItemsInTransaction: Int,
TransactionAmountUSD: Double,
FraudLabel: String)

Then we load the data, as follows:

val transactions = MongoSpark.load[Transaction](sparkSession)

We must register a temporary view for the DataFrame, as follows:

transactions.createOrReplaceTempView("transactions")

Before we can execute an SQL statement, for example:

val filteredTransactions = sparkSession.sql("SELECT CustomerID, MerchantID FROM transactions WHERE TransactionAmountUSD = 100")

Use the following instruction:

filteredTransactions.show

It returns the following:

+----------+----------+
|CustomerID|MerchantID|
+----------+----------+
| 830a7u3|u323fy8902|
+----------+----------+
主站蜘蛛池模板: 南木林县| 杭锦旗| 南平市| 临漳县| 桦南县| 东台市| 淮滨县| 景德镇市| 阳高县| 宜州市| 师宗县| 常德市| 额敏县| 班戈县| 涪陵区| 毕节市| 南丹县| 广汉市| 景宁| 平阴县| 巴塘县| 大余县| 达拉特旗| 航空| 渭源县| 绥化市| 曲周县| 明星| 阜新市| 辉县市| 丹巴县| 宣威市| 衡南县| 汾阳市| 全州县| 白玉县| 天祝| 肇源县| 原阳县| 松阳县| 三河市|