官术网_书友最值得收藏!

  • Learning Spark SQL
  • Aurobindo Sarkar
  • 1034字
  • 2021-07-02 18:23:43

Using Spark with relational databases

There is a huge debate on whether relational databases fit into big data processing scenarios. However, it's undeniable that vast quantities of structured data in enterprises live in such databases, and organizations rely heavily on the existing RDBMSs for their critical business transactions.

A vast majority of developers are most comfortable working with relational databases and the rich set of tools available from leading vendors. Increasingly, cloud service providers, such as Amazon AWS, have made administration, replication, and scaling simple enough for many organizations to transition their large relational databases to the cloud.

Some good big data use cases for relational databases include the following:

  • Complex OLTP transactions
  • Applications or features that need ACID compliance
  • Support for standard SQL
  • Real-time ad hoc query functionality
  • Systems implementing many complex relationships
For an excellent coverage of NoSQL and relational use cases, refer to the blog titled What the heck are you actually using NoSQL for? at http://highscalability.com/blog/2010/12/6/what-the-heck-are-you-actually-using-nosql-for.html.

In Spark, it is easy to work with relational data and combine it with other data sources in different forms and formats:

As an example that illustrates using Spark with a MySQL database, we will implement a use-case in which we split the data between HDFS and MySQL. The MySQL database will be targeted to support interactive queries from concurrent users, while the data on HDFS will be targeted for batch processing, running machine learning applications, and for making the data available to BI tools. In this example, we assume that the interactive queries are against the current month's data only. Hence, we will retain only the current month's data in MySQL and write out the rest of data to HDFS (in JSON format).

The implementation steps, we will follow are:

  1. Create MySQL database.
  2. Define a table.
  3. Create a user ID and grant privileges.
  4. Start Spark shell with MySQL JDBC driver.
  5. Create a RDD from input data file, separate the header, define a schema, and create a DataFrame.
  6. Create a new column for timestamps.
  7. Separate the data into two DataFrames based on the timestamp value (data for the current month and rest of data previous months).
  8. Drop the original invoiceDate column and then rename the timestamp column to invoiceDate.
  9. Write out the DataFrame containing current month data to the MySQL table.
  10. Write out the DataFrame containing data (other than current month data) to HDFS (in JSON format).
If you do not have MySQL already installed and available, you can download it from https://www.mysql.com/downloads/. Follow the installation instructions for your specific OS to install the database. Also, download the JDBC connector available on the same website.

After you have your MySQL database server up and running, fire up the MySQL shell. In the following steps, we will create a new database and define a transactions table. We use a transnational dataset that contains all the transactions occurring between 01/12/2010 and 09/12/2011 for a UK-based and registered nonstore online retail. The dataset has been contributed by Dr Daqing Chen, Director: Public Analytics group, School of Engineering, London South Bank University and is available at https://archive.ics.uci.edu/ml/datasets/Online+Retail.

You should see a screen similar to the following when you start the MySQL shell:

  1. Create a new database called retailDB to store our customer transactions data:
      mysql> create database retailDB;
Connect to retailDB as follows:
mysql> use retailDB;
  1. Here, we define a transactions table with transactionID as the primary key. In a production scenario, you would also create indexes on other fields, such as CustomerID, to support queries more efficiently:
      mysql>create table transactions(transactionID integer not null 
auto_increment, invoiceNovarchar(20), stockCodevarchar(20),
description varchar(255), quantity integer, unitPrice double,
customerIDvarchar(20), country varchar(100), invoiceDate
Timestamp, primary key(transactionID));

Next, we verify the transactions table schema using the describe command to ensure that it is exactly how we want it:

mysql> describe transactions;
  1. Create a user ID retaildbuser and grant all privileges to it. We will use this user from our Spark shell for connecting and executing our queries.
      mysql> CREATE USER 'retaildbuser'@'localhost' IDENTIFIED BY 
'mypass';
mysql> GRANT ALL ON retailDB.* TO 'retaildbuser'@'localhost';
  1. Start the Spark shell with the classpath containing the path to the MySQL JDBC driver, as follows:
      SPARK_CLASSPATH=/Users/aurobindosarkar/Downloads/mysql-connector-
