文章目錄 本地緩存更新方案探索 1 背景 2 方案探索 2.1 初始化 2.2 實時更新 2.2.1 長輪詢 2.2.1.1 client 2.2.2.2 server
本地緩存更新方案探索
1 背景
大家在工作中是否遇到過某些業務數據需要頻繁使用,但是數據量不大的情況,一般就是幾十條甚至幾百條這種。 一般的解決方案就是業務維護數據,然后同步redis緩存,C端使用緩存的數據。但是這里不免會出現大key/熱key的問題,另外還有緩存穿透、緩存擊穿等問題。 那么接下來我們一起探索一下如何解決上述問題吧。
2 方案探索
首先我們評估數據量,發現這類數據一般只有百條左右。那么在技術選型上使用本地緩存無疑是最好的方案。現在應對C端場景基本選型的都是Caffeine。詳見:https://blog.csdn.net/for62/article/details/147494533 我們選擇了本地緩存一方面可以抗大流量,做到無狀態橫向擴容。另一方面可以提高服務穩定性降低tp99。 那么接下來我們就要設計緩存一致性的實現方案了,如何將redis中的數據近實時同步到本地緩存,C端只讀本地緩存,可以降級讀redis。 這里我們參考長輪詢實現配置中心的方案:https://mp.weixin.qq.com/s/YjvL0sUTGHxR3GJFqrP8qg。客戶端長輪詢監聽服務端數據變更,感知到數據變更后更新本地緩存數據。設計圖如下:
2.1 初始化
這里我們先假設刷新本地緩存的方法為:LocalCacheRefresher.refresh();
public void refresh ( ) { Caffeine < String , Object > cacheInfo = getLocalCacheInstance ( ) ; String redisCacheKey = getRedisCacheKey ( ) ; Set < String > keys = redisCache. hKeys ( redisCacheKey) ; for ( String key : keys) { String data = redisCache. hGet ( redisCacheKey, key) ; cacheInfo. put ( key, data) ; } }
@Component
public class LocalCacheInitRunner implements ApplicationRunner { @Override public void run ( ApplicationArguments args) throws Exception { LocalCacheRefresher . refresh ( ) ; } }
2.2 實時更新
2.2.1 長輪詢
這里我們用長輪詢的方案監聽源數據的變更來刷新本地緩存。
2.2.1.1 client
@Slf4j
public class LongPollClient { private CloseableHttpClient httpClient; private RequestConfig requestConfig; public ConfigClient ( ) { this . httpClient = HttpClientBuilder . create ( ) . build ( ) ; this . requestConfig = RequestConfig . custom ( ) . setSocketTimeout ( 6000 ) . build ( ) ; } public void longPolling ( String url, String dataId) { String endpoint = url + "?dataId=" + dataId; HttpGet request = new HttpGet ( endpoint) ; CloseableHttpResponse response = httpClient. execute ( request) ; switch ( response. getStatusLine ( ) . getStatusCode ( ) ) { case 200 : { BufferedReader rd = new BufferedReader ( new InputStreamReader ( response. getEntity ( ) . getContent ( ) ) ) ; StringBuilder result = new StringBuilder ( ) ; String line; while ( ( line = rd. readLine ( ) ) != null ) { result. append ( line) ; } response. close ( ) ; String configInfo = result. toString ( ) ; log. info ( "dataId: [{}] changed, receive configInfo: {}" , dataId, configInfo) ; longPolling ( url, dataId) ; break ; } case 304 : { log. info ( "longPolling dataId: [{}] once finished, configInfo is unchanged, longPolling again" , dataId) ; longPolling ( url, dataId) ; break ; } default : { throw new RuntimeException ( "unExcepted HTTP status code" ) ; } } }
}
2.2.2.2 server
@RestController
@Slf4j
@SpringBootApplication
public class LongPollServer { @Data private static class AsyncTask { private AsyncContext asyncContext; private boolean timeout; public AsyncTask ( AsyncContext asyncContext, boolean timeout) { this . asyncContext = asyncContext; this . timeout = timeout; } } private Multimap < String , AsyncTask > dataIdContext = Multimaps . synchronizedSetMultimap ( HashMultimap . create ( ) ) ; private ThreadFactory threadFactory = new ThreadFactoryBuilder ( ) . setNameFormat ( "longPolling-timeout-checker-%d" ) . build ( ) ; private ScheduledExecutorService timeoutChecker = new ScheduledThreadPoolExecutor ( 1 , threadFactory) ; @RequestMapping ( "/listener" ) public void addListener ( HttpServletRequest request, HttpServletResponse response) { String dataId = request. getParameter ( "dataId" ) ; AsyncContext asyncContext = request. startAsync ( request, response) ; AsyncTask asyncTask = new AsyncTask ( asyncContext, true ) ; dataIdContext. put ( dataId, asyncTask) ; timeoutChecker. schedule ( ( ) -> { if ( asyncTask. isTimeout ( ) ) { dataIdContext. remove ( dataId, asyncTask) ; response. setStatus ( HttpServletResponse . SC_NOT_MODIFIED) ; asyncContext. complete ( ) ; } } , 3000 , TimeUnit . MILLISECONDS) ; } @RequestMapping ( "/publishConfig" ) @SneakyThrows public String publishConfig ( String dataId, String configInfo) { log. info ( "publish configInfo dataId: [{}], configInfo: {}" , dataId, configInfo) ; Collection < AsyncTask > asyncTasks = dataIdContext. removeAll ( dataId) ; for ( AsyncTask asyncTask : asyncTasks) { asyncTask. setTimeout ( false ) ; HttpServletResponse response = ( HttpServletResponse ) asyncTask. getAsyncContext ( ) . getResponse ( ) ; response. setStatus ( HttpServletResponse . SC_OK) ; response. getWriter ( ) . println ( configInfo) ; asyncTask. getAsyncContext ( ) . complete ( ) ; } return "success" ; } public static void main ( String [ ] args) { SpringApplication . run ( ConfigServer . class , args) ; } }