赞
踩
tritonserver学习之二:tritonserver编译
tritonserver学习之三:tritonserver运行流程
tritonserver学习之六:自定义c++、python custom backend实践
tritonserver学习之八:redis_caches实践
tritonserver学习之九:tritonserver grpc异步模式
triton中cache的设计,主要是用来提升推理性能,实现机制为缓存不同的推理请求,当相同的请求到达triton,则可以直接从缓存中获取结果,而不再经过推理,这样不仅降低了整个过程的耗时,同时也节省了算力。
缓存中key的生成,是根据模型名称、版本、输入tensor名称及模型输入进行hash而成,唯一的标识了一个推理请求。
triton支持的cache有两种,一种为内存(local cache),另外一种为redis(redis cache),在启动triton时,通过不同的命令行参数进行设置。
开启cache命令行:
- tritonserver --model-repository=/models --log-verbose=1 --cache-config=local,size=1048576 --log-file=1.txt
-
- tritonserver --model-repository=/models --log-verbose=1 --cache-config=redis,host=172.17.0.1 --cache-config redis,port=6379 --cache-config redis,password=“xxx”
triton中,cache的技术方案,和backend类似,在triton与cache之间约定了四个api:
- TRITONSERVER_Error*
- TRITONCACHE_CacheInitialize(TRITONCACHE_Cache** cache, const char* cache_config)
-
- TRITONSERVER_Error*
- TRITONCACHE_CacheFinalize(TRITONCACHE_Cache* cache)
-
- TRITONSERVER_Error*
- TRITONCACHE_CacheLookup(
- TRITONCACHE_Cache* cache, const char* key, TRITONCACHE_CacheEntry* entry,
- TRITONCACHE_Allocator* allocator)
-
- TRITONSERVER_Error*
- TRITONCACHE_CacheInsert(
- TRITONCACHE_Cache* cache, const char* key, TRITONCACHE_CacheEntry* entry,
- TRITONCACHE_Allocator* allocator)
这四个api分别为初始化、析构、查找缓存、插入缓存,每种cache都需要实现这四个api,triton在enable缓存后,会相应的调用这四个api,这种机制的好处是解耦triton与cache的设计,两者之间通过上述4个标准的api进行交互。
github地址:GitHub - triton-inference-server/redis_cache: TRITONCACHE implementation of a Redis cache
该函数位于:redis_cache/src/cache_api.cc,该函数实现主要功能:根据输入的配置,创建redis_cache类对象。
- TRITONSERVER_Error*
- TRITONCACHE_CacheInitialize(TRITONCACHE_Cache** cache, const char* cache_config)
- {
- if (cache == nullptr) {
- return TRITONSERVER_ErrorNew(
- TRITONSERVER_ERROR_INVALID_ARG, "cache was nullptr");
- }
- if (cache_config == nullptr) {
- return TRITONSERVER_ErrorNew(
- TRITONSERVER_ERROR_INVALID_ARG, "cache config was nullptr");
- }
-
- std::unique_ptr<RedisCache> rcache;
- RETURN_IF_ERROR(RedisCache::Create(cache_config, &rcache));
- *cache = reinterpret_cast<TRITONCACHE_Cache*>(rcache.release());
- return nullptr; // success
- }

