当前位置:   article > 正文

缓存框架 Caffeine 的可视化探索与实践

缓存框架 Caffeine 的可视化探索与实践

作者:vivo 互联网服务器团队-  Wang Zhi

Caffeine 作为一个高性能的缓存框架而被大量使用。本文基于Caffeine已有的基础进行定制化开发实现可视化功能。

一、背景

Caffeine缓存是一个高性能、可扩展、内存优化的 Java 缓存库,基于 Google 的 Guava Cache演进而来并提供了接近最佳的命中率。

Caffeine 缓存包含以下特点

  1. 高效快速:Caffeine 缓存使用近似算法和并发哈希表等优化技术,使得缓存的访问速度非常快。

  2. 内存友好:Caffeine 缓存使用一种内存优化策略,能够根据需要动态调整缓存的大小,有效地利用内存资源。

  3. 多种缓存策略:Caffeine 缓存支持多种缓存策略,如基于容量、时间、权重、手动移除、定时刷新等,并提供了丰富的配置选项,能够适应不同的应用场景和需求。

  4. 支持异步加载和刷新:Caffeine 缓存支持异步加载和刷新缓存项,可以与 Spring 等框架无缝集成。

  5. 清理策略:Caffeine 使用 Window TinyLFU 清理策略,它提供了接近最佳的命中率。

  6. 支持自动加载和自动过期:Caffeine 缓存可以根据配置自动加载和过期缓存项,无需手动干预。

  7. 统计功能:Caffeine 缓存提供了丰富的统计功能,如缓存命中率、缓存项数量等,方便评估缓存的性能和效果。

正是因为Caffeine具备的上述特性,Caffeine作为项目中本地缓存的不二选择,越来越多的项目集成了Caffeine的功能,进而衍生了一系列的业务视角的需求。

日常使用的需求之一希望能够实时评估Caffeine实例的内存占用情况并能够提供动态调整缓存参数的能力,但是已有的内存分析工具MAT需要基于dump的文件进行分析无法做到实时,这也是整个事情的起因之一。

二、业务的技术视角

  • 能够对项目中的Caffeine的缓存实例能够做到近实时统计,实时查看缓存的实例个数。

  • 能够对Caffeine的每个实例的缓存配置参数、内存占用、缓存命中率做到实时查看,同时能够支持单个实例的缓存过期时间,缓存条目等参数进行动态配置下发。

  • 能够对Caffeine的每个实例的缓存数据做到实时查看,并且能够支持缓存数据的立即失效等功能。

基于上述的需求背景,结合caffeine的已有功能和定制的部分源码开发,整体作为caffeine可视化的技术项目进行推进和落地。

三、可视化能力

Caffeine可视化项目目前已支持功能包括:

  • 项目维度的全局缓存实例的管控。

  • 单缓存实例配置信息可视化、内存占用可视化、命中率可视化。

  • 单缓存实例的数据查询、配置动态变更、缓存数据失效等功能。

3.1 缓存实例的全局管控

图片

说明:

  • 以应用维度+机器维度展示该应用下包含的缓存实例对象,每个实例包含缓存设置中的大小、过期策略、过期时间、内存占用、缓存命中率等信息。

  • 单实例维度的内存占用和缓存命中率支持以趋势图进行展示。

  • 单实例维度支持配置变更操作和缓存查询操作。

3.2 内存占用趋势

图片

说明:

  • 内存占用趋势记录该缓存实例对象近一段时间内存占用的趋势变化。

  • 时间周期目前支持展示近两天的数据。

3.3 命中率趋势

图片

说明:

  • 命中率趋势记录该缓存实例对象近一段时间缓存命中的变化情况。

  • 时间周期目前支持展示近两天的数据。

3.4 配置变更

图片

说明:

  • 配置变更目前支持缓存大小和过期时间的动态设置。

  • 目前暂时支持单实例的设置,后续会支持全量生效功能。

3.5 缓存查询

图片

