Contents

K8s 多集群(三)---KubeVela 核心逻辑:Application Controller 源码分析(上)

本文主要分析 KubeVela 中的 App Controller 部分源码,分享 app 对象 apply 到集群之后 KubeVela 的运作流程,从而更好的理解 KubeVela。

本文旨在通过分析源码,解决一个大问题和几个小问题。

一个大问题:KubeVela 中的 Application 对象是怎么工作的

几个小问题:

  • App 中的 components 是怎么转换为 k8s object 的
  • App 中的 policy 分别是怎么工作的
  • App 中的 workflow 是怎么运行的

由于篇幅比较长,因此拆分成了上下两篇文章。

1. Application 对象是什么?

基于 OAM 模型,KubeVela将应用抽象成了一个 Application 对象,中文翻译可以叫做:应用部署计划。一个完整 Application 对象包含以下 4 部分内容:

  • Component
  • Trains
  • Policy
  • Workflow

具体可以参考这篇文章:初识 KubeVela:基于 OAM 模型的应用交付平台

Demo

以下就是一个完整的 Application 对象

apiVersion: core.oam.dev/v1beta1
kind: Application
metadata:
  name: first-vela-app
spec:
  components:
    - name: express-server
      type: webservice
      properties:
        image: oamdev/hello-world
        ports:
          - port: 8000
            expose: true
      traits:
        - type: scaler
          properties:
            replicas: 1
  policies:
    - name: target-default
      type: topology
      properties:
        clusters: ["local"]
        namespace: "default"
    - name: target-prod
      type: topology
      properties:
        clusters: ["local"]
        namespace: "prod"
    - name: deploy-ha
      type: override
      properties:
        components:
          - type: webservice
            traits:
              - type: scaler
                properties:
                  replicas: 2
  workflow:
    mode:
      steps: StepByStep
      subSteps: StepByStep
    steps:
      - name: deploy2default
        type: deploy
        properties:
          policies: ["target-default"]
      - name: deploy2prod
        type: deploy
        properties:
          policies: ["target-prod", "deploy-ha"]

根据 Yaml 可知,该 App 对象里包含了以下内容:

  • 一个 webservice 类型的 Component
  • 一个 scaler 类型的 traints
  • 两个 topology policy 和一个 override policy
  • 以及 两个 workflowstep

具体效果就是在 local 集群 default 命名空间部署一个单副本 express-server 服务,在 prod 命名空间部署一个两副本的 express-server 服务。

那么 KubeVela 是怎么处理这个 Application 对象的呢?

我们执行 kubectl apply 将这个 app 对象 apply 到集群之后会经过哪些流程呢?

这就是本文分析的内容,从源码入手,分析 KubeVela 是如何处理 Application 对象的。

2. 大致流程

看源码之前,先给到大家一个大致的流程,后续就按照这个顺序分析。

使用以下命令将一个 app 对象 apply 到集群之后,一般会经过以下流程:

kubectl apply -f app.yaml

首先是 KubeVela 运行的 vela-core pod 里面有一个 Application Controller 他会 watch Application 对象,我们的 app 对象一创建就会被 watch 到,然后进入 controller 流程。

  • 1)首先解析 app 对象,解析为 内部的 appfile
    • App 对象是个用户看的,KubeVela 内部使用的是一个叫做 appfile 的结构体
    • 这里就会分离 app 里的 component、policy、workflow 等结构
  • 2)查询 CRD 拿到对应插件里的 spec.cue.template
    • 因为 KubeVela 里面的插件也是通过 CRD 形式注册的,因此这里直接通过查询 CRD 拿到插件对象
    • CRD 的名字就是查询的类型
  • 3)将 CUE 模板和组件里的参数合并生成 k8s object
    • 这部分就是调用的 CUE 的包了
  • 4)将 k8s object 应用到对应集群里

3. 源码分析

看这部内容之前,需要对 KubeVela 有一个大致的认识,比如

  • 知道 Application 对象 由 Component、Traints、Policy、Workflow 等组成
  • 知道 KubeVela 中的 Component 注册机制

再次建议看一下这篇文章:初识 KubeVela:基于 OAM 模型的应用交付平台