其中,Redis::Create函数创建RedisCache对象,为静态函数,这种写法为triton的常用写法,类中设计静态函数:Create,创建类对象,这样代码更易读,是一个很好的设计,该函数如下:
- TRITONSERVER_Error*
- RedisCache::Create(
- const std::string& cache_config, std::unique_ptr<RedisCache>* cache)
- {
- rapidjson::Document document;
-
- document.Parse(cache_config.c_str());
- if (!document.HasMember("host") || !document.HasMember("port")) {
- return TRITONSERVER_ErrorNew(
- TRITONSERVER_ERROR_INVALID_ARG,
- "Failed to initialize RedisCache, didn't specify address. Must at a "
- "minimum specify 'host' and 'port' in the configuration - e.g. "
- "tritonserver --cache-config redis,host=redis --cache-config "
- "redis,port=6379 --model-repository=/models ...");
- }
-
- sw::redis::ConnectionOptions options;
- sw::redis::ConnectionPoolOptions poolOptions;
-
- // try pulling user/password from environment fist
- // override if present in the config
- setOptionFromEnv(USERNAME_ENV_VAR_NAME, options.user);
- setOptionFromEnv(PASSWORD_ENV_VAR_NAME, options.password);
-
- setOption("host", options.host, document);
- setOption("port", options.port, document);
- setOption("user", options.user, document);
- setOption("password", options.password, document);
- setOption("db", options.db, document);
- setOption("connect_timeout", options.connect_timeout, document);
- setOption("socket_timeout", options.socket_timeout, document);
- setOption("pool_size", poolOptions.size, document);
- setOption("wait_timeout", poolOptions.wait_timeout, document);
- if (!document.HasMember("wait_timeout")) {
- poolOptions.wait_timeout = std::chrono::milliseconds(1000);
- }
-
- // tls options
- if (document.HasMember("tls_enabled")) {
- options.tls.enabled =
- strcmp(document["tls_enabled"].GetString(), "true") == 0;
- setOption("cert", options.tls.cert, document);
- setOption("key", options.tls.key, document);
- setOption("cacert", options.tls.cacert, document);
- setOption("cacert_dir", options.tls.cacertdir, document);
- setOption("sni", options.tls.sni, document);
- }
-
- try {
- cache->reset(new RedisCache(options, poolOptions));
- }
- catch (const std::exception& ex) {
- return TRITONSERVER_ErrorNew(
- TRITONSERVER_ERROR_INTERNAL,
- ("Failed to initialize RedisCache: " + std::string(ex.what())).c_str());
- }
- return nullptr; // success
- }

