- Learning Spark SQL
- Aurobindo Sarkar
- 1034字
- 2021-07-02 18:23:43
Using Spark with relational databases
There is a huge debate on whether relational databases fit into big data processing scenarios. However, it's undeniable that vast quantities of structured data in enterprises live in such databases, and organizations rely heavily on the existing RDBMSs for their critical business transactions.
A vast majority of developers are most comfortable working with relational databases and the rich set of tools available from leading vendors. Increasingly, cloud service providers, such as Amazon AWS, have made administration, replication, and scaling simple enough for many organizations to transition their large relational databases to the cloud.
Some good big data use cases for relational databases include the following:
- Complex OLTP transactions
- Applications or features that need ACID compliance
- Support for standard SQL
- Real-time ad hoc query functionality
- Systems implementing many complex relationships
In Spark, it is easy to work with relational data and combine it with other data sources in different forms and formats:

As an example that illustrates using Spark with a MySQL database, we will implement a use-case in which we split the data between HDFS and MySQL. The MySQL database will be targeted to support interactive queries from concurrent users, while the data on HDFS will be targeted for batch processing, running machine learning applications, and for making the data available to BI tools. In this example, we assume that the interactive queries are against the current month's data only. Hence, we will retain only the current month's data in MySQL and write out the rest of data to HDFS (in JSON format).
The implementation steps, we will follow are:
- Create MySQL database.
- Define a table.
- Create a user ID and grant privileges.
- Start Spark shell with MySQL JDBC driver.
- Create a RDD from input data file, separate the header, define a schema, and create a DataFrame.
- Create a new column for timestamps.
- Separate the data into two DataFrames based on the timestamp value (data for the current month and rest of data previous months).
- Drop the original invoiceDate column and then rename the timestamp column to invoiceDate.
- Write out the DataFrame containing current month data to the MySQL table.
- Write out the DataFrame containing data (other than current month data) to HDFS (in JSON format).
After you have your MySQL database server up and running, fire up the MySQL shell. In the following steps, we will create a new database and define a transactions table. We use a transnational dataset that contains all the transactions occurring between 01/12/2010 and 09/12/2011 for a UK-based and registered nonstore online retail. The dataset has been contributed by Dr Daqing Chen, Director: Public Analytics group, School of Engineering, London South Bank University and is available at https://archive.ics.uci.edu/ml/datasets/Online+Retail.
You should see a screen similar to the following when you start the MySQL shell:

- Create a new database called retailDB to store our customer transactions data:
mysql> create database retailDB;
Connect to retailDB as follows:
mysql> use retailDB;
- Here, we define a transactions table with transactionID as the primary key. In a production scenario, you would also create indexes on other fields, such as CustomerID, to support queries more efficiently:
mysql>create table transactions(transactionID integer not null
auto_increment, invoiceNovarchar(20), stockCodevarchar(20),
description varchar(255), quantity integer, unitPrice double,
customerIDvarchar(20), country varchar(100), invoiceDate
Timestamp, primary key(transactionID));
Next, we verify the transactions table schema using the describe command to ensure that it is exactly how we want it:
mysql> describe transactions;

