- Spark核心技術與高級應用
- 于俊等
- 122字
- 2019-01-01 01:24:39
4.2 創建RDD
由于Spark一切都是基于RDD的,如何創建RDD就變得非常重要,除了可以直接從父RDD轉換,還支持兩種方式來創建RDD:
1)并行化一個程序中已經存在的集合(例如,數組);
2)引用一個外部文件存儲系統(HDFS、HBase、Tachyon或是任何一個支持Hadoop輸入格式的數據源)中的數據集。
4.2.1 集合(數組)創建RDD
通過并行集合(數組)創建RDD,主要是調用SparkContext的parallelize方法,在Driver(驅動程序)中一個已經存在的集合(數組)上創建,SparkContext對象代表到Spark集群的連接,可以用來創建RDD、廣播變量和累加器??梢詮椭萍系膶ο髣摻ㄒ粋€支持并行操作的分布式數據集(ParallelCollectionRDD)。一旦該RDD創建完成,分布數據集可以支持并行操作,比如在該集合上調用Reduce將數組的元素相加。
parallelize方法的定義如下:
def parallelize[T](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T]
其中,第一個參數為對象集合,第二個參數為設定的分片數,默認值為2,返回指定對象類型的RDD。
下面以Scala語言進行操作,展示如何從一個數組創建一個并行集合,并進行數組元素相加操作。
scala> val data = Array(1, 2, 3, 4, 5)
data: Array[Int] = Array(1, 2, 3, 4, 5)
scala> val distData = sc.parallelize(data)
distData: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0]…
scala> distData.reduce((a, b) => a + b)
res2: Int = 15
并行集合創建的一個重要參數是slices的數目,它指定了將數據集切分為幾個分區。在集群模式中,Spark將會在每份slice上運行一個Task。當然,也可以通過parallelize方法的第二個參數進行手動設置(如sc.parallelize(data, 10)),可以為集群中的每個CPU分配2~4個slices(也就是每個CPU分配2~4個Task)。
4.2.2 存儲創建RDD
Spark可以從本地文件創建,也可以由Hadoop支持的文件系統(HDFS、KFS、Amazon S3、Hypertable、HBase等),以及Hadoop支持的輸入格式創建分布式數據集。
Spark支持textFile、SequenceFiles及任何Hadoop支持的輸入格式。
1. 從各種分布式文件系統創建
RDD可以通過SparkContext的textFile(文本文件)方法創建,其定義如下:
def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String]
其中,第一個參數指定文件的URI地址(本地文件路徑,或者hdfs://、sdn://、kfs://……),并且以“行”的集合形式讀取,第二個參數和parallelize方法一樣也是用來指定分片數。下面以Scala語言進行操作為例,展示如何從一個數組創建一個并行集合。
scala> val distFile = sc.textFile(“dfs://data.txt”)
distFile: org.apache.spark.rdd.RDD[String] =spark.HadoopRDD@1d4cee08
一旦創建了并行集合,distFile變量實質上轉變成新的RDD,可以使用Map和Reduce操作將所有行數的長度相加:
distFile.map(s => s.length).reduce((a, b) => a + b).
注意
如果使用本地文件系統中的路徑,那么該文件在工作節點必須可以被相同的路徑訪問。這可以通過將文件復制到所有的工作節點或使用網絡掛載的共享文件系統實現。所有Spark基于的文件輸入方法(包括textFile方法),都支持路徑、壓縮文件和通配符??梢允褂胻extFile("/path")、textFile("/path/*.txt")和textFile("/path /*.gz")。
HDFS數據塊大小為64的MB的倍數,Spark默認為每一個數據塊創建一個分片。如果需要一個分片包含多個數據塊,可以通過傳入參數來指定更多的分片。
wholeTextFiles方法可以讀取一個包含多個小的文本文件的目錄,并通過鍵-值對<f ilepath,content>(其中key為文件路徑,value為文件內容)的方式返回每一個目錄。而textFile函數為每個文件中的每一行返回一個記錄。
2.從支持Hadoop輸入格式數據源創建
對于其他類型的Hadoop輸入格式,可以使用SparkContext.hadoopRDD方法來加載數據,也可以使用SparkContext.newHadoopRDD來處理這些基于新Mapreduce API的輸入格式。RDD.saveAsObjectFile和SparkContext.objectFile支持以序列化的Java對象組成簡單的格式來保存RDD,并提供了一個簡單的方法來保存任何RDD。
- 數據挖掘原理與實踐
- 分布式數據庫系統:大數據時代新型數據庫技術(第3版)
- 卷積神經網絡的Python實現
- 基于Apache CXF構建SOA應用
- 大數據架構和算法實現之路:電商系統的技術實戰
- 數據挖掘原理與SPSS Clementine應用寶典
- 深入淺出 Hyperscan:高性能正則表達式算法原理與設計
- 科研統計思維與方法:SPSS實戰
- Instant Autodesk AutoCAD 2014 Customization with .NET
- 數據庫技術及應用
- 大數據與機器學習:實踐方法與行業案例
- 數據賦能
- Artificial Intelligence for Big Data
- 量化投資:交易模型開發與數據挖掘
- AutoCAD基礎與應用精品教程(2008版)