这个函数最核心的功能只有几行代码:
- try {
- cache->reset(new RedisCache(options, poolOptions));
- }
- catch (const std::exception& ex) {
- return TRITONSERVER_ErrorNew(
- TRITONSERVER_ERROR_INTERNAL,
- ("Failed to initialize RedisCache: " + std::string(ex.what())).c_str());
- }
解析输入的配置json串,创建RedisCache对象,该类的构造函数非常简单,使用redis++库,创建redis连接客户端,并调用成员函数:ping(),确认redis是否连接成功。
核心函数:
- std::unique_ptr<sw::redis::Redis>
- init_client(
- const sw::redis::ConnectionOptions& connectionOptions,
- sw::redis::ConnectionPoolOptions poolOptions)
- {
- std::unique_ptr<sw::redis::Redis> redis =
- std::make_unique<sw::redis::Redis>(connectionOptions, poolOptions);
- const auto msg = "Triton RedisCache client connected";
- if (redis->ping(msg) != msg) {
- throw std::runtime_error("Failed to ping Redis server.");
- }
-
- LOG_VERBOSE(1) << "Successfully connected to Redis";
- return redis;
- }
该函数类似类的析构函数,在退出时调用,删除相关的资源。
- TRITONSERVER_Error*
- TRITONCACHE_CacheFinalize(TRITONCACHE_Cache* cache)
- {
- if (cache == nullptr) {
- return TRITONSERVER_ErrorNew(
- TRITONSERVER_ERROR_INVALID_ARG, "cache was nullptr");
- }
-
- delete reinterpret_cast<RedisCache*>(cache);
- return nullptr; // success
- }
这个函数是实现cache的核心函数,将要缓存的内容保存到redis中,函数原型:
- TRITONSERVER_Error*
- TRITONCACHE_CacheInsert(
- TRITONCACHE_Cache* cache, const char* key, TRITONCACHE_CacheEntry* entry,
- TRITONCACHE_Allocator* allocator)
- {
- RETURN_IF_ERROR(CheckArgs(cache, key, entry, allocator));
- const auto redis_cache = reinterpret_cast<RedisCache*>(cache);
- CacheEntry redis_entry;
- size_t numBuffers = 0;
- RETURN_IF_ERROR(TRITONCACHE_CacheEntryBufferCount(entry, &numBuffers));
- std::vector<std::shared_ptr<char[]>> managedBuffers;
- for (size_t i = 0; i < numBuffers; i++) {
- TRITONSERVER_BufferAttributes* attrs = nullptr;
- RETURN_IF_ERROR(TRITONSERVER_BufferAttributesNew(&attrs));
- std::shared_ptr<TRITONSERVER_BufferAttributes> managed_attrs(
- attrs, TRITONSERVER_BufferAttributesDelete);
- void* base = nullptr;
- size_t byteSize = 0;
- int64_t memoryTypeId;
- TRITONSERVER_MemoryType memoryType;
- RETURN_IF_ERROR(TRITONCACHE_CacheEntryGetBuffer(entry, i, &base, attrs));
- RETURN_IF_ERROR(TRITONSERVER_BufferAttributesByteSize(attrs, &byteSize));
- RETURN_IF_ERROR(
- TRITONSERVER_BufferAttributesMemoryType(attrs, &memoryType));
- RETURN_IF_ERROR(
- TRITONSERVER_BufferAttributesMemoryTypeId(attrs, &memoryTypeId));
-
- if (!byteSize) {
- return TRITONSERVER_ErrorNew(
- TRITONSERVER_ERROR_INTERNAL, "Buffer size was zero");
- }
- // DLIS-2673: Add better memory_type support - SL - keeping this in place,
- // presumably we're going to have to pull out the other bits that are
- // important some day.
- if (memoryType != TRITONSERVER_MEMORY_CPU &&
- memoryType != TRITONSERVER_MEMORY_CPU_PINNED) {
- return TRITONSERVER_ErrorNew(
- TRITONSERVER_ERROR_INVALID_ARG,
- "Only input buffers in CPU memory are allowed in cache currently");
- }
-
- std::shared_ptr<char[]> managedBuffer(new char[byteSize]);
-
- // Overwrite entry buffer with cache-allocated buffer.
- // No need to set new buffer attrs for now, will reuse the one we got above.
- TRITONCACHE_CacheEntrySetBuffer(
- entry, i, static_cast<void*>(managedBuffer.get()), nullptr /* attrs */);
-
- managedBuffers.push_back(managedBuffer);
- redis_entry.items.insert(std::make_pair(
- getFieldName(i, fieldType::bufferSize), std::to_string(byteSize)));
- redis_entry.items.insert(std::make_pair(
- getFieldName(i, fieldType::memoryType), std::to_string(memoryType)));
- redis_entry.items.insert(std::make_pair(
- getFieldName(i, fieldType::memoryTypeId),
- std::to_string(memoryTypeId)));
- }
-
- // Callback to copy directly from Triton buffers to RedisCache managedBuffers
- TRITONCACHE_Copy(allocator, entry);
- for (size_t i = 0; i < numBuffers; i++) {
- auto bytesToCopy =
- std::stoi(redis_entry.items.at(getFieldName(i, fieldType::bufferSize)));
- redis_entry.items.insert(std::make_pair(
- getFieldName(i, fieldType::buffer),
- std::string(managedBuffers.at(i).get(), bytesToCopy)));
- }
-
- // sanity check to make sure we are inserting items into the cache that are
- // comprised of the right number of fields to allow us to marshal
- // the buffer back from Redis later on.
- if (redis_entry.items.size() % FIELDS_PER_BUFFER != 0) {
- return TRITONSERVER_ErrorNew(
- TRITONSERVER_ERROR_INVALID_ARG,
- "Attempted to add incomplete entry to cache");
- }
-
- RETURN_IF_ERROR(redis_cache->Insert(key, redis_entry));
- return nullptr; // success
- }

