Contents

Kubernetes教程(十七)--- Kubernetes Extension: Aggregated API

本文主要记录了如何使用 Kubernetes Aggregated API,以及通过源码分析其大致实现。

一句话描述什么是 Kubernetes Aggregated API ?

Aggregator for Kubernetes-style API servers: dynamic registration, discovery summarization, secure proxy。

设计文档: design-proposals#aggregated-api-servers.md

1. 简单使用

我们可以通过 APIService 对象来动态的往 kube-apiserver 上注册我们自己的服务。具体如下:

首先往集群里创建一个 APIService 对象来告知 k8s 我们要注册的服务相关信息,完整 yaml 如下

apiVersion: apiregistration.k8s.io/v1beta1
kind: APIService
metadata:
  name: v1alpha1.custom-metrics.metrics.k8s.io
spec:
  insecureSkipTLSVerify: true
  group: custom-metrics.metrics.k8s.io
  groupPriorityMinimum: 1000
  versionPriority: 15
  service:
    name: api
    namespace: custom-metrics
  version: v1alpha1

重点关注 spec.groupspec.version 以及 spec.service 这几个字段。

上述对象创建后即可使用以下地址进行访问,具体的路径拼接规则为:

proxyPath := "/apis/" + apiService.Spec.Group + "/" + apiService.Spec.Version

后端服务则需要以 Service 方式暴露出来。

结合上面的例子拼接出来的 URL 就是 /apis/custom-metrics.metrics.k8s.io/v1alpha1

我们请求该地址时请求会被转发到 custom-metrics 这个 namespace 下的名叫api 的 service 上。

2. 大致原理

分析源码之前先介绍下大致的一个实现,后续可以带着这个思路去看源码。

kube-apiserver

kube-apiserver 其实包含三种 APIServer:

  • AggregatorServer:负责处理 apiregistration.k8s.io 组下的 APIService 资源请求,同时将来自用户的请求拦截转发给 Aggregated APIServer(AA);
  • KubeAPIServer:负责对请求的一些通用处理,包括:认证、鉴权以及各个内建资源(pod, deployment,service)的 REST 服务等;
  • ApiExtensionsServer:负责 CustomResourceDefinition(CRD)apiResources 以及 apiVersions 的注册,同时处理 CRD 以及相应 CustomResource(CR)的REST请求(如果对应 CR 不能被处理的话则会返回404),也是 apiserver Delegation 的最后一环;

kube-apiserver 中以责任链方式将这三个不同的 server 服务串起来,形成了 kube-apiserver,具体如下:

../../../img/kubernetes/aggregateapi/kube-apiserver-flow.png

服务首先由 APIAggregator 进行处理,如果匹配不上则由 APIServer 处理,最后才是 CRDServer。

APIAggregator

知道 kube-apiserver 实现后再回过头来看 APIAggregator 的实现,为什么我们创建一个 APIService 对象,然后就能请求了,动态注册是什么做到的。

实际上内部一直有一个 controller 在 watch APIService 对象的变化,我们刚创建 APIService 对象,controller 这边就感知到了,然后拿到具体信息,根据规则拼接处访问地址,并根据 spec.service 中的信息拿到对应的 service 构建 handler,将具体请求转发到 service。

如果熟悉 http 框架的话应该知道,大部分 http 框架最终都是用一个 map 结构来存放 path 和 handler 的一个映射关系,因此这里动态注册可以理解为往这个 map 对象里添加了一个值。

3. 源码分析

基于 k8s 1.24 版本

具体代码在 staging/src/k8s.io/kube-aggregator/pkg/apiserver/ 目录下。

ServiceRegistrationController

这个就是前面提到的一直在 watch APIServer 对象的那个 controller。

代码和 k8s 其它 controller一样,watch 对应资源的变化并分发到 add、update 和 delete 方法。

// staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiservice_controller.go#L59

// NewAPIServiceRegistrationController returns a new APIServiceRegistrationController.
func NewAPIServiceRegistrationController(apiServiceInformer informers.APIServiceInformer, apiHandlerManager APIHandlerManager) *APIServiceRegistrationController {
   c := &APIServiceRegistrationController{
      apiHandlerManager: apiHandlerManager,
      apiServiceLister:  apiServiceInformer.Lister(),
      apiServiceSynced:  apiServiceInformer.Informer().HasSynced,
      queue:             workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "APIServiceRegistrationController"),
   }

   apiServiceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
      AddFunc:    c.addAPIService,
      UpdateFunc: c.updateAPIService,
      DeleteFunc: c.deleteAPIService,
   })

   c.syncFn = c.sync // 核心逻辑在这个方法里

   return c
}