java-5.1.38/mysql-connector-java-5.1.38-bin.jar bin/spark-shell
  1. Create a RDD containing all the rows from our downloaded Dataset:
      scala> import org.apache.spark.sql.types._
scala> import org.apache.spark.sql.Row
scala> import java.util.Properties

scala>val inFileRDD =
sc.textFile("file:///Users/aurobindosarkar/Downloads/UCI Online
Retail.txt")
  1. Separate the header from the rest of the data:
      scala>val allRowsRDD = inFileRDD.map(line 
=>line.split("\t").map(_.trim))
scala>val header = allRowsRDD.first
scala>val data = allRowsRDD.filter(_(0) != header(0))
  1. Define the fields and define a schema for our data records, as follows:
      scala>val fields = Seq(
| StructField("invoiceNo", StringType, true),
| StructField("stockCode", StringType, true),
| StructField("description", StringType, true),
| StructField("quantity", IntegerType, true),
| StructField("invoiceDate", StringType, true),
| StructField("unitPrice", DoubleType, true),
| StructField("customerID", StringType, true),
| StructField("country", StringType, true)
| )
scala>val schema = StructType(fields)
  1. Create an RDD of Row objects, create a DataFrame using the previously created schema:
      scala>val rowRDD = data.map(attributes => Row(attributes(0), 
attributes(1), attributes(2), attributes(3).toInt, attributes(4),
attributes(5).toDouble, attributes(6), attributes(7)))

scala>val r1DF = spark.createDataFrame(rowRDD, schema)
  1. Add a column called ts (a timestamp column) to the DataFrame, as follows:
      scala>val ts = 
unix_timestamp($"invoiceDate","dd/MM/yyHH:mm").cast("timestamp")
scala>val r2DF = r1DF.withColumn("ts", ts)
scala>r2DF.show()
  1. Create a table object and execute appropriate SQLs to separate the table data into two DataFrames based on the timestamps:
      scala> r2DF.createOrReplaceTempView("retailTable")
scala>val r3DF = spark.sql("select * from retailTable where ts<
'2011-12-01'")
scala>val r4DF = spark.sql("select * from retailTable where ts>=
'2011-12-01'")
  1. Drop the invoiceDate column in our new DataFrame.
      scala>val selectData = r4DF.select("invoiceNo", "stockCode", 
"description", "quantity", "unitPrice", "customerID", "country",
"ts")
  1. Rename the ts column to invoiceDate, as follows:
      scala>val writeData = selectData.withColumnRenamed("ts", 
"invoiceDate")
scala>writeData.show()
  1. Create a variable to point to the database URL. Additionally, create a Properties object to hold the user ID and password required for connecting to retailDB. Next, connect to the MySQL database and insert the records from the "current month" into the transactions table:
      scala>val dbUrl = "jdbc:mysql://localhost:3306/retailDB"
scala>val prop = new Properties()
scala>prop.setProperty("user", "retaildbuser")
scala>prop.setProperty("password", "mypass")
scala>writeData.write.mode("append").jdbc(dbUrl, "transactions",
prop)
  1. Select the columns of interest from the DataFrame (containing data other than for the current month), and write them out to the HDFS filesystem in JSON format:
      scala>val selectData = r3DF.select("invoiceNo", "stockCode", 
"description", "quantity", "unitPrice", "customerID", "country",
"ts")

scala>val writeData = selectData.withColumnRenamed("ts",
"invoiceDate")
scala>writeData.select("*").write.format("json")
.save("hdfs://localhost:9000/Users/r3DF")
主站蜘蛛池模板: 五家渠市| 西华县| 宜宾市| 荃湾区| 凤山县| 华阴市| 温宿县| 富源县| 济宁市| 台北市| 西畴县| 临朐县| 株洲县| 平湖市| 祥云县| 正宁县| 弋阳县| 寿阳县| 高密市| 兰西县| 从江县| 阳新县| 抚松县| 阳谷县| 长岭县| 花垣县| 新安县| 南投市| 丰镇市| 长兴县| 沁源县| 巨野县| 锦屏县| 芜湖县| 固安县| 屏山县| 西青区| 会同县| 滁州市| 甘泉县| 蒙山县|