以下分析基于 KubeVela v1.9.6

具体代码在pkg/controller/core.oam.dev/v1beta1/application/application_controller.go 文件里。

逻辑还是比较复杂,按照上面记录的步骤分析吧

整体分为 3 个大逻辑:

  • 1)解析得到 appFile
  • 2)构建 applicationStep
  • 3)将资源部署到 k8s 集群

上篇里主要分析第一部分:如何解析 Application 对象,得到 appFile对象。

这部分就是解析我们 apply 到集群的这个 Application 对象,将其转换为 KubeVela 内部的一个叫做 appfile 的对象。

这也是 Controller 中的第一部分逻辑,后续所有逻辑都是对 appfile 对象的处理。

Controller 中将 app 对象解析为 appfile 结构体,大概就是下面这部分代码:

func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
    // 1. 获取 app 对象
    app := new(v1beta1.Application)
    if err := r.Get(ctx, client.ObjectKey{
       Name:      req.Name,
       Namespace: req.Namespace,
    }, app); err != nil {
       if !kerrors.IsNotFound(err) {
          logCtx.Error(err, "get application")
       }
       return r.result(client.IgnoreNotFound(err)).ret()
    }

    // 2. 构建一个 parser
    appParser := appfile.NewApplicationParser(r.Client, r.pd)
    handler, err := NewAppHandler(logCtx, r, app)
    if err != nil {
       return r.endWithNegativeCondition(logCtx, app, condition.ReconcileError(err), common.ApplicationStarting)
    }
    endReconcile, result, err := r.handleFinalizers(logCtx, app, handler)
    if err != nil {
       if app.GetDeletionTimestamp() == nil {
          return r.endWithNegativeCondition(logCtx, app, condition.ReconcileError(err), common.ApplicationStarting)
       }
       return result, err
    }
    if endReconcile {
       return result, nil
    }
    // 3. 使用 parser 解析 app 对象,得到 appFile
    appFile, err := appParser.GenerateAppFile(logCtx, app)
    if err != nil {
       r.Recorder.Event(app, event.Warning(velatypes.ReasonFailedParse, err))
       return r.endWithNegativeCondition(logCtx, app, condition.ErrorCondition("Parsed", err), common.ApplicationRendering)
    }
    // 省略
    return result, nil
  }

整体流程还是比较简单:

  • 首先就是 controller 的标准操作,根据 name + namespace 调用 API 拿到 app 对象
  • 然后就是构建了一个 parser,用于解析 app 对象
  • 接着就是使用 parser 对象将 app 对象解析成了 appfile。

接下来就是分析一下 parser 是怎么做解析的。

3.1 构建 Parser:NewApplicationParser

首先是构建 appParser 对象用于解析 application 对象。

就这么一句话 appParser 对象 就创建好了。

 appParser := appfile.NewApplicationParser(r.Client, r.pd)

具体如下:

// NewApplicationParser create appfile parser
func NewApplicationParser(cli client.Client, pd *packages.PackageDiscover) *Parser {
    return &Parser{
       client:     cli,
       pd:         pd,
       tmplLoader: LoadTemplate,
    }
}

一共三个参数:

  • 1)k8s client,用于和 k8s 交互
  • 2)packageDiscover:定义了 CUE 相关的包,由外部传进来,暂时不清楚是具体做什么的 todo
  • 3)tmplLoader:这里面描述了 parser 该怎么和 k8s 交互拿到对应的目标对象

3.2 生成 AppFile:GenerateAppFile

拿到 parser 之后就开始解析 Application 了:

    appFile, err := appParser.GenerateAppFile(logCtx, app)
    if err != nil {
       r.Recorder.Event(app, event.Warning(velatypes.ReasonFailedParse, err))
       return r.endWithNegativeCondition(logCtx, app, condition.ErrorCondition("Parsed", err), common.ApplicationRendering)
    }

具体如下:

func (p *Parser) GenerateAppFile(ctx context.Context, app *v1beta1.Application) (*Appfile, error) {
    if ctx, ok := ctx.(monitorContext.Context); ok {
       subCtx := ctx.Fork("generate-app-file", monitorContext.DurationMetric(func(v float64) {
          metrics.AppReconcileStageDurationHistogram.WithLabelValues("generate-appfile").Observe(v)
       }))
       defer subCtx.Commit("finish generate appFile")
    }
    if isLatest, appRev, err := p.isLatestPublishVersion(ctx, app); err != nil {
       return nil, err
    } else if isLatest {
       app.Spec = appRev.Spec.Application.Spec
       return p.GenerateAppFileFromRevision(appRev)
    }
    return p.GenerateAppFileFromApp(ctx, app)
}

可以看到,KubeVela 会根据情况使用不同的方法来解析:

  • GenerateAppFileFromRevision
  • GenerateAppFileFromApp

这是因为 KubeVela 给 app 增加了版本的概念,用一个 app.oam.dev/publishVersion 的 annotation 来处理,只有当用户手动修改了这个 annotation 之后,新的 app 对象才会生效,否则会一直使用旧版本。

也就是说只有第一次会直接解析 app 对象本身,后续都会先判断 revision 是否变化。

KubeVela 使用 ApplicationRevision CRD 对象来存储旧版本

这里我们先不管 GenerateAppFileFromRevision,直接看 GenerateAppFileFromApp 就像,整体逻辑是差不多的。

具体如下:

// GenerateAppFileFromApp converts an application to an Appfile
func (p *Parser) GenerateAppFileFromApp(ctx context.Context, app *v1beta1.Application) (*Appfile, error) {

    // 给 Policy 设置默认名字
    for idx := range app.Spec.Policies {
       if app.Spec.Policies[idx].Name == "" {
          app.Spec.Policies[idx].Name = fmt.Sprintf("%s:auto-gen:%d", app.Spec.Policies[idx].Type, idx)
       }
    }
    
    // 初始化一个 appFile 结构体,内部主要是一些初始化操作
    appFile := newAppFile(app)
    if app.Status.LatestRevision != nil {
       appFile.AppRevisionName = app.Status.LatestRevision.Name
    }

    // 核心逻辑,使用不同方法分别解析不同的类型的数据
    var err error
    if err = p.parseComponents(ctx, appFile); err != nil {
       return nil, errors.Wrap(err, "failed to parseComponents")
    }
    if err = p.parseWorkflowSteps(ctx, appFile); err != nil {
       return nil, errors.Wrap(err, "failed to parseWorkflowSteps")
    }
    if err = p.parsePolicies(ctx, appFile); err != nil {
       return nil, errors.Wrap(err, "failed to parsePolicies")
    }
    if err = p.parseReferredObjects(ctx, appFile); err != nil {
       return nil, errors.Wrap(err, "failed to parseReferredObjects")
    }

    return appFile, nil
}

通过 newAppFile 方法拿到了一个 appFile 结构体,该方法内部没有太多逻辑,主要是对 map 做一些 make 操作,防止后续 panic。

另外分别有 4 个方法来解析不同的组件,这是核心逻辑:

  • parseComponents
  • parseWorkflowSteps:这里会生成一些默认的 step,如果 app 里面没有定义指定的话
    • 即:没指定 Workflow 也会自动生成,用于执行部署操作
  • parsePolicies
  • parseReferredObjects

没有单独的 parseTraits 是因为 Traits 和 Component 是一起处理的,都在 parseComponents 里。

解析 Component:parseComponents

KubeVela 中对于 component 的解析逻辑大致是这样的:

  • 1)从 k8s 集群里加载对应的 XDefinition 对象,从该对象中拿到 CUE 模版
  • 2)然后从 app 的 component 中拿到参数
  • 3)最后将参数填充到模版里就得到最终的 k8s object 对象了。

向终端用户屏蔽底层的复杂度,用户只需要提供少量数据,经过插件处理后即可生成完整信息。

就向这样:

../../../img/multi-cluster/kubevela/xdefinition-usage.png

这里就使用 parseComponents 来分析具体的解析逻辑,其他几个都是类似的:

// parseComponents resolve an Application Components and Traits to generate Component
func (p *Parser) parseComponents(ctx context.Context, af *Appfile) error {
    var comps []*Component
    for _, c := range af.app.Spec.Components {
       comp, err := p.parseComponent(ctx, c)
       if err != nil {
          return err
       }
       comps = append(comps, comp)
    }

    af.ParsedComponents = comps
    af.Components = af.app.Spec.Components
    setComponentDefinitions(af, comps)

    return nil
}

parseComponents 里面又调用了 parseComponent 组件,继续追踪:

func (p *Parser) parseComponent(ctx context.Context, comp common.ApplicationComponent) (*Component, error) {
    // 解析 component
    workload, err := p.makeComponent(ctx, comp.Name, comp.Type, types.TypeComponentDefinition, comp.Properties)
    if err != nil {
       return nil, err
    }
    // 解析 traits
    if err = p.parseTraits(ctx, workload, comp); err != nil {
       return nil, err
    }
    return workload, nil
}

解析 component 和 traits 流程都是一样的,这里就只展示一个了,继续追踪 makeComponent 方法:

func (p *Parser) makeComponent(ctx context.Context, name, typ string, capType types.CapType, props *runtime.RawExtension) (*Component, error) {
    templ, err := p.tmplLoader.LoadTemplate(ctx, p.client, typ, capType)
    if err != nil {
       return nil, errors.WithMessagef(err, "fetch component/policy type of %s", name)
    }
    return p.convertTemplate2Component(name, typ, props, templ)
}

Ok,已经到核心方法了,p.tmplLoader.LoadTemplate(ctx, p.client, typ, capType) 这里就是核心方法,

func (fn TemplateLoaderFn) LoadTemplate(ctx context.Context, c client.Client, capName string, capType types.CapType) (*Template, error) {
    return fn(ctx, c, capName, capType)
}

我们的 tmplLoader 实际上是一个 func 类型,这里就是在调用自己,回过头去,我们前面创建 appParser 的时候给这个 tmplLoader 赋值了,这里就是用的那个

// NewApplicationParser create appfile parser
func NewApplicationParser(cli client.Client, pd *packages.PackageDiscover) *Parser {
    return &Parser{
       client:     cli,
       pd:         pd,
       tmplLoader: LoadTemplate,
    }
}

那么核心的解析逻辑就是在 LoadTemplate 这个方法里面,简化后的代码如下:

func LoadTemplate(ctx context.Context, cli client.Client, capName string, capType types.CapType) (*Template, error) {
    ctx = multicluster.WithCluster(ctx, multicluster.Local)
    switch capType {
    case types.TypeComponentDefinition, types.TypeWorkload:
    case types.TypeTrait:
    case types.TypePolicy:
    case types.TypeWorkflowStep:
    }
    return nil, fmt.Errorf("kind(%s) of %s not supported", capType, capName)
}

可以看到,这个方法里根据组件类型,走了不同逻辑去解析,这里我们需要追踪的是 component 的解析,因此就关注第一个 case,