说明:

  • 单实例维度支持缓存数据的查询。

  • 目前支持常见的缓存Key类型包括String类型、Long类型、Int类型。

四、原理实现

4.1 整体设计框架

  • Caffeine框架功能整合

图片

说明:

  • 沿用Caffeine的基础功能包括Caffeine的缓存功能和Caffeine统计功能。

  • 新增Caffeine内存占用预估功能,该功能主要是预估缓存实例对象占用的内存情况。

  • 新增Caffeine实例命名功能,该功能是针对每个实例对象提供命名功能,是全局管控的基础。

  • 新增Caffeine实例全局管控功能,该功能主要维护项目运行中所有的缓存实例。

Caffeine可视化框架

图片

说明:

  • 【项目工程侧】:Caffeine的可视化框架基于Caffeine框架功能整合的基础上增加通信层进行数据数据上报和配置的下发。

  • 【管控平台侧】:负责缓存数据上报的接收展示,配置变更命令的下发。

  • 【通信层支持push和pull两种模式】,push模式主要用于统计数据的实时上报,pull模式主要用于配置下发和缓存数据查询。

4.2 源码实现

业务层-缓存对象的管理

  1. static Cache<String, List<String>> accountWhiteCache = Caffeine.newBuilder()
  2. .expireAfterWrite(VivoConfigManager.getInteger("trade.account.white.list.cache.ttl", 10), TimeUnit.MINUTES)
  3. .recordStats().maximumSize(VivoConfigManager.getInteger("trade.account.white.list.cache.size", 100)).build();
  4. 常规的Caffeine实例的创建方式
  5. static Cache<String, List<String>> accountWhiteCache = Caffeine.newBuilder().applyName("accountWhiteCache")
  6. .expireAfterWrite(VivoConfigManager.getInteger("trade.account.white.list.cache.ttl", 10), TimeUnit.MINUTES)
  7. .recordStats().maximumSize(VivoConfigManager.getInteger("trade.account.white.list.cache.size", 100)).build();
  8. 支持实例命名的Caffeine实例的创建方式

说明:

  • 在Caffeine实例创建的基础上增加了缓存实例的命名功能,通过.applyName("accountWhiteCache")来定义缓存实例的命名。

  1. public final class Caffeine<K, V> {
  2. /**
  3. * caffeine的实例名称
  4. */
  5. String instanceName;
  6. /**
  7. * caffeine的实例维护的Map信息
  8. */
  9. static Map<String, Cache> cacheInstanceMap = new ConcurrentHashMap<>();
  10. @NonNull
  11. public <K1 extends K, V1 extends V> Cache<K1, V1> build() {
  12. requireWeightWithWeigher();
  13. requireNonLoadingCache();
  14. @SuppressWarnings("unchecked")
  15. Caffeine<K1, V1> self = (Caffeine<K1, V1>) this;
  16. Cache localCache = isBounded() ? new BoundedLocalCache.BoundedLocalManualCache<>(self) : new UnboundedLocalCache.UnboundedLocalManualCache<>(self);
  17. if (null != localCache && StringUtils.isNotEmpty(localCache.getInstanceName())) {
  18. cacheInstanceMap.put(localCache.getInstanceName(), localCache);
  19. }
  20. return localCache;
  21. }
  22. }

说明:

  • 每个Caffeine都有一个实例名称instanceName。

  • 全局通过cacheInstanceMap来维护Caffeine实例对象的名称和实例的映射关系。

  • 通过维护映射关系能够通过实例的名称查询到缓存实例对象并对缓存实例对象进行各类的操作。

  • Caffeine实例的命名功能是其他功能整合的基石。

业务层-内存占用的预估

  1. import jdk.nashorn.internal.ir.debug.ObjectSizeCalculator;
  2. public abstract class BoundedLocalCache<K, V> extends BLCHeader.DrainStatusRef<K, V>
  3. implements LocalCache<K, V> {
  4. final ConcurrentHashMap<Object, Node<K, V>> data;
  5. @Override
  6. public long getMemoryUsed() {
  7. // 预估内存占用
  8. return ObjectSizeCalculator.getObjectSize(data);
  9. }
  10. }

