当前位置:   article > 正文

tritonserver学习之七:cache管理器

tritonserver学习之七:cache管理器

tritonserver学习之一:triton使用流程

tritonserver学习之二:tritonserver编译 

tritonserver学习之三:tritonserver运行流程

tritonserver学习之四:命令行解析

tritonserver学习之五:backend实现机制

tritonserver学习之六:自定义c++、python custom backend实践

tritonserver学习之八:redis_caches实践

tritonserver学习之九:tritonserver grpc异步模式

1、概念

       triton中cache的设计,主要是用来提升推理性能,实现机制为缓存不同的推理请求,当相同的请求到达triton,则可以直接从缓存中获取结果,而不再经过推理,这样不仅降低了整个过程的耗时,同时也节省了算力。

       缓存中key的生成,是根据模型名称、版本、输入tensor名称及模型输入进行hash而成,唯一的标识了一个推理请求。

       triton支持的cache有两种,一种为内存(local cache),另外一种为redis(redis cache),在启动triton时,通过不同的命令行参数进行设置。

2、cache实现机制

开启cache命令行:

  1. tritonserver --model-repository=/models --log-verbose=1 --cache-config=local,size=1048576 --log-file=1.txt
  2. tritonserver --model-repository=/models --log-verbose=1 --cache-config=redis,host=172.17.0.1 --cache-config redis,port=6379 --cache-config redis,password=“xxx”

triton中,cache的技术方案,和backend类似,在triton与cache之间约定了四个api:

  1. TRITONSERVER_Error*
  2. TRITONCACHE_CacheInitialize(TRITONCACHE_Cache** cache, const char* cache_config)
  3. TRITONSERVER_Error*
  4. TRITONCACHE_CacheFinalize(TRITONCACHE_Cache* cache)
  5. TRITONSERVER_Error*
  6. TRITONCACHE_CacheLookup(
  7. TRITONCACHE_Cache* cache, const char* key, TRITONCACHE_CacheEntry* entry,
  8. TRITONCACHE_Allocator* allocator)
  9. TRITONSERVER_Error*
  10. TRITONCACHE_CacheInsert(
  11. TRITONCACHE_Cache* cache, const char* key, TRITONCACHE_CacheEntry* entry,
  12. TRITONCACHE_Allocator* allocator)

 这四个api分别为初始化、析构、查找缓存、插入缓存,每种cache都需要实现这四个api,triton在enable缓存后,会相应的调用这四个api,这种机制的好处是解耦triton与cache的设计,两者之间通过上述4个标准的api进行交互。

3、redis cache

github地址:GitHub - triton-inference-server/redis_cache: TRITONCACHE implementation of a Redis cache

3.1 TRITONCACHE_CacheInitialize

该函数位于:redis_cache/src/cache_api.cc,该函数实现主要功能:根据输入的配置,创建redis_cache类对象。

  1. TRITONSERVER_Error*
  2. TRITONCACHE_CacheInitialize(TRITONCACHE_Cache** cache, const char* cache_config)
  3. {
  4. if (cache == nullptr) {
  5. return TRITONSERVER_ErrorNew(
  6. TRITONSERVER_ERROR_INVALID_ARG, "cache was nullptr");
  7. }
  8. if (cache_config == nullptr) {
  9. return TRITONSERVER_ErrorNew(
  10. TRITONSERVER_ERROR_INVALID_ARG, "cache config was nullptr");
  11. }
  12. std::unique_ptr<RedisCache> rcache;
  13. RETURN_IF_ERROR(RedisCache::Create(cache_config, &rcache));
  14. *cache = reinterpret_cast<TRITONCACHE_Cache*>(rcache.release());
  15. return nullptr; // success
  16. }

