書名: Spark大數據商業實戰三部曲:內核解密|商業案例|性能調優作者名: 王家林本章字數: 1496字更新時間: 2019-12-12 17:30:06
11.4 Spark與Redis整合原理與實戰
本節通過生產環境實戰案例講解Spark與Redis整合原理及Spark與Redis整合實戰。
11.4.1 Spark與Redis整合原理
Redis是一個開源項目(BSD許可)。Redis以內存數據結構存儲,用作數據庫、緩存和消息代理。它支持數據結構,如字符串、散列、列表、集合、具有范圍查詢的排序集、位圖、超文本和具有半徑查詢的地理空間索引等。Redis內置復制、Lua腳本、LRU eviction、事務和不同級別的磁盤持久性,通過Redis Sentinel提供高可用性,并通過Redis Cluster進行自動分區。
Redis可以對這些類型運行原子操作,如附加到字符串、在哈希中增加值、將元素推送到列表中、計算集交集、聯合與差異于一體;或者在排序集中獲得最高排名的成員。
為了實現其卓越的性能,Redis使用內存中的數據集。根據業務用例,可以通過將數據集一次性轉儲到磁盤中,或通過將每個命令附加到日志來持久化。如果只需要功能豐富的網絡內存緩存,則可以選擇禁用持久性。
Redis還支持簡單的主從異步復制,第一次同步非阻塞的速度非常快,網絡切分傳輸時可自動重連同步。
Redis的其他功能包括:
事務Transactions。
發布/訂閱。
Lua腳本。
Keys with a limited time-to-live。
LRU eviction of keys。
自動故障切換。
大多數編程語言可以使用Redis。
Redis以ANSI C編寫,適用于大多數POSIX系統,如Linux、* BSD、OS X,無需外部依賴。Linux和OS X是Redis開發和測試的兩個操作系統,建議使用Linux進行部署。Redis可能在諸如SmartOS的Solaris衍生系統中工作,但支持是盡力而為的。沒有官方支持Windows版本,但是Microsoft開發并維護了Redis的Win-64端口。
11.4.2 Spark與Redis整合實戰
本節以生產環境中Spark與Redis整合實戰案例來講解。在通信運營商的Spark大數據項目中,Spark每分鐘實時讀取Hdfs中的話單數據,經過業務邏輯代碼分析轉換后,將清洗以后的數據轉換成一個List字符串列表,然后遍歷List字符串列表,將每條記錄拼接成一個Key-Value字符串放入Redis隊列。
Spark與Redis整合實戰案例實現步驟如下。
(1)通過Maven方式下載Redis的jedis 2.6.0 Jar包。
pom.xml文件增加以下內容:
1. <dependency> 2. <groupId>redis.clients</groupId> 3. <artifactId>jedis</artifactId> 4. <version>2.6.0</version> 5. </dependency>
在Spark中導入Redis的Jar包。
1. import redis.clients.jedis.Jedis; 2. import redis.clients.jedis.JedisPool; 3. import redis.clients.jedis.JedisPoolConfig;
(2)在項目config.properties配置文件中增加Redis的主機地址、端口、密碼,以及Redis連接池分配的連接數、等待時間等信息。
1. ## REDIS 2. redis.ip=100.*.*.100 3. redis.port=6379 4. redis.password=password 5. ...... 6. ## redis 7. #最大分配的對象數 8. redis.pool.maxTotal=1024 9. #最大能夠保持idle狀態的對象數 10. redis.pool.maxIdle=200 11. #當池內沒有返回對象時,最大等待時間 12. redis.pool.maxWait=1000 13. #當調用borrow Object方法時,是否進行有效性檢查 14. redis.pool.testOnBorrow=true 15. #當調用return Object方法時,是否進行有效性檢查 16. redis.pool.testOnReturn=true 17.
(3)編寫RedisServiceImpl實現類,從配置文件中獲取Redis的主機地址、端口、密碼等信息;編寫getFromPool方法從redis訪問池中獲取redis實例;編寫getSingle方法獲取redis實例。
1. public class RedisServiceImpl { 2. private static final Map<String, String> REDIS_CONFIG = Config. getInstance().getRedisParams(); 3. private static final String REDIS_IP = REDIS_CONFIG.get("redis.ip"); 4. private static final String REDIS_PORT = REDIS_CONFIG.get("redis.port"); 5. private static final String REDIS_PASSWORD = REDIS_CONFIG.get("redis. password"); 6. private static final int REDIS_TIMEOUT = 2000; 7. private static Logger log = LoggerFactory.getLogger(RedisServiceImpl.class); 8. private static JedisPool pool; 9. /** * 從redis訪問池中獲取redis實例 * * @return redis實例 10. */ 11. public static Jedis getFromPool() { 12. if (pool == null) { 13. JedisPoolConfig config = new JedisPoolConfig(); 14. config.setMaxTotal(Integer.valueOf(REDIS_CONFIG.get("redis. pool.maxTotal"))); 15. config.setMaxIdle(Integer.valueOf(REDIS_CONFIG.get("redis. pool.maxIdle"))); 16. config.setMaxWaitMillis(Integer.valueOf(REDIS_CONFIG.get("redis. pool.maxWait"))); 17. config.setTestOnBorrow(Boolean.valueOf(REDIS_CONFIG.get("redis. pool.testOnBorrow"))); 18. config.setTestOnReturn(Boolean.valueOf(REDIS_CONFIG.get("redis. pool.testOnReturn"))); 19. pool = new JedisPool(config, REDIS_IP, Integer.valueOf(REDIS_ PORT), REDIS_TIMEOUT, REDIS_PASSWORD); 20. } 21. return pool.getResource(); 22. } 23. 24. public static void returnResource(Jedis redis) { 25. pool.returnResource(redis); 26. } 27. 28. /** * 獲取redis實例 * * @return redis實例 29. */ 30. 31. public static Jedis getSingle() { 32. Jedis redis = new Jedis(REDIS_IP, Integer.valueOf(REDIS_PORT), REDIS_TIMEOUT); 33. redis.auth(REDIS_PASSWORD); 34. return redis; 35. } 36. }
(4)編寫項目的業務代碼,RedisBean類數據結構用于要存放Redis的數據。
1. public class RedisBean { 2. public void setKey(String key) { 3. this.key = key; 4. } 5. 6. public void setValue(String value) { 7. this.value = value; 8. } 9. 10. public String getKey() { 11. return key; 12. } 13. 14. public String getValue() { 15. return value; 16. } 17. 18. protected String key; 19. protected String value; 20. 21. }
根據項目的業務需求,將Spark中提取轉換后的每條記錄存入業務數據結構RedisBean的key、value中;然后加入到RedisBeanList列表中。RedisBeanList是List<RedisBean> RedisBeanList類型。
1. if ("RedisTestQUALINFO".equals(keyType)) { 2. RedisBean.key = "RedisTestQUALINFO"; 3. RedisBean.value = RedisTestReslut.toString(); 4. RedisBeanList.add(RedisBean); 5. }
(5)最終調用業務方法addToRedis,通過RedisServiceImpl.getSingle()獲取Redis實例,然后循環遍歷List<RedisBean>的每個元素,調用redis.lpush方法分別將Key值、Value值lpush到Redis中。lPush完成以后,通過redis.close關閉連接。
1. public static void addToRedis(List<RedisBean> redisBeanList) { 2. Jedis redis = RedisServiceImpl.getSingle(); 3. for (RedisBean redisData : redisBeanList) { 4. redis.lpush(redisData.getKey(), redisData.getValue()); 5. } 6. redis.close(); 7. }
(6)Redis業務驗證:可以登錄到Redis系統中,查詢數據已持久化至Redis。
1. redis-cli –h 100.*.*.100 -p 6379 -a 'password' 2. select 0 ---切換到0庫 3. keys * ---列出所有的key 4. lrange CDNNODEQUALINFO 0 -1 查看所有的記錄
- UTM(統一威脅管理)技術概論
- VMware Performance and Capacity Management(Second Edition)
- CompTIA Network+ Certification Guide
- 高維聚類知識發現關鍵技術研究及應用
- Enterprise PowerShell Scripting Bootcamp
- 學會VBA,菜鳥也高飛!
- 精通數據科學:從線性回歸到深度學習
- 和機器人一起進化
- 手把手教你學Flash CS3
- Visual Basic項目開發案例精粹
- 分布式Java應用
- Wireshark Revealed:Essential Skills for IT Professionals
- CPLD/FPGA技術應用
- Flash CS3動畫制作
- Python Data Mining Quick Start Guide