kubeedge源碼分析系列之cloudcore

kubeedge源碼分析系列之cloudcore

本系列的源碼分析是在 commit da92692baa660359bb314d89dfa3a80bffb1d26c 之上進行的。node

cloudcore部分的源碼分析是在kubeedge源碼分析系列之總體架構基礎上展開的,若是沒有閱讀過kubeedge源碼分析系列之總體架構,直接閱讀本文,會感受比較突兀。git

本文概述

本文對cloudcore進行了展開,對cloudcore組件中功能模塊共用的消息框架和各功能模塊的具體功能進行深刻剖析,具體以下:github

  1. cloudcore功能模塊之間通訊的消息框架
  2. cloudhub剖析
  3. edgecontroller剖析
  4. devicecontroller剖析

cloudcore功能模塊之間通訊的消息框架

cloudcore組件中各個功能模塊之間是經過Beehive來組織和管理的,Beehive是一個基於go-channels的消息框架,但本文不對Beehive作全面的分析,只會分析cloudcore組件中用到的Beehive的相關功能。web

Beehive的消息框架是在cloudcore的功能模塊啓動的時候一如的,具體以下:api

kubeedge/beehive/pkg/core/core.go緩存

import (
	...
	"github.com/kubeedge/beehive/pkg/core/context"
)

// StartModules starts modules that are registered
func StartModules() {
	coreContext := context.GetContext(context.MsgCtxTypeChannel)

	modules := GetModules()
	for name, module := range modules {
		//Init the module
		coreContext.AddModule(name)
		//Assemble typeChannels for sendToGroup
		coreContext.AddModuleGroup(name, module.Group())
		go module.Start(coreContext)
		klog.Infof("Starting module %v", name)
	}
}
複製代碼

從上面的函數定義中能夠看到,在cloudcore的功能模塊(model)啓動以前,首先實例化了一個beehive的context,而後再得到各個功能模塊,最後用一個for循環逐個啓動功能模塊,並將已實例化的beehive的context做爲參數,傳入每一個功能模塊的啓動函數(Start(...))中。這樣cloudcore中獨立的功能模塊就被beehive的context給組織成了一個統一的總體,至於beehive的context是怎麼作到的,還須要進入beehive的context的實例化函數context.GetContext(...)一探究竟,具體以下:服務器

kubeedge/beehive/pkg/core/context/contex_factory.go微信

// GetContext gets global context instance
func GetContext(contextType string) *Context {
	once.Do(func() {
		context = &Context{}
		switch contextType {
		case MsgCtxTypeChannel:
			channelContext := NewChannelContext()
			context.messageContext = channelContext
			context.moduleContext = channelContext
		default:
			klog.Warningf("Do not support context type:%s", contextType)
		}
	})
	return context
}
複製代碼

上面的函數定義能夠中的第3行context = &Context{}實例化了一個空context,進入該實例化context的定義,探究一下它的具體內容:websocket

kubeedge/beehive/pkg/core/context/context.go架構

// Context is global context object
type Context struct {
	moduleContext  ModuleContext
	messageContext MessageContext
}

//ModuleContext is interface for context module management
type ModuleContext interface {
	AddModule(module string)
	AddModuleGroup(module, group string)
	Cleanup(module string)
}

//MessageContext is interface for message syncing
type MessageContext interface {
	// async mode
	Send(module string, message model.Message)
	Receive(module string) (model.Message, error)
	// sync mode
	SendSync(module string, message model.Message, timeout time.Duration) (		model.Message, error)
	SendResp(message model.Message)
	// group broadcast
	SendToGroup(moduleType string, message model.Message)
	SendToGroupSync(moduleType string, message model.Message, timeout 		time.Duration) error
}
複製代碼

從上面的Context Struct定義能夠看出該Context的兩個屬性moduleContext,messageContext都是interface類型的,因此能夠判定該Context確定不是真身,在函數GetContext(...)(kubeedge/beehive/pkg/core/context/context.go)繼續往下看,在第6行channelContext := NewChannelContext(),又一個context實例化函數NewChannelContext(),進入該函數的定義去看一下,它是否是真身:

kubeedge/beehive/pkg/core/context/context.go

// ChannelContext is object for Context channel
type ChannelContext struct {
	//ConfigFactory goarchaius.ConfigurationFactory
	channels     map[string]chan model.Message
	chsLock      sync.RWMutex
	typeChannels map[string]map[string]chan model.Message
	typeChsLock  sync.RWMutex
	anonChannels map[string]chan model.Message
	anonChsLock  sync.RWMutex
}
複製代碼

在該struct的定義文件中,會看到ChannelContext struct實現了Context struct(kubeedge/beehive/pkg/core/context/context.go)屬性包含的全部interface(ModuleContext,MessageContext),毫無疑問ChannelContext struct就是cloudcore中各功能模塊相互通訊的消息隊列框架的真身了。