func LoadTemplate(ctx context.Context, cli client.Client, capName string, capType types.CapType) (*Template, error) {
    ctx = multicluster.WithCluster(ctx, multicluster.Local)
    // Application Controller only loads template from ComponentDefinition and TraitDefinition
    switch capType {
    case types.TypeComponentDefinition, types.TypeWorkload:
       // 根据名字去查询 ComponentDefinition 对象,也就是在 k8s 里面查询 CRD 对象
       cd := new(v1beta1.ComponentDefinition)
       err := oamutil.GetCapabilityDefinition(ctx, cli, cd, capName)
       if err !=nil {
        // 省略...
       }
       // 然后使用这个 CRD 对象构建出一个模版
       tmpl, err := newTemplateOfCompDefinition(cd)
       if err != nil {
          return nil, err
       }
       return tmpl, nil
 }

GetCapabilityDefinition 就不细讲了,就是使用 k8s client 从集群里查询 CRD,具体如下:

func GetDefinition(ctx context.Context, cli client.Reader, definition client.Object, definitionName string) error {
    appNs := GetDefinitionNamespaceWithCtx(ctx)
    // 首先在 app 所在 namespace 查询
    if err := cli.Get(ctx, types.NamespacedName{Name: definitionName, Namespace: appNs}, definition); err != nil {
       if !apierrors.IsNotFound(err) {
          return err
       }
      // 没有的话再去 系统 namespace 下查询
       for _, ns := range []string{GetXDefinitionNamespaceWithCtx(ctx), oam.SystemDefinitionNamespace} {
          err = GetDefinitionFromNamespace(ctx, cli, definition, definitionName, ns)
          if !apierrors.IsNotFound(err) {
             return err
          }
       }
       return err
    }
    return nil
}

这里贴一个 ComponentDefinition 对象,看起来就比较清晰了,下面这个 raw 应该就是最简单的了:

apiVersion: core.oam.dev/v1beta1
kind: ComponentDefinition
metadata:
  annotations:
    definition.oam.dev/description: Raw allow users to specify raw K8s object in properties.
      This definition is DEPRECATED, please use 'k8s-objects' instead.
    meta.helm.sh/release-name: kubevela
    meta.helm.sh/release-namespace: vela-system
  name: raw
  namespace: vela-system
spec:
  schematic:
    cue:
      template: |
        output: parameter
        parameter: {}        
  workload:
    type: autodetects.core.oam.dev

其中核心就是 spec.schematic.cue.template,这里面就是 CUE 语法写的模版,这部分逻辑的核心就是拿这个模板文件。

然后再回到 makeComponent 方法:

func (p *Parser) makeComponent(ctx context.Context, name, typ string, capType types.CapType, props *runtime.RawExtension) (*Component, error) {
    templ, err := p.tmplLoader.LoadTemplate(ctx, p.client, typ, capType)
    if err != nil {
       return nil, errors.WithMessagef(err, "fetch component/policy type of %s", name)
    }
    return p.convertTemplate2Component(name, typ, props, templ)
}

拿到模版后进入 convertTemplate2Component 方法,这里就是将参数和模板进行组合:

func (p *Parser) convertTemplate2Component(name, typ string, props *runtime.RawExtension, templ *Template) (*Component, error) {
    // 这里面的 props 是一个 json 格式数据,就是 app 里面定义的一些参数,
    // 因为 CUE 模板需要接收参数,所以从 app 里解析出来并传递给模板
    settings, err := util.RawExtension2Map(props)
    if err != nil {
       return nil, errors.WithMessagef(err, "fail to parse settings for %s", name)
    }
    cpType, err := util.ConvertDefinitionRevName(typ)
    if err != nil {
       cpType = typ
    }
    return &Component{
       Traits:             []*Trait{},
       Name:               name,
       Type:               cpType,
       CapabilityCategory: templ.CapabilityCategory,
       FullTemplate:       templ,
       Params:             settings,
       engine:             definition.NewWorkloadAbstractEngine(name, p.pd),
    }, nil
}

最终会通过 component.engine 这个解析引擎来组合参数和 CUE 模板,这个 engine 实际是一个接口:

type AbstractEngine interface {
    Complete(ctx process.Context, abstractTemplate string, params interface{}) error
    HealthCheck(templateContext map[string]interface{}, healthPolicyTemplate string, parameter interface{}) (bool, error)
    Status(templateContext map[string]interface{}, customStatusTemplate string, parameter interface{}) (string, error)
    GetTemplateContext(ctx process.Context, cli client.Client, accessor util.NamespaceAccessor) (map[string]interface{}, error)
}

其中的 Complete 方法就是组合参数和模版的,解析工作比较繁琐,主要就是在调 CUE 的包,这里就不展示了,最终 Complete 完成后就得到了 yaml 内容,一般都是 k8s object 对象。

具体是个什么 k8s object 就看插件里面模版怎么定义的了

小结:根据组件类型,去 k8s 集群中查询 ComponentDefinition 对象,拿到组件定义中的 CUE 模板,然后使用 CUE engine 将模板以及从 Application 对象中解析到的参数进行组合,得到最终的结果。

这个实际就是 KubeVela 中的自定义组件工作流程,不熟悉的话可以看一下这篇文章:初识 KubeVela:基于 OAM 模型的应用交付平台#插件机制

其他组件都是使用类似的流程来解析的,就不一一分析了。

解析工作流步骤:parseWorkflowSteps

由于 Workflow 比较特殊逻辑,KubeVela 中应用的部署依赖于 Workflow,因此如果用户创建 Application 对象时未指定 Workflow 那么这个 Application 对象就不会被部署起来,那前面的这些逻辑都没有任何意义了。

因此 KubeVela 在这部分添加了一些自定义逻辑,用于生成默认的 Workflow**,以保证 Application 对象能够正常部署。**

所以我们也单独分析一下,就是前面提到的 parseWorkflowSteps 方法:

func (p *Parser) parseWorkflowSteps(ctx context.Context, af *Appfile) error {
    if err := p.loadWorkflowToAppfile(ctx, af); err != nil {
       return err
    }
    for _, workflowStep := range af.WorkflowSteps {
       err := p.fetchAndSetWorkflowStepDefinition(ctx, af, workflowStep.Type)
       if err != nil {
          return err
       }

       if workflowStep.SubSteps != nil {
          for _, workflowSubStep := range workflowStep.SubSteps {
             err := p.fetchAndSetWorkflowStepDefinition(ctx, af, workflowSubStep.Type)
             if err != nil {
                return err
             }
          }
       }
    }
    return nil
}

继续追踪 loadWorkflowToAppfile 方法

func (p *Parser) loadWorkflowToAppfile(ctx context.Context, af *Appfile) error {
    var err error
    // parse workflow steps
    af.WorkflowMode = &workflowv1alpha1.WorkflowExecuteMode{
       Steps:    workflowv1alpha1.WorkflowModeDAG,
       SubSteps: workflowv1alpha1.WorkflowModeDAG,
    }
    // 如果用户手动指定了 Workflow 这里就解析一下
    if wfSpec := af.app.Spec.Workflow; wfSpec != nil {
       app := af.app
       mode := wfSpec.Mode
       // 根据 ref 中填的 name 找到对应 Workflow 
       if wfSpec.Ref != "" && mode == nil {
          wf := &workflowv1alpha1.Workflow{}
          if err := af.WorkflowClient(p.client).Get(ctx, ktypes.NamespacedName{Namespace: af.app.Namespace, Name: app.Spec.Workflow.Ref}, wf); err != nil {
             return err
          }
          mode = wf.Mode
       }
       af.WorkflowSteps = wfSpec.Steps
       af.WorkflowMode.Steps = workflowv1alpha1.WorkflowModeStep
       if mode != nil {
          if mode.Steps != "" {
             af.WorkflowMode.Steps = mode.Steps
          }
          if mode.SubSteps != "" {
             af.WorkflowMode.SubSteps = mode.SubSteps
          }
       }
    }
    // 然后开始生成 Workflow
    af.WorkflowSteps, err = step.NewChainWorkflowStepGenerator(
       &step.RefWorkflowStepGenerator{Client: af.WorkflowClient(p.client), Context: ctx},
       &step.DeployWorkflowStepGenerator{},
       &step.Deploy2EnvWorkflowStepGenerator{},
       &step.ApplyComponentWorkflowStepGenerator{},
    ).Generate(af.app, af.WorkflowSteps)
    return err
}

核心是下面这一部分

    af.WorkflowSteps, err = step.NewChainWorkflowStepGenerator(
       &step.RefWorkflowStepGenerator{Client: af.WorkflowClient(p.client), Context: ctx},
       &step.DeployWorkflowStepGenerator{},
       &step.Deploy2EnvWorkflowStepGenerator{},
       &step.ApplyComponentWorkflowStepGenerator{},
    ).Generate(af.app, af.WorkflowSteps)

通过 chain 形式,按顺序执行多个 generator,直到其中某一个步骤生成 Workflow 为止:

  • RefWorkflowStepGenerator: KubeVela 中支持引用集群中已经创建好的 Workflow,这里则是在处理这部分逻辑,根据 ref 字段名找到对应的 Workflow。
  • DeployWorkflowStepGenerator:根据 topology 类型的 policy 来生成 Workflow
  • Deploy2EnvWorkflowStepGenerator:部署到指定环境,这个应该是和 VelaUX 配合使用的,暂时忽略
  • ApplyComponentWorkflowStepGenerator:如果前面几个步骤都没有成功生成才会执行这部分逻辑,生成一个 apply-component 类型的 WorkflowStep,直接将组件部署到 local 集群的 default 命名空间。

以上 3 个 Generateor 按照层级分可以这样

  • RefWorkflowStepGenerator:正常逻辑,只是解析了用户指定的 RefWorkflow
  • DeployWorkflowStepGenerator:优化逻辑,自动根据 topology policy 生成 Workflow
  • ApplyComponentWorkflowStepGenerator:兜底逻辑,就算没有任何 Workflow 以及 topology 类型的 policy 都能生成一个 apply-component 类型的 WorkflowStep 保证 Component 正常部署

也就是说,在最极限的情况下,Application 对象里我们可以只写 Component,就能保证正常运行。

注意:如果我们手动指定了一个 WorkflowStep 就会导致所有默认逻辑被会忽略,因为他们都有下面这个判断

    if len(existingSteps) > 0 {
       return existingSteps, nil
    }
DeployWorkflowStepGenerator

这里看一下 DeployWorkflowStepGenerator 是怎么根据 topology policy 生成 Workflow 的,具体实现如下:

func (g *DeployWorkflowStepGenerator) Generate(app *v1beta1.Application, existingSteps []workflowv1alpha1.WorkflowStep) (steps []workflowv1alpha1.WorkflowStep, err error) {
    // 如果上一步生成了 step 这里直接返回
    if len(existingSteps) > 0 {
       return existingSteps, nil
    }
    // 然后根据 topology 策略生成 step
    var topologies []string
    var overrides []string
    for _, policy := range app.Spec.Policies {
       switch policy.Type {
       case v1alpha1.TopologyPolicyType:
          topologies = append(topologies, policy.Name)
       case v1alpha1.OverridePolicyType:
          overrides = append(overrides, policy.Name)
       }
    }
    for _, topology := range topologies {
       steps = append(steps, workflowv1alpha1.WorkflowStep{
          WorkflowStepBase: workflowv1alpha1.WorkflowStepBase{
             Name: "deploy-" + topology,
             Type: "deploy",
             Properties: util.Object2RawExtension(map[string]interface{}{
                "policies": append(overrides, topology),
             }),
          },
       })
    }
    // 特殊处理一下带 refObjets 组件的
    if len(topologies) == 0 {
       containsRefObjects := false
       for _, comp := range app.Spec.Components {
          if comp.Type == v1alpha1.RefObjectsComponentType {
             containsRefObjects = true
             break
          }
       }
       if containsRefObjects || len(overrides) > 0 {
          steps = append(steps, workflowv1alpha1.WorkflowStep{
             WorkflowStepBase: workflowv1alpha1.WorkflowStepBase{
                Name:       "deploy",
                Type:       DeployWorkflowStep,
                Properties: util.Object2RawExtension(map[string]interface{}{"policies": append([]string{}, overrides...)}),
             },
          })
       }
    }
    return steps, nil
}

具体逻辑:为每个 topology policy 生成一个 step,每个 step 中除了关联当前 topology 之外,还关联了全部的 override policy。

核心代码如下:

    var topologies []string
    var overrides []string
    for _, policy := range app.Spec.Policies {
       switch policy.Type {
       case v1alpha1.TopologyPolicyType:
          topologies = append(topologies, policy.Name)
       case v1alpha1.OverridePolicyType:
          overrides = append(overrides, policy.Name)
       }
    }
    // 为每个 topology 策略生成一个步骤
    for _, topology := range topologies {
       steps = append(steps, workflowv1alpha1.WorkflowStep{
          WorkflowStepBase: workflowv1alpha1.WorkflowStepBase{
             Name: "deploy-" + topology,
             Type: "deploy",
             Properties: util.Object2RawExtension(map[string]interface{}{
                "policies": append(overrides, topology),
             }),
          },
       })
    }
ApplyComponentWorkflowStepGenerator

最后则是兜底逻辑的 ApplyComponentWorkflowStepGenerator,到这里如果前面几个 Generateor 都没有生成 WorkflowStep 那么就直接为每个 Component 生成一个apply-component类型的步骤,保证 Component 能够被部署出去。

apply-component 类型的 step 用于实现一个 component 组件的部署,默认会部署到 local 集群的 default 命名空间(这个部署位置由第三部分逻辑指定的)。

func (g *ApplyComponentWorkflowStepGenerator) Generate(app *v1beta1.Application, existingSteps []workflowv1alpha1.WorkflowStep) (steps []workflowv1alpha1.WorkflowStep, err error) {
    if len(existingSteps) > 0 {
       return existingSteps, nil
    }
    for _, comp := range app.Spec.Components {
       steps = append(steps, workflowv1alpha1.WorkflowStep{
          WorkflowStepBase: workflowv1alpha1.WorkflowStepBase{
             Name: comp.Name,
             Type: wftypes.WorkflowStepTypeApplyComponent,
             Properties: util.Object2RawExtension(map[string]string{
                "component": comp.Name,
             }),
          },
       })
    }
    return
}
Demo

这里准备了两个 Application 对象,大家可以用来测试一下这个逻辑。

首先是一个最简单的 app,只有 component,按照之前的分析,会走兜底逻辑,最终会被部署到 local 集群的 default 命名空间。

apiVersion: core.oam.dev/v1beta1
kind: Application
metadata:
  name: simple-app
spec:
  components:
    - name: express-server
      type: webservice
      properties:
        image: oamdev/hello-world
        ports:
          - port: 8000
            expose: true

然后是一个带 topology policy的 app,按之前分析,会根据 topology 生成 step,因此会被部署到 local 集群的 default 和 pord 两个命名空间,同时生成的步骤会带上所有 override policy,因此,两个命名空间下都是两副本。

apiVersion: core.oam.dev/v1beta1
kind: Application
metadata:
  name: simple-app-policy
spec:
  components:
    - name: express-server
      type: webservice
      properties:
        image: oamdev/hello-world
        ports:
          - port: 8000
            expose: true
  policies:
    - name: target-default
      type: topology
      properties:
        # local 集群即 Kubevela 所在的集群
        clusters: ["local"]
        namespace: "default"
    - name: target-prod
      type: topology
      properties:
        clusters: ["local"]
        # 此命名空间需要在应用部署前完成创建
        namespace: "prod"
    - name: deploy-ha
      type: override
      properties:
        components:
          - type: webservice
            traits:
              - type: scaler
                properties:
                  replicas: 2

另外的解析就不在一一分析了,都是类似的逻辑。

至此,我们就成功从 Application 对象中解析得到 appFile 对象了,这一部分逻辑到此结束。

3.3 分支逻辑:AppRevision

拿到 appFile 之后,有一些 AppRevision 相关的逻辑,前面说了 KubeVela 中的 app 是有版本控制的,因此这里会将 appFile 保存一下。

相关代码如下:

appFile, err := appParser.GenerateAppFile(logCtx, app)

// 处理 AppRevision
if err := handler.PrepareCurrentAppRevision(logCtx, appFile); err != nil {}
if err := handler.FinalizeAndApplyAppRevision(logCtx); err != nil {}

if err := handler.UpdateAppLatestRevisionStatus(logCtx, r.patchStatus); err != nil {}

主要就是维护 AppRevision 对象,然后在 app 对象上打一些 label 来进行关联,不是主线逻辑就不深入分析了。

4. 小结

至此,appFile 对象就拿到了,上篇分析就结束了。

主要分为两块:

  • 1)解析 component:从 k8s 集群里加载对应的 XDefinition 对象,从该对象中拿到 CUE 模版,然后从 app 的 component 中拿到参数,然后将参数填充到模版里就得到最终的 k8s object 对象了。
  • 2)解析 WorkflowStep:这部分其实比较简单,不过由于又优化逻辑和兜底逻辑存在,导致没有严格按照 application 对象生成 step,因此使用的时候会比较迷惑。

下篇会分析拿到 appFile 对象后,怎么生成 ApplicationStep 将这些k8s object 对象部署出去的,以及过程中是如何处理这些 policy的。