说明:

  • 通过ObjectSizeCalculator.getObjectSize预估内存的缓存值。

  • data值是Caffeine实例用来保存真实数据的对象。

业务层-数据上报机制

  1. public static StatsData getCacheStats(String instanceName) {
  2. Cache cache = Caffeine.getCacheByInstanceName(instanceName);
  3. CacheStats cacheStats = cache.stats();
  4. StatsData statsData = new StatsData();
  5. statsData.setInstanceName(instanceName);
  6. statsData.setTimeStamp(System.currentTimeMillis()/1000);
  7. statsData.setMemoryUsed(String.valueOf(cache.getMemoryUsed()));
  8. statsData.setEstimatedSize(String.valueOf(cache.estimatedSize()));
  9. statsData.setRequestCount(String.valueOf(cacheStats.requestCount()));
  10. statsData.setHitCount(String.valueOf(cacheStats.hitCount()));
  11. statsData.setHitRate(String.valueOf(cacheStats.hitRate()));
  12. statsData.setMissCount(String.valueOf(cacheStats.missCount()));
  13. statsData.setMissRate(String.valueOf(cacheStats.missRate()));
  14. statsData.setLoadCount(String.valueOf(cacheStats.loadCount()));
  15. statsData.setLoadSuccessCount(String.valueOf(cacheStats.loadSuccessCount()));
  16. statsData.setLoadFailureCount(String.valueOf(cacheStats.loadFailureCount()));
  17. statsData.setLoadFailureRate(String.valueOf(cacheStats.loadFailureRate()));
  18. Optional<Eviction> optionalEviction = cache.policy().eviction();
  19. optionalEviction.ifPresent(eviction -> statsData.setMaximumSize(String.valueOf(eviction.getMaximum())));
  20. Optional<Expiration> optionalExpiration = cache.policy().expireAfterWrite();
  21. optionalExpiration.ifPresent(expiration -> statsData.setExpireAfterWrite(String.valueOf(expiration.getExpiresAfter(TimeUnit.SECONDS))));
  22. optionalExpiration = cache.policy().expireAfterAccess();
  23. optionalExpiration.ifPresent(expiration -> statsData.setExpireAfterAccess(String.valueOf(expiration.getExpiresAfter(TimeUnit.SECONDS))));
  24. optionalExpiration = cache.policy().refreshAfterWrite();
  25. optionalExpiration.ifPresent(expiration -> statsData.setRefreshAfterWrite(String.valueOf(expiration.getExpiresAfter(TimeUnit.SECONDS))));
  26. return statsData;
  27. }