至於ChannelContext struct(kubeedge/beehive/pkg/core/context/context.go)具體是經過什麼手段來實現cloudcore中各功能模塊相互通訊的,感興趣的同窗能夠根據本文的梳理本身去研究一下,若是想讓咱們繼續深刻分析,能夠在評論區留言。

cloudhub剖析

從「cloudcore功能模塊之間通訊的消息框架」已經分析到各功能模塊的啓動了,cloudhub功能模塊啓動函數的具體內容以下:

kubeedge/cloud/edge/pkg/cloudhub/cloudhub.go

func (a *cloudHub) Start(c *beehiveContext.Context) {
	var ctx context.Context
	a.context = c
	ctx, a.cancel = context.WithCancel(context.Background())

	initHubConfig()

	messageq := channelq.NewChannelMessageQueue(c)

	// start dispatch message from the cloud to edge node
	go messageq.DispatchMessage(ctx)

	// start the cloudhub server
	if util.HubConfig.ProtocolWebsocket {
		go servers.StartCloudHub(servers.ProtocolWebsocket, messageq, c)
	}

	if util.HubConfig.ProtocolQuic {
		go servers.StartCloudHub(servers.ProtocolQuic, messageq, c)
	}

	if util.HubConfig.ProtocolUDS {
		// The uds server is only used to communicate with csi driver from kubeedge on cloud.
		// It is not used to communicate between cloud and edge.
		go udsserver.StartServer(util.HubConfig, c)
	}

}
複製代碼

從以上cloudhub的啓動函數Start(...)定義中,能夠清晰地看出cloudhub在啓動時主要作了以下幾件事:

  1. 接受beehiveContext.Context的通訊框架實例,並保存(a.context = c);
  2. 初始化cloudhub的配置(initHubConfig());
  3. 將接受到的beehiveContext.Context的通訊框架實例進行修飾(messageq := channelq.NewChannelMessageQueue(c)),在原用通訊框架實例的基礎上加入了緩存功能;
  4. 啓動一個消息分發go routine(go messageq.DispatchMessage(ctx)),監聽雲端的事件下發到edge端去;
  5. 若是設置了websocket啓動,就啓動websocket服務器的go routine(go servers.StartCloudHub(servers.ProtocolWebsocket, messageq, c) );
  6. 若是設置了quic啓動,就啓動quic服務器的go routine(go servers.StartCloudHub(servers.ProtocolQuic, messageq, c));
  7. 若是設置了unix domain socket啓動,就啓動unix domain socket服務器的go routine(go udsserver.StartServer(util.HubConfig, c));

須要對以上另外說明的是:

  1. websocket server和quic server的功能是相同,也就是說二者能夠選其一,若是條件容許的話,建議選quic server,速度更快一些;
  2. unix domain socket是用來與kubeedge的csi(container storage interface) 通訊的;

以上就是cloudcore中cloudhub功能模塊的剖析,若是有同窗對於cloudhub具體都作了那些事,是怎麼作的感興趣,能夠在本文的基礎上自行剖析,若是想讓筆者分析,請評論區留言。

edgecontroller剖析

從「cloudcore功能模塊之間通訊的消息框架」已經分析到各功能模塊的啓動了,edgecontroller功能模塊啓動函數的具體內容以下:

kubeedge/cloud/pkg/edgecontroller/controller.go

// Start controller
func (ctl *Controller) Start(c *beehiveContext.Context) {
	var ctx context.Context

	config.Context = c
	ctx, ctl.cancel = context.WithCancel(context.Background())

	initConfig()

	upstream, err := controller.NewUpstreamController()
	if err != nil {
		klog.Errorf("new upstream controller failed with error: %s", err)
		os.Exit(1)
	}
	upstream.Start(ctx)

	downstream, err := controller.NewDownstreamController()
	if err != nil {
		klog.Warningf("new downstream controller failed with error: %s", err)
		os.Exit(1)
	}
	downstream.Start(ctx)

}
複製代碼

從以上edgecontroller的啓動函數Start(...)定義中,能夠清晰地看出cloudhub在啓動時主要作了以下幾件事:

  1. 接受beehiveContext.Context的通訊框架實例,並保存(config.Context = c);

  2. 初始化edgecontroller的配置(initHubConfig());

  3. 實例化並啓動UpstreamController

    upstream, err := controller.NewUpstreamController()
     if err != nil {
     	klog.Errorf("new upstream controller failed with error: %s", err)
     	os.Exit(1)
     }
     upstream.Start(ctx)
    複製代碼
  4. 實例化並啓動DownstreamController

    downstream, err := controller.NewDownstreamController()
     if err != nil {
     	klog.Warningf("new downstream controller failed with error: %s", err)
     	os.Exit(1)
     }
     downstream.Start(ctx)	
    複製代碼

