官术网_书友最值得收藏!

Understanding Resilient Distributed Datasets (RDDs)

RDDs are Spark's primary distributed Dataset abstraction. It is a collection of data that is immutable, distributed, lazily evaluated, type inferred, and cacheable. Prior to execution, the developer code (using higher-level constructs such as SQL, DataFrames, and Dataset APIs) is converted to a DAG of RDDs (ready for execution).

You can create RDDs by parallelizing an existing collection of data or accessing a Dataset residing in an external storage system, such as the file system or various Hadoop-based data sources. The parallelized collections form a distributed Dataset that enable parallel operations on them.

You can create a RDD from the input file with number of partitions specified, as shown:

scala> val cancerRDD = sc.textFile("file:///Users/aurobindosarkar/Downloads/breast-cancer-wisconsin.data", 4)

scala> cancerRDD.partitions.size
res37: Int = 4

You can implicitly convert the RDD to a DataFrame by importing the spark.implicits package and using the toDF() method:

scala> import spark.implicits._scala> 
val cancerDF = cancerRDD.toDF()

To create a DataFrame with a specific schema, we define a Row object for the rows contained in the DataFrame. Additionally, we split the comma-separated data, convert it to a list of fields, and then map it to the Row object. Finally, we use the createDataFrame() to create the DataFrame with a specified schema:

def row(line: List[String]): Row = { Row(line(0).toLong, line(1).toInt, line(2).toInt, line(3).toInt, line(4).toInt, line(5).toInt, line(6).toInt, line(7).toInt, line(8).toInt, line(9).toInt, line(10).toInt) }
val data = cancerRDD.map(_.split(",").to[List]).map(row)
val cancerDF = spark.createDataFrame(data, recordSchema)

Further, we can easily convert the preceding DataFrame to a Dataset using the case class defined earlier:

scala> val cancerDS = cancerDF.as[CancerClass]

RDD data is logically divided into a set of partitions; additionally, all input, intermediate, and output data is also represented as partitions. The number of RDD partitions defines the level of data fragmentation. These partitions are also the basic units of parallelism. Spark execution jobs are split into multiple stages, and as each stage operates on one partition at a time, it is very important to tune the number of partitions. Fewer partitions than active stages means your cluster could be under-utilized, while an excessive number of partitions could impact the performance due to higher disk and network I/O.

The programming interface to RDDs support two types of operations: transformations and actions. The transformations create a new Dataset from an existing one, while the actions return a value or result of a computation. All transformations are evaluated lazily--the actual execution occurs only when an action is executed to compute a result. The transformations form a lineage graph instead of actually replicating data across multiple machines. This graph-based approach enables an efficient fault tolerance model. For example, if an RDD partition is lost, then it can be recomputed based on the lineage graph. 

You can control data persistence (for example, caching) and specify placement preferences for RDD partitions and then use specific operators for manipulating them. By default, Spark persists RDDs in memory, but it can spill them to disk if sufficient RAM isn't available. Caching improves performance by several orders of magnitude; however, it is often memory intensive. Other persistence options include storing RDDs to disk and replicating them across the nodes in your cluster. The in-memory storage of persistent RDDs can be in the form of deserialized or serialized Java objects. The deserialized option is faster, while the serialized option is more memory-efficient (but slower). Unused RDDs are automatically removed from the cache but, depending on your requirements; if a specific RDD is no longer required, then you can also explicitly release it.

主站蜘蛛池模板: 东光县| 申扎县| 饶阳县| 西畴县| 岳阳市| 华容县| 含山县| 同心县| 茌平县| 阿城市| 镇原县| 盱眙县| 瑞丽市| 常山县| 开江县| 杭锦后旗| 舟曲县| 卢氏县| 若羌县| 乌兰浩特市| 明星| 隆昌县| 新巴尔虎左旗| 广宁县| 沧州市| 冀州市| 团风县| 沙坪坝区| 奎屯市| 永寿县| 西青区| 邢台市| 怀远县| 扬中市| 金乡县| 云龙县| 黑水县| 宜州市| 自贡市| 从江县| 灵山县|