日本综合一区二区|亚洲中文天堂综合|日韩欧美自拍一区|男女精品天堂一区|欧美自拍第6页亚洲成人精品一区|亚洲黄色天堂一区二区成人|超碰91偷拍第一页|日韩av夜夜嗨中文字幕|久久蜜综合视频官网|精美人妻一区二区三区

RELATEED CONSULTING
相關咨詢
選擇下列產(chǎn)品馬上在線溝通
服務時間:8:30-17:00
你可能遇到了下面的問題
關閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營銷解決方案
實戰(zhàn)派|Java項目中玩轉(zhuǎn)Redis6.0客戶端緩存!

哈嘍大家好啊,我是Hydra。

在前面的文章中,我們介紹了Redis6.0中的新特性客戶端緩存client-side caching,通過telnet連接模擬客戶端,測試了三種客戶端緩存的工作模式,這篇文章我們就來點硬核實戰(zhàn),看看客戶端緩存在java項目中應該如何落地。

鋪墊

首先介紹一下今天要使用到的工具Lettuce,它是一個可伸縮線程安全的redis客戶端。多個線程可以共享同一個RedisConnection,利用nio框架Netty來高效地管理多個連接。

放眼望向現(xiàn)在常用的redis客戶端開發(fā)工具包,雖然能用的不少,但是目前率先擁抱redis6.0,支持客戶端緩存功能的卻不多,而lettuce就是其中的領跑者。

我們先在項目中引入最新版本的依賴,下面正式開始實戰(zhàn)環(huán)節(jié):


io.lettuce
lettuce-core
6.1.8.RELEASE

實戰(zhàn)

在項目中應用lettuce,開啟并使用客戶端緩存功能,只需要下面這一段代碼:

public static void main(String[] args) throws InterruptedException {
// 創(chuàng)建 RedisClient 連接信息
RedisURI redisURI= RedisURI.builder()
.withHost("127.0.0.1")
.withPort(6379)
.build();
RedisClient client = RedisClient.create(redisURI);
StatefulRedisConnection connect = client.connect();

Map map = new HashMap<>();
CacheFrontend frontend=ClientSideCaching.enable(CacheAccessor.forMap(map),
connect, TrackingArgs.Builder.enabled().noloop());

String key="user";
while (true){
String value = frontend.get(key);
System.out.println(value);
TimeUnit.SECONDS.sleep(10);
}
}

上面的代碼主要完成了幾項工作:

  • 通過RedisURI配置redis連接的標準信息,并建立連接。
  • 創(chuàng)建用于充當本地緩存的Map,開啟客戶端緩存功能,建立一個緩存訪問器CacheFrontend。
  • 在循環(huán)中使用CacheFrontend,不斷查詢同一個key對應的值并打印。

啟動上面的程序,控制臺會不斷的打印user對應的緩存,在啟動一段時間后,我們在其他的客戶端修改user對應的值,運行的結(jié)果如下:

可以看到,在其他客戶端修改了key所對應的值后,打印結(jié)果也發(fā)生了變化。但是到這里,我們也不知道lettuce是不是真的使用了客戶端緩存,雖然結(jié)果正確,但是說不定是它每次都重新執(zhí)行了get命令呢?

所以我們下面來看看源碼,分析一下具體的代碼執(zhí)行流程。

分析

在上面的代碼中,最關鍵的類就是CacheFrontend了,我們再來仔細看一下上面具體實例化時的語句:

CacheFrontend frontend=ClientSideCaching.enable(
CacheAccessor.forMap(map),
connect,
TrackingArgs.Builder.enabled().noloop()
);

首先調(diào)用了ClientSideCaching的enable()方法,我們看一下它的源碼:

解釋一下傳入的3個參數(shù):

  • CacheAccessor:一個定義對客戶端緩存進行訪問接口,上面調(diào)用它的forMap方法返回的是一個MapCacheAccessor,它的底層使用的我們自定義的Map來存放本地緩存,并且提供了get、put、evict等方法操作Map。
  • StatefulRedisConnection:使用到的redis連接。
  • TrackingArgs:客戶端緩存的參數(shù)配置,使用noloop后不會接收當前連接修改key后的通知。

向redis服務端發(fā)送開啟tracking的命令后,繼續(xù)向下調(diào)用create()方法:

這個過程中實例化了一個重要對象,它就是實現(xiàn)了RedisCache接口的DefaultRedisCache對象,實際向redis執(zhí)行查詢時的get請求、寫入的put請求,都是由它來完成。

實例化完成后,繼續(xù)向下調(diào)用同名的create()方法:

在這個方法中,實例化了ClientSideCaching對象,注意一下傳入的兩個參數(shù),通過前面的介紹也很好理解它們的分工:

  • 當本地緩存存在時,直接從CacheAccessor中讀取。
  • 當本地緩存不存在時,使用RedisCache從服務端讀取。

需要額外注意一下的是返回前的兩行代碼,先看第一句(行號114的那行)。

這里向RedisCache添加了一個監(jiān)聽,當監(jiān)聽到類型為invalidate的作廢消息時,拿到要作廢的key,傳遞給消費者。一般情況下,keys中只會有一個元素。

消費時會遍歷當前ClientSideCaching的消費者列表invalidationListeners:

而這個列表中的所有,就是在上面的第二行代碼中(行號115的那行)添加的,看一下方法的定義:

而實際傳入的方法引用則是下面MapCacheAccessor的evict()方法,也就是說,當收到key作廢的消息后,會移除掉本地緩存Map中緩存的這個數(shù)據(jù)。

客戶端緩存的作廢邏輯我們梳理清楚了,再來看看它是何時寫入的,直接看ClientSideCaching的get()方法:

可以看到,get方法會先從本地緩存MapCacheAccessor中嘗試獲取,如果取到則直接返回,如果沒有再使用RedisCache讀取redis中的緩存,并將返回的結(jié)果存入到MapCacheAccessor中。

圖解

源碼看到這里,是不是基本邏輯就串聯(lián)起來了,我們再畫兩張圖來梳理一下這個流程。先看get的過程:

再來看一下通知客戶端緩存失效的過程:

怎么樣,配合這兩張圖再理解一下,是不是很完美?

其實也不是…回憶一下我們之前使用兩級緩存Caffeine+Redis時,當時使用的通知機制,會在修改redis緩存后通知所有主機修改本地緩存,修改成為最新的值。目前的lettuce看來,顯然不滿足這一功能,只能做到作廢刪除緩存但是不會主動更新。

擴展

那么,如果想實現(xiàn)本地客戶端緩存的實時更新,我們應該如何在現(xiàn)在的基礎上進行擴展呢?仔細想一下的話,思路也很簡單:

  • 首先,移除掉lettuce的客戶端緩存本身自帶的作廢消息監(jiān)聽器
  • 然后,添加我們自己的作廢消息監(jiān)聽器

回顧一下上面源碼分析的圖,在調(diào)用DefaultRedisCache的addInvalidationListener()方法時,其實是調(diào)用的是StatefulRedisConnection的addListener()方法,也就是說,這個監(jiān)聽器其實是添加在redis連接上的。

如果我們再看一下這個方法源碼的話,就會發(fā)現(xiàn),在它的附近還有一個對應的removeListener()方法,一看就是我們要找的東西,準備用它來移除消息監(jiān)聽。

不過再仔細看看,這個方法是要傳參數(shù)的啊,我們明顯不知道現(xiàn)在里面已經(jīng)存在的PushListener有什么,所以沒法直接使用,那么無奈只能再接著往下看看這個pushHandler是什么玩意…

通過注釋可以知道,這個PushHandler就是一個用來操作PushListener的處理工具,雖然我們不知道具體要移除的PushListener是哪一個,但是驚喜的是,它提供了一個getPushListeners()方法,可以獲取當前所有的監(jiān)聽器。

這樣一來就簡單了,我上來直接清除掉這個集合中的所有監(jiān)聽器,問題就迎刃而解了~

不過,在StatefulRedisConnectionImpl中的pushHandler是一個私有對象,也沒有對外進行暴露,想要操作起來還是需要費上一點功夫的。下面,我們就在分析的結(jié)果上進行代碼的修改。

魔改

首先,我們需要自定義一個工具類,它的主要功能是操作監(jiān)聽器,所以就命名為ListenerChanger好了。它要完成的功能主要有三個:

  • 移除原有的全部消息監(jiān)聽。
  • 添加新的自定義消息監(jiān)聽。
  • 更新本地緩存MapCacheAccessor中的數(shù)據(jù)。

首先定義構(gòu)造方法,需要傳入StatefulRedisConnection和CacheAccessor作為參數(shù),在后面的方法中會用到,并且創(chuàng)建一個RedisCommands,用于后面向redis服務端發(fā)送get命令請求。

public class ListenerChanger {
private StatefulRedisConnection connection;
private CacheAccessor mapCacheAccessor;
private RedisCommands command;
public ListenerChanger(StatefulRedisConnection connection,
CacheAccessor mapCacheAccessor) {
this.connection = connection;
this.mapCacheAccessor = mapCacheAccessor;
this.command = connection.sync();
}

//其他方法先省略……
}

移除監(jiān)聽

前面說過,pushHandler是一個私有對象,我們無法直接獲取和操作,所以只能先使用反射獲得。PushHandler中的監(jiān)聽器列表存儲在一個CopyOnWriteArrayList中,我們直接使用迭代器移除掉所有內(nèi)容即可。