说明:

  • 通过Caffeine自带的统计接口来统计相关数值。

  • 统计数据实例维度进行统计。

  1. public static void sendReportData() {
  2. try {
  3. if (!VivoConfigManager.getBoolean("memory.caffeine.data.report.switch", true)) {
  4. return;
  5. }
  6. // 1、获取所有的cache实例对象
  7. Method listCacheInstanceMethod = HANDLER_MANAGER_CLASS.getMethod("listCacheInstance", null);
  8. List<String> instanceNames = (List)listCacheInstanceMethod.invoke(null, null);
  9. if (CollectionUtils.isEmpty(instanceNames)) {
  10. return;
  11. }
  12. String appName = System.getProperty("app.name");
  13. String localIp = getLocalIp();
  14. String localPort = String.valueOf(NetPortUtils.getWorkPort());
  15. ReportData reportData = new ReportData();
  16. InstanceData instanceData = new InstanceData();
  17. instanceData.setAppName(appName);
  18. instanceData.setIp(localIp);
  19. instanceData.setPort(localPort);
  20. // 2、遍历cache实例对象获取缓存监控数据
  21. Method getCacheStatsMethod = HANDLER_MANAGER_CLASS.getMethod("getCacheStats", String.class);
  22. Map<String, StatsData> statsDataMap = new HashMap<>();
  23. instanceNames.stream().forEach(instanceName -> {
  24. try {
  25. StatsData statsData = (StatsData)getCacheStatsMethod.invoke(null, instanceName);
  26. statsDataMap.put(instanceName, statsData);
  27. } catch (Exception e) {
  28. }
  29. });
  30. // 3、构建上报对象
  31. reportData.setInstanceData(instanceData);
  32. reportData.setStatsDataMap(statsDataMap);
  33. // 4、发送Http的POST请求
  34. HttpPost httpPost = new HttpPost(getReportDataUrl());
  35. httpPost.setConfig(requestConfig);
  36. StringEntity stringEntity = new StringEntity(JSON.toJSONString(reportData));
  37. stringEntity.setContentType("application/json");
  38. httpPost.setEntity(stringEntity);
  39. HttpResponse response = httpClient.execute(httpPost);
  40. String result = EntityUtils.toString(response.getEntity(),"UTF-8");
  41. EntityUtils.consume(response.getEntity());
  42. logger.info("Caffeine 数据上报成功 URL {} 参数 {} 结果 {}", getReportDataUrl(), JSON.toJSONString(reportData), result);
  43. } catch (Throwable throwable) {
  44. logger.error("Caffeine 数据上报失败 URL {} ", getReportDataUrl(), throwable);
  45. }
  46. }

说明:

  • 通过获取项目中运行的所有Caffeine实例并依次遍历收集统计数据。

  • 通过http协议负责上报对应的统计数据,采用固定间隔周期进行上报。

业务层-配置动态下发

  1. public static ExecutionResponse dispose(ExecutionRequest request) {
  2. ExecutionResponse executionResponse = new ExecutionResponse();
  3. executionResponse.setCmdType(CmdTypeEnum.INSTANCE_CONFIGURE.getCmd());
  4. executionResponse.setInstanceName(request.getInstanceName());
  5. String instanceName = request.getInstanceName();
  6. Cache cache = Caffeine.getCacheByInstanceName(instanceName);
  7. // 设置缓存的最大条目
  8. if (null != request.getMaximumSize() && request.getMaximumSize() > 0) {
  9. Optional<Eviction> optionalEviction = cache.policy().eviction();
  10. optionalEviction.ifPresent(eviction ->eviction.setMaximum(request.getMaximumSize()));
  11. }
  12. // 设置写后过期的过期时间
  13. if (null != request.getExpireAfterWrite() && request.getExpireAfterWrite() > 0) {
  14. Optional<Expiration> optionalExpiration = cache.policy().expireAfterWrite();
  15. optionalExpiration.ifPresent(expiration -> expiration.setExpiresAfter(request.getExpireAfterWrite(), TimeUnit.SECONDS));
  16. }
  17. // 设置访问过期的过期时间
  18. if (null != request.getExpireAfterAccess() && request.getExpireAfterAccess() > 0) {
  19. Optional<Expiration> optionalExpiration = cache.policy().expireAfterAccess();
  20. optionalExpiration.ifPresent(expiration -> expiration.setExpiresAfter(request.getExpireAfterAccess(), TimeUnit.SECONDS));
  21. }
  22. // 设置写更新的过期时间
  23. if (null != request.getRefreshAfterWrite() && request.getRefreshAfterWrite() > 0) {
  24. Optional<Expiration> optionalExpiration = cache.policy().refreshAfterWrite();
  25. optionalExpiration.ifPresent(expiration -> expiration.setExpiresAfter(request.getRefreshAfterWrite(), TimeUnit.SECONDS));
  26. }
  27. executionResponse.setCode(0);
  28. executionResponse.setMsg("success");
  29. return executionResponse;
  30. }

说明:

  • 通过Caffeine自带接口进行缓存配置的相关设置。

