- 《架構世界》2020微服務刊:微服務分布式事務實戰
- 普元信息
- 1450字
- 2020-09-03 11:23:34
.實現細節
動態路由管理
作為所有請求流量的入口,在實際生產環境中為了保證高可靠和高可用,盡量避免重啟,需要實現 動態路由配置。實現動態路由其實很簡單,重點在于 這個接口. 這個接口繼承自兩個接口,其中 是用來加載路由的.它有很多實現類,其中的 就用來實現從 中加載路由. 另一個 用來實現路由的添加與刪除. 通過查看 的源碼可以發現,在 . . . . . 中這么一段:
@Bean
@ConditionalOnMissingBean(RouteDefinitionRepository.class)
public InMemoryRouteDefinitionRepositoryinMemoryRouteDefinitionRepository() {
return new InMemoryRouteDefinitionRepository();
}
可以看出,網關中如果沒有
的 ,就會采用 做為實現.這個 有一個問題,就是數據沒有持久化,網關重啟之后,原來通過接口設置的路由就會丟失了.這當然是不可接受的,所以我們需要實現自已的
,來提供路由配置信息. 如使用 做為存儲,來實現路由的存儲.具體實現請參考文章: :// . . / / / /除此以外,每當路由更改之后,還需要通知網關刷新路由.這需要發送
來通知網關. 如下列示例:@Component
public classRouteDynamicServiceimplementsApplicationEventPublisherAware{
private ApplicationEventPublisher publisher;
@Override
public void setApplicationEventPublisher(ApplicationEventPublisher publisher) {
this.publisher= publisher;
}
/**
*
刷新路由表
*/
() {
. ( ( ));
}
}
刷新可以通過消息通知機制來觸發,當然,也可以對外接供
接口,手動觸發.數據存儲

如上述類圖所示,
為治理數據統一存儲接口. 為實現的它的抽像類,它需要依賴兩個,一個是 ,用來實現 數據的存儲.另一個為 ,用來為各治數對象生成對應的 . 下面則為各個治理數據存儲的實現類.使用 做為持久存儲時,需要注意以下幾點:)為對象生成 時,建議為 添加一個命名空間(就是加一段有意義的前綴)
)在 中進行模糊搜索時,提供給 的 ,不能是一個正則的通配,它支持三種通配 *(多個),?(單個),
)如果數據量比較大,不建議使用 進行模糊查詢,應該使用 方式
數據緩存
我們提供了內部緩存,它處于使用者與持久存儲之間,緩存數據以提升性能. 緩存的實現主要有如下幾點:
)實現了 以實現在網關啟動時,自動加載數據
)內部使用了 ,保證寫時的線程同步,又保證了 時的高效( 整個過程不需要加鎖)
)從緩存中取數據時,如果需要懶加載,當從持久存儲中加載不到數據時,建議使用空數據,或空集合占位,避免每次都去持久存儲中查詢.
代碼示例如下:
/**
*根據appCode獲取流量策略
*
*@param appCode
*@return
*/
public Set<ApplicationTrafficPolicy>getAppTrafficPolicies(StringappCode) {
//從緩存加載
Map<String, ApplicationTrafficPolicy> map = policyMap.get(appCode);
//緩存中沒有
if(map ==null) {
//嘗試從持久存儲中加載所有此網關的流量策略
Set<ApplicationTrafficPolicy> policies = trafficPolicyRepository.fuzzyQuery();
//持久存儲中沒有任何流量策略,占個位置,防止緩存重復去加載
if(policies ==null|| policies.size() ==0) {
map =new ConcurrentHashMap<>();
policyMap.put(appCode, map);
}else{
//持久存儲中有流量策略,放入緩存
for(ApplicationTrafficPolicy policy : policies) {
setTrafficPolicy(policy);
}
//重新從緩存中加載一次
map = policyMap.get(appCode);
//如果還是沒有,使用空map占位子
if(map ==null) {
map=new ConcurrentHashMap<>();
policyMap.put(appCode, map);
}
}
}
returnmap.values().stream().collect(Collectors.toSet());
}
事件通知
事件通知,這里我們使用的是
的發布與訂閱能力. 默認是不發送事件的,要讓它發布事件,需要先修改它的配置文件 . ,添加一個配置:notify-keyspace-events "K$g"
上面的配置將使得
中發生數據的添加,修改或刪除時,發送 或 事件.然后,我們需要配置一個
,用來訂閱我們感興趣的事件.@Bean
RedisMessageListenerContainercontainer(MessageListenerAdapter listenerAdapter) {
StringgtwReidsPattern ="__keyspace@*__:"+ GTW + keyGenerator.getGatewayCode() +"]*";
StringcofRedisPattern ="__keyspace@*__:"+ COF + cacheKey.getKeyNameSpace() + USER_NAME +"*";
log.info("Add gateway redis message listener, patternTopic is {}", gtwReidsPattern);
log.info("Add coframe redis message listener, patternTopic is {}", cofRedisPattern);
RedisMessageListenerContainer container=new RedisMessageListenerContainer();
container.setConnectionFactory(redisTemplate.getConnectionFactory());
// PatternTopic參考: http://redisdoc.com/topic/notification.html
container.addMessageListener(listenerAdapter,Arrays.asList(new PatternTopic(PatternUtil.fmt(gtwReidsPattern)),new PatternTopic(PatternUtil.fmt(cofRedisPattern))));
return container;
}
當
事件訂閱好了之后,每次其中我們關心的數據有變更,都會發送 或 事件.我們需要定義一個
,來接收事件:@Service(value = RedisMessageListener.REDIS_LISTENER_NAME)
public classRedisMessageListenerimplementsMessageListener {
@Override
public void onMessage(Message message,byte[] pattern) {
Stringops =new String(message.getBody());
Stringchannel =new String(message.getChannel());
Stringkey = channel.split(":")[1];
if("set".equals(ops)) {
Stringvalue = redisTemplate.opsForValue().get(key);
handleSet(key, value);
}else if("del".equals(ops)) {
handleDel(key);
}
}
...
}
接收到事件后,會調用相應的內部緩存,更新內部緩存中的數據,以實現治理數據變更的及時生效。
推薦閱讀

關于作者:將曉漁,現任普元信息云計算架構師。曾在 ,云計算,數據備份,移動互聯相關領域公司工作,十年以上 工作經驗。國內 云計算的早期嘗鮮者,容器技術專家,微服務架構的踐行者。