其中,Redis::Create函数创建RedisCache对象,为静态函数,这种写法为triton的常用写法,类中设计静态函数:Create,创建类对象,这样代码更易读,是一个很好的设计,该函数如下:

  1. TRITONSERVER_Error*
  2. RedisCache::Create(
  3. const std::string& cache_config, std::unique_ptr<RedisCache>* cache)
  4. {
  5. rapidjson::Document document;
  6. document.Parse(cache_config.c_str());
  7. if (!document.HasMember("host") || !document.HasMember("port")) {
  8. return TRITONSERVER_ErrorNew(
  9. TRITONSERVER_ERROR_INVALID_ARG,
  10. "Failed to initialize RedisCache, didn't specify address. Must at a "
  11. "minimum specify 'host' and 'port' in the configuration - e.g. "
  12. "tritonserver --cache-config redis,host=redis --cache-config "
  13. "redis,port=6379 --model-repository=/models ...");
  14. }
  15. sw::redis::ConnectionOptions options;
  16. sw::redis::ConnectionPoolOptions poolOptions;
  17. // try pulling user/password from environment fist
  18. // override if present in the config
  19. setOptionFromEnv(USERNAME_ENV_VAR_NAME, options.user);
  20. setOptionFromEnv(PASSWORD_ENV_VAR_NAME, options.password);
  21. setOption("host", options.host, document);
  22. setOption("port", options.port, document);
  23. setOption("user", options.user, document);
  24. setOption("password", options.password, document);
  25. setOption("db", options.db, document);
  26. setOption("connect_timeout", options.connect_timeout, document);
  27. setOption("socket_timeout", options.socket_timeout, document);
  28. setOption("pool_size", poolOptions.size, document);
  29. setOption("wait_timeout", poolOptions.wait_timeout, document);
  30. if (!document.HasMember("wait_timeout")) {
  31. poolOptions.wait_timeout = std::chrono::milliseconds(1000);
  32. }
  33. // tls options
  34. if (document.HasMember("tls_enabled")) {
  35. options.tls.enabled =
  36. strcmp(document["tls_enabled"].GetString(), "true") == 0;
  37. setOption("cert", options.tls.cert, document);
  38. setOption("key", options.tls.key, document);
  39. setOption("cacert", options.tls.cacert, document);
  40. setOption("cacert_dir", options.tls.cacertdir, document);
  41. setOption("sni", options.tls.sni, document);
  42. }
  43. try {
  44. cache->reset(new RedisCache(options, poolOptions));
  45. }
  46. catch (const std::exception& ex) {
  47. return TRITONSERVER_ErrorNew(
  48. TRITONSERVER_ERROR_INTERNAL,
  49. ("Failed to initialize RedisCache: " + std::string(ex.what())).c_str());
  50. }
  51. return nullptr; // success
  52. }

这个函数最核心的功能只有几行代码:

  1. try {
  2. cache->reset(new RedisCache(options, poolOptions));
  3. }
  4. catch (const std::exception& ex) {
  5. return TRITONSERVER_ErrorNew(
  6. TRITONSERVER_ERROR_INTERNAL,
  7. ("Failed to initialize RedisCache: " + std::string(ex.what())).c_str());
  8. }

解析输入的配置json串,创建RedisCache对象,该类的构造函数非常简单,使用redis++库,创建redis连接客户端,并调用成员函数:ping(),确认redis是否连接成功。

核心函数:

  1. std::unique_ptr<sw::redis::Redis>
  2. init_client(
  3. const sw::redis::ConnectionOptions& connectionOptions,
  4. sw::redis::ConnectionPoolOptions poolOptions)
  5. {
  6. std::unique_ptr<sw::redis::Redis> redis =
  7. std::make_unique<sw::redis::Redis>(connectionOptions, poolOptions);
  8. const auto msg = "Triton RedisCache client connected";
  9. if (redis->ping(msg) != msg) {
  10. throw std::runtime_error("Failed to ping Redis server.");
  11. }
  12. LOG_VERBOSE(1) << "Successfully connected to Redis";
  13. return redis;
  14. }

3.2 TRITONCACHE_CacheFinalize

该函数类似类的析构函数,在退出时调用,删除相关的资源。

  1. TRITONSERVER_Error*
  2. TRITONCACHE_CacheFinalize(TRITONCACHE_Cache* cache)
  3. {
  4. if (cache == nullptr) {
  5. return TRITONSERVER_ErrorNew(
  6. TRITONSERVER_ERROR_INVALID_ARG, "cache was nullptr");
  7. }
  8. delete reinterpret_cast<RedisCache*>(cache);
  9. return nullptr; // success
  10. }

3.3 TRITONCACHE_CacheInsert

