tritonserver学习之一:triton使用流程
tritonserver学习之二:tritonserver编译
tritonserver学习之三:tritonserver运行流程
tritonserver学习之四:命令行解析
tritonserver学习之五:backend实现机制
tritonserver学习之六:自定义c++、python custom backend实践
1、概念
triton中cache的设计,主要是用来提升推理性能,实现机制为缓存不同的推理请求,当相同的请求到达triton,则可以直接从缓存中获取结果,而不再经过推理,这样不仅降低了整个过程的耗时,同时也节省了算力。
缓存中key的生成,是根据模型名称、版本、输入tensor名称及模型输入进行hash而成,唯一的标识了一个推理请求。
triton支持的cache有两种,一种为内存(local cache),另外一种为redis(redis cache),在启动triton时,通过不同的命令行参数进行设置。
2、cache实现机制
开启cache命令行:
tritonserver --model-repository=/models --log-verbose=1 --cache-config=local,size=1048576 --log-file=1.txt
tritonserver --model-repository=/models --log-verbose=1 --cache-config=redis,host=172.17.0.1 --cache-config redis,port=6379 --cache-config redis,password=“xxx”
triton中,cache的技术方案,和backend类似,在triton与cache之间约定了四个api:
TRITONSERVER_Error*
TRITONCACHE_CacheInitialize(TRITONCACHE_Cache** cache, const char* cache_config)
TRITONSERVER_Error*
TRITONCACHE_CacheFinalize(TRITONCACHE_Cache* cache)
TRITONSERVER_Error*
TRITONCACHE_CacheLookup(
TRITONCACHE_Cache* cache, const char* key, TRITONCACHE_CacheEntry* entry,
TRITONCACHE_Allocator* allocator)
TRITONSERVER_Error*
TRITONCACHE_CacheInsert(
TRITONCACHE_Cache* cache, const char* key, TRITONCACHE_CacheEntry* entry,
TRITONCACHE_Allocator* allocator)
这四个api分别为初始化、析构、查找缓存、插入缓存,每种cache都需要实现这四个api,triton在enable缓存后,会相应的调用这四个api,这种机制的好处是解耦triton与cache的设计,两者之间通过上述4个标准的api进行交互。
3、redis cache
github地址:GitHub - triton-inference-server/redis_cache: TRITONCACHE implementation of a Redis cache
3.1 TRITONCACHE_CacheInitialize
该函数位于:redis_cache/src/cache_api.cc,该函数实现主要功能:根据输入的配置,创建redis_cache类对象。
TRITONSERVER_Error*
TRITONCACHE_CacheInitialize(TRITONCACHE_Cache** cache, const char* cache_config)
{
if (cache == nullptr) {
return TRITONSERVER_ErrorNew(
TRITONSERVER_ERROR_INVALID_ARG, "cache was nullptr");
}
if (cache_config == nullptr) {
return TRITONSERVER_ErrorNew(
TRITONSERVER_ERROR_INVALID_ARG, "cache config was nullptr");
}
std::unique_ptr<RedisCache> rcache;
RETURN_IF_ERROR(RedisCache::Create(cache_config, &rcache));
*cache = reinterpret_cast<TRITONCACHE_Cache*>(rcache.release());
return nullptr; // success
}
其中,Redis::Create函数创建RedisCache对象,为静态函数,这种写法为triton的常用写法,类中设计静态函数:Create,创建类对象,这样代码更易读,是一个很好的设计,该函数如下:
TRITONSERVER_Error*
RedisCache::Create(
const std::string& cache_config, std::unique_ptr<RedisCache>* cache)
{
rapidjson::Document document;
document.Parse(cache_config.c_str());
if (!document.HasMember("host") || !document.HasMember("port")) {
return TRITONSERVER_ErrorNew(
TRITONSERVER_ERROR_INVALID_ARG,
"Failed to initialize RedisCache, didn't specify address. Must at a "
"minimum specify 'host' and 'port' in the configuration - e.g. "
"tritonserver --cache-config redis,host=redis --cache-config "
"redis,port=6379 --model-repository=/models ...");
}
sw::redis::ConnectionOptions options;
sw::redis::ConnectionPoolOptions poolOptions;
// try pulling user/password from environment fist
// override if present in the config
setOptionFromEnv(USERNAME_ENV_VAR_NAME, options.user);
setOptionFromEnv(PASSWORD_ENV_VAR_NAME, options.password);
setOption("host", options.host, document);
setOption("port", options.port, document);
setOption("user", options.user, document);
setOption("password", options.password, document);
setOption("db", options.db, document);
setOption("connect_timeout", options.connect_timeout, document);
setOption("socket_timeout", options.socket_timeout, document);
setOption("pool_size", poolOptions.size, document);
setOption("wait_timeout", poolOptions.wait_timeout, document);
if (!document.HasMember("wait_timeout")) {
poolOptions.wait_timeout = std::chrono::milliseconds(1000);
}
// tls options
if (document.HasMember("tls_enabled")) {
options.tls.enabled =
strcmp(document["tls_enabled"].GetString(), "true") == 0;
setOption("cert", options.tls.cert, document);
setOption("key", options.tls.key, document);
setOption("cacert", options.tls.cacert, document);
setOption("cacert_dir", options.tls.cacertdir, document);
setOption("sni", options.tls.sni, document);
}
try {
cache->reset(new RedisCache(options, poolOptions));
}
catch (const std::exception& ex) {
return TRITONSERVER_ErrorNew(
TRITONSERVER_ERROR_INTERNAL,
("Failed to initialize RedisCache: " + std::string(ex.what())).c_str());
}
return nullptr; // success
}
这个函数最核心的功能只有几行代码:
try {
cache->reset(new RedisCache(options, poolOptions));
}
catch (const std::exception& ex) {
return TRITONSERVER_ErrorNew(
TRITONSERVER_ERROR_INTERNAL,
("Failed to initialize RedisCache: " + std::string(ex.what())).c_str());
}
解析输入的配置json串,创建RedisCache对象,该类的构造函数非常简单,使用redis++库,创建redis连接客户端,并调用成员函数:ping(),确认redis是否连接成功。
核心函数:
std::unique_ptr<sw::redis::Redis>
init_client(
const sw::redis::ConnectionOptions& connectionOptions,
sw::redis::ConnectionPoolOptions poolOptions)
{
std::unique_ptr<sw::redis::Redis> redis =
std::make_unique<sw::redis::Redis>(connectionOptions, poolOptions);
const auto msg = "Triton RedisCache client connected";
if (redis->ping(msg) != msg) {
throw std::runtime_error("Failed to ping Redis server.");
}
LOG_VERBOSE(1) << "Successfully connected to Redis";
return redis;
}
3.2 TRITONCACHE_CacheFinalize
该函数类似类的析构函数,在退出时调用,删除相关的资源。
TRITONSERVER_Error*
TRITONCACHE_CacheFinalize(TRITONCACHE_Cache* cache)
{
if (cache == nullptr) {
return TRITONSERVER_ErrorNew(
TRITONSERVER_ERROR_INVALID_ARG, "cache was nullptr");
}
delete reinterpret_cast<RedisCache*>(cache);
return nullptr; // success
}
3.3 TRITONCACHE_CacheInsert
这个函数是实现cache的核心函数,将要缓存的内容保存到redis中,函数原型:
TRITONSERVER_Error*
TRITONCACHE_CacheInsert(
TRITONCACHE_Cache* cache, const char* key, TRITONCACHE_CacheEntry* entry,
TRITONCACHE_Allocator* allocator)
{
RETURN_IF_ERROR(CheckArgs(cache, key, entry, allocator));
const auto redis_cache = reinterpret_cast<RedisCache*>(cache);
CacheEntry redis_entry;
size_t numBuffers = 0;
RETURN_IF_ERROR(TRITONCACHE_CacheEntryBufferCount(entry, &numBuffers));
std::vector<std::shared_ptr<char[]>> managedBuffers;
for (size_t i = 0; i < numBuffers; i++) {
TRITONSERVER_BufferAttributes* attrs = nullptr;
RETURN_IF_ERROR(TRITONSERVER_BufferAttributesNew(&attrs));
std::shared_ptr<TRITONSERVER_BufferAttributes> managed_attrs(
attrs, TRITONSERVER_BufferAttributesDelete);
void* base = nullptr;
size_t byteSize = 0;
int64_t memoryTypeId;
TRITONSERVER_MemoryType memoryType;
RETURN_IF_ERROR(TRITONCACHE_CacheEntryGetBuffer(entry, i, &base, attrs));
RETURN_IF_ERROR(TRITONSERVER_BufferAttributesByteSize(attrs, &byteSize));
RETURN_IF_ERROR(
TRITONSERVER_BufferAttributesMemoryType(attrs, &memoryType));
RETURN_IF_ERROR(
TRITONSERVER_BufferAttributesMemoryTypeId(attrs, &memoryTypeId));
if (!byteSize) {
return TRITONSERVER_ErrorNew(
TRITONSERVER_ERROR_INTERNAL, "Buffer size was zero");
}
// DLIS-2673: Add better memory_type support - SL - keeping this in place,
// presumably we're going to have to pull out the other bits that are
// important some day.
if (memoryType != TRITONSERVER_MEMORY_CPU &&
memoryType != TRITONSERVER_MEMORY_CPU_PINNED) {
return TRITONSERVER_ErrorNew(
TRITONSERVER_ERROR_INVALID_ARG,
"Only input buffers in CPU memory are allowed in cache currently");
}
std::shared_ptr<char[]> managedBuffer(new char[byteSize]);
// Overwrite entry buffer with cache-allocated buffer.
// No need to set new buffer attrs for now, will reuse the one we got above.
TRITONCACHE_CacheEntrySetBuffer(
entry, i, static_cast<void*>(managedBuffer.get()), nullptr /* attrs */);
managedBuffers.push_back(managedBuffer);
redis_entry.items.insert(std::make_pair(
getFieldName(i, fieldType::bufferSize), std::to_string(byteSize)));
redis_entry.items.insert(std::make_pair(
getFieldName(i, fieldType::memoryType), std::to_string(memoryType)));
redis_entry.items.insert(std::make_pair(
getFieldName(i, fieldType::memoryTypeId),
std::to_string(memoryTypeId)));
}
// Callback to copy directly from Triton buffers to RedisCache managedBuffers
TRITONCACHE_Copy(allocator, entry);
for (size_t i = 0; i < numBuffers; i++) {
auto bytesToCopy =
std::stoi(redis_entry.items.at(getFieldName(i, fieldType::bufferSize)));
redis_entry.items.insert(std::make_pair(
getFieldName(i, fieldType::buffer),
std::string(managedBuffers.at(i).get(), bytesToCopy)));
}
// sanity check to make sure we are inserting items into the cache that are
// comprised of the right number of fields to allow us to marshal
// the buffer back from Redis later on.
if (redis_entry.items.size() % FIELDS_PER_BUFFER != 0) {
return TRITONSERVER_ErrorNew(
TRITONSERVER_ERROR_INVALID_ARG,
"Attempted to add incomplete entry to cache");
}
RETURN_IF_ERROR(redis_cache->Insert(key, redis_entry));
return nullptr; // success
}
在这个函数中,需要注意entry这个参数,其实际类型为:class CacheEntry对象(定义位于cache_entry.cc),是triton对缓存内容的组织形式,这个概念是区别于cache的内容管理的,可以简单的这样理解,当tritonserver拿到要缓存的内容后,需要将内容进行统一的管理,最后的结果就是一个CacheEntry,而TRITONCACHE_CacheInsert函数的功能就是解析CacheEntry,将要缓存的内容解析到redis cache中的CacheEntry中,在redis cache中,CacheEntry的定义如下:
struct CacheEntry {
size_t numBuffers = 0;
std::unordered_map<std::string, std::string> items;
};
该函数的主流程如下:
中间还有一步:
TRITONCACHE_Copy(allocator, entry);
将缓存内容,从triton的缓存中拷贝到cache_manager类定义的缓存中。
3.4 TRITONCACHE_CacheLookup
该函数主要实现从redis hash表中查找并读取数据,同时将缓存数据拷贝到triton的cache_entry以及cache_manager的allocator中。
4、redis_cache实现
redis_cache的实现相对local_cache来说比较简单,对内容的缓存,使用了redis中hash表数据结构,这部分代码主要三个模块:初始化、insert、lookup。
4.1 redis初始化
对redis的操作使用了redis++库,初始化部分,主要实现在redis_cache类的构造函数中:
std::unique_ptr<sw::redis::Redis>
init_client(
const sw::redis::ConnectionOptions& connectionOptions,
sw::redis::ConnectionPoolOptions poolOptions)
{
std::unique_ptr<sw::redis::Redis> redis =
std::make_unique<sw::redis::Redis>(connectionOptions, poolOptions);
const auto msg = "Triton RedisCache client connected";
if (redis->ping(msg) != msg) {
throw std::runtime_error("Failed to ping Redis server.");
}
LOG_VERBOSE(1) << "Successfully connected to Redis";
return redis;
}
还是老样子,triton中每个类,都会设计一个静态的create函数,用于创建本类的对象,reids_cache也一样,create函数完成对象的创建并赋值给入参【cache】,同时建立与redis的链接。
static TRITONSERVER_Error* Create(
const std::string& cache_config, std::unique_ptr<RedisCache>* cache);
4.2 insert
insert的核心即为将缓存内容插入到redis的hash表中,代码最重要的也就一行:
TRITONSERVER_Error*
RedisCache::Insert(const std::string& key, CacheEntry& entry)
{
try {
_client->hmset(key, entry.items.begin(), entry.items.end());
}
catch (const sw::redis::TimeoutError& e) {
return handleError("Timeout inserting key: ", key, e.what());
}
catch (const sw::redis::IoError& e) {
return handleError("Failed to insert key: ", key, e.what());
}
catch (const std::exception& e) {
return handleError("Failed to insert key: ", key, e.what());
}
catch (...) {
return handleError("Failed to insert key: ", key, "Unknown error.");
}
return nullptr; // success
}
_client->hmset(key, entry.items.begin(), entry.items.end());这行代码的含义是,将entry结构中items这个map中的内容全部插入到hash表中。
4.3 lookup
lookup的功能可以简单的理解为redis的命令hgetall,通过该命令将redis hash表中某个key的所有内容放入entry结构的items字段中。
std::pair<TRITONSERVER_Error*, CacheEntry>
RedisCache::Lookup(const std::string& key)
{
// CacheEntry结构体,成员map+int
CacheEntry entry;
try {
// 获取 hash 表的所有字段和值
this->_client->hgetall(
key, std::inserter(entry.items, entry.items.begin()));
// determine the number of buffers by dividing the size by the number of
// fields per buffer
entry.numBuffers = entry.items.size() / FIELDS_PER_BUFFER;
return {nullptr, entry};
}
catch (const sw::redis::TimeoutError& e) {
return {handleError("Timeout retrieving key: ", key, e.what()), {}};
}
catch (const sw::redis::IoError& e) {
return {handleError("Failed to retrieve key: ", key, e.what()), {}};
}
catch (const std::exception& e) {
return {handleError("Failed to retrieve key: ", key, e.what()), {}};
}
catch (...) {
return {handleError("Failed to retrieve key: ", key, "Unknown error."), {}};
}
}
5、问题求助
在使用triton的过程中,我尝试使用一下cache,但是一直没有看到推理的结果缓存到redis中,不知道是什么原因,我在两个地方使能了cache功能:
第一个,启动triton时增加使能cache功能:
tritonserver --model-repository=/models --log-verbose=1 --cache-config=redis,host=172.17.0.1 --cache-config redis,port=6379 --cache-config redis,password="xxxx" --log-file=1.txt
第二,在模型配置文件中,使能response cache:
response_cache {
enable: true
}
之后通过client请求模型进行推理,但是在redis中一直看不到缓存,至今没有找到原因,如果有同学使用过这个功能,欢迎留言指教,非常感谢。
也欢迎大家关注公众号交流: