官术网_书友最值得收藏!

4.2 創建RDD

由于Spark一切都是基于RDD的,如何創建RDD就變得非常重要,除了可以直接從父RDD轉換,還支持兩種方式來創建RDD:

1)并行化一個程序中已經存在的集合(例如,數組);

2)引用一個外部文件存儲系統(HDFS、HBase、Tachyon或是任何一個支持Hadoop輸入格式的數據源)中的數據集。

4.2.1 集合(數組)創建RDD

通過并行集合(數組)創建RDD,主要是調用SparkContext的parallelize方法,在Driver(驅動程序)中一個已經存在的集合(數組)上創建,SparkContext對象代表到Spark集群的連接,可以用來創建RDD、廣播變量和累加器??梢詮椭萍系膶ο髣摻ㄒ粋€支持并行操作的分布式數據集(ParallelCollectionRDD)。一旦該RDD創建完成,分布數據集可以支持并行操作,比如在該集合上調用Reduce將數組的元素相加。

parallelize方法的定義如下:

def parallelize[T](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T]

其中,第一個參數為對象集合,第二個參數為設定的分片數,默認值為2,返回指定對象類型的RDD。

下面以Scala語言進行操作,展示如何從一個數組創建一個并行集合,并進行數組元素相加操作。

            scala> val data = Array(1, 2, 3, 4, 5)
            data: Array[Int] = Array(1, 2, 3, 4, 5)
            scala> val distData = sc.parallelize(data)
            distData: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0]…
            scala> distData.reduce((a, b) => a + b)
            res2: Int = 15

并行集合創建的一個重要參數是slices的數目,它指定了將數據集切分為幾個分區。在集群模式中,Spark將會在每份slice上運行一個Task。當然,也可以通過parallelize方法的第二個參數進行手動設置(如sc.parallelize(data, 10)),可以為集群中的每個CPU分配2~4個slices(也就是每個CPU分配2~4個Task)。

4.2.2 存儲創建RDD

Spark可以從本地文件創建,也可以由Hadoop支持的文件系統(HDFS、KFS、Amazon S3、Hypertable、HBase等),以及Hadoop支持的輸入格式創建分布式數據集。

Spark支持textFile、SequenceFiles及任何Hadoop支持的輸入格式。

1. 從各種分布式文件系統創建

RDD可以通過SparkContext的textFile(文本文件)方法創建,其定義如下:

def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String]

其中,第一個參數指定文件的URI地址(本地文件路徑,或者hdfs://、sdn://、kfs://……),并且以“行”的集合形式讀取,第二個參數和parallelize方法一樣也是用來指定分片數。下面以Scala語言進行操作為例,展示如何從一個數組創建一個并行集合。

          scala> val distFile = sc.textFile(“dfs://data.txt”)
          distFile: org.apache.spark.rdd.RDD[String] =spark.HadoopRDD@1d4cee08

一旦創建了并行集合,distFile變量實質上轉變成新的RDD,可以使用Map和Reduce操作將所有行數的長度相加:

distFile.map(s => s.length).reduce((a, b) => a + b).

注意

如果使用本地文件系統中的路徑,那么該文件在工作節點必須可以被相同的路徑訪問。這可以通過將文件復制到所有的工作節點或使用網絡掛載的共享文件系統實現。所有Spark基于的文件輸入方法(包括textFile方法),都支持路徑、壓縮文件和通配符??梢允褂胻extFile("/path")、textFile("/path/*.txt")和textFile("/path /*.gz")。

HDFS數據塊大小為64的MB的倍數,Spark默認為每一個數據塊創建一個分片。如果需要一個分片包含多個數據塊,可以通過傳入參數來指定更多的分片。

wholeTextFiles方法可以讀取一個包含多個小的文本文件的目錄,并通過鍵-值對<f ilepath,content>(其中key為文件路徑,value為文件內容)的方式返回每一個目錄。而textFile函數為每個文件中的每一行返回一個記錄。

2.從支持Hadoop輸入格式數據源創建

對于其他類型的Hadoop輸入格式,可以使用SparkContext.hadoopRDD方法來加載數據,也可以使用SparkContext.newHadoopRDD來處理這些基于新Mapreduce API的輸入格式。RDD.saveAsObjectFile和SparkContext.objectFile支持以序列化的Java對象組成簡單的格式來保存RDD,并提供了一個簡單的方法來保存任何RDD。

主站蜘蛛池模板: 宜兰市| 常德市| 乐清市| 江源县| 安溪县| 甘孜县| 奎屯市| 宁津县| 周口市| 重庆市| 双柏县| 彩票| 法库县| 汪清县| 尚志市| 德惠市| 蓝山县| 乌拉特前旗| 临湘市| 哈巴河县| 祁连县| 施秉县| 永兴县| 克山县| 南岸区| 介休市| 昭觉县| 根河市| 息烽县| 潢川县| 天祝| 江北区| 阿克陶县| 南川市| 四平市| 黎川县| 苍溪县| 巴楚县| 城步| 武山县| 南靖县|