这个函数是实现cache的核心函数,将要缓存的内容保存到redis中,函数原型:

  1. TRITONSERVER_Error*
  2. TRITONCACHE_CacheInsert(
  3. TRITONCACHE_Cache* cache, const char* key, TRITONCACHE_CacheEntry* entry,
  4. TRITONCACHE_Allocator* allocator)
  5. {
  6. RETURN_IF_ERROR(CheckArgs(cache, key, entry, allocator));
  7. const auto redis_cache = reinterpret_cast<RedisCache*>(cache);
  8. CacheEntry redis_entry;
  9. size_t numBuffers = 0;
  10. RETURN_IF_ERROR(TRITONCACHE_CacheEntryBufferCount(entry, &numBuffers));
  11. std::vector<std::shared_ptr<char[]>> managedBuffers;
  12. for (size_t i = 0; i < numBuffers; i++) {
  13. TRITONSERVER_BufferAttributes* attrs = nullptr;
  14. RETURN_IF_ERROR(TRITONSERVER_BufferAttributesNew(&attrs));
  15. std::shared_ptr<TRITONSERVER_BufferAttributes> managed_attrs(
  16. attrs, TRITONSERVER_BufferAttributesDelete);
  17. void* base = nullptr;
  18. size_t byteSize = 0;
  19. int64_t memoryTypeId;
  20. TRITONSERVER_MemoryType memoryType;
  21. RETURN_IF_ERROR(TRITONCACHE_CacheEntryGetBuffer(entry, i, &base, attrs));
  22. RETURN_IF_ERROR(TRITONSERVER_BufferAttributesByteSize(attrs, &byteSize));
  23. RETURN_IF_ERROR(
  24. TRITONSERVER_BufferAttributesMemoryType(attrs, &memoryType));
  25. RETURN_IF_ERROR(
  26. TRITONSERVER_BufferAttributesMemoryTypeId(attrs, &memoryTypeId));
  27. if (!byteSize) {
  28. return TRITONSERVER_ErrorNew(
  29. TRITONSERVER_ERROR_INTERNAL, "Buffer size was zero");
  30. }
  31. // DLIS-2673: Add better memory_type support - SL - keeping this in place,
  32. // presumably we're going to have to pull out the other bits that are
  33. // important some day.
  34. if (memoryType != TRITONSERVER_MEMORY_CPU &&
  35. memoryType != TRITONSERVER_MEMORY_CPU_PINNED) {
  36. return TRITONSERVER_ErrorNew(
  37. TRITONSERVER_ERROR_INVALID_ARG,
  38. "Only input buffers in CPU memory are allowed in cache currently");
  39. }
  40. std::shared_ptr<char[]> managedBuffer(new char[byteSize]);
  41. // Overwrite entry buffer with cache-allocated buffer.
  42. // No need to set new buffer attrs for now, will reuse the one we got above.
  43. TRITONCACHE_CacheEntrySetBuffer(
  44. entry, i, static_cast<void*>(managedBuffer.get()), nullptr /* attrs */);
  45. managedBuffers.push_back(managedBuffer);
  46. redis_entry.items.insert(std::make_pair(
  47. getFieldName(i, fieldType::bufferSize), std::to_string(byteSize)));
  48. redis_entry.items.insert(std::make_pair(
  49. getFieldName(i, fieldType::memoryType), std::to_string(memoryType)));
  50. redis_entry.items.insert(std::make_pair(
  51. getFieldName(i, fieldType::memoryTypeId),
  52. std::to_string(memoryTypeId)));
  53. }
  54. // Callback to copy directly from Triton buffers to RedisCache managedBuffers
  55. TRITONCACHE_Copy(allocator, entry);
  56. for (size_t i = 0; i < numBuffers; i++) {
  57. auto bytesToCopy =
  58. std::stoi(redis_entry.items.at(getFieldName(i, fieldType::bufferSize)));
  59. redis_entry.items.insert(std::make_pair(
  60. getFieldName(i, fieldType::buffer),
  61. std::string(managedBuffers.at(i).get(), bytesToCopy)));
  62. }
  63. // sanity check to make sure we are inserting items into the cache that are
  64. // comprised of the right number of fields to allow us to marshal
  65. // the buffer back from Redis later on.
  66. if (redis_entry.items.size() % FIELDS_PER_BUFFER != 0) {
  67. return TRITONSERVER_ErrorNew(
  68. TRITONSERVER_ERROR_INVALID_ARG,
  69. "Attempted to add incomplete entry to cache");
  70. }
  71. RETURN_IF_ERROR(redis_cache->Insert(key, redis_entry));
  72. return nullptr; // success
  73. }

在这个函数中,需要注意entry这个参数,其实际类型为:class CacheEntry对象(定义位于cache_entry.cc),是triton对缓存内容的组织形式,这个概念是区别于cache的内容管理的,可以简单的这样理解,当tritonserver拿到要缓存的内容后,需要将内容进行统一的管理,最后的结果就是一个CacheEntry,而TRITONCACHE_CacheInsert函数的功能就是解析CacheEntry,将要缓存的内容解析到redis cache中的CacheEntry中,在redis cache中,CacheEntry的定义如下:

  1. struct CacheEntry {
  2. size_t numBuffers = 0;
  3. std::unordered_map<std::string, std::string> items;
  4. };

该函数的主流程如下:

中间还有一步:

TRITONCACHE_Copy(allocator, entry);

将缓存内容,从triton的缓存中拷贝到cache_manager类定义的缓存中。

3.4 TRITONCACHE_CacheLookup