业务层-缓存数据清空

  1. /**
  2. * 失效缓存的值
  3. * @param request
  4. * @return
  5. */
  6. public static ExecutionResponse invalidate(ExecutionRequest request) {
  7. ExecutionResponse executionResponse = new ExecutionResponse();
  8. executionResponse.setCmdType(CmdTypeEnum.INSTANCE_INVALIDATE.getCmd());
  9. executionResponse.setInstanceName(request.getInstanceName());
  10. try {
  11. // 查找对应的cache实例
  12. String instanceName = request.getInstanceName();
  13. Cache cache = Caffeine.getCacheByInstanceName(instanceName);
  14. // 处理清空指定实例的所有缓存 或 指定实例的key对应的缓存
  15. Object cacheKeyObj = request.getCacheKey();
  16. // 清除所有缓存
  17. if (Objects.isNull(cacheKeyObj)) {
  18. cache.invalidateAll();
  19. } else {
  20. // 清除指定key对应的缓存
  21. if (Objects.equals(request.getCacheKeyType(), 2)) {
  22. cache.invalidate(Long.valueOf(request.getCacheKey().toString()));
  23. } else if (Objects.equals(request.getCacheKeyType(), 3)) {
  24. cache.invalidate(Integer.valueOf(request.getCacheKey().toString()));
  25. } else {
  26. cache.invalidate(request.getCacheKey().toString());
  27. }
  28. }
  29. executionResponse.setCode(0);
  30. executionResponse.setMsg("success");
  31. } catch (Exception e) {
  32. executionResponse.setCode(-1);
  33. executionResponse.setMsg("fail");
  34. }
  35. return executionResponse;
  36. }
  37. }

业务层-缓存数据查询

  1. public static ExecutionResponse inspect(ExecutionRequest request) {
  2. ExecutionResponse executionResponse = new ExecutionResponse();
  3. executionResponse.setCmdType(CmdTypeEnum.INSTANCE_INSPECT.getCmd());
  4. executionResponse.setInstanceName(request.getInstanceName());
  5. String instanceName = request.getInstanceName();
  6. Cache cache = Caffeine.getCacheByInstanceName(instanceName);
  7. Object cacheValue = cache.getIfPresent(request.getCacheKey());
  8. if (Objects.equals(request.getCacheKeyType(), 2)) {
  9. cacheValue = cache.getIfPresent(Long.valueOf(request.getCacheKey().toString()));
  10. } else if (Objects.equals(request.getCacheKeyType(), 3)) {
  11. cacheValue = cache.getIfPresent(Integer.valueOf(request.getCacheKey().toString()));
  12. } else {
  13. cacheValue = cache.getIfPresent(request.getCacheKey().toString());
  14. }
  15. if (Objects.isNull(cacheValue)) {
  16. executionResponse.setData("");
  17. } else {
  18. executionResponse.setData(JSON.toJSONString(cacheValue));
  19. }
  20. return executionResponse;
  21. }

说明:

  • 通过Caffeine自带接口进行缓存信息查询。

通信层-监听服务

  1. public class ServerManager {
  2. private Server jetty;
  3. /**
  4. * 创建jetty对象
  5. * @throws Exception
  6. */
  7. public ServerManager() throws Exception {
  8. int port = NetPortUtils.getAvailablePort();
  9. jetty = new Server(port);
  10. ServletContextHandler context = new ServletContextHandler(ServletContextHandler.NO_SESSIONS);
  11. context.setContextPath("/");
  12. context.addServlet(ClientServlet.class, "/caffeine");
  13. jetty.setHandler(context);
  14. }
  15. /**
  16. * 启动jetty对象
  17. * @throws Exception
  18. */
  19. public void start() throws Exception {
  20. jetty.start();
  21. }
  22. }
  23. public class ClientServlet extends HttpServlet {
  24. private static final Logger logger = LoggerFactory.getLogger(ClientServlet.class);
  25. @Override
  26. protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
  27. super.doGet(req, resp);
  28. }
  29. @Override
  30. protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
  31. ExecutionResponse executionResponse = null;
  32. String requestJson = null;
  33. try {
  34. // 获取请求的相关的参数
  35. String contextPath = req.getContextPath();
  36. String servletPath = req.getServletPath();
  37. String requestUri = req.getRequestURI();
  38. requestJson = IOUtils.toString(req.getInputStream(), StandardCharsets.UTF_8);
  39. // 处理不同的命令
  40. ExecutionRequest executionRequest = JSON.parseObject(requestJson, ExecutionRequest.class);
  41. // 通过反射来来处理类依赖问题
  42. executionResponse = DisposeCenter.dispatch(executionRequest);
  43. } catch (Exception e) {
  44. logger.error("vivo-memory 处理请求异常 {} ", requestJson, e);
  45. }
  46. if (null == executionResponse) {
  47. executionResponse = new ExecutionResponse();
  48. executionResponse.setCode(-1);
  49. executionResponse.setMsg("处理异常");
  50. }
  51. // 组装相应报文
  52. resp.setContentType("application/json; charset=utf-8");
  53. PrintWriter out = resp.getWriter();
  54. out.println(JSON.toJSONString(executionResponse));
  55. out.flush();
  56. }
  57. }

