官术网_书友最值得收藏!

11.4 Spark與Redis整合原理與實戰

本節通過生產環境實戰案例講解Spark與Redis整合原理及Spark與Redis整合實戰。

11.4.1 Spark與Redis整合原理

Redis是一個開源項目(BSD許可)。Redis以內存數據結構存儲,用作數據庫、緩存和消息代理。它支持數據結構,如字符串、散列、列表、集合、具有范圍查詢的排序集、位圖、超文本和具有半徑查詢的地理空間索引等。Redis內置復制、Lua腳本、LRU eviction、事務和不同級別的磁盤持久性,通過Redis Sentinel提供高可用性,并通過Redis Cluster進行自動分區。

Redis可以對這些類型運行原子操作,如附加到字符串、在哈希中增加值、將元素推送到列表中、計算集交集、聯合與差異于一體;或者在排序集中獲得最高排名的成員。

為了實現其卓越的性能,Redis使用內存中的數據集。根據業務用例,可以通過將數據集一次性轉儲到磁盤中,或通過將每個命令附加到日志來持久化。如果只需要功能豐富的網絡內存緩存,則可以選擇禁用持久性。

Redis還支持簡單的主從異步復制,第一次同步非阻塞的速度非常快,網絡切分傳輸時可自動重連同步。

Redis的其他功能包括:

 事務Transactions。

 發布/訂閱。

 Lua腳本。

 Keys with a limited time-to-live。

 LRU eviction of keys。

 自動故障切換。

大多數編程語言可以使用Redis。

Redis以ANSI C編寫,適用于大多數POSIX系統,如Linux、* BSD、OS X,無需外部依賴。Linux和OS X是Redis開發和測試的兩個操作系統,建議使用Linux進行部署。Redis可能在諸如SmartOS的Solaris衍生系統中工作,但支持是盡力而為的。沒有官方支持Windows版本,但是Microsoft開發并維護了Redis的Win-64端口。

11.4.2 Spark與Redis整合實戰

本節以生產環境中Spark與Redis整合實戰案例來講解。在通信運營商的Spark大數據項目中,Spark每分鐘實時讀取Hdfs中的話單數據,經過業務邏輯代碼分析轉換后,將清洗以后的數據轉換成一個List字符串列表,然后遍歷List字符串列表,將每條記錄拼接成一個Key-Value字符串放入Redis隊列。

Spark與Redis整合實戰案例實現步驟如下。

(1)通過Maven方式下載Redis的jedis 2.6.0 Jar包。

pom.xml文件增加以下內容:

1.  <dependency>
2.        <groupId>redis.clients</groupId>
3.        <artifactId>jedis</artifactId>
4.        <version>2.6.0</version>
5.    </dependency>

在Spark中導入Redis的Jar包。

1.  import redis.clients.jedis.Jedis;
2.  import redis.clients.jedis.JedisPool;
3.  import redis.clients.jedis.JedisPoolConfig;

(2)在項目config.properties配置文件中增加Redis的主機地址、端口、密碼,以及Redis連接池分配的連接數、等待時間等信息。

1.  ## REDIS
2.  redis.ip=100.*.*.100
3.  redis.port=6379
4.  redis.password=password
5.  ......
6.  ## redis
7.  #最大分配的對象數
8.  redis.pool.maxTotal=1024
9.  #最大能夠保持idle狀態的對象數
10. redis.pool.maxIdle=200
11. #當池內沒有返回對象時,最大等待時間
12. redis.pool.maxWait=1000
13. #當調用borrow Object方法時,是否進行有效性檢查
14. redis.pool.testOnBorrow=true
15. #當調用return Object方法時,是否進行有效性檢查
16. redis.pool.testOnReturn=true
17.

(3)編寫RedisServiceImpl實現類,從配置文件中獲取Redis的主機地址、端口、密碼等信息;編寫getFromPool方法從redis訪問池中獲取redis實例;編寫getSingle方法獲取redis實例。

