官术网_书友最值得收藏!

2.2 讓輪子轉起來

2.2.1 本書約定

(1)本書中的例子以Scala語言的編程進行講解,Flink的API也只講述Scala語言形式。本書中的例子不會過多地運用 Scala 編程技巧,因此讀者只需要具備基本的Scala語言知識即可。

(2)例子的開發環境為Java 8(1.8.0_73)、Maven(3.0.4)、SBT(1.2.6)和Scala(IDEA Scala plugin 2.11.12)。

(3)IDE選用IntelliJ IDEA,使用社區版(Community Edition 2018.2.6 x64)。由于存在版本兼容性問題,作者不推薦使用Eclipse。

(4)開發環境為Windows 7,Flink部署環境為Linux(CentOS)。

(5)Flink的版本為1.6.1。

2.2.2 搭建單機版環境

1.搭建一個單機版的運行環境

(1)下載不帶Hadoop組件的Flink程序包:flink-1.6.1-bin-scala_2.11.tgz。

(2)部署在Linux服務器上,然后啟動單機版Flink:

為了訪問方便,設置開發環境機器hosts文件,以以下域名映射Linux服務器IP地址:

(3)啟動成功后,在瀏覽器地址欄中輸入以下地址,訪問 Flink 的 Web Dashboard:

Web Dashboard展示當前Job Manager和Task Manager的狀態,如圖2-1所示。

圖2-1 Flink的Web Dashboard

2.運行SocketWindowWordCount程序

(1)啟動一個端口號為9000的Socket server:

(2)運行SocketWindowWordCount應用程序:

(3)在Socket server端手動輸入單詞,如果一行有多個單詞,就在兩個單詞之間輸入空格。輸入及對應的聚合結果如圖2-2所示。

圖2-2 輸入及對應的聚合結果

圖2-2中同一種顏色的輸入和輸出是對應的,其中“:1”是Socket server端換行的聚合結果。

SocketWindowWordCount 應用程序根據處理時間開滾動窗口,每秒計算一次窗口接收單詞的次數,代碼如下:

以上代碼從socket(9000端口)按行讀入字符,切割成單詞(w=> w.split ("\\s"))后轉換成 case 對象(WordWithCount),該對象有兩個屬性,其中 String類型屬性代表單詞本身;Long類型屬性代表單詞出現的次數。

其中 timeWindow 為開窗機制,如果應用程序的時間特征為事件時間,則開長度為5秒的事件時間窗口,否則開長度為1秒的處理時間窗口。Flink流處理環境(StreamExecutionEnvironment)默認的時間特征為處理時間,因此本例中的開窗機制為長度為1秒的處理時間窗口。

2.2.3 配置IDEA

使用 Maven從 Flink官網下載應用程序工程模板。為了避免輸入錯誤,我們設置 Maven為 Batch模式,在命令行中設定 groupId、artifactId和 version,需要注意版本號的值用雙引號包裹起來,代碼如下:

應用程序模板Maven的構建過程,如圖2-3所示。

圖2-3 應用程序模板Maven的構建過程

然后,將下載的應用程序工程導入 IDEA。該工程有兩個樣例程序,分別為批處理應用程序(BatchJob)和流處理應用程序(StreamingJob)。該工程的pom.xml文件的主要內容如下:

Flink應用程序模板如圖2-4所示。

圖2-4 Flink應用程序模板

此外,為了讓開發工具自動檢查代碼規范,IDEA 開啟了 Scala 語言對應的Checkstyle功能。

主站蜘蛛池模板: 普安县| 巨鹿县| 绥棱县| 广西| 克什克腾旗| 宁远县| 武山县| 龙游县| 太仆寺旗| 肃南| 融水| 开平市| 合肥市| 阿拉善盟| 庄河市| 兴海县| 北海市| 漠河县| 镇原县| 公安县| 澄城县| 长葛市| 平谷区| 黔西县| 永城市| 砀山县| 墨玉县| 怀柔区| 西华县| 车险| 滕州市| 芷江| 绍兴市| 舟曲县| 乐亭县| 棋牌| 延吉市| 石屏县| 勃利县| 满洲里市| 揭阳市|