在这个函数中,需要注意entry这个参数,其实际类型为:class CacheEntry对象(定义位于cache_entry.cc),是triton对缓存内容的组织形式,这个概念是区别于cache的内容管理的,可以简单的这样理解,当tritonserver拿到要缓存的内容后,需要将内容进行统一的管理,最后的结果就是一个CacheEntry,而TRITONCACHE_CacheInsert函数的功能就是解析CacheEntry,将要缓存的内容解析到redis cache中的CacheEntry中,在redis cache中,CacheEntry的定义如下:
- struct CacheEntry {
- size_t numBuffers = 0;
- std::unordered_map<std::string, std::string> items;
- };
该函数的主流程如下:
中间还有一步:
TRITONCACHE_Copy(allocator, entry);
将缓存内容,从triton的缓存中拷贝到cache_manager类定义的缓存中。
该函数主要实现从redis hash表中查找并读取数据,同时将缓存数据拷贝到triton的cache_entry以及cache_manager的allocator中。
redis_cache的实现相对local_cache来说比较简单,对内容的缓存,使用了redis中hash表数据结构,这部分代码主要三个模块:初始化、insert、lookup。
对redis的操作使用了redis++库,初始化部分,主要实现在redis_cache类的构造函数中:
- std::unique_ptr<sw::redis::Redis>
- init_client(
- const sw::redis::ConnectionOptions& connectionOptions,
- sw::redis::ConnectionPoolOptions poolOptions)
- {
- std::unique_ptr<sw::redis::Redis> redis =
- std::make_unique<sw::redis::Redis>(connectionOptions, poolOptions);
- const auto msg = "Triton RedisCache client connected";
- if (redis->ping(msg) != msg) {
- throw std::runtime_error("Failed to ping Redis server.");
- }
-
- LOG_VERBOSE(1) << "Successfully connected to Redis";
- return redis;
- }
还是老样子,triton中每个类,都会设计一个静态的create函数,用于创建本类的对象,reids_cache也一样,create函数完成对象的创建并赋值给入参【cache】,同时建立与redis的链接。
- static TRITONSERVER_Error* Create(
- const std::string& cache_config, std::unique_ptr<RedisCache>* cache);
insert的核心即为将缓存内容插入到redis的hash表中,代码最重要的也就一行:
- TRITONSERVER_Error*
- RedisCache::Insert(const std::string& key, CacheEntry& entry)
- {
- try {
- _client->hmset(key, entry.items.begin(), entry.items.end());
- }
- catch (const sw::redis::TimeoutError& e) {
- return handleError("Timeout inserting key: ", key, e.what());
- }
- catch (const sw::redis::IoError& e) {
- return handleError("Failed to insert key: ", key, e.what());
- }
- catch (const std::exception& e) {
- return handleError("Failed to insert key: ", key, e.what());
- }
- catch (...) {
- return handleError("Failed to insert key: ", key, "Unknown error.");
- }
-
- return nullptr; // success
- }

_client->hmset(key, entry.items.begin(), entry.items.end());这行代码的含义是,将entry结构中items这个map中的内容全部插入到hash表中。
lookup的功能可以简单的理解为redis的命令hgetall,通过该命令将redis hash表中某个key的所有内容放入entry结构的items字段中。
- std::pair<TRITONSERVER_Error*, CacheEntry>
- RedisCache::Lookup(const std::string& key)
- {
- // CacheEntry结构体,成员map+int
- CacheEntry entry;
-
- try {
- // 获取 hash 表的所有字段和值
- this->_client->hgetall(
- key, std::inserter(entry.items, entry.items.begin()));
-
- // determine the number of buffers by dividing the size by the number of
- // fields per buffer
- entry.numBuffers = entry.items.size() / FIELDS_PER_BUFFER;
- return {nullptr, entry};
- }
- catch (const sw::redis::TimeoutError& e) {
- return {handleError("Timeout retrieving key: ", key, e.what()), {}};
- }
- catch (const sw::redis::IoError& e) {
- return {handleError("Failed to retrieve key: ", key, e.what()), {}};
- }
- catch (const std::exception& e) {
- return {handleError("Failed to retrieve key: ", key, e.what()), {}};
- }
- catch (...) {
- return {handleError("Failed to retrieve key: ", key, "Unknown error."), {}};
- }
- }

在使用triton的过程中,我尝试使用一下cache,但是一直没有看到推理的结果缓存到redis中,不知道是什么原因,我在两个地方使能了cache功能:
第一个,启动triton时增加使能cache功能:
tritonserver --model-repository=/models --log-verbose=1 --cache-config=redis,host=172.17.0.1 --cache-config redis,port=6379 --cache-config redis,password="xxxx" --log-file=1.txt
第二,在模型配置文件中,使能response cache:
- response_cache {
- enable: true
- }
之后通过client请求模型进行推理,但是在redis中一直看不到缓存,至今没有找到原因,如果有同学使用过这个功能,欢迎留言指教,非常感谢。
也欢迎大家关注公众号交流:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。