public void removeAllListeners() {
try {
Class connectionClass = StatefulRedisConnectionImpl.class;
Field pushHandlerField = connectionClass.getDeclaredField("pushHandler");
pushHandlerField.setAccessible(true);
PushHandler pushHandler = (PushHandler) pushHandlerField.get(this.connection);

CopyOnWriteArrayList pushListeners
= (CopyOnWriteArrayList) pushHandler.getPushListeners();
Iterator it = pushListeners.iterator();
while (it.hasNext()) {
PushListener listener = it.next();
pushListeners.remove(listener);
}
} catch (NoSuchFieldException | IllegalAccessException e) {
e.printStackTrace();
}
}

添加監(jiān)聽

這里我們模仿DefaultRedisCache中addInvalidationListener()方法的寫法,添加一個監(jiān)聽器,除了最后處理的代碼基本一致。對于監(jiān)聽到的要作廢的keys集合,另外啟動一個線程更新本地數(shù)據(jù)。

public void addNewListener() {
this.connection.addListener(new PushListener() {
@Override
public void onPushMessage(PushMessage message) {
if (message.getType().equals("invalidate")) {
List content = message.getContent(StringCodec.UTF8::decodeKey);
List keys = (List) content.get(1);
System.out.println("modifyKeys:"+keys);

// start a new thread to update cacheAccessor
new Thread(()-> updateMap(keys)).start();
}
}
});
}

本地更新

使用RedisCommands重新從redis服務端獲取最新的數(shù)據(jù),并更新本地緩存mapCacheAccessor中的數(shù)據(jù)。

private void updateMap(List keys){
for (K key : keys) {
V newValue = this.command.get(key);
System.out.println("newValue:"+newValue);
mapCacheAccessor.put(key, newValue);
}
}

至于為什么執(zhí)行這個方法時額外啟動了一個新線程,是因為我在測試中發(fā)現(xiàn),當在PushListener的onPushMessage方法中執(zhí)行RedisCommands的get()方法時,會一直取不到值,但是像這樣新啟動一個線程就沒有問題。

測試

下面,我們來寫一段測試代碼,來測試上面的改動。

public static void main(String[] args) throws InterruptedException {
// 省略之前創(chuàng)建連接代碼……

Map map = new HashMap<>();
CacheAccessor mapCacheAccessor = CacheAccessor.forMap(map);
CacheFrontend frontend = ClientSideCaching.enable(mapCacheAccessor,
connect,
TrackingArgs.Builder.enabled().noloop());

ListenerChanger listenerChanger
= new ListenerChanger<>(connect, mapCacheAccessor);
// 移除原有的listeners
listenerChanger.removeAllListeners();
// 添加新的監(jiān)聽器
listenerChanger.addNewListener();

String key = "user";
while (true) {
String value = frontend.get(key);
System.out.println(value);
TimeUnit.SECONDS.sleep(30);
}
}

可以看到,代碼基本上在之前的基礎上沒有做什么改動,只是在創(chuàng)建完ClientSideCaching后,執(zhí)行了我們自己實現(xiàn)的ListenerChanger的兩個方法。先移除所有監(jiān)聽器、再添加新的監(jiān)聽器。下面我們以debug模式啟動測試代碼,簡單看一下代碼的執(zhí)行邏輯。

首先,在未執(zhí)行移除操作前,pushHandler中的監(jiān)聽器列表中有一個監(jiān)聽器:

移除后,監(jiān)聽器列表為空:

在添加完自定義監(jiān)聽器、并且執(zhí)行完第一次查詢操作后,在另外一個redis客戶端中修改user的值,這時PushListener會收到作廢類型的消息監(jiān)聽:

啟動一個新線程,查詢redis中user對應的最新值,并放入cacheAccessor中:

當循環(huán)中CacheFrontend的get()方法再被執(zhí)行時,會直接從cacheAccessor中取到刷新后的值,不需要再次去訪問redis服務端了:

總結(jié)

到這里,我們基于lettuce的客戶端緩存的基本使用、以及在這個基礎上進行的魔改就基本完成了。可以看到,lettuce客戶端已經(jīng)在底層封裝了一套比較成熟的API,能讓我們在將redis升級到6.0以后,開箱即用式地使用客戶端緩存這一新特性。在使用中,不需要我們關注底層原理,也不用做什么業(yè)務邏輯的改造,總的來說,使用起來還是挺香的。


名稱欄目:實戰(zhàn)派|Java項目中玩轉(zhuǎn)Redis6.0客戶端緩存!
當前網(wǎng)址:http://www.dlmjj.cn/article/dppdhje.html