專一於大數據及容器雲核心技術解密,可提供全棧的大數據+雲原平生臺諮詢方案,請持續關注本套博客。若有任何學術交流,可隨時聯繫。更多內容請關注《數據雲技術社區》公衆號,或請轉發郵件至1120746959@qq.com。 node
iptables -t nat -nvL KUBE-SERVICES
iptables –t nat –nvL KUBE-SVC-YREYKMMDZGMSMDZU
iptables –t nat –nvL KUBE-SEP-*
複製代碼
func main() { command := app.NewProxyCommand() if err := command.Execute(); err != nil { os.Exit(1) } } 複製代碼
// Run runs the specified ProxyServer. func (o *Options) Run() error { defer close(o.errCh) if len(o.WriteConfigTo) > 0 { return o.writeConfigFile() } proxyServer, err := NewProxyServer(o) if err != nil { return err } if o.CleanupAndExit { return proxyServer.CleanupAndExit() } o.proxyServer = proxyServer return o.runLoop() } 複製代碼
const ( ProxyModeUserspace ProxyMode = "userspace" ProxyModeIPTables ProxyMode = "iptables" ProxyModeIPVS ProxyMode = "ipvs" ProxyModeKernelspace ProxyMode = "kernelspace" ) func newProxyServer( config *proxyconfigapi.KubeProxyConfiguration, cleanupAndExit bool, master string) (*ProxyServer, error) { if config == nil { return nil, errors.New("config is required") } if c, err := configz.New(proxyconfigapi.GroupName); err == nil { c.Set(config) } else { return nil, fmt.Errorf("unable to register configz: %s", err) } protocol := utiliptables.ProtocolIpv4 if net.ParseIP(config.BindAddress).To4() == nil { klog.V(0).Infof("IPv6 bind address (%s), assume IPv6 operation", config.BindAddress) protocol = utiliptables.ProtocolIpv6 } var iptInterface utiliptables.Interface var ipvsInterface utilipvs.Interface var kernelHandler ipvs.KernelHandler var ipsetInterface utilipset.Interface var dbus utildbus.Interface // Create a iptables utils. execer := exec.New() dbus = utildbus.New() iptInterface = utiliptables.New(execer, dbus, protocol) kernelHandler = ipvs.NewLinuxKernelHandler() ipsetInterface = utilipset.New(execer) canUseIPVS, _ := ipvs.CanUseIPVSProxier(kernelHandler, ipsetInterface) if canUseIPVS { ipvsInterface = utilipvs.New(execer) } // We omit creation of pretty much everything if we run in cleanup mode if cleanupAndExit { return &ProxyServer{ execer: execer, IptInterface: iptInterface, IpvsInterface: ipvsInterface, IpsetInterface: ipsetInterface, }, nil } client, eventClient, err := createClients(config.ClientConnection, master) if err != nil { return nil, err } // Create event recorder hostname, err := utilnode.GetHostname(config.HostnameOverride) if err != nil { return nil, err } eventBroadcaster := record.NewBroadcaster() recorder := eventBroadcaster.NewRecorder(proxyconfigscheme.Scheme, v1.EventSource{Component: "kube-proxy", Host: hostname}) nodeRef := &v1.ObjectReference{ Kind: "Node", Name: hostname, UID: types.UID(hostname), Namespace: "", } var healthzServer *healthcheck.HealthzServer var healthzUpdater healthcheck.HealthzUpdater if len(config.HealthzBindAddress) > 0 { healthzServer = healthcheck.NewDefaultHealthzServer(config.HealthzBindAddress, 2*config.IPTables.SyncPeriod.Duration, recorder, nodeRef) healthzUpdater = healthzServer } var proxier proxy.Provider proxyMode := getProxyMode(string(config.Mode), kernelHandler, ipsetInterface, iptables.LinuxKernelCompatTester{}) nodeIP := net.ParseIP(config.BindAddress) if nodeIP.IsUnspecified() { nodeIP = utilnode.GetNodeIP(client, hostname) if nodeIP == nil { return nil, fmt.Errorf("unable to get node IP for hostname %s", hostname) } } if proxyMode == proxyModeIPTables { klog.V(0).Info("Using iptables Proxier.") if config.IPTables.MasqueradeBit == nil { // MasqueradeBit must be specified or defaulted. return nil, fmt.Errorf("unable to read IPTables MasqueradeBit from config") } // TODO this has side effects that should only happen when Run() is invoked. proxier, err = iptables.NewProxier( iptInterface, utilsysctl.New(), execer, config.IPTables.SyncPeriod.Duration, config.IPTables.MinSyncPeriod.Duration, config.IPTables.MasqueradeAll, int(*config.IPTables.MasqueradeBit), config.ClusterCIDR, hostname, nodeIP, recorder, healthzUpdater, config.NodePortAddresses, ) if err != nil { return nil, fmt.Errorf("unable to create proxier: %v", err) } metrics.RegisterMetrics() } else if proxyMode == proxyModeIPVS { klog.V(0).Info("Using ipvs Proxier.") proxier, err = ipvs.NewProxier( iptInterface, ipvsInterface, ipsetInterface, utilsysctl.New(), execer, config.IPVS.SyncPeriod.Duration, config.IPVS.MinSyncPeriod.Duration, config.IPVS.ExcludeCIDRs, config.IPVS.StrictARP, config.IPTables.MasqueradeAll, int(*config.IPTables.MasqueradeBit), config.ClusterCIDR, hostname, nodeIP, recorder, healthzServer, config.IPVS.Scheduler, config.NodePortAddresses, ) if err != nil { return nil, fmt.Errorf("unable to create proxier: %v", err) } metrics.RegisterMetrics() } else { klog.V(0).Info("Using userspace Proxier.") // TODO this has side effects that should only happen when Run() is invoked. proxier, err = userspace.NewProxier( userspace.NewLoadBalancerRR(), net.ParseIP(config.BindAddress), iptInterface, execer, *utilnet.ParsePortRangeOrDie(config.PortRange), config.IPTables.SyncPeriod.Duration, config.IPTables.MinSyncPeriod.Duration, config.UDPIdleTimeout.Duration, config.NodePortAddresses, ) if err != nil { return nil, fmt.Errorf("unable to create proxier: %v", err) } } iptInterface.AddReloadFunc(proxier.Sync) return &ProxyServer{ Client: client, EventClient: eventClient, IptInterface: iptInterface, IpvsInterface: ipvsInterface, IpsetInterface: ipsetInterface, execer: execer, Proxier: proxier, // 不一樣模式代理 Broadcaster: eventBroadcaster, Recorder: recorder, ConntrackConfiguration: config.Conntrack, Conntracker: &realConntracker{}, ProxyMode: proxyMode, NodeRef: nodeRef, MetricsBindAddress: config.MetricsBindAddress, EnableProfiling: config.EnableProfiling, OOMScoreAdj: config.OOMScoreAdj, ConfigSyncPeriod: config.ConfigSyncPeriod.Duration, HealthzServer: healthzServer, }, nil } 複製代碼
// NewProxier returns a new Proxier given an iptables and ipvs Interface instance. // Because of the iptables and ipvs logic, it is assumed that there is only a single Proxier active on a machine. // An error will be returned if it fails to update or acquire the initial lock. // Once a proxier is created, it will keep iptables and ipvs rules up to date in the background and // will not terminate if a particular iptables or ipvs call fails. func NewProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, ipset utilipset.Interface, sysctl utilsysctl.Interface, exec utilexec.Interface, syncPeriod time.Duration, minSyncPeriod time.Duration, excludeCIDRs []string, strictARP bool, masqueradeAll bool, masqueradeBit int, clusterCIDR string, hostname string, nodeIP net.IP, recorder record.EventRecorder, healthzServer healthcheck.HealthzUpdater, scheduler string, nodePortAddresses []string, ) (*Proxier, error) { // Set the route_localnet sysctl we need for if val, _ := sysctl.GetSysctl(sysctlRouteLocalnet); val != 1 { if err := sysctl.SetSysctl(sysctlRouteLocalnet, 1); err != nil { return nil, fmt.Errorf("can't set sysctl %s: %v", sysctlRouteLocalnet, err) } } // Proxy needs br_netfilter and bridge-nf-call-iptables=1 when containers // are connected to a Linux bridge (but not SDN bridges). Until most // plugins handle this, log when config is missing if val, err := sysctl.GetSysctl(sysctlBridgeCallIPTables); err == nil && val != 1 { klog.Infof("missing br-netfilter module or unset sysctl br-nf-call-iptables; proxy may not work as intended") } // Set the conntrack sysctl we need for if val, _ := sysctl.GetSysctl(sysctlVSConnTrack); val != 1 { if err := sysctl.SetSysctl(sysctlVSConnTrack, 1); err != nil { return nil, fmt.Errorf("can't set sysctl %s: %v", sysctlVSConnTrack, err) } } // Set the connection reuse mode if val, _ := sysctl.GetSysctl(sysctlConnReuse); val != 0 { if err := sysctl.SetSysctl(sysctlConnReuse, 0); err != nil { return nil, fmt.Errorf("can't set sysctl %s: %v", sysctlConnReuse, err) } } // Set the expire_nodest_conn sysctl we need for if val, _ := sysctl.GetSysctl(sysctlExpireNoDestConn); val != 1 { if err := sysctl.SetSysctl(sysctlExpireNoDestConn, 1); err != nil { return nil, fmt.Errorf("can't set sysctl %s: %v", sysctlExpireNoDestConn, err) } } // Set the expire_quiescent_template sysctl we need for if val, _ := sysctl.GetSysctl(sysctlExpireQuiescentTemplate); val != 1 { if err := sysctl.SetSysctl(sysctlExpireQuiescentTemplate, 1); err != nil { return nil, fmt.Errorf("can't set sysctl %s: %v", sysctlExpireQuiescentTemplate, err) } } // Set the ip_forward sysctl we need for if val, _ := sysctl.GetSysctl(sysctlForward); val != 1 { if err := sysctl.SetSysctl(sysctlForward, 1); err != nil { return nil, fmt.Errorf("can't set sysctl %s: %v", sysctlForward, err) } } if strictARP { // Set the arp_ignore sysctl we need for if val, _ := sysctl.GetSysctl(sysctlArpIgnore); val != 1 { if err := sysctl.SetSysctl(sysctlArpIgnore, 1); err != nil { return nil, fmt.Errorf("can't set sysctl %s: %v", sysctlArpIgnore, err) } } // Set the arp_announce sysctl we need for if val, _ := sysctl.GetSysctl(sysctlArpAnnounce); val != 2 { if err := sysctl.SetSysctl(sysctlArpAnnounce, 2); err != nil { return nil, fmt.Errorf("can't set sysctl %s: %v", sysctlArpAnnounce, err) } } } // Generate the masquerade mark to use for SNAT rules. masqueradeValue := 1 << uint(masqueradeBit) masqueradeMark := fmt.Sprintf("%#08x/%#08x", masqueradeValue, masqueradeValue) if nodeIP == nil { klog.Warningf("invalid nodeIP, initializing kube-proxy with 127.0.0.1 as nodeIP") nodeIP = net.ParseIP("127.0.0.1") } isIPv6 := utilnet.IsIPv6(nodeIP) klog.V(2).Infof("nodeIP: %v, isIPv6: %v", nodeIP, isIPv6) if len(clusterCIDR) == 0 { klog.Warningf("clusterCIDR not specified, unable to distinguish between internal and external traffic") } else if utilnet.IsIPv6CIDRString(clusterCIDR) != isIPv6 { return nil, fmt.Errorf("clusterCIDR %s has incorrect IP version: expect isIPv6=%t", clusterCIDR, isIPv6) } if len(scheduler) == 0 { klog.Warningf("IPVS scheduler not specified, use %s by default", DefaultScheduler) scheduler = DefaultScheduler } healthChecker := healthcheck.NewServer(hostname, recorder, nil, nil) // use default implementations of deps proxier := &Proxier{ portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable), serviceMap: make(proxy.ServiceMap), serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, &isIPv6, recorder), endpointsMap: make(proxy.EndpointsMap), endpointsChanges: proxy.NewEndpointChangeTracker(hostname, nil, &isIPv6, recorder), syncPeriod: syncPeriod, minSyncPeriod: minSyncPeriod, excludeCIDRs: parseExcludedCIDRs(excludeCIDRs), iptables: ipt, masqueradeAll: masqueradeAll, masqueradeMark: masqueradeMark, exec: exec, clusterCIDR: clusterCIDR, hostname: hostname, nodeIP: nodeIP, portMapper: &listenPortOpener{}, recorder: recorder, healthChecker: healthChecker, healthzServer: healthzServer, ipvs: ipvs, ipvsScheduler: scheduler, ipGetter: &realIPGetter{nl: NewNetLinkHandle(isIPv6)}, iptablesData: bytes.NewBuffer(nil), filterChainsData: bytes.NewBuffer(nil), natChains: bytes.NewBuffer(nil), natRules: bytes.NewBuffer(nil), filterChains: bytes.NewBuffer(nil), filterRules: bytes.NewBuffer(nil), netlinkHandle: NewNetLinkHandle(isIPv6), ipset: ipset, nodePortAddresses: nodePortAddresses, networkInterfacer: utilproxy.RealNetwork{}, gracefuldeleteManager: NewGracefulTerminationManager(ipvs), } // initialize ipsetList with all sets we needed proxier.ipsetList = make(map[string]*IPSet) for _, is := range ipsetInfo { proxier.ipsetList[is.name] = NewIPSet(ipset, is.name, is.setType, isIPv6, is.comment) } burstSyncs := 2 klog.V(3).Infof("minSyncPeriod: %v, syncPeriod: %v, burstSyncs: %d", minSyncPeriod, syncPeriod, burstSyncs) //這裏定義了proxier的syncRunner字段,即proxier的具體運行邏輯 proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs) proxier.gracefuldeleteManager.Run() return proxier, nil } 複製代碼
// runLoop will watch on the update change of the proxy server's configuration file. // Return an error when updated func (o *Options) runLoop() error { if o.watcher != nil { o.watcher.Run() } // run the proxy in goroutine go func() { err := o.proxyServer.Run() o.errCh <- err }() for { err := <-o.errCh if err != nil { return err } } } 複製代碼
// Run runs the specified ProxyServer. This should never exit (unless CleanupAndExit is set). // TODO: At the moment, Run() cannot return a nil error, otherwise it's caller will never exit. Update callers of Run to handle nil errors. func (s *ProxyServer) Run() error { // To help debugging, immediately log version klog.Infof("Version: %+v", version.Get()) // TODO(vmarmol): Use container config for this. var oomAdjuster *oom.OOMAdjuster if s.OOMScoreAdj != nil { oomAdjuster = oom.NewOOMAdjuster() if err := oomAdjuster.ApplyOOMScoreAdj(0, int(*s.OOMScoreAdj)); err != nil { klog.V(2).Info(err) } } if s.Broadcaster != nil && s.EventClient != nil { s.Broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: s.EventClient.Events("")}) } // Start up a healthz server if requested if s.HealthzServer != nil { s.HealthzServer.Run() } // Start up a metrics server if requested if len(s.MetricsBindAddress) > 0 { proxyMux := mux.NewPathRecorderMux("kube-proxy") healthz.InstallHandler(proxyMux) proxyMux.HandleFunc("/proxyMode", func(w http.ResponseWriter, r *http.Request) { fmt.Fprintf(w, "%s", s.ProxyMode) }) proxyMux.Handle("/metrics", prometheus.Handler()) if s.EnableProfiling { routes.Profiling{}.Install(proxyMux) } configz.InstallHandler(proxyMux) go wait.Until(func() { err := http.ListenAndServe(s.MetricsBindAddress, proxyMux) if err != nil { utilruntime.HandleError(fmt.Errorf("starting metrics server failed: %v", err)) } }, 5*time.Second, wait.NeverStop) } // Tune conntrack, if requested // Conntracker is always nil for windows if s.Conntracker != nil { max, err := getConntrackMax(s.ConntrackConfiguration) if err != nil { return err } if max > 0 { err := s.Conntracker.SetMax(max) if err != nil { if err != errReadOnlySysFS { return err } // errReadOnlySysFS is caused by a known docker issue (https://github.com/docker/docker/issues/24000), // the only remediation we know is to restart the docker daemon. // Here we'll send an node event with specific reason and message, the // administrator should decide whether and how to handle this issue, // whether to drain the node and restart docker. Occurs in other container runtimes // as well. // TODO(random-liu): Remove this when the docker bug is fixed. const message = "CRI error: /sys is read-only: " + "cannot modify conntrack limits, problems may arise later (If running Docker, see docker issue #24000)" s.Recorder.Eventf(s.NodeRef, api.EventTypeWarning, err.Error(), message) } } if s.ConntrackConfiguration.TCPEstablishedTimeout != nil && s.ConntrackConfiguration.TCPEstablishedTimeout.Duration > 0 { timeout := int(s.ConntrackConfiguration.TCPEstablishedTimeout.Duration / time.Second) if err := s.Conntracker.SetTCPEstablishedTimeout(timeout); err != nil { return err } } if s.ConntrackConfiguration.TCPCloseWaitTimeout != nil && s.ConntrackConfiguration.TCPCloseWaitTimeout.Duration > 0 { timeout := int(s.ConntrackConfiguration.TCPCloseWaitTimeout.Duration / time.Second) if err := s.Conntracker.SetTCPCloseWaitTimeout(timeout); err != nil { return err } } } informerFactory := informers.NewSharedInformerFactoryWithOptions(s.Client, s.ConfigSyncPeriod, informers.WithTweakListOptions(func(options *v1meta.ListOptions) { options.LabelSelector = "!" + apis.LabelServiceProxyName })) // Create configs (i.e. Watches for Services and Endpoints) // Note: RegisterHandler() calls need to happen before creation of Sources because sources // only notify on changes, and the initial update (on process start) may be lost if no handlers // are registered yet. // ServiceConfig是kube-proxy中用於監聽service變化的組件,其本質就是informer,進入NewServiceConfig方法可知。 serviceConfig := config.NewServiceConfig(informerFactory.Core().V1().Services(), s.ConfigSyncPeriod) // EventHandlerd的處理邏輯是由Proxier來處理的 serviceConfig.RegisterEventHandler(s.Proxier) go serviceConfig.Run(wait.NeverStop) // endpointsConfig是kube-proxy中用於監聽endpoints變化的組件,其本質就是informer endpointsConfig := config.NewEndpointsConfig(informerFactory.Core().V1().Endpoints(), s.ConfigSyncPeriod) //EventHandler的處理邏輯是由Proxier來處理的 endpointsConfig.RegisterEventHandler(s.Proxier) go endpointsConfig.Run(wait.NeverStop) // This has to start after the calls to NewServiceConfig and NewEndpointsConfig because those // functions must configure their shared informer event handlers first. informerFactory.Start(wait.NeverStop) // Birth Cry after the birth is successful s.birthCry() // Just loop forever for now... s.Proxier.SyncLoop() return nil } 複製代碼
// NewServiceConfig creates a new ServiceConfig. func NewServiceConfig(serviceInformer coreinformers.ServiceInformer, resyncPeriod time.Duration) *ServiceConfig { result := &ServiceConfig{ listerSynced: serviceInformer.Informer().HasSynced, } serviceInformer.Informer().AddEventHandlerWithResyncPeriod( cache.ResourceEventHandlerFuncs{ AddFunc: result.handleAddService, UpdateFunc: result.handleUpdateService, DeleteFunc: result.handleDeleteService, }, resyncPeriod, ) return result } // result.handleUpdateService func (c *ServiceConfig) handleUpdateService(oldObj, newObj interface{}) { oldService, ok := oldObj.(*v1.Service) if !ok { utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", oldObj)) return } service, ok := newObj.(*v1.Service) if !ok { utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", newObj)) return } for i := range c.eventHandlers { klog.V(4).Info("Calling handler.OnServiceUpdate") c.eventHandlers[i].OnServiceUpdate(oldService, service) } } 複製代碼
// OnServiceUpdate is called whenever modification of an existing // service object is observed. func (proxier *Proxier) OnServiceUpdate(oldService, service *v1.Service) { if proxier.serviceChanges.Update(oldService, service) && proxier.isInitialized() { proxier.syncRunner.Run() } } // 僅更新事件到Map // Run the function as soon as possible. If this is called while Loop is not // running, the call may be deferred indefinitely. // If there is already a queued request to call the underlying function, it // may be dropped - it is just guaranteed that we will try calling the // underlying function as soon as possible starting from now. func (bfr *BoundedFrequencyRunner) Run() { // If it takes a lot of time to run the underlying function, noone is really // processing elements from <run> channel. So to avoid blocking here on the // putting element to it, we simply skip it if there is already an element // in it. select { case bfr.run <- struct{}{}: default: } } 複製代碼
pkg/proxy/iptables/proxier.go func NewProxier(...) (*Proxier, error) { ...... proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs) return proxier, nil } 複製代碼
for _, chain := range iptablesJumpChains { if _, err := proxier.iptables.EnsureChain(chain.table, chain.chain); err != nil { klog.Errorf("Failed to ensure that %s chain %s exists: %v", chain.table, kubeServicesChain, err) return } args := append(chain.extraArgs, "-m", "comment", "--comment", chain.comment, "-j", string(chain.chain), ) if _, err := proxier.iptables.EnsureRule(utiliptables.Prepend, chain.table, chain.sourceChain, args...); err != nil { klog.Errorf("Failed to ensure that %s chain %s jumps to %s: %v", chain.table, chain.sourceChain, chain.chain, err) return } } 複製代碼
svcInfo, ok := svc.(*serviceInfo)
if !ok {
klog.Errorf("Failed to cast serviceInfo %q", svcName.String())
continue
}
isIPv6 := utilnet.IsIPv6(svcInfo.ClusterIP)
protocol := strings.ToLower(string(svcInfo.Protocol))
svcNameString := svcInfo.serviceNameString
hasEndpoints := len(proxier.endpointsMap[svcName]) > 0
svcChain := svcInfo.servicePortChainName
if hasEndpoints {
// Create the per-service chain, retaining counters if possible.
if chain, ok := existingNATChains[svcChain]; ok {
writeBytesLine(proxier.natChains, chain)
} else {
writeLine(proxier.natChains, utiliptables.MakeChainLine(svcChain))
}
activeNATChains[svcChain] = true
}
...// Capture the clusterIP.
if hasEndpoints {
args = append(args[:0],
"-A", string(kubeServicesChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s cluster IP"`, svcNameString),
"-m", protocol, "-p", protocol,
"-d", utilproxy.ToCIDR(svcInfo.ClusterIP),
"--dport", strconv.Itoa(svcInfo.Port),
)
...
} else {
writeLine(proxier.filterRules,
"-A", string(kubeServicesChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString),
"-m", protocol, "-p", protocol,
"-d", utilproxy.ToCIDR(svcInfo.ClusterIP),
"--dport", strconv.Itoa(svcInfo.Port),
"-j", "REJECT",
)
}
// Capture externalIPs.
for _, externalIP := range svcInfo.ExternalIPs {
...
}
複製代碼
proxier.iptablesData.Reset() proxier.iptablesData.Write(proxier.filterChains.Bytes()) proxier.iptablesData.Write(proxier.filterRules.Bytes()) proxier.iptablesData.Write(proxier.natChains.Bytes()) proxier.iptablesData.Write(proxier.natRules.Bytes()) klog.V(5).Infof("Restoring iptables rules: %s", proxier.iptablesData.Bytes()) err = proxier.iptables.RestoreAll(proxier.iptablesData.Bytes(), utiliptables.NoFlushTables, utiliptables.RestoreCounters) if err != nil { klog.Errorf("Failed to execute iptables-restore: %v", err) // Revert new local ports. klog.V(2).Infof("Closing local ports after iptables-restore failure") utilproxy.RevertPorts(replacementPortsMap, proxier.portsMap) return } 複製代碼
proxier和syncProxyRules纔是笑到最後,掌握實權的,核心精華就在這裏。linux
專一於大數據及容器雲核心技術解密,可提供全棧的大數據+雲原平生臺諮詢方案,請持續關注本套博客。若有任何學術交流,可隨時聯繫。更多內容請關注《數據雲技術社區》公衆號,或請轉發郵件至1120746959@qq.com git