官术网_书友最值得收藏!

Partitions and performance

Earlier in this recipe, if we had run sc.textFile() without specifying minPartitions for this dataset, we would only have two partitions:

myRDD = (
sc
.textFile('/databricks-datasets/flights/airport-codes-na.txt')
.map(lambda element: element.split("\t"))
)

myRDD.getNumPartitions()

# Output
Out[2]: 2

But as noted, if the minPartitions flag is specified, then you would get the specified four partitions (or more):

myRDD = (
sc
.textFile(
'/databricks-datasets/flights/airport-codes-na.txt'
, minPartitions=4
).map(lambda element: element.split("\t"))
)

myRDD.getNumPartitions()

# Output
Out[6]: 4

A key aspect of partitions for your RDD is that the more partitions you have, the higher the parallelism. Potentially, having more partitions will improve your query performance. For this portion of the recipe, let's use a slightly larger file, departuredelays.csv

# Read the `departuredelays.csv` file and count number of rows
myRDD = (
sc
.textFile('/data/flights/departuredelays.csv')
.map(lambda element: element.split(","))
)

myRDD.count()

# Output Duration: 3.33s
Out[17]: 1391579

# Get the number of partitions
myRDD.getNumPartitions()

# Output:
Out[20]: 2

As noted in the preceding code snippet, by default, Spark will create two partitions and take 3.33 seconds (on my small cluster) to count the 1.39 million rows in the departure delays CSV file.

Executing the same command, but also specifying minPartitions (in this case, eight partitions), you will notice that the count() method completed in 2.96 seconds (instead of 3.33 seconds with eight partitions). Note that these values may be different based on your machine's configuration, but the key takeaway is that modifying the number of partitions may result in faster performance due to parallelization. Check out the following code:

# Read the `departuredelays.csv` file and count number of rows
myRDD = (
sc
.textFile('/data/flights/departuredelays.csv', minPartitions=8)
.map(lambda element: element.split(","))
)

myRDD.count()

# Output Duration: 2.96s
Out[17]: 1391579

# Get the number of partitions
myRDD.getNumPartitions()

# Output:
Out[20]: 8
主站蜘蛛池模板: 天津市| 多伦县| 咸丰县| 郸城县| 新津县| 霸州市| 宁城县| 乌拉特前旗| 泸水县| 类乌齐县| 高雄县| 岗巴县| 康乐县| 峨眉山市| 高州市| 临湘市| 黄浦区| 海门市| 龙泉市| 常熟市| 西畴县| 北宁市| 清水河县| 柳江县| 德化县| 岗巴县| 镇宁| 常德市| 旬阳县| 平泉县| 北安市| 盐津县| 兴和县| 新晃| 哈巴河县| 平山县| 大安市| 乌鲁木齐市| 建湖县| 静安区| 本溪市|