下面深刻分析UpstreamController和DownstreamController都具體作了哪些事:

UpstreamController

順着UpstreamController的實例化函數找到UpstreamController struct定義:

kubeedge/cloud/pkg/edgecontroller/upstream.go

// UpstreamController subscribe messages from edge and sync to k8s api server
type UpstreamController struct {
	kubeClient   *kubernetes.Clientset
	messageLayer messagelayer.MessageLayer

	// message channel
	nodeStatusChan            chan model.Message
	podStatusChan             chan model.Message
	secretChan                chan model.Message
	configMapChan             chan model.Message
	serviceChan               chan model.Message
	endpointsChan             chan model.Message
	persistentVolumeChan      chan model.Message
	persistentVolumeClaimChan chan model.Message
	volumeAttachmentChan      chan model.Message
	queryNodeChan             chan model.Message
	updateNodeChan            chan model.Message
}	
複製代碼

看到上面UpstreamController的定義,估計正在閱讀本文的同窗已經在猜,UpstreamController是否是負責處理edge節點上報的nodeStatus、podStatus、secret、configMap、service、endpoints、persistentVolume、persistentVolumeClaim、volumeAttachment等資源的信息的,恭喜你猜對了,UpstreamController就是幹這個事的。

以上就是UpstreamController功能模塊的剖析,若是有同窗對於UpstreamController具體怎麼處理edge節點上報的nodeStatus、podStatus、secret、configMap、service、endpoints、persistentVolume、persistentVolumeClaim、volumeAttachment等資源的信息的的感興趣,能夠在本文的基礎上自行剖析,若是想讓筆者分析,請評論區留言。

DownstreamController

順着DownstreamController的實例化函數找到DownstreamController struct定義:

kubeedge/cloud/pkg/edgecontroller/downstream.go

// DownstreamController watch kubernetes api server and send change to edge
type DownstreamController struct {
	kubeClient   *kubernetes.Clientset
	messageLayer messagelayer.MessageLayer

	podManager *manager.PodManager

	configmapManager *manager.ConfigMapManager

	secretManager *manager.SecretManager

	nodeManager *manager.NodesManager

	serviceManager *manager.ServiceManager

	endpointsManager *manager.EndpointsManager

	lc *manager.LocationCache
}
複製代碼

看到上面DownstreamController的定義,和UpstreamController的功能,同窗們應該也能夠對DownstreamController的具體功能猜到幾分了,對DownstreamController就是監聽cloud端pod、configmap、secret、node、service和endpoints等資源的事件,並下發到edge節點的。

以上就是DownstreamController功能模塊的剖析,若是有同窗對於DownstreamController具體怎麼處理edge節點上報的pod、configmap、secret、node、service和endpoints等資源的信息的的感興趣,能夠在本文的基礎上自行剖析,若是想讓筆者分析,請評論區留言。

devicecontroller剖析

從「cloudcore功能模塊之間通訊的消息框架」已經分析到各功能模塊的啓動了,devicecontroller功能模塊啓動函數的具體內容以下:

kubeedge/cloud/pkg/devicecontroller/module.go

// Start controller
func (dctl *DeviceController) Start(c *beehiveContext.Context) {
	var ctx context.Context
	config.Context = c

	ctx, dctl.cancel = context.WithCancel(context.Background())

	initConfig()

	downstream, err := controller.NewDownstreamController()
	if err != nil {
		klog.Errorf("New downstream controller failed with error: %s", err)
		os.Exit(1)
	}
	upstream, err := controller.NewUpstreamController(downstream)
	if err != nil {
		klog.Errorf("new upstream controller failed with error: %s", err)
		os.Exit(1)
	}

	downstream.Start(ctx)
	// wait for downstream controller to start and load deviceModels and devices
	// TODO think about sync
	time.Sleep(1 * time.Second)
	upstream.Start(ctx)
}
複製代碼

看上面devicecontroller的啓動函數是否似曾相識相識,那是由於它和edgecontroller的啓動函數的邏輯基本相同,因此對於devicecontroller的剖析,同窗能夠參考「edgecontrller剖析「。

到此「kubeedge源碼分析系列之cloudcore」就所有結束了,你們在根據本文去閱讀kubeedge的源碼時,必定要時刻提醒本身,cloudcore中的個model之間時能夠經過「beehive的context消息通訊框架」進行相互通訊的,不然,會產生一些困惑。

本文是「之江實驗室端邊雲操做系統團隊」 kubeedge源碼分析系列的第二篇,接下來會對各組件的源碼進行系統分析。若是有機會咱們團隊也會積極解決kubeedge的issue和實現新的feature。

這是咱們「之江實驗室端邊雲操做系統團隊」維護的「之江實驗室kubeedge源碼分析羣「微信羣,歡迎你們的參與!!!

之江實驗室kubeedge源碼分析羣二維碼入口
相關文章
相關標籤/搜索