緩存是爲達到系統快速響應的一項關鍵技術,Ceph 做爲一個複雜的分佈式存儲系統,有多種、多級緩存存在。緩存按照位置分爲:linux
按照部署方式分爲:c++
而Rados 網關緩存,也即RGW Cache 按照位置:做爲Ceph client 能夠歸爲客戶端緩存,做爲上層應用的服務端能夠歸爲服務端緩存。而按照部署方式則爲分佈式緩存,由於Ceph 集羣一般會存在多個RGW 實例,分佈式緩存會涉及到緩存同步等問題。git
RGW Cache 將對象存儲的相關元數據存儲在內部緩存中,用於提高性能。github
前面已經提到,目前Ceph 中涉及RGW Cache 的配置參數有三個:算法
查看RGW cache 命中率:swift
[root@umstor14 build]# bin/ceph daemon out/radosgw.8000.asok perf dump|grep cache "cache_hit": 336, "cache_miss": 135,
ceph.conf 中配置參數rgw_cache_enabled。緩存
rgw_main.cc 中,得到RGWRados *store:cookie
int main() { RGWRados *store = RGWStoreManager::get_storage(g_ceph_context, g_conf()->rgw_enable_gc_threads, g_conf()->rgw_enable_lc_threads, g_conf()->rgw_enable_bl_threads, g_conf()->rgw_enable_quota_threads, g_conf()->rgw_run_sync_thread, g_conf().get_val<bool>("rgw_dynamic_resharding"), g_conf()->rgw_cache_enabled); // 獲取rgw_cache_enabled 的配置,決定是否開啓緩存 }
調用路徑以下:網絡
RGWRados RGWStoreManager::RGWStoreManager::get_storage() ==>
RGWRados RGWStoreManager::init_storage_provider() ==>
int RGWRados::initialize(CephContext *_cct) ==>
int RGWRados::initialize()數據結構
/** * Initialize the RADOS instance and prepare to do other ops * Returns 0 on success, -ERR# on failure. */ int RGWRados::initialize() { int ret; inject_notify_timeout_probability = cct->_conf.get_val<double>("rgw_inject_notify_timeout_probability"); max_notify_retries = cct->_conf.get_val<uint64_t>("rgw_max_notify_retries"); ret = init_svc(false); // 初始化包含svc_sysobj, sysobj_cache, svc_notify等的RGW Services if (ret < 0) { ldout(cct, 0) << "ERROR: failed to init services (ret=" << cpp_strerror(-ret) << ")" << dendl; return ret; } host_id = svc.zone_utils->gen_host_id(); ret = init_rados(); //rados 相關上下文初始化 if (ret < 0) return ret; return init_complete(); // 初始化gc,lc,reshard 等線程 }
RGWRados *store的初始化中初始化RGW 服務:
int RGWRados::init_svc(bool raw) raw=false ==>
int RGWServices::init(CephContext cct, bool have_cache) ==>
int RGWServices::do_init(CephContext cct, bool have_cache, false) ==>
int RGWServices_Def::init(CephContext *cct, bool have_cache, false)
int RGWServices_Def::init(CephContext *cct, bool have_cache, bool raw) { finisher = std::make_unique<RGWSI_Finisher>(cct); notify = std::make_unique<RGWSI_Notify>(cct); rados = std::make_unique<RGWSI_RADOS>(cct); zone = std::make_unique<RGWSI_Zone>(cct); zone_utils = std::make_unique<RGWSI_ZoneUtils>(cct); quota = std::make_unique<RGWSI_Quota>(cct); sync_modules = std::make_unique<RGWSI_SyncModules>(cct); sysobj = std::make_unique<RGWSI_SysObj>(cct); sysobj_core = std::make_unique<RGWSI_SysObj_Core>(cct); if (have_cache) { sysobj_cache = std::make_unique<RGWSI_SysObj_Cache>(cct); } ... // 各種服務初始化 sysobj_core->core_init(rados.get(), zone.get()); if (have_cache) { sysobj_cache->init(rados.get(), zone.get(), notify.get()); sysobj->init(rados.get(), sysobj_cache.get()); } else { sysobj->init(rados.get(), sysobj_core.get()); } ... //啓動notify 服務 if (!raw) { r = notify->start(); if (r < 0) { ldout(cct, 0) << "ERROR: failed to start notify service (" << cpp_strerror(-r) << dendl; return r; } } ... // 啓動sysobj_core 服務 r = sysobj_core->start(); if (r < 0) { ldout(cct, 0) << "ERROR: failed to start sysobj_core service (" << cpp_strerror(-r) << dendl; return r; } // 根據參數配置選擇是否啓動sysobj_cache 服務 if (have_cache) { r = sysobj_cache->start(); if (r < 0) { ldout(cct, 0) << "ERROR: failed to start sysobj_cache service (" << cpp_strerror(-r) << dendl; return r; } } // 啓動sysobj 服務 r = sysobj->start(); if (r < 0) { ldout(cct, 0) << "ERROR: failed to start sysobj service (" << cpp_strerror(-r) << dendl; return r; } /* cache or core services will be started by sysobj */ return 0; }
CacheProovider RGWSI_SysObj_Cache繼承自RGWSI_SysObj_Core,而RGWSI_SysObj_Core 又是RGWServiceInstance的子類。
最終啓動RGWSI_SysObj_Cache 服務。
int RGWServiceInstance::start() ==>
virtual int RGWServiceInstance::do_start() ==>
int RGWSI_SysObj_Cache::do_start()
子類RGWSI_SysObj_Cache::do_start()中
int RGWSI_SysObj_Cache::do_start() { int r = RGWSI_SysObj_Core::do_start(); // 目前並沒作什麼,return 0 if (r < 0) { return r; } // 啓動notify 服務,爲了後面的不一樣實例間的緩存分發 r = notify_svc->start(); if (r < 0) { return r; } assert(notify_svc->is_started()); cb.reset(new RGWSI_SysObj_Cache_CB(this)); // 初始化回調對象 // 註冊包含回調函數的對象至notify_svc // 經過notify_svc 的watch/notify 機制調用到已註冊的回調函數 int RGWSI_SysObj_Cache::watch_cb() notify_svc->register_watch_cb(cb.get()); return 0; }
watch_cb()的調用路徑是:
int RGWSI_Notify::watch_cb() ==>
int RGWSI_SysObj_Cache_CB::watch_cb() ==>
int RGWSI_SysObj_Cache::watch_cb()
通常的Cache 系統會有如下四個重要的概念:
CachingProvider <>-----> CacheManager <>-----> Cache <>-----> Entry
RGW Cache 主要在如下源文件中實現:
類圖結構以下:
根據各部分起到的做用,其中
CacheManager
的角色,管理一個Cache(Map)
(即std::unordered_map<string, ObjectCacheEntry> cache_map)。CachingProvider
,管理一個CacheManager
(即ObjectCache cache)。Entry
的角色。CachingProvider
RGWSI_SysObj_Cache:
class RGWSI_SysObj_Cache : public RGWSI_SysObj_Core { //...... RGWSI_Notify *notify_svc{nullptr}; ObjectCache cache; // std::shared_ptr<RGWSI_SysObj_Cache_CB> cb; };
關於Entry
ObjectCacheEntry
struct ObjectCacheEntry { ObjectCacheInfo info; //包含緩存對象data、metadata及xattr std::list<string>::iterator lru_iter; uint64_t lru_promotion_ts; uint64_t gen; //entry 的版本,初始爲0,每次更新後加一 std::vector<pair<RGWChainedCache *, string> > chained_entries; // ObjectCacheEntry() : lru_promotion_ts(0), gen(0) {} };
每一個Entry 中包含對應Object 的緩存數據及相關信息,LRU信息,版本信息,chained_entries 等。
struct ObjectCacheInfo { int status = 0; uint32_t flags = 0; //? uint64_t epoch = 0; //? bufferlist data; map<string, bufferlist> xattrs; map<string, bufferlist> rm_xattrs; // 待移除xattrs ObjectMetaInfo meta; obj_version version = {}; ceph::coarse_mono_time time_added; //加入緩存的時間, 從新加入緩存的對象須要更新該時間 ...... };
能夠看到Cache 中包含了數據、元數據以及xattr等信息。
前面提到ObjectCache
充當了CacheManager
的角色,而RGWSI_SysObj_Cache
至關於CachingProvider
。
LRU 是一類常見的緩存淘汰算法,在Ehcache,Redis等不少系統中都有實現或改進實現。
LRU(Least recently used,最近最少使用)算法根據數據的歷訪問記錄來進行數據淘汰,其核心思想是:若是數據最近被訪問過,那麼未來被訪問到的機率也很高。
LRU 操做總結:
RGWSI_SysObj_Cache 做爲CachingProvider,它負責對CacheManager ObjectCache的管理。
新的系統對象服務(system objects service)經過sysobj_core 用於核心的操做,這樣能夠在system objects service 上擴展cache service,以實現object cache,其在PR 24014中引入。
RGWSI_SysObj_Core 是系統對象的基本抽象:屬性和方法,RGWSI_SysObj_Cache 繼承自RGWSI_SysObj_Core,實現cache service 的擴展。
class RGWSI_SysObj_Cache : public RGWSI_SysObj_Core { //...... RGWSI_Notify *notify_svc{nullptr}; ObjectCache cache; // std::shared_ptr<RGWSI_SysObj_Cache_CB> cb; protected: void init(RGWSI_RADOS *_rados_svc, RGWSI_Zone *_zone_svc, RGWSI_Notify *_notify_svc) { core_init(_rados_svc, _zone_svc); notify_svc = _notify_svc; } int do_start() override; int raw_stat(const rgw_raw_obj& obj, uint64_t *psize, real_time *pmtime, uint64_t *epoch, map<string, bufferlist> *attrs, bufferlist *first_chunk, RGWObjVersionTracker *objv_tracker) override; int read(); //讀操做 int get_attr(); // 獲取xattr int set_attrs(); // 設置xattr int remove(); //移除緩存 int write(); int write_data(); // int distribute_cache(); // 分發緩存,由於一般會有多個RGW 實例,須要將緩存在多個RGW 實例間同步,保證數據一致性。 int watch_cb(); // watch 回調函數 void set_enabled(bool status); // watch/notify 開關,用於分佈式多RGW 實例的緩存同步 public: // chain cache bool chain_cache_entry(std::initializer_list<rgw_cache_entry_info *> cache_info_entries, RGWChainedCache::Entry *chained_entry); ...... };
移除緩存remove()
int RGWSI_SysObj_Cache::remove(RGWSysObjectCtxBase& obj_ctx, RGWObjVersionTracker *objv_tracker, const rgw_raw_obj& obj) { rgw_pool pool; string oid; normalize_pool_and_obj(obj.pool, obj.oid, pool, oid); string name = normal_name(pool, oid); // 根據前面構成的標準cache name,調用CacheManager的bool ObjectCache::remove(const string& name) 執行緩存刪除 cache.remove(name); ObjectCacheInfo info; // 向分佈式系統中的其餘RGW 實例分發緩存操做 int r = distribute_cache(name, obj, info, REMOVE_OBJ); if (r < 0) { ldout(cct, 0) << "ERROR: " << __func__ << "(): failed to distribute cache: r=" << r << dendl; } // 刪除sysobj_core 對象 return RGWSI_SysObj_Core::remove(obj_ctx, objv_tracker, obj); }
具體的緩存刪除操做由CacheManager ObjectCache 執行
bool ObjectCache::remove(const string& name) { RWLock::WLocker l(lock); // 第一步:獲取寫鎖 if (!enabled) { return false; } // 在cache map中找到指定緩存 auto iter = cache_map.find(name); if (iter == cache_map.end()) return false; ldout(cct, 10) << "removing " << name << " from cache" << dendl; ObjectCacheEntry& entry = iter->second; // 移除指定ObjectCacheEntry 關聯的全部 chained_entries for (auto& kv : entry.chained_entries) { kv.first->invalidate(kv.second); } remove_lru(name, iter->second.lru_iter); // 更新lru cache_map.erase(iter); // cache map 中移除該對象緩存 return true; }
以緩存中最多見、最重要的操做read()爲例分析:
int RGWSI_SysObj_Cache::read(RGWSysObjectCtxBase& obj_ctx, GetObjState& read_state, RGWObjVersionTracker *objv_tracker, const rgw_raw_obj& obj, bufferlist *obl, off_t ofs, off_t end, map<string, bufferlist> *attrs, bool raw_attrs, rgw_cache_entry_info *cache_info, boost::optional<obj_version> refresh_version) { rgw_pool pool; string oid; // 若指定非開始處的offset 讀取,則直接讀取sysobj_core 對象 if (ofs != 0) { return RGWSI_SysObj_Core::read(obj_ctx, read_state, objv_tracker, obj, obl, ofs, end, attrs, raw_attrs, cache_info, refresh_version); } normalize_pool_and_obj(obj.pool, obj.oid, pool, oid); string name = normal_name(pool, oid); ObjectCacheInfo info; uint32_t flags = (end != 0 ? CACHE_FLAG_DATA : 0); if (objv_tracker) flags |= CACHE_FLAG_OBJV; if (attrs) flags |= CACHE_FLAG_XATTRS; // 獲取指定name 的cache if ((cache.get(name, info, flags, cache_info) == 0) && (!refresh_version || !info.version.compare(&(*refresh_version)))) { if (info.status < 0) return info.status; bufferlist& bl = info.data; bufferlist::iterator i = bl.begin(); obl->clear(); i.copy_all(*obl); if (objv_tracker) objv_tracker->read_version = info.version; if (attrs) { if (raw_attrs) { *attrs = info.xattrs; } else { rgw_filter_attrset(info.xattrs, RGW_ATTR_PREFIX, attrs); } } return obl->length(); } map<string, bufferlist> unfiltered_attrset; int r = RGWSI_SysObj_Core::read(obj_ctx, read_state, objv_tracker, obj, obl, ofs, end, (attrs ? &unfiltered_attrset : nullptr), true, /* cache unfiltered attrs */ cache_info, refresh_version); if (r < 0) { // 未讀到該對象時,將該對象加入cache if (r == -ENOENT) { // only update ENOENT, we'd rather retry other errors info.status = r; cache.put(name, info, cache_info); } return r; } if (obl->length() == end + 1) { /* in this case, most likely object contains more data, we can't cache it */ flags &= ~CACHE_FLAG_DATA; } else { bufferptr p(r); bufferlist& bl = info.data; bl.clear(); bufferlist::iterator o = obl->begin(); o.copy_all(bl); } info.status = 0; info.flags = flags; if (objv_tracker) { info.version = objv_tracker->read_version; } if (attrs) { info.xattrs = std::move(unfiltered_attrset); if (raw_attrs) { *attrs = info.xattrs; } else { rgw_filter_attrset(info.xattrs, RGW_ATTR_PREFIX, attrs); } } cache.put(name, info, cache_info); return r; }
CacheManager ObjectCache 負責具體Cache Entries的管理:緩存獲取,緩存移除,LRU 管理
class ObjectCache { std::unordered_map<string, ObjectCacheEntry> cache_map; std::list<string> lru; // LRU 列表 unsigned long lru_size; // LRU 表的大小 unsigned long lru_counter; // 當前LRU 數 unsigned long lru_window; // rgw_cache_lru_size 的一半大小 RWLock lock; CephContext *cct; vector<RGWChainedCache *> chained_cache; bool enabled; // watch/notify 的開關 ceph::timespan expiry; // 緩存過時時間大小 };
int ObjectCache::get(const string& name, ObjectCacheInfo& info, uint32_t mask, rgw_cache_entry_info *cache_info) { RWLock::RLocker l(lock); // 第一步,先獲取讀鎖 if (!enabled) { return -ENOENT; } // 獲取指定緩存 auto iter = cache_map.find(name); if (iter == cache_map.end()) { ldout(cct, 10) << "cache get: name=" << name << " : miss" << dendl; if (perfcounter) perfcounter->inc(l_rgw_cache_miss); return -ENOENT; } // 緩存是否已通過期 // 過時緩存須要從cache map中移除,從LRU 表中移除 if (expiry.count() && (ceph::coarse_mono_clock::now() - iter->second.info.time_added) > expiry) { ldout(cct, 10) << "cache get: name=" << name << " : expiry miss" << dendl; lock.unlock(); lock.get_write(); // 由讀鎖轉爲寫鎖 // check that wasn't already removed by other thread iter = cache_map.find(name); if (iter != cache_map.end()) { for (auto &kv : iter->second.chained_entries) kv.first->invalidate(kv.second); remove_lru(name, iter->second.lru_iter); cache_map.erase(iter); } if(perfcounter) perfcounter->inc(l_rgw_cache_miss); return -ENOENT; } ObjectCacheEntry *entry = &iter->second; // 當前entry 計數距離總計數lru_counter超過LRU 窗口大小,即當前entry 已經落在LRU 表後半段,這時纔去更新entry LRU表 // [lru window](https://github.com/ceph/ceph/commit/a84cf15f64211c00bc6c95687ff4509d16b1f909) if (lru_counter - entry->lru_promotion_ts > lru_window) { ldout(cct, 20) << "cache get: touching lru, lru_counter=" << lru_counter << " promotion_ts=" << entry->lru_promotion_ts << dendl; lock.unlock(); lock.get_write(); /* promote lock to writer */ /* need to redo this because entry might have dropped off the cache */ iter = cache_map.find(name); if (iter == cache_map.end()) { ldout(cct, 10) << "lost race! cache get: name=" << name << " : miss" << dendl; if(perfcounter) perfcounter->inc(l_rgw_cache_miss); return -ENOENT; } entry = &iter->second; /* check again, we might have lost a race here */ if (lru_counter - entry->lru_promotion_ts > lru_window) { touch_lru(name, *entry, iter->second.lru_iter); // 更新緩存LRU } } ObjectCacheInfo& src = iter->second.info; if ((src.flags & mask) != mask) { ldout(cct, 10) << "cache get: name=" << name << " : type miss (requested=0x" << std::hex << mask << ", cached=0x" << src.flags << std::dec << ")" << dendl; if(perfcounter) perfcounter->inc(l_rgw_cache_miss); return -ENOENT; } ldout(cct, 10) << "cache get: name=" << name << " : hit (requested=0x" << std::hex << mask << ", cached=0x" << src.flags << std::dec << ")" << dendl; info = src; if (cache_info) { cache_info->cache_locator = name; cache_info->gen = entry->gen; } if(perfcounter) perfcounter->inc(l_rgw_cache_hit); return 0; }
void ObjectCache::put(const string& name, ObjectCacheInfo& info, rgw_cache_entry_info *cache_info) { RWLock::WLocker l(lock); if (!enabled) { return; } ldout(cct, 10) << "cache put: name=" << name << " info.flags=0x" << std::hex << info.flags << std::dec << dendl; auto [iter, inserted] = cache_map.emplace(name, ObjectCacheEntry{}); ObjectCacheEntry& entry = iter->second; entry.info.time_added = ceph::coarse_mono_clock::now(); if (inserted) { entry.lru_iter = lru.end(); } ObjectCacheInfo& target = entry.info; invalidate_lru(entry); entry.chained_entries.clear(); entry.gen++; touch_lru(name, entry, entry.lru_iter); target.status = info.status; if (info.status < 0) { target.flags = 0; target.xattrs.clear(); target.data.clear(); return; } if (cache_info) { cache_info->cache_locator = name; cache_info->gen = entry.gen; } target.flags |= info.flags; if (info.flags & CACHE_FLAG_META) target.meta = info.meta; else if (!(info.flags & CACHE_FLAG_MODIFY_XATTRS)) target.flags &= ~CACHE_FLAG_META; // non-meta change should reset meta if (info.flags & CACHE_FLAG_XATTRS) { target.xattrs = info.xattrs; map<string, bufferlist>::iterator iter; for (iter = target.xattrs.begin(); iter != target.xattrs.end(); ++iter) { ldout(cct, 10) << "updating xattr: name=" << iter->first << " bl.length()=" << iter->second.length() << dendl; } } else if (info.flags & CACHE_FLAG_MODIFY_XATTRS) { map<string, bufferlist>::iterator iter; for (iter = info.rm_xattrs.begin(); iter != info.rm_xattrs.end(); ++iter) { ldout(cct, 10) << "removing xattr: name=" << iter->first << dendl; target.xattrs.erase(iter->first); } for (iter = info.xattrs.begin(); iter != info.xattrs.end(); ++iter) { ldout(cct, 10) << "appending xattr: name=" << iter->first << " bl.length()=" << iter->second.length() << dendl; target.xattrs[iter->first] = iter->second; } } if (info.flags & CACHE_FLAG_DATA) target.data = info.data; if (info.flags & CACHE_FLAG_OBJV) target.version = info.version; }
bool ObjectCache::remove(const string& name) { RWLock::WLocker l(lock); // 第一步,獲取寫鎖 if (!enabled) { return false; } auto iter = cache_map.find(name); if (iter == cache_map.end()) return false; ldout(cct, 10) << "removing " << name << " from cache" << dendl; ObjectCacheEntry& entry = iter->second; // 移除跟cache entry 關聯的全部chained entries for (auto& kv : entry.chained_entries) { kv.first->invalidate(kv.second); } // 移除LRU 表中的cache object對應項 remove_lru(name, iter->second.lru_iter); cache_map.erase(iter); return true; }
LRU 表是一個雙向列表 std:list<>,可支持表頭插入、表尾插入。RGW Cache 實如今LRU 表頭
std::list<string> lru;
LRU 移除
void ObjectCache::remove_lru(const string& name, std::list<string>::iterator& lru_iter) { if (lru_iter == lru.end())//肯定是否在LRU 表中 return; lru.erase(lru_iter);// 移除該項 lru_size--; // LRU 當前size 減一 lru_iter = lru.end(); //將當前iter 置爲無效 }
touch_lru 負責更新緩存項至LRU 表:
void ObjectCache::touch_lru(const string& name, ObjectCacheEntry& entry, std::list<string>::iterator& lru_iter) { // 當前lru size 超過預設值rgw_cache_lru_size,須要先刪除LRU 頭 while (lru_size > (size_t)cct->_conf->rgw_cache_lru_size) { auto iter = lru.begin(); // LRU 表尾項 if ((*iter).compare(name) == 0) { // 若是當前對象是LRU 是LRU 表尾項,不用立馬顯式刪除,LRU 會根據rgw_cache_lru_size 自動不包含該項 /* * if the entry we're touching happens to be at the lru end, don't remove it, * lru shrinking can wait for next time */ break; } // 移除LRU 表尾項對應的對象緩存 auto map_iter = cache_map.find(*iter); ldout(cct, 10) << "removing entry: name=" << *iter << " from cache LRU" << dendl; if (map_iter != cache_map.end()) { ObjectCacheEntry& entry = map_iter->second; invalidate_lru(entry); cache_map.erase(map_iter); } // 刪除LRU 表尾項,並將當前LRU size 減一 lru.pop_front(); lru_size--; } if (lru_iter == lru.end()) { // lru_iter不在LRU 表中:插入當前項至LRU 表頭(list 尾) lru.push_back(name); lru_size++; lru_iter--; ldout(cct, 10) << "adding " << name << " to cache LRU end" << dendl; } else { // lru_iter在LRU 表中:移動至當前項至LRU 表頭(list 尾) ldout(cct, 10) << "moving " << name << " to cache LRU end" << dendl; lru.erase(lru_iter); lru.push_back(name); lru_iter = lru.end(); --lru_iter; } lru_counter++; entry.lru_promotion_ts = lru_counter; // }
RGW Cache 屬於分佈式緩存,一般會有多個RGW 實例,緩存須要在各個RGW 實例間分發,且須要保證緩存一致性。
RGW Cache的調用路徑中已經給出,CachingProvider RGWSI_SysObj_Cache 會在服務啓動do_start() 中start notify_svc,並註冊watch_cb 函數。
notify_svc 這個服務的做用就是提供一種watch/notify 機制,以確保緩存一致性。
watch/notify 機制由librados提供。其中,notify rados object 存在default.rgw.control 池中。
[root@umstor14 build]# bin/rados ls -p default.rgw.control notify.1 notify.6 notify.3 notify.7 notify.2 notify.4 notify.5 notify.0 [root@umstor14 build]# bin/rados -p default.rgw.control stat notify.1 default.rgw.control/notify.1 mtime 2020-01-10 18:59:13.000000, size 0 [root@umstor14 build]# bin/rados -p default.rgw.control stat notify.7 default.rgw.control/notify.7 mtime 2020-01-10 18:59:14.000000, size 0
notify_svc 服務的啓動路徑跟cache_svc 相似:
int RGWServiceInstance::start() ==>
virtual int RGWServiceInstance::do_start() ==>
int RGWSI_Notify::do_start()
do_start() 會初始化watch:
int RGWSI_Notify::init_watch() { num_watchers = cct->_conf->rgw_num_control_oids; // 有參數rgw_num_control_oids 配置,默認8個 watcher bool compat_oid = (num_watchers == 0); if (num_watchers <= 0) num_watchers = 1; watchers = new RGWWatcher *[num_watchers]; ...... }
在cache op 以後,會執行cache 分發操做distribute_cache():
int RGWSI_SysObj_Cache::distribute_cache(const string& normal_name, const rgw_raw_obj& obj, ObjectCacheInfo& obj_info, int op) { RGWCacheNotifyInfo info; info.op = op; info.obj_info = obj_info; info.obj = obj; bufferlist bl; encode(info, bl); return notify_svc->distribute(normal_name, bl); // 利用notify_svc 分發 }
分發過程:
int RGWSI_Notify::distribute(const string& key, bufferlist& bl) { // 選擇一個notify obj RGWSI_RADOS::Obj notify_obj = pick_control_obj(key); ldout(cct, 10) << "distributing notification oid=" << notify_obj.get_ref().obj << " bl.length()=" << bl.length() << dendl; // 執行分發 return robust_notify(notify_obj, bl); }
分發細節會在RGW Services -- Notify Service 中說明。
另外,在notify_svc 服務的watcher 的handle_notify()中調用已註冊的回調函數。
watcher 收到notify的更新通知後,會更新本地緩存。
void RGWWatcher::handle_notify() { ...... // 調用cache_svc 服務註冊的回調函數 svc->watch_cb(notify_id, cookie, notifier_id, bl); // 向通知者發送確認消息 bufferlist reply_bl; // empty reply payload obj.notify_ack(notify_id, cookie, reply_bl); ...... }
回調函數中根據操做類型,利用CacheManager 完成cache 更新或移除:
int RGWSI_SysObj_Cache::watch_cb(uint64_t notify_id, uint64_t cookie, uint64_t notifier_id, bufferlist& bl) { RGWCacheNotifyInfo info; //cache notify 信息,包含:操做、rgw raw object、obj cache info、offset等 try { auto iter = bl.cbegin(); decode(info, iter); } catch (buffer::end_of_buffer& err) { ldout(cct, 0) << "ERROR: got bad notification" << dendl; return -EIO; } catch (buffer::error& err) { ldout(cct, 0) << "ERROR: buffer::error" << dendl; return -EIO; } rgw_pool pool; string oid; normalize_pool_and_obj(info.obj.pool, info.obj.oid, pool, oid); string name = normal_name(pool, oid); switch (info.op) { case UPDATE_OBJ: //利用CacheManager 更新緩存 cache.put(name, info.obj_info, NULL); break; case REMOVE_OBJ: //利用CacheManager 移除緩存 cache.remove(name); break; default: ldout(cct, 0) << "WARNING: got unknown notification op: " << info.op << dendl; return -EINVAL; } return 0; }
Chained cache 讓user info,bucket info 能夠經過連接原生緩存,得以開啓緩存。
Basically chains bucket info and user info caches to the raw metadata object cache.
binfo_cache = new RGWChainedCacheImpl<bucket_info_entry>; static RGWChainedCacheImpl<user_info_entry> uinfo_cache;
以user cache 爲例,在開啓RGW Cache後,優先從緩存中獲取:
void rgw_user_init(RGWRados *store) { uinfo_cache.init(store->svc.cache); user_meta_handler = new RGWUserMetadataHandler; store->meta_mgr->register_handler(user_meta_handler); } int rgw_get_user_info_from_index(RGWRados * const store, const string& key, const rgw_pool& pool, RGWUserInfo& info, RGWObjVersionTracker * const objv_tracker, real_time * const pmtime) { // 首選嘗試獲取緩存 if (auto e = uinfo_cache.find(key)) { info = e->info; if (objv_tracker) *objv_tracker = e->objv_tracker; if (pmtime) *pmtime = e->mtime; return 0; } ...... // 未能從緩存中獲取,直接從RADOS 集羣中獲取 // 獲取到以後,更新uinfo 緩存 uinfo_cache.put(store->svc.cache, key, &e, { &cache_info }); .......
class RGWChainedCache { public: ...... struct Entry { RGWChainedCache *cache; // 關聯cache const string& key; // email/swift_name/access_key/bucket name void *data; // 指向bucket_info_entry或user_info_entry Entry(RGWChainedCache *_c, const string& _k, void *_d) : cache(_c), key(_k), data(_d) {} }; };
經過sysobj_cache_svc 服務提供chain cache:
將chain_entry添加到chained cache,並和cache_info_entries 指向的ObjectCacheEntry相關聯。
bool RGWChainedCache::put(RGWSI_SysObj_Cache *svc, const string& key, T *entry, std::initializer_list<rgw_cache_entry_info *> cache_info_entries) { if (!svc) { return false; } Entry chain_entry(this, key, entry); /* we need the svc cache to call us under its lock to maintain lock ordering */ return svc->chain_cache_entry(cache_info_entries, &chain_entry); } bool ObjectCache::chain_cache_entry(std::initializer_list<rgw_cache_entry_info*> cache_info_entries, RGWChainedCache::Entry *chained_entry) { // 確認全部有效ObjectCacheEntry ...... // 將待添加entry添加到對應chain cache中 chained_entry->cache->chain_cb(chained_entry->key, chained_entry->data); // 將chained entry關聯到指定的全部有效的ObjectCacheEntry for (auto entry : entries) { entry->chained_entries.push_back(make_pair(chained_entry->cache, chained_entry->key)); } ...... }
chained cache 依賴於ObjectCache,
更新ObjectCache的成員 vector<RGWChainedCache *> chained_cache:
void ObjectCache::chain_cache(RGWChainedCache *cache); void ObjectCache::unchain_cache(RGWChainedCache *cache);
前面的測試系統的cache 命中率:"cache_hit": 336,"cache_miss": 135, 336/(336+135)*100% = 71%
緩存系統適合讀多寫少的場景。如何在這種場景下,提升RGW Cache 的命中率,如下方向能夠考慮: