k8s目前主要支持CPU和內存兩種資源,爲了支持用戶須要按需分配的其餘硬件類型的資源的調度分配,k8s實現了設備插件框架(device plugin framework)來用於其餘硬件類型的資源集成,好比如今機器學習要使用GPU等資源,今天來看下其內部的關鍵實現windows
當咱們要集成本地硬件的資源的時候,咱們能夠在當前節點上經過DaemonSet來運行一個GRPC服務,經過這個服務來進行本地硬件資源的上報與分配api
當提供硬件服務須要與kubelet進行通訊的時候,則首先須要進行註冊,註冊的方式,則是經過最原始的底層的socket文件,而且經過Linux文件系統的inotify機制,來實現服務的註冊微信
Watcher主要是負責感知當前節點上註冊的服務,當發現新的要註冊的插件服務,則會產生對應的事件,註冊到當前的kubelet中網絡
這裏的狀態主要是指的是否須要註冊,由於kubelet與對應的插件服務是經過網絡進行通訊的,當網絡出現問題、或者對應的插件服務故障,則可能會致使服務註冊失敗,但此時對應的服務的socket還依舊存在,即對應的插件服務依舊存在數據結構
此時就會有兩種狀態:指望狀態與實際狀態, 由於socket存在因此服務的指望狀態實際上是須要註冊這個插件服務,可是實際上由於某些緣由,這個插件服務並無完成註冊,後續會不斷的經過指望狀態,調整實際狀態,從而達到一致框架
協調器則就是完成上述兩種狀態之間操做的核心,其經過調用對應插件的回調函數,其實就是調用對應的grpc接口,來完成指望狀態與實際狀態的一致性機器學習
針對每種類型的插件,都會有對應的控制器,其實也就是實現對應設備註冊和反註冊而且完成底層資源的分配(Allocate)和收集(ListWatch)操做socket
type Watcher struct {
// 感知插件服務註冊的socket的路徑
path string
fs utilfs.Filesystem
// inotify監測插件服務socket變化
fsWatcher *fsnotify.Watcher
stopped chan struct{}
// 存儲指望狀態
desiredStateOfWorld cache.DesiredStateOfWorld
}複製代碼
初始化其實就是建立對應的目錄ide
func (w *Watcher) init() error {
klog.V(4).Infof("Ensuring Plugin directory at %s ", w.path)
if err := w.fs.MkdirAll(w.path, 0755); err != nil {
return fmt.Errorf("error (re-)creating root %s: %v", w.path, err)
}
return nil
}複製代碼
go func(fsWatcher *fsnotify.Watcher) {
defer close(w.stopped)
for {
select {
case event := <-fsWatcher.Events:
//若是發現對應目錄的文件的變化,則會觸發對應的事件
if event.Op&fsnotify.Create == fsnotify.Create {
err := w.handleCreateEvent(event)
if err != nil {
klog.Errorf("error %v when handling create event: %s", err, event)
}
} else if event.Op&fsnotify.Remove == fsnotify.Remove {
w.handleDeleteEvent(event)
}
continue
case err := <-fsWatcher.Errors:
if err != nil {
klog.Errorf("fsWatcher received error: %v", err)
}
continue
case <-stopCh:
// In case of plugin watcher being stopped by plugin manager, stop
// probing the creation/deletion of plugin sockets.
// Also give all pending go routines a chance to complete
select {
case <-w.stopped:
case <-time.After(11 * time.Second):
klog.Errorf("timeout on stopping watcher")
}
w.fsWatcher.Close()
return
}
}
}(fsWatcher)複製代碼
其實補償機制主要是在從新啓動kubelet的時候,須要將以前已經存在的socket從新註冊到當前的kubelet中函數
func (w *Watcher) traversePluginDir(dir string) error {
return w.fs.Walk(dir, func(path string, info os.FileInfo, err error) error {
if err != nil {
if path == dir {
return fmt.Errorf("error accessing path: %s error: %v", path, err)
}
klog.Errorf("error accessing path: %s error: %v", path, err)
return nil
}
switch mode := info.Mode(); {
case mode.IsDir():
if err := w.fsWatcher.Add(path); err != nil {
return fmt.Errorf("failed to watch %s, err: %v", path, err)
}
case mode&os.ModeSocket != 0:
event := fsnotify.Event{
Name: path,
Op: fsnotify.Create,
}
//TODO: Handle errors by taking corrective measures
if err := w.handleCreateEvent(event); err != nil {
klog.Errorf("error %v when handling create event: %s", err, event)
}
default:
klog.V(5).Infof("Ignoring file %s with mode %v", path, mode)
}
return nil
})
}複製代碼
註冊其實就只須要感知到的socket文件路徑傳遞給指望狀態進行管理
func (w *Watcher) handlePluginRegistration(socketPath string) error {
if runtime.GOOS == "windows" {
socketPath = util.NormalizePath(socketPath)
}
// 調用指望狀態進行更新
klog.V(2).Infof("Adding socket path or updating timestamp %s to desired state cache", socketPath)
err := w.desiredStateOfWorld.AddOrUpdatePlugin(socketPath)
if err != nil {
return fmt.Errorf("error adding socket path %s or updating timestamp to desired state cache: %v", socketPath, err)
}
return nil
}複製代碼
註冊其實就只須要感知到的socket文件路徑傳遞給指望狀態進行管理
func (w *Watcher) handleDeleteEvent(event fsnotify.Event) {
klog.V(6).Infof("Handling delete event: %v", event)
socketPath := event.Name
klog.V(2).Infof("Removing socket path %s from desired state cache", socketPath)
w.desiredStateOfWorld.RemovePlugin(socketPath)
}複製代碼
插件信息其實只是存儲了對應socket的路徑和最近更新的時間
type PluginInfo struct {
SocketPath string
Timestamp time.Time
}複製代碼
指望狀態與實際狀態在數據結構上都是同樣的,由於本質上只是爲了存儲插件的當前的狀態信息,即更新時間,這裏不在贅述
type desiredStateOfWorld struct {
socketFileToInfo map[string]PluginInfo
sync.RWMutex
}複製代碼
type actualStateOfWorld struct {
socketFileToInfo map[string]PluginInfo
sync.RWMutex
}複製代碼
目前k8s中支持兩大類的插件的管理一類是DevicePlugin即咱們本文說的這些都是這種概念,一類是CSIPlugin,其中針對每一類DRiver的處理其實內部都是不同的,那其實在操做以前就要先感知到當前的Driver是那種類型的
OperationExecutor主要就是作這件事的,其根據不一樣的plugin類型,生成不一樣的要執行的操做,即對應的Plugin類型獲取對應的handler,就生成了一個要執行的操做
registerPluginFunc := func() error {
client, conn, err := dial(socketPath, dialTimeoutDuration)
if err != nil {
return fmt.Errorf("RegisterPlugin error -- dial failed at socket %s, err: %v", socketPath, err)
}
defer conn.Close()
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
infoResp, err := client.GetInfo(ctx, ®isterapi.InfoRequest{})
if err != nil {
return fmt.Errorf("RegisterPlugin error -- failed to get plugin info using RPC GetInfo at socket %s, err: %v", socketPath, err)
}複製代碼
handler, ok := pluginHandlers[infoResp.Type]
if !ok {
if err := og.notifyPlugin(client, false, fmt.Sprintf("RegisterPlugin error -- no handler registered for plugin type: %s at socket %s", infoResp.Type, socketPath)); err != nil {
return fmt.Errorf("RegisterPlugin error -- failed to send error at socket %s, err: %v", socketPath, err)
}
return fmt.Errorf("RegisterPlugin error -- no handler registered for plugin type: %s at socket %s", infoResp.Type, socketPath)
}
if infoResp.Endpoint == "" {
infoResp.Endpoint = socketPath
}
if err := handler.ValidatePlugin(infoResp.Name, infoResp.Endpoint, infoResp.SupportedVersions); err != nil {
if err = og.notifyPlugin(client, false, fmt.Sprintf("RegisterPlugin error -- plugin validation failed with err: %v", err)); err != nil {
return fmt.Errorf("RegisterPlugin error -- failed to send error at socket %s, err: %v", socketPath, err)
}
return fmt.Errorf("RegisterPlugin error -- pluginHandler.ValidatePluginFunc failed")
}複製代碼
err = actualStateOfWorldUpdater.AddPlugin(cache.PluginInfo{
SocketPath: socketPath,
Timestamp: timestamp,
})
if err != nil {
klog.Errorf("RegisterPlugin error -- failed to add plugin at socket %s, err: %v", socketPath, err)
}
// 調用插件的註冊回調函數
if err := handler.RegisterPlugin(infoResp.Name, infoResp.Endpoint, infoResp.SupportedVersions); err != nil {
return og.notifyPlugin(client, false, fmt.Sprintf("RegisterPlugin error -- plugin registration failed with err: %v", err))
}
複製代碼
if err := og.notifyPlugin(client, true, ""); err != nil {
return fmt.Errorf("RegisterPlugin error -- failed to send registration status at socket %s, err: %v", socketPath, err)
}複製代碼
func dial(unixSocketPath string, timeout time.Duration) (registerapi.RegistrationClient, *grpc.ClientConn, error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
c, err := grpc.DialContext(ctx, unixSocketPath, grpc.WithInsecure(), grpc.WithBlock(),
grpc.WithContextDialer(func(ctx context.Context, addr string) (net.Conn, error) {
return (&net.Dialer{}).DialContext(ctx, "unix", addr)
}),
)
if err != nil {
return nil, nil, fmt.Errorf("failed to dial socket %s, err: %v", unixSocketPath, err)
}
return registerapi.NewRegistrationClient(c), c, nil
}複製代碼
今天就先到這裏,下一章會繼續介紹如何組合上述組件以及默認的回調管理機制的實現,進探究到這裏謝謝你們,感謝分享點贊,反轉又不花錢
k8s源碼閱讀電子書地址: www.yuque.com/baxiaoshi/t…
微信號:baxiaoshi2020
關注公告號閱讀更多源碼分析文章
更多文章關注 www.sreguide.com
本文由博客一文多發平臺 OpenWrite 發佈