官术网_书友最值得收藏!

How to do it...

In this section, we will run the same group by statement—one via an RDD using reduceByKey(), and one via a DataFrame using Spark SQL GROUP BY. For this query, we will sum the time delays grouped by originating city and sort according to the originating city:

# RDD: Sum delays, group by and order by originating city
flights.map(lambda c: (c[3], int(c[1]))).reduceByKey(lambda x, y: x + y).sortByKey().take(50)

# Output (truncated)
# Duration: 11.08 seconds
[(u'ABE', 5113),
(u'ABI', 5128),
(u'ABQ', 64422),
(u'ABY', 1554),
(u'ACT', 392),
... ]

For this particular configuration, it took 11.08 seconds to extract the columns, execute reduceByKey() to summarize the data, execute sortByKey() to order it, and then return the values to the driver:

# RDD: Sum delays, group by and order by originating city
spark.sql("select origin, sum(delay) as TotalDelay from flightsDF group by origin order by origin").show(50)

# Output (truncated)
# Duration: 4.76s
+------+----------+
|origin|TotalDelay|
+------+----------+
| ABE | 5113|
| ABI | 5128|
| ABQ | 64422|
| ABY | 1554|
| ACT | 392|
...
+------+----------+

There are many advantages of Spark DataFrames, including, but not limited to the following:

  • You can execute Spark SQL statements (not just through the Spark DataFrame API)
  • There is a schema associated with your data so you can specify the column name instead of position
  • In this configuration and example, the query completes in 4.76 seconds, while RDDs complete in 11.08 seconds

It is impossible to improve your RDD query by specifying minPartitions within sc.textFile() when originally loading the data to increase the number of partitions:
flights = sc.textFile('/databricks-datasets/flights/departuredelays.csv', minPartitions=8), ...

flights = sc.textFile('/databricks-datasets/flights/departuredelays.csv', minPartitions=8), ...

For this configuration, the same query returned in 6.63 seconds. While this approach is faster, its still slower than DataFrames; in general, DataFrames are faster out of the box with the default configuration. 

主站蜘蛛池模板: 湘乡市| 哈尔滨市| 察隅县| 普格县| 防城港市| 望江县| 福贡县| 苏尼特右旗| 太仓市| 裕民县| 凤庆县| 司法| 兴国县| 宽城| 山东省| 乡城县| 沧源| 泸定县| 嘉义县| 新巴尔虎左旗| 封开县| 博湖县| 松原市| 彰武县| 灵璧县| 翼城县| 福贡县| 桐柏县| 汝南县| 瑞金市| 凌源市| 台南市| 桂阳县| 绥棱县| 襄汾县| 剑阁县| 怀宁县| 长武县| 高密市| 明水县| 晋宁县|