文章目录 本地缓存更新方案探索 1 背景 2 方案探索 2.1 初始化 2.2 实时更新 2.2.1 长轮询 2.2.1.1 client 2.2.2.2 server
本地缓存更新方案探索
1 背景
大家在工作中是否遇到过某些业务数据需要频繁使用,但是数据量不大的情况,一般就是几十条甚至几百条这种。 一般的解决方案就是业务维护数据,然后同步redis缓存,C端使用缓存的数据。但是这里不免会出现大key/热key的问题,另外还有缓存穿透、缓存击穿等问题。 那么接下来我们一起探索一下如何解决上述问题吧。
2 方案探索
首先我们评估数据量,发现这类数据一般只有百条左右。那么在技术选型上使用本地缓存无疑是最好的方案。现在应对C端场景基本选型的都是Caffeine。详见:https://blog.csdn.net/for62/article/details/147494533 我们选择了本地缓存一方面可以抗大流量,做到无状态横向扩容。另一方面可以提高服务稳定性降低tp99。 那么接下来我们就要设计缓存一致性的实现方案了,如何将redis中的数据近实时同步到本地缓存,C端只读本地缓存,可以降级读redis。 这里我们参考长轮询实现配置中心的方案:https://mp.weixin.qq.com/s/YjvL0sUTGHxR3GJFqrP8qg。客户端长轮询监听服务端数据变更,感知到数据变更后更新本地缓存数据。设计图如下:
2.1 初始化
这里我们先假设刷新本地缓存的方法为:LocalCacheRefresher.refresh();
public void refresh ( ) { Caffeine < String , Object > cacheInfo = getLocalCacheInstance ( ) ; String redisCacheKey = getRedisCacheKey ( ) ; Set < String > keys = redisCache. hKeys ( redisCacheKey) ; for ( String key : keys) { String data = redisCache. hGet ( redisCacheKey, key) ; cacheInfo. put ( key, data) ; } }
@Component
public class LocalCacheInitRunner implements ApplicationRunner { @Override public void run ( ApplicationArguments args) throws Exception { LocalCacheRefresher . refresh ( ) ; } }
2.2 实时更新
2.2.1 长轮询
这里我们用长轮询的方案监听源数据的变更来刷新本地缓存。
2.2.1.1 client
@Slf4j
public class LongPollClient { private CloseableHttpClient httpClient; private RequestConfig requestConfig; public ConfigClient ( ) { this . httpClient = HttpClientBuilder . create ( ) . build ( ) ; this . requestConfig = RequestConfig . custom ( ) . setSocketTimeout ( 6000 ) . build ( ) ; } public void longPolling ( String url, String dataId) { String endpoint = url + "?dataId=" + dataId; HttpGet request = new HttpGet ( endpoint) ; CloseableHttpResponse response = httpClient. execute ( request) ; switch ( response. getStatusLine ( ) . getStatusCode ( ) ) { case 200 : { BufferedReader rd = new BufferedReader ( new InputStreamReader ( response. getEntity ( ) . getContent ( ) ) ) ; StringBuilder result = new StringBuilder ( ) ; String line; while ( ( line = rd. readLine ( ) ) != null ) { result. append ( line) ; } response. close ( ) ; String configInfo = result. toString ( ) ; log. info ( "dataId: [{}] changed, receive configInfo: {}" , dataId, configInfo) ; longPolling ( url, dataId) ; break ; } case 304 : { log. info ( "longPolling dataId: [{}] once finished, configInfo is unchanged, longPolling again" , dataId) ; longPolling ( url, dataId) ; break ; } default : { throw new RuntimeException ( "unExcepted HTTP status code" ) ; } } }
}
2.2.2.2 server
@RestController
@Slf4j
@SpringBootApplication
public class LongPollServer { @Data private static class AsyncTask { private AsyncContext asyncContext; private boolean timeout; public AsyncTask ( AsyncContext asyncContext, boolean timeout) { this . asyncContext = asyncContext; this . timeout = timeout; } } private Multimap < String , AsyncTask > dataIdContext = Multimaps . synchronizedSetMultimap ( HashMultimap . create ( ) ) ; private ThreadFactory threadFactory = new ThreadFactoryBuilder ( ) . setNameFormat ( "longPolling-timeout-checker-%d" ) . build ( ) ; private ScheduledExecutorService timeoutChecker = new ScheduledThreadPoolExecutor ( 1 , threadFactory) ; @RequestMapping ( "/listener" ) public void addListener ( HttpServletRequest request, HttpServletResponse response) { String dataId = request. getParameter ( "dataId" ) ; AsyncContext asyncContext = request. startAsync ( request, response) ; AsyncTask asyncTask = new AsyncTask ( asyncContext, true ) ; dataIdContext. put ( dataId, asyncTask) ; timeoutChecker. schedule ( ( ) -> { if ( asyncTask. isTimeout ( ) ) { dataIdContext. remove ( dataId, asyncTask) ; response. setStatus ( HttpServletResponse . SC_NOT_MODIFIED) ; asyncContext. complete ( ) ; } } , 3000 , TimeUnit . MILLISECONDS) ; } @RequestMapping ( "/publishConfig" ) @SneakyThrows public String publishConfig ( String dataId, String configInfo) { log. info ( "publish configInfo dataId: [{}], configInfo: {}" , dataId, configInfo) ; Collection < AsyncTask > asyncTasks = dataIdContext. removeAll ( dataId) ; for ( AsyncTask asyncTask : asyncTasks) { asyncTask. setTimeout ( false ) ; HttpServletResponse response = ( HttpServletResponse ) asyncTask. getAsyncContext ( ) . getResponse ( ) ; response. setStatus ( HttpServletResponse . SC_OK) ; response. getWriter ( ) . println ( configInfo) ; asyncTask. getAsyncContext ( ) . complete ( ) ; } return "success" ; } public static void main ( String [ ] args) { SpringApplication . run ( ConfigServer . class , args) ; } }