核心处理逻辑在 c.sync 方法里

// staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiservice_controller.go#L79
func (c *APIServiceRegistrationController) sync(key string) error {
   apiService, err := c.apiServiceLister.Get(key)
   if apierrors.IsNotFound(err) {
      c.apiHandlerManager.RemoveAPIService(key)
      return nil
   }
   if err != nil {
      return err
   }

   return c.apiHandlerManager.AddAPIService(apiService)
}

如果对应资源不存在了,则调用 RemoveAPIService 从 APIService 中移除,否则调用 AddAPIService 方法进行添加。

接下来继续追踪 AddAPIService & RemoveAPIService 方法实现

AddAPIService & RemoveAPIService

具体如下:

// staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go#L419
// AddAPIService adds an API service.  It is not thread-safe, so only call it on one thread at a time please.
// It's a slow moving API, so its ok to run the controller on a single thread
func (s *APIAggregator) AddAPIService(apiService *v1.APIService) error {
   // if the proxyHandler already exists, it needs to be updated. The aggregation bits do not
   // since they are wired against listers because they require multiple resources to respond
   if proxyHandler, exists := s.proxyHandlers[apiService.Name]; exists {
      proxyHandler.updateAPIService(apiService)
      if s.openAPIAggregationController != nil {
         s.openAPIAggregationController.UpdateAPIService(proxyHandler, apiService)
      }
      if s.openAPIV3AggregationController != nil {
         s.openAPIV3AggregationController.UpdateAPIService(proxyHandler, apiService)
      }
      return nil
   }
   // 这里就是前面提到的 url 的拼接规则
   proxyPath := "/apis/" + apiService.Spec.Group + "/" + apiService.Spec.Version
   // v1. is a special case for the legacy API.  It proxies to a wider set of endpoints.
   if apiService.Name == legacyAPIServiceName {
      proxyPath = "/api"
   }
   // 这里在构建 handler 了
   // register the proxy handler
   proxyHandler := &proxyHandler{
      localDelegate:              s.delegateHandler,
      proxyCurrentCertKeyContent: s.proxyCurrentCertKeyContent,
      proxyTransport:             s.proxyTransport,
      serviceResolver:            s.serviceResolver,
      egressSelector:             s.egressSelector,
   }
   proxyHandler.updateAPIService(apiService)
   if s.openAPIAggregationController != nil {
      s.openAPIAggregationController.AddAPIService(proxyHandler, apiService)
   }
   if s.openAPIV3AggregationController != nil {
      s.openAPIV3AggregationController.AddAPIService(proxyHandler, apiService)
   }
  
   s.proxyHandlers[apiService.Name] = proxyHandler
   // 开始注册 可以看到,这里同时注册了带/ 和不带/ 两个 path
   s.GenericAPIServer.Handler.NonGoRestfulMux.Handle(proxyPath, proxyHandler)
   s.GenericAPIServer.Handler.NonGoRestfulMux.UnlistedHandlePrefix(proxyPath+"/", proxyHandler)

   // if we're dealing with the legacy group, we're done here
   if apiService.Name == legacyAPIServiceName {
      return nil
   }

   // if we've already registered the path with the handler, we don't want to do it again.
   if s.handledGroups.Has(apiService.Spec.Group) {
      return nil
   }

   // it's time to register the group aggregation endpoint
   groupPath := "/apis/" + apiService.Spec.Group
   groupDiscoveryHandler := &apiGroupHandler{
      codecs:    aggregatorscheme.Codecs,
      groupName: apiService.Spec.Group,
      lister:    s.lister,
      delegate:  s.delegateHandler,
   }
   // aggregation is protected
   s.GenericAPIServer.Handler.NonGoRestfulMux.Handle(groupPath, groupDiscoveryHandler)
   s.GenericAPIServer.Handler.NonGoRestfulMux.UnlistedHandle(groupPath+"/", groupDiscoveryHandler)
   s.handledGroups.Insert(apiService.Spec.Group)
   return nil
}