- Create a user ID retaildbuser and grant all privileges to it. We will use this user from our Spark shell for connecting and executing our queries.
mysql> CREATE USER 'retaildbuser'@'localhost' IDENTIFIED BY
'mypass';
mysql> GRANT ALL ON retailDB.* TO 'retaildbuser'@'localhost';
- Start the Spark shell with the classpath containing the path to the MySQL JDBC driver, as follows:
SPARK_CLASSPATH=/Users/aurobindosarkar/Downloads/mysql-connector-
java-5.1.38/mysql-connector-java-5.1.38-bin.jar bin/spark-shell
- Create a RDD containing all the rows from our downloaded Dataset:
scala> import org.apache.spark.sql.types._
scala> import org.apache.spark.sql.Row
scala> import java.util.Properties
scala>val inFileRDD =
sc.textFile("file:///Users/aurobindosarkar/Downloads/UCI Online
Retail.txt")
- Separate the header from the rest of the data:
scala>val allRowsRDD = inFileRDD.map(line
=>line.split("\t").map(_.trim))
scala>val header = allRowsRDD.first
scala>val data = allRowsRDD.filter(_(0) != header(0))
- Define the fields and define a schema for our data records, as follows:
scala>val fields = Seq(
| StructField("invoiceNo", StringType, true),
| StructField("stockCode", StringType, true),
| StructField("description", StringType, true),
| StructField("quantity", IntegerType, true),
| StructField("invoiceDate", StringType, true),
| StructField("unitPrice", DoubleType, true),
| StructField("customerID", StringType, true),
| StructField("country", StringType, true)
| )
scala>val schema = StructType(fields)
- Create an RDD of Row objects, create a DataFrame using the previously created schema:
scala>val rowRDD = data.map(attributes => Row(attributes(0),
attributes(1), attributes(2), attributes(3).toInt, attributes(4),
attributes(5).toDouble, attributes(6), attributes(7)))
scala>val r1DF = spark.createDataFrame(rowRDD, schema)
- Add a column called ts (a timestamp column) to the DataFrame, as follows:
scala>val ts =
unix_timestamp($"invoiceDate","dd/MM/yyHH:mm").cast("timestamp")
scala>val r2DF = r1DF.withColumn("ts", ts)
scala>r2DF.show()

- Create a table object and execute appropriate SQLs to separate the table data into two DataFrames based on the timestamps:
scala> r2DF.createOrReplaceTempView("retailTable")
scala>val r3DF = spark.sql("select * from retailTable where ts<
'2011-12-01'")
scala>val r4DF = spark.sql("select * from retailTable where ts>=
'2011-12-01'")
- Drop the invoiceDate column in our new DataFrame.
scala>val selectData = r4DF.select("invoiceNo", "stockCode",
"description", "quantity", "unitPrice", "customerID", "country",
"ts")
- Rename the ts column to invoiceDate, as follows:
scala>val writeData = selectData.withColumnRenamed("ts",
"invoiceDate")
scala>writeData.show()

- Create a variable to point to the database URL. Additionally, create a Properties object to hold the user ID and password required for connecting to retailDB. Next, connect to the MySQL database and insert the records from the "current month" into the transactions table:
scala>val dbUrl = "jdbc:mysql://localhost:3306/retailDB"
scala>val prop = new Properties()
scala>prop.setProperty("user", "retaildbuser")
scala>prop.setProperty("password", "mypass")
scala>writeData.write.mode("append").jdbc(dbUrl, "transactions",
prop)
- Select the columns of interest from the DataFrame (containing data other than for the current month), and write them out to the HDFS filesystem in JSON format:
scala>val selectData = r3DF.select("invoiceNo", "stockCode",
"description", "quantity", "unitPrice", "customerID", "country",
"ts")
scala>val writeData = selectData.withColumnRenamed("ts",
"invoiceDate")
scala>writeData.select("*").write.format("json")
.save("hdfs://localhost:9000/Users/r3DF")
- 零起步玩轉掌控板與Mind+
- 構建移動網站與APP:HTML 5移動開發入門與實戰(跨平臺移動開發叢書)
- WSO2 Developer’s Guide
- MongoDB,Express,Angular,and Node.js Fundamentals
- AIRIOT物聯網平臺開發框架應用與實戰
- Learning YARN
- JavaScript動態網頁編程
- SQL Server 2016 從入門到實戰(視頻教學版)
- C語言程序設計實訓教程與水平考試指導
- Xcode 6 Essentials
- Learning Ionic
- Visual C++開發寶典
- 啊哈C語言!:邏輯的挑戰(修訂版)
- Bitcoin Essentials
- 輕松學Scratch 3.0 少兒編程(全彩)