说明:

  • 通信层通过jetty启动http服务进行监听,安全考虑端口不对外开放。

  • 通过定义ClientServlet来处理相关的请求包括配置下发和缓存查询等功能。

通信层-心跳设计

  1. /**
  2. * 发送心跳数据
  3. */
  4. public static void sendHeartBeatData() {
  5. try {
  6. if (!VivoConfigManager.getBoolean("memory.caffeine.heart.report.switch", true)) {
  7. return;
  8. }
  9. // 1、构建心跳数据
  10. String appName = System.getProperty("app.name");
  11. String localIp = getLocalIp();
  12. String localPort = String.valueOf(NetPortUtils.getWorkPort());
  13. HeartBeatData heartBeatData = new HeartBeatData();
  14. heartBeatData.setAppName(appName);
  15. heartBeatData.setIp(localIp);
  16. heartBeatData.setPort(localPort);
  17. heartBeatData.setTimeStamp(System.currentTimeMillis()/1000);
  18. // 2、发送Http的POST请求
  19. HttpPost httpPost = new HttpPost(getHeartBeatUrl());
  20. httpPost.setConfig(requestConfig);
  21. StringEntity stringEntity = new StringEntity(JSON.toJSONString(heartBeatData));
  22. stringEntity.setContentType("application/json");
  23. httpPost.setEntity(stringEntity);
  24. HttpResponse response = httpClient.execute(httpPost);
  25. String result = EntityUtils.toString(response.getEntity(),"UTF-8");
  26. EntityUtils.consume(response.getEntity());
  27. logger.info("Caffeine 心跳上报成功 URL {} 参数 {} 结果 {}", getHeartBeatUrl(), JSON.toJSONString(heartBeatData), result);
  28. } catch (Throwable throwable) {
  29. logger.error("Caffeine 心跳上报失败 URL {} ", getHeartBeatUrl(), throwable);
  30. }
  31. }

说明:

  • 心跳功能上报项目实例的ip和端口用来通信,携带时间戳用来记录上报时间戳。

  • 实际项目中因为机器的回收等场景需要通过上报时间戳定时清理下线的服务。

五、总结

vivo技术团队在Caffeine的使用经验上曾有过多次分享,可参考公众号文章《如何把 Caffeine Cache 用得如丝般顺滑》,此篇文章在使用的基础上基于使用痛点进行进一步的定制。

目前Caffeine可视化的项目已经在相关核心业务场景中落地并发挥作用,整体运行平稳。使用较多的功能包括项目维度的caffeine实例的全局管控,单实例维度的内存占用评估和缓存命中趋势评估。

如通过单实例的内存占用评估功能能够合理评估缓存条目设置和内存占用之间的关系;通过分析缓存命中率的整体趋势评估缓存的参数设置合理性。

期待此篇文章能够给业界缓存使用和监控带来一些新思路。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/987170
推荐阅读
相关标签
  

闽ICP备14008679号