该函数主要实现从redis hash表中查找并读取数据,同时将缓存数据拷贝到triton的cache_entry以及cache_manager的allocator中。

4、redis_cache实现

redis_cache的实现相对local_cache来说比较简单,对内容的缓存,使用了redis中hash表数据结构,这部分代码主要三个模块:初始化、insert、lookup。

4.1 redis初始化

对redis的操作使用了redis++库,初始化部分,主要实现在redis_cache类的构造函数中:

  1. std::unique_ptr<sw::redis::Redis>
  2. init_client(
  3. const sw::redis::ConnectionOptions& connectionOptions,
  4. sw::redis::ConnectionPoolOptions poolOptions)
  5. {
  6. std::unique_ptr<sw::redis::Redis> redis =
  7. std::make_unique<sw::redis::Redis>(connectionOptions, poolOptions);
  8. const auto msg = "Triton RedisCache client connected";
  9. if (redis->ping(msg) != msg) {
  10. throw std::runtime_error("Failed to ping Redis server.");
  11. }
  12. LOG_VERBOSE(1) << "Successfully connected to Redis";
  13. return redis;
  14. }

还是老样子,triton中每个类,都会设计一个静态的create函数,用于创建本类的对象,reids_cache也一样,create函数完成对象的创建并赋值给入参【cache】,同时建立与redis的链接。

  1. static TRITONSERVER_Error* Create(
  2. const std::string& cache_config, std::unique_ptr<RedisCache>* cache);

 4.2 insert

insert的核心即为将缓存内容插入到redis的hash表中,代码最重要的也就一行:

  1. TRITONSERVER_Error*
  2. RedisCache::Insert(const std::string& key, CacheEntry& entry)
  3. {
  4. try {
  5. _client->hmset(key, entry.items.begin(), entry.items.end());
  6. }
  7. catch (const sw::redis::TimeoutError& e) {
  8. return handleError("Timeout inserting key: ", key, e.what());
  9. }
  10. catch (const sw::redis::IoError& e) {
  11. return handleError("Failed to insert key: ", key, e.what());
  12. }
  13. catch (const std::exception& e) {
  14. return handleError("Failed to insert key: ", key, e.what());
  15. }
  16. catch (...) {
  17. return handleError("Failed to insert key: ", key, "Unknown error.");
  18. }
  19. return nullptr; // success
  20. }

_client->hmset(key, entry.items.begin(), entry.items.end());这行代码的含义是,将entry结构中items这个map中的内容全部插入到hash表中。

4.3 lookup

lookup的功能可以简单的理解为redis的命令hgetall,通过该命令将redis hash表中某个key的所有内容放入entry结构的items字段中。

  1. std::pair<TRITONSERVER_Error*, CacheEntry>
  2. RedisCache::Lookup(const std::string& key)
  3. {
  4. // CacheEntry结构体,成员map+int
  5. CacheEntry entry;
  6. try {
  7. // 获取 hash 表的所有字段和值
  8. this->_client->hgetall(
  9. key, std::inserter(entry.items, entry.items.begin()));
  10. // determine the number of buffers by dividing the size by the number of
  11. // fields per buffer
  12. entry.numBuffers = entry.items.size() / FIELDS_PER_BUFFER;
  13. return {nullptr, entry};
  14. }
  15. catch (const sw::redis::TimeoutError& e) {
  16. return {handleError("Timeout retrieving key: ", key, e.what()), {}};
  17. }
  18. catch (const sw::redis::IoError& e) {
  19. return {handleError("Failed to retrieve key: ", key, e.what()), {}};
  20. }
  21. catch (const std::exception& e) {
  22. return {handleError("Failed to retrieve key: ", key, e.what()), {}};
  23. }
  24. catch (...) {
  25. return {handleError("Failed to retrieve key: ", key, "Unknown error."), {}};
  26. }
  27. }

5、问题求助

在使用triton的过程中,我尝试使用一下cache,但是一直没有看到推理的结果缓存到redis中,不知道是什么原因,我在两个地方使能了cache功能:

第一个,启动triton时增加使能cache功能:

tritonserver --model-repository=/models --log-verbose=1 --cache-config=redis,host=172.17.0.1 --cache-config redis,port=6379 --cache-config redis,password="xxxx" --log-file=1.txt

第二,在模型配置文件中,使能response cache:

  1. response_cache {
  2. enable: true
  3. }

之后通过client请求模型进行推理,但是在redis中一直看不到缓存,至今没有找到原因,如果有同学使用过这个功能,欢迎留言指教,非常感谢

也欢迎大家关注公众号交流:

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/351812
推荐阅读
相关标签
  

闽ICP备14008679号