// RemoveAPIService removes the APIService from being handled.  It is not thread-safe, so only call it on one thread at a time please.
// It's a slow moving API, so it's ok to run the controller on a single thread.
func (s *APIAggregator) RemoveAPIService(apiServiceName string) {
   version := v1helper.APIServiceNameToGroupVersion(apiServiceName)

   proxyPath := "/apis/" + version.Group + "/" + version.Version
   // v1. is a special case for the legacy API.  It proxies to a wider set of endpoints.
   if apiServiceName == legacyAPIServiceName {
      proxyPath = "/api"
   }
    // 移除则是调用 Unregister
   s.GenericAPIServer.Handler.NonGoRestfulMux. Unregister(proxyPath)
   s.GenericAPIServer.Handler.NonGoRestfulMux.Unregister(proxyPath + "/")
   if s.openAPIAggregationController != nil {
      s.openAPIAggregationController.RemoveAPIService(apiServiceName)
   }
   if s.openAPIV3AggregationController != nil {
      s.openAPIAggregationController.RemoveAPIService(apiServiceName)
   }
   delete(s.proxyHandlers, apiServiceName)

   // TODO unregister group level discovery when there are no more versions for the group
   // We don't need this right away because the handler properly delegates when no versions are present
}

前面提到的 path 规则就是在这个方法里定义的

proxyPath := "/apis/" + apiService.Spec.Group + "/" + apiService.Spec.Version

然后处理请求的 handle 就是

  proxyHandler := &proxyHandler{
      localDelegate:              s.delegateHandler,
      proxyCurrentCertKeyContent: s.proxyCurrentCertKeyContent,
      proxyTransport:             s.proxyTransport,
      serviceResolver:            s.serviceResolver,
      egressSelector:             s.egressSelector,
   }
   proxyHandler.updateAPIService(apiService)

updateAPIService

上面的 proxyHandler.updateAPIService(apiService) 就是更新这个 proxy 的后端 service

// staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go#L227

func (r *proxyHandler) updateAPIService(apiService *apiregistrationv1api.APIService) {
   if apiService.Spec.Service == nil {
      r.handlingInfo.Store(proxyHandlingInfo{local: true})
      return
   }

   proxyClientCert, proxyClientKey := r.proxyCurrentCertKeyContent()

   clientConfig := &restclient.Config{
      TLSClientConfig: restclient.TLSClientConfig{
         Insecure:   apiService.Spec.InsecureSkipTLSVerify,
         // 拼接 Service 的 DNS 记录
         ServerName: apiService.Spec.Service.Name + "." + apiService.Spec.Service.Namespace + ".svc",
         CertData:   proxyClientCert,
         KeyData:    proxyClientKey,
         CAData:     apiService.Spec.CABundle,
      },
   }
   clientConfig.Wrap(x509metrics.NewDeprecatedCertificateRoundTripperWrapperConstructor(
      x509MissingSANCounter,
      x509InsecureSHA1Counter,
   ))

   newInfo := proxyHandlingInfo{
      name:             apiService.Name,
      restConfig:       clientConfig,
      serviceName:      apiService.Spec.Service.Name,
      serviceNamespace: apiService.Spec.Service.Namespace,
      servicePort:      *apiService.Spec.Service.Port,
      serviceAvailable: apiregistrationv1apihelper.IsAPIServiceConditionTrue(apiService, apiregistrationv1api.Available),
   }
   if r.egressSelector != nil {
      networkContext := egressselector.Cluster.AsNetworkContext()
      var egressDialer utilnet.DialFunc
      egressDialer, err := r.egressSelector.Lookup(networkContext)
      if err != nil {
         klog.Warning(err.Error())
      } else {
         newInfo.restConfig.Dial = egressDialer
      }
   } else if r.proxyTransport != nil && r.proxyTransport.DialContext != nil {
      newInfo.restConfig.Dial = r.proxyTransport.DialContext
   }
   newInfo.proxyRoundTripper, newInfo.transportBuildingError = restclient.TransportFor(newInfo.restConfig)
   if newInfo.transportBuildingError != nil {
      klog.Warning(newInfo.transportBuildingError.Error())
   }
   r.handlingInfo.Store(newInfo)
}

核心如下,根据 service name + namespace 组装成了 svc 的 DNS 记录,在加上 TLS 证书等信息的就构建成了一个 restClient,使用该客户端就可以访问到对应的后端 service 了。

   clientConfig := &restclient.Config{
      TLSClientConfig: restclient.TLSClientConfig{
         Insecure:   apiService.Spec.InsecureSkipTLSVerify,
         ServerName: apiService.Spec.Service.Name + "." + apiService.Spec.Service.Namespace + ".svc",
         CertData:   proxyClientCert,
         KeyData:    proxyClientKey,
         CAData:     apiService.Spec.CABundle,
      },
   }

AddAPIService

最后再把 proxyHandler 存起来

s.proxyHandlers[apiService.Name] = proxyHandler
s.GenericAPIServer.Handler.NonGoRestfulMux.Handle(proxyPath, proxyHandler)
s.GenericAPIServer.Handler.NonGoRestfulMux.UnlistedHandlePrefix(proxyPath+"/", proxyHandler)

最终的添加方法如下

// Handle registers the handler for the given pattern.
// If a handler already exists for pattern, Handle panics.
func (m *PathRecorderMux) Handle(path string, handler http.Handler) {
   m.lock.Lock()
   defer m.lock.Unlock()
   m.trackCallers(path)

   m.exposedPaths = append(m.exposedPaths, path)
   m.pathToHandler[path] = handler
   m.refreshMuxLocked()
}

可以看到最终被存放到了 pathToHandler 这个 map 里。

可以看到和普通 http 框架类似,也是存到一个 map 对象里。

PathRecorderMux 定义如下:

// PathRecorderMux wraps a mux object and records the registered exposedPaths.
type PathRecorderMux struct {
   // name is used for logging so you can trace requests through
   name string

   lock            sync.Mutex
   notFoundHandler http.Handler
   pathToHandler   map[string]http.Handler
   prefixToHandler map[string]http.Handler

   // mux stores a pathHandler and is used to handle the actual serving.
   // Turns out, we want to accept trailing slashes, BUT we don't care about handling
   // everything under them.  This does exactly matches only unless its explicitly requested to
   // do something different
   mux atomic.Value

   // exposedPaths is the list of paths that should be shown at /
   exposedPaths []string

   // pathStacks holds the stacks of all registered paths.  This allows us to show a more helpful message
   // before the "http: multiple registrations for %s" panic.
   pathStacks map[string]string
}

pathHandler.ServeHTTP

pathHandler 对象也实现了一个 ServeHTTP 方法,应该可以直接处理 http 请求,根据 URL 中的 path 拿到对应的 handler 进行处理。

// ServeHTTP makes it an http.Handler
func (m *PathRecorderMux) ServeHTTP(w http.ResponseWriter, r *http.Request) {
   m.mux.Load().(*pathHandler).ServeHTTP(w, r)
}

// ServeHTTP makes it an http.Handler
func (h *pathHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
   // 先看下能否直接匹配,也就是 不带/的path
   if exactHandler, ok := h.pathToHandler[r.URL.Path]; ok {
      klog.V(5).Infof("%v: %q satisfied by exact match", h.muxName, r.URL.Path)
      exactHandler.ServeHTTP(w, r)
      return
   }

   for _, prefixHandler := range h.prefixHandlers {
       // 然后在看能否前缀匹配
      if strings.HasPrefix(r.URL.Path, prefixHandler.prefix) {
         klog.V(5).Infof("%v: %q satisfied by prefix %v", h.muxName, r.URL.Path, prefixHandler.prefix)
         prefixHandler.handler.ServeHTTP(w, r)
         return
      }
   }

   klog.V(5).Infof("%v: %q satisfied by NotFoundHandler", h.muxName, r.URL.Path)
   h.notFoundHandler.ServeHTTP(w, r)
}

到这里的话,怎么动态注册的就已经分析完了,整个流程还是比较简单的。

APIAggregator 创建

前面提到了 kube-apiserver 里实际上是由 3 个 apiserver 组成的,APIAggregator 只是其中一个,这里看下 APIAggregator 是怎么挂载到 kube-apiserver 里呢。

三个 APIServer 通过 delegation 的关系关联,在 kube-apiserver 初始化创建的过程中:

  • 首先创建的是 APIExtensionsServer,它的 delegationTarget 是一个空的 Delegate,即什么都不做
    • 因为 APIExtensionsServer 以及是整个环节的最后一环的,他处理不了就是真的处理不了了
  • 继而将 APIExtensionsServer 的 GenericAPIServer,作为 delegationTarget 传给了 KubeAPIServer,创建出了 KubeAPIServer
    • KubeAPIServer 处理不了的就委派给 APIExtensionsServer
  • 再然后,将 kubeAPIServer 的 GenericAPIServer 作为 delegationTarget 传给了 AggregatorServer,创建出了 AggregatorServer
    • AggregatorServer 处理不了的就委派给 kubeAPIServer

因为这个 delegation 的关系,这里 3 个 server 的创建顺序刚好和调用顺序相反。

../../../img/kubernetes/aggregateapi/kube-apiserver-delegation.png

Aggregator API 创建方法如下,创建出一个 APIAggregator 对象,参数里的 delegationTarget 就是 KubeAPIServer。

可以看到 注册 handler 的时候用的 /apis 这个路径。

// NewWithDelegate returns a new instance of APIAggregator from the given config.
func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.DelegationTarget) (*APIAggregator, error) {
    // delegationTarget 就是委派目标的意思,前面提到过 kube-apiserver 里的 3 个 apiserver 以责任链形式组合的,前面处理不了的就委派给后面的来处理。
    // APIAggregator 这里的委派目标就是 apiserver
   genericServer, err := c.GenericConfig.New("kube-aggregator", delegationTarget)
   if err != nil {
      return nil, err
   }

   apiregistrationClient, err := clientset.NewForConfig(c.GenericConfig.LoopbackClientConfig)
   if err != nil {
      return nil, err
   }
   informerFactory := informers.NewSharedInformerFactory(
      apiregistrationClient,
      5*time.Minute, // this is effectively used as a refresh interval right now.  Might want to do something nicer later on.
   )

   // apiServiceRegistrationControllerInitiated is closed when APIServiceRegistrationController has finished "installing" all known APIServices.
   // At this point we know that the proxy handler knows about APIServices and can handle client requests.
   // Before it might have resulted in a 404 response which could have serious consequences for some controllers like  GC and NS
   //
   // Note that the APIServiceRegistrationController waits for APIServiceInformer to synced before doing its work.
   apiServiceRegistrationControllerInitiated := make(chan struct{})
   if err := genericServer.RegisterMuxAndDiscoveryCompleteSignal("APIServiceRegistrationControllerInitiated", apiServiceRegistrationControllerInitiated); err != nil {
      return nil, err
   }

   s := &APIAggregator{
      GenericAPIServer:           genericServer,
      delegateHandler:            delegationTarget.UnprotectedHandler(),
      proxyTransport:             c.ExtraConfig.ProxyTransport,
      proxyHandlers:              map[string]*proxyHandler{},
      handledGroups:              sets.String{},
      lister:                     informerFactory.Apiregistration().V1().APIServices().Lister(),
      APIRegistrationInformers:   informerFactory,
      serviceResolver:            c.ExtraConfig.ServiceResolver,
      openAPIConfig:              c.GenericConfig.OpenAPIConfig,
      openAPIV3Config:            c.GenericConfig.OpenAPIV3Config,
      egressSelector:             c.GenericConfig.EgressSelector,
      proxyCurrentCertKeyContent: func() (bytes []byte, bytes2 []byte) { return nil, nil },
   }

   // used later  to filter the served resource by those that have expired.
   resourceExpirationEvaluator, err := genericapiserver.NewResourceExpirationEvaluator(*c.GenericConfig.Version)
   if err != nil {
      return nil, err
   }

   apiGroupInfo := apiservicerest.NewRESTStorage(c.GenericConfig.MergedResourceConfig, c.GenericConfig.RESTOptionsGetter, resourceExpirationEvaluator.ShouldServeForVersion(1, 22))
   if err := s.GenericAPIServer.InstallAPIGroup(&apiGroupInfo); err != nil {
      return nil, err
   }

   enabledVersions := sets.NewString()
   for v := range apiGroupInfo.VersionedResourcesStorageMap {
      enabledVersions.Insert(v)
   }
   if !enabledVersions.Has(v1.SchemeGroupVersion.Version) {
      return nil, fmt.Errorf("API group/version %s must be enabled", v1.SchemeGroupVersion.String())
   }

   apisHandler := &apisHandler{
      codecs:         aggregatorscheme.Codecs,
      lister:         s.lister,
      discoveryGroup: discoveryGroup(enabledVersions),
   }
   s.GenericAPIServer.Handler.NonGoRestfulMux.Handle("/apis", apisHandler)
   s.GenericAPIServer.Handler.NonGoRestfulMux.UnlistedHandle("/apis/", apisHandler)

   apiserviceRegistrationController := NewAPIServiceRegistrationController(informerFactory.Apiregistration().V1().APIServices(), s)
   if len(c.ExtraConfig.ProxyClientCertFile) > 0 && len(c.ExtraConfig.ProxyClientKeyFile) > 0 {
      aggregatorProxyCerts, err := dynamiccertificates.NewDynamicServingContentFromFiles("aggregator-proxy-cert", c.ExtraConfig.ProxyClientCertFile, c.ExtraConfig.ProxyClientKeyFile)
      if err != nil {
         return nil, err
      }
      // We are passing the context to ProxyCerts.RunOnce as it needs to implement RunOnce(ctx) however the
      // context is not used at all. So passing a empty context shouldn't be a problem
      ctx := context.TODO()
      if err := aggregatorProxyCerts.RunOnce(ctx); err != nil {
         return nil, err
      }
      aggregatorProxyCerts.AddListener(apiserviceRegistrationController)
      s.proxyCurrentCertKeyContent = aggregatorProxyCerts.CurrentCertKeyContent

      s.GenericAPIServer.AddPostStartHookOrDie("aggregator-reload-proxy-client-cert", func(postStartHookContext genericapiserver.PostStartHookContext) error {
         // generate a context  from stopCh. This is to avoid modifying files which are relying on apiserver
         // TODO: See if we can pass ctx to the current method
         ctx, cancel := context.WithCancel(context.Background())
         go func() {
            select {
            case <-postStartHookContext.StopCh:
               cancel() // stopCh closed, so cancel our context
            case <-ctx.Done():
            }
         }()
         go aggregatorProxyCerts.Run(ctx, 1)
         return nil
      })
   }

   availableController, err := statuscontrollers.NewAvailableConditionController(
      informerFactory.Apiregistration().V1().APIServices(),
      c.GenericConfig.SharedInformerFactory.Core().V1().Services(),
      c.GenericConfig.SharedInformerFactory.Core().V1().Endpoints(),
      apiregistrationClient.ApiregistrationV1(),
      c.ExtraConfig.ProxyTransport,
      (func() ([]byte, []byte))(s.proxyCurrentCertKeyContent),
      s.serviceResolver,
      c.GenericConfig.EgressSelector,
   )
   if err != nil {
      return nil, err
   }

   s.GenericAPIServer.AddPostStartHookOrDie("start-kube-aggregator-informers", func(context genericapiserver.PostStartHookContext) error {
      informerFactory.Start(context.StopCh)
      c.GenericConfig.SharedInformerFactory.Start(context.StopCh)
      return nil
   })
   s.GenericAPIServer.AddPostStartHookOrDie("apiservice-registration-controller", func(context genericapiserver.PostStartHookContext) error {
      go apiserviceRegistrationController.Run(context.StopCh, apiServiceRegistrationControllerInitiated)
      select {
      case <-context.StopCh:
      case <-apiServiceRegistrationControllerInitiated:
      }

      return nil
   })
   s.GenericAPIServer.AddPostStartHookOrDie("apiservice-status-available-controller", func(context genericapiserver.PostStartHookContext) error {
      // if we end up blocking for long periods of time, we may need to increase workers.
      go availableController.Run(5, context.StopCh)
      return nil
   })

   if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.StorageVersionAPI) &&
      utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerIdentity) {
      // Spawn a goroutine in aggregator apiserver to update storage version for
      // all built-in resources
      s.GenericAPIServer.AddPostStartHookOrDie(StorageVersionPostStartHookName, func(hookContext genericapiserver.PostStartHookContext) error {
         // Wait for apiserver-identity to exist first before updating storage
         // versions, to avoid storage version GC accidentally garbage-collecting
         // storage versions.
         kubeClient, err := kubernetes.NewForConfig(hookContext.LoopbackClientConfig)
         if err != nil {
            return err
         }
         if err := wait.PollImmediateUntil(100*time.Millisecond, func() (bool, error) {
            _, err := kubeClient.CoordinationV1().Leases(metav1.NamespaceSystem).Get(
               context.TODO(), s.GenericAPIServer.APIServerID, metav1.GetOptions{})
            if apierrors.IsNotFound(err) {
               return false, nil
            }
            if err != nil {
               return false, err
            }
            return true, nil
         }, hookContext.StopCh); err != nil {
            return fmt.Errorf("failed to wait for apiserver-identity lease %s to be created: %v",
               s.GenericAPIServer.APIServerID, err)
         }
         // Technically an apiserver only needs to update storage version once during bootstrap.
         // Reconcile StorageVersion objects every 10 minutes will help in the case that the
         // StorageVersion objects get accidentally modified/deleted by a different agent. In that
         // case, the reconciliation ensures future storage migration still works. If nothing gets
         // changed, the reconciliation update is a noop and gets short-circuited by the apiserver,
         // therefore won't change the resource version and trigger storage migration.
         go wait.PollImmediateUntil(10*time.Minute, func() (bool, error) {
            // All apiservers (aggregator-apiserver, kube-apiserver, apiextensions-apiserver)
            // share the same generic apiserver config. The same StorageVersion manager is used
            // to register all built-in resources when the generic apiservers install APIs.
            s.GenericAPIServer.StorageVersionManager.UpdateStorageVersions(hookContext.LoopbackClientConfig, s.GenericAPIServer.APIServerID)
            return false, nil
         }, hookContext.StopCh)
         // Once the storage version updater finishes the first round of update,
         // the PostStartHook will return to unblock /healthz. The handler chain
         // won't block write requests anymore. Check every second since it's not
         // expensive.
         wait.PollImmediateUntil(1*time.Second, func() (bool, error) {
            return s.GenericAPIServer.StorageVersionManager.Completed(), nil
         }, hookContext.StopCh)
         return nil
      })
   }

   return s, nil
}

4. 其他

apiserver-builder

官方提供了一个类似 kube-builder 的工具 apiserver-builder,可以帮助我们快速创建项目骨架,并且使用 apiserver-builder 构建的项目目录结构比较清晰,更利于后期维护。

安装 apiserver-builder 工具

通过 go install 安装

go install sigs.k8s.io/apiserver-builder-alpha/cmd/apiserver-boot@v1.23.0

初始化项目

完成 apiserver-boot 安装后,可通过如下命令来初始化一个 Aggregated APIServer 项目:

# syntax: apiserver-boot init repo --domain <your-domain>
apiserver-boot init repo --domain github.com

整体使用起来和 kube-builder 差不多,具体教程参考官方文档,这里就不演示了

CRDs 还是 Aggregated APIServer

除了聚合 API,官方还提供了另一种方式以实现对标准 kubernetes API 接口的扩展:CRD(Custom Resource Definition ),能达到与聚合 API 基本一样的功能,而且更加易用,开发成本更小,但相较而言聚合 API 则更为灵活。针对这两种扩展方式如何选择,官方也提供了相应的参考。

通常,如果存在以下情况,CRD 可能更合适:

  • 定制资源的字段不多;
  • 你在组织内部使用该资源或者在一个小规模的开源项目中使用该资源,而不是在商业产品中使用; 聚合 API 可提供更多的高级 API 特性,也可对其他特性进行定制;例如,对存储层进行定制、对 protobuf 协议支持、对 logs、patch 等操作支持。

两种方式的核心区别是定义 api-resource 的方式不同。在 Aggregated APIServer 方式中,api-resource 是通过代码向 API 注册资源类型,而 Custom Resource 是直接通过 yaml 文件向 API 注册资源类型。

简单来说就是

  • CRD 是让 kube-apiserver 认识更多的对象类别(Kind)

  • Aggregated APIServer 是构建自己的 APIServer 服务。

虽然 CRD 更简单,但是缺少更多的灵活性,更详细的 CRDs 与 Aggregated API 的对比可参考官方文档

不过大部分需求都可以通过 CRD 方式实现,而且官方也是比较推荐使用 CRD 进行扩展,Aggregated API 一般是用于接入已有的 apiserver。

5. 小结

本文主要分析了 Aggregated API 的大致实现,即:通过 controller watch APIServer 对象,然后动态注册 handler。

然后简单描述了 kube-apiserver 的构成:通过责任链方式将 AggregatorServer、KubeAPIServer、ApiExtensionsServer 串联起来

接着介绍片了如何使用 apiserver-builder 工具如何快速构建 AggregatorServer

最后简单对比了 CRD 和 Aggregated APIServer:大部分需求都可以通过 CRD 方式实现,而且官方也是比较推荐使用 CRD 进行扩展,只有在 CRD 方式实现不了的时候才建议使用 Aggregated API。

6. 参考

design-proposals#aggregated-api-servers.md

docs#Kubernetes API 聚合层

code#kubernetes

Aggregated APIServer 构建云原生应用最佳实践

api聚合机制实现原理

解析kubernetes Aggregated API Servers