1.   public class RedisServiceImpl {
2.      private static final Map<String, String> REDIS_CONFIG = Config.
        getInstance().getRedisParams();
3.      private static final String REDIS_IP = REDIS_CONFIG.get("redis.ip");
4.      private static final String REDIS_PORT = REDIS_CONFIG.get("redis.port");
5.      private static final String REDIS_PASSWORD = REDIS_CONFIG.get("redis.
        password");
6.      private static final int REDIS_TIMEOUT = 2000;
7.      private static Logger log = LoggerFactory.getLogger(RedisServiceImpl.class);
8.      private static JedisPool pool;
9.      /**
          * 從redis訪問池中獲取redis實例
          *
          * @return redis實例
10.       */
11.     public static Jedis getFromPool() {
12.         if (pool == null) {
13.             JedisPoolConfig config = new JedisPoolConfig();
14.             config.setMaxTotal(Integer.valueOf(REDIS_CONFIG.get("redis.
                 pool.maxTotal")));
15.             config.setMaxIdle(Integer.valueOf(REDIS_CONFIG.get("redis.
                 pool.maxIdle")));
16.             config.setMaxWaitMillis(Integer.valueOf(REDIS_CONFIG.get("redis.
                 pool.maxWait")));
17.             config.setTestOnBorrow(Boolean.valueOf(REDIS_CONFIG.get("redis.
                 pool.testOnBorrow")));
18.             config.setTestOnReturn(Boolean.valueOf(REDIS_CONFIG.get("redis.
                 pool.testOnReturn")));
19.             pool = new JedisPool(config, REDIS_IP, Integer.valueOf(REDIS_
                PORT), REDIS_TIMEOUT, REDIS_PASSWORD);
20.         }
21.         return pool.getResource();
22.     }
23.
24.     public static void returnResource(Jedis redis) {
25.         pool.returnResource(redis);
26.     }
27.
28.     /**
          * 獲取redis實例
          *
          * @return redis實例
29.       */
30.
31.     public static Jedis getSingle() {
32.         Jedis redis = new Jedis(REDIS_IP, Integer.valueOf(REDIS_PORT),
            REDIS_TIMEOUT);
33.         redis.auth(REDIS_PASSWORD);
34.         return redis;
35.     }
36. }

(4)編寫項目的業務代碼,RedisBean類數據結構用于要存放Redis的數據。

1.  public class RedisBean {
2.      public void setKey(String key) {
3.         this.key = key;
4.     }
5.
6.     public void setValue(String value) {
7.         this.value = value;
8.     }
9.
10.    public String getKey() {
11.        return key;
12.    }
13.
14.    public String getValue() {
15.        return value;
16.    }
17.
18.    protected String key;
19.    protected String value;
20.
21. }

根據項目的業務需求,將Spark中提取轉換后的每條記錄存入業務數據結構RedisBean的key、value中;然后加入到RedisBeanList列表中。RedisBeanList是List<RedisBean> RedisBeanList類型。

1.  if ("RedisTestQUALINFO".equals(keyType)) {
2.         RedisBean.key = "RedisTestQUALINFO";
3.         RedisBean.value = RedisTestReslut.toString();
4.         RedisBeanList.add(RedisBean);
5.     }

(5)最終調用業務方法addToRedis,通過RedisServiceImpl.getSingle()獲取Redis實例,然后循環遍歷List<RedisBean>的每個元素,調用redis.lpush方法分別將Key值、Value值lpush到Redis中。lPush完成以后,通過redis.close關閉連接。

1.     public static void addToRedis(List<RedisBean> redisBeanList) {
2.     Jedis redis = RedisServiceImpl.getSingle();
3.     for (RedisBean redisData : redisBeanList) {
4.         redis.lpush(redisData.getKey(), redisData.getValue());
5.     }
6.     redis.close();
7.  }

(6)Redis業務驗證:可以登錄到Redis系統中,查詢數據已持久化至Redis。

1.   redis-cli –h 100.*.*.100  -p 6379 -a 'password'
2.  select 0  ---切換到0庫
3.  keys *    ---列出所有的key
4.  lrange CDNNODEQUALINFO 0 -1 查看所有的記錄
主站蜘蛛池模板: 鞍山市| 肇源县| 双柏县| 安龙县| 治县。| 武威市| 成安县| 昌图县| 东辽县| 宝应县| 岳西县| 双城市| 格尔木市| 阿克苏市| 策勒县| 定安县| 东山县| 宜君县| 武定县| 周宁县| 英吉沙县| 同心县| 兰溪市| 泸定县| 萝北县| 南乐县| 延边| 茂名市| 哈尔滨市| 密云县| 普安县| 土默特右旗| 陇西县| 永康市| 宣城市| 天门市| 定西市| 尼木县| 临澧县| 丽江市| 宽甸|