官术网_书友最值得收藏!

Data ingestion from a relational database

Suppose the data is stored in a table called sparkexample in a MySQL (https://dev.mysql.com/) schema with the name sparkdb. This is the structure of that table:

mysql> DESCRIBE sparkexample;
+-----------------------+-------------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+-----------------------+-------------+------+-----+---------+-------+
| DateTimeString | varchar(23) | YES | | NULL | |
| CustomerID | varchar(10) | YES | | NULL | |
| MerchantID | varchar(10) | YES | | NULL | |
| NumItemsInTransaction | int(11) | YES | | NULL | |
| MerchantCountryCode | varchar(3) | YES | | NULL | |
| TransactionAmountUSD | float | YES | | NULL | |
| FraudLabel | varchar(5) | YES | | NULL | |
+-----------------------+-------------+------+-----+---------+-------+
7 rows in set (0.00 sec)

It contains the same data as, for the example, in Training data ingestion through Spark, as follows:

mysql> select * from sparkexample;
+-------------------------+------------+------------+-----------------------+---------------------+----------------------+------------+
| DateTimeString | CustomerID | MerchantID | NumItemsInTransaction | MerchantCountryCode | TransactionAmountUSD | FraudLabel |
+-------------------------+------------+------------+-----------------------+---------------------+----------------------+------------+
| 2016-01-01 17:00:00.000 | 830a7u3 | u323fy8902 | 1 | USA | 100 | Legit |
| 2016-01-01 18:03:01.256 | 830a7u3 | 9732498oeu | 3 | FR | 73.2 | Legit |
|... | | | | | | |

The dependencies to add to the Scala Spark project are the following:

  • Apache Spark 2.2.1
  • Apache Spark SQL 2.2.1
  • The specific JDBC driver for the MySQL database release used

Let's now implement the Spark application in Scala. In order to connect to the database, we need to provide all of the needed parameters. Spark SQL also includes a data source that can read data from other databases using JDBC, so the required properties are the same as for a connection to a database through traditional JDBC; for example:

var jdbcUsername = "root"
var jdbcPassword = "secretpw"

val jdbcHostname = "mysqlhost"
val jdbcPort = 3306
val jdbcDatabase ="sparkdb"
val jdbcUrl = s"jdbc:mysql://${jdbcHostname}:${jdbcPort}/${jdbcDatabase}"

We need to check that the JDBC driver for the MySQL database is available, as follows:

Class.forName("com.mysql.jdbc.Driver")

We can now create a SparkSession, as follows:

val spark = SparkSession
.builder()
.master("local[*]")
.appName("Spark MySQL basic example")
.getOrCreate()

Import the implicit conversions, as follows:

import spark.implicits._

You can finally connect to the database and load the data from the sparkexample table to a DataFrame, as follows:

val jdbcDF = spark.read
.format("jdbc")
.option("url", jdbcUrl)
.option("dbtable", s"${jdbcDatabase}.sparkexample")
.option("user", jdbcUsername)
.option("password", jdbcPassword)
.load()

Spark automatically reads the schema from a database table and maps its types back to Spark SQL types. Execute the following method on the DataFrame:

jdbcDF.printSchema()

It returns the exact same schema as for the table sparkexample; for example:

root
|-- DateTimeString: string (nullable = true)
|-- CustomerID: string (nullable = true)
|-- MerchantID: string (nullable = true)
|-- NumItemsInTransaction: integer (nullable = true)
|-- MerchantCountryCode: string (nullable = true)
|-- TransactionAmountUSD: double (nullable = true)
|-- FraudLabel: string (nullable = true)

Once the data is loaded into the DataFrame, it is possible to run SQL queries against it using the specific DSL as shown in the following example:

jdbcDF.select("MerchantCountryCode", "TransactionAmountUSD").groupBy("MerchantCountryCode").avg("TransactionAmountUSD")

It is possible to increase the parallelism of the reads through the JDBC interface. We need to provide split boundaries based on the DataFrame column values. There are four options available (columnname, lowerBound, upperBound, and numPartitions) to specify the parallelism on read. They are optional, but they must all be specified if any of them is provided; for example:

val jdbcDF = spark.read
.format("jdbc")
.option("url", jdbcUrl)
.option("dbtable", s"${jdbcDatabase}.employees")
.option("user", jdbcUsername)
.option("password", jdbcPassword)
.option("columnName", "employeeID")
.option("lowerBound", 1L)
.option("upperBound", 100000L)
.option("numPartitions", 100)
.load()

While the examples in this section refer to a MySQL database, they apply the same way to any commercial or open source RDBMS for which a JDBC driver is available.

主站蜘蛛池模板: 左贡县| 桃源县| 徐汇区| 沁阳市| 嘉禾县| 清新县| 南阳市| 肃宁县| 松滋市| 葵青区| 微博| 桑日县| 闻喜县| 中江县| 溧水县| 临桂县| 武穴市| 阜平县| 博爱县| 青海省| 和平区| 修武县| 霍州市| 武乡县| 兰西县| 元氏县| 启东市| 华坪县| 井研县| 三原县| 怀来县| 抚顺市| 茌平县| 洛阳市| 孙吴县| 宝山区| 新巴尔虎左旗| 双柏县| 治多县| 修文县| 承德市|