K8s 多集群(四)---KubeVela 核心逻辑:Application Controller 源码分析(下)
本文主要分析 KubeVela 中的 App Controller 部分源码,分享 app 对象 apply 到集群之后 KubeVela 的运作流程,从而更好的理解 KubeVela。
本文旨在通过分析源码,解决一个大问题和几个小问题。
一个大问题:KubeVela 中的 Application 对象是怎么工作的
几个小问题:
- App 中的 components 是怎么转换为 k8s object 的
- App 中的 policy 分别是怎么工作的
- App 中的 workflow 是怎么运行的
1. 前言
看这部内容之前,需要对 KubeVela 有一个大致的认识,比如
- 知道 Application 对象 由 Component、Traints、Policy、Workflow 等组成
- 知道 KubeVela 中的 Component 注册机制
再次建议看一下这篇文章:初识 KubeVela:基于 OAM 模型的应用交付平台
以下分析基于 KubeVela v1.9.6
具体代码在pkg/controller/core.oam.dev/v1beta1/application/application_controller.go
文件里。
逻辑还是比较复杂,按照上面记录的步骤分析吧
整体分为 3 个大逻辑:
- 1)解析得到 appFile
- 2)构建 applicationStep
- 3)将资源部署到 k8s 集群
下篇里主要分析第二和第三部分:如何解析 Application 对象,得到 appFile对象。
2. 生成应用步骤:GenerateApplicationSteps
这里就进入了第二个大的逻辑,根据 appFile 生成 ApplicationStep,这里的 Step 和前面提到的 WorkflowStep 有所区别,这里的 ApplicationStep 是真正用于执行的。
相当于 WorkflowStep 定义了需要执行什么操作,ApplicationStep 则是真正的执行对象。
获取 workflowInstance & Runners
拿到 appFile 之后下一个比较重要的逻辑就是GenerateApplicationSteps
:
workflowInstance, runners, err := handler.GenerateApplicationSteps(logCtx, app, appParser, appFile)
这个方法还是挺复杂的,最终得到了一个 instance 和一个 runner,其中 runner 是比较重要的,
这里面实际就是注册了一些 func,具体如下:
核心为里面的 xxx.Install 方法,后续用到时再具体分析,这里先略过
func (h *AppHandler) GenerateApplicationSteps(ctx monitorContext.Context,
app *v1beta1.Application,
appParser *appfile.Parser,
af *appfile.Appfile) (*wfTypes.WorkflowInstance, []wfTypes.TaskRunner, error) {
handlerProviders := providers.NewProviders()
kube.Install(handlerProviders, h.Client, appLabels, &kube.Handlers{
Apply: h.Dispatch,
Delete: h.Delete,
})
configprovider.Install(handlerProviders, h.Client, func(ctx context.Context, resources []*unstructured.Unstructured, applyOptions []apply.ApplyOption) error {
for _, res := range resources {
res.SetLabels(util.MergeMapOverrideWithDst(res.GetLabels(), appLabels))
}
return h.resourceKeeper.Dispatch(ctx, resources, applyOptions)
})
oamProvider.Install(handlerProviders, app, af, h.Client, h.applyComponentFunc(
appParser, appRev, af), h.renderComponentFunc(appParser, appRev, af))
pCtx := velaprocess.NewContext(generateContextDataFromApp(app, appRev.Name))
renderer := func(ctx context.Context, comp common.ApplicationComponent) (*appfile.Component, error) {
return appParser.ParseComponentFromRevisionAndClient(ctx, comp, appRev)
}
multiclusterProvider.Install(handlerProviders, h.Client, app, af,
h.applyComponentFunc(appParser, appRev, af),
h.checkComponentHealth(appParser, appRev, af),
renderer)
terraformProvider.Install(handlerProviders, app, renderer)
query.Install(handlerProviders, h.Client, nil)
// 省略...
}
可以看到有各种 Install,这里注册上之后,后面的逻辑里就会用到这些东西,因此暂时不展开分析了,等用到了再细说。
至此,我们获取到了一个 workflowInstance 和 runners 对象。
ExecuteRunners
核心就下面两句:
workflowExecutor := executor.New(workflowInstance, r.Client, nil)
workflowState, err := workflowExecutor.ExecuteRunners(authCtx, runners)
使用前面的 workflowInstance 构建了 workflowExecutor,然后通过 workflowExecutor 运行了前面拿到的 runners 对象。这里的 ExecuteRunners 是一个接口,最终实现在 这里,精简后的代码如下:
func (w *workflowExecutor) ExecuteRunners(ctx monitorContext.Context, taskRunners []types.TaskRunner) (v1alpha1.WorkflowRunPhase, error) {
// new 一个用于执行 Workflow 的 engine
e := newEngine(ctx, wfCtx, w, status, taskRunners)
// 核心方法
err = e.Run(ctx, taskRunners, dagMode)
if err != nil {
ctx.Error(err, "run steps")
StepStatusCache.Store(cacheKey, len(status.Steps))
return v1alpha1.WorkflowStateExecuting, err
}
return e.checkWorkflowPhase(), nil
}
继续追踪这个 Run 方法:
func (e *engine) Run(ctx monitorContext.Context, taskRunners []types.TaskRunner, dag bool) error {
var err error
if dag {
err = e.runAsDAG(ctx, taskRunners, false)
} else {
err = e.steps(ctx, taskRunners, dag)
}
e.checkFailedAfterRetries()
e.setNextExecuteTime(ctx)
return err
}
这里会根据是否是 DAG 走不同的 case,默认是 false,所以走 e.steps
:
这个 DAG 的值应该是来源于 Application 中的 workflow.mode 字段,默认为 StepByStep,因此 这个 dag 就是 false
func (e *engine) steps(ctx monitorContext.Context, taskRunners []types.TaskRunner, dag bool) error {
for index, runner := range taskRunners {
options := e.generateRunOptions(ctx, e.findDependPhase(taskRunners, index, dag))
// 核心方法
status, operation, err := runner.Run(wfCtx, options)
if err != nil {
return err
}
e.finishStep(operation)
// 省略...
}
return nil
}
就是 for 循环里面执行runner.Run
方法,然后由于这里的 runners 是上一步GenerateApplicationSteps
中返回的,因此现在在回过头去看一下这些 runners 是什么,这里直接把相关代码贴过来:
runners, err := generator.GenerateRunners(ctx, instance, wfTypes.StepGeneratorOptions{
Providers: handlerProviders,
PackageDiscover: h.pd,
ProcessCtx: pCtx,
TemplateLoader: template.NewWorkflowStepTemplateRevisionLoader(appRev, h.Client.RESTMapper()),
Client: h.Client,
StepConvertor: map[string]func(step workflowv1alpha1.WorkflowStep) (workflowv1alpha1.WorkflowStep, error){
wfTypes.WorkflowStepTypeApplyComponent: func(lstep workflowv1alpha1.WorkflowStep) (workflowv1alpha1.WorkflowStep, error) {
copierStep := lstep.DeepCopy()
if err := convertStepProperties(copierStep, app); err != nil {
return lstep, errors.WithMessage(err, "convert [apply-component]")
}
copierStep.Type = wfTypes.WorkflowStepTypeBuiltinApplyComponent
return *copierStep, nil
},
},
})
然后 GenerateRunners 方法内部比较复杂,连续跳了好几个接口才走到最终的实现,具体代码在 这里,核心代码如下:
func (t *TaskLoader) makeTaskGenerator(templ string) (types.TaskGenerator, error) {
return func(wfStep v1alpha1.WorkflowStep, genOpt *types.TaskGeneratorOptions) (types.TaskRunner, error) {
// 省略其他逻辑...
tRunner := new(taskRunner)
tRunner.name = wfStep.Name
// 核心部分,设置 run 方法
tRunner.run = func(ctx wfContext.Context, options *types.TaskRunOptions) (stepStatus v1alpha1.StepStatus, operations *types.Operation, rErr error) {
// 省略其他逻辑...
if err := exec.doSteps(tracer, ctx, taskv); err != nil {
tracer.Error(err, "do steps")
exec.err(ctx, true, err, types.StatusReasonExecute)
return exec.status(), exec.operation(), nil
}
return exec.status(), exec.operation(), nil
}
return tRunner, nil
}, nil
}
里面的核心方法就是创建了 runner,然后为 run 方法赋值了,那么我们在前面看到的 taskrunner.run 方法实际就是在执行这个方法。
然后这个 run 方法里面比较重要的就是这句:
if err := exec.doSteps(tracer, ctx, taskv); err != nil {
tracer.Error(err, "do steps")
exec.err(ctx, true, err, types.StatusReasonExecute)
return exec.status(), exec.operation(), nil
}
这里面就是在真的执行 Workflow 了
func (exec *executor) doSteps(ctx monitorContext.Context, wfCtx wfContext.Context, v *value.Value) error {
// 省略其他逻辑...
return v.StepByFields(func(fieldName string, in *value.Value) (bool, error) {
// 省略其他逻辑...
do := OpTpy(in)
if do == "" {
return false, nil
}
if do == "steps" {
if err := exec.doSteps(ctx, wfCtx, in); err != nil {
return false, err
}
} else {
provider := opProvider(in)
if err := exec.Handle(ctx, wfCtx, provider, do, in); err != nil {
return false, errors.WithMessagef(err, "run step(provider=%s,do=%s)", provider, do)
}
}
if exec.suspend || exec.terminated || exec.wait {
return true, nil
}
return false, nil
})
}
然后这里根据 operation type 也分了 3 个 case:
do := OpTpy(in)
if do == "" {
return false, nil
}
if do == "steps" {
if err := exec.doSteps(ctx, wfCtx, in); err != nil {
return false, err
}
} else {
provider := opProvider(in)
if err := exec.Handle(ctx, wfCtx, provider, do, in); err != nil {
return false, errors.WithMessagef(err, "run step(provider=%s,do=%s)", provider, do)
}
}
一般我们手动指定的 Workflow 都是 deploy,因此会进入到最后的 else 这个 case,那么继续追踪 ecec.Handle 方法:
func (exec *executor) Handle(ctx monitorContext.Context, wfCtx wfContext.Context, provider string, do string, v *value.Value) error {
// 根据 provider 和 do 的类型找到对应的handler
h, exist := exec.handlers.GetHandler(provider, do)
if !exist {
return errors.Errorf("handler not found")
}
// 最后就是执行这个 handler
return h(ctx, wfCtx, v, exec)
}
这里面的 handler 实际上就是前面 GenerateApplicationSteps
的时候不是很多 Install 方法吗,就是在注册 hanlder,就像这样:
// pkg/controller/core.oam.dev/v1beta1/application/generator.go#L114
multiclusterProvider.Install(handlerProviders, h.Client, app, af,
h.applyComponentFunc(appParser, appRev, af),
h.checkComponentHealth(appParser, appRev, af),
renderer)
继续追踪 Install :
func Install(p wfTypes.Providers, c client.Client, app *v1beta1.Application, af *appfile.Appfile, apply oamProvider.ComponentApply, healthCheck oamProvider.ComponentHealthCheck, renderer oamProvider.WorkloadRenderer) {
prd := &provider{Client: c, app: app, af: af, apply: apply, healthCheck: healthCheck, renderer: renderer}
p.Register(ProviderName, map[string]wfTypes.Handler{
"make-placement-decisions": prd.MakePlacementDecisions,
"patch-application": prd.PatchApplication,
"list-clusters": prd.ListClusters,
"get-placements-from-topology-policies": prd.GetPlacementsFromTopologyPolicies,
"deploy": prd.Deploy,
})
}
可以看到就是调用了 Register 方法,把这些 hander 给注册进来了。
这也是一种常见的写法,方法的注册和调用分发,通过注册的方式来解耦。
那么我们这里是 deploy 对应的就是 prd.Deploy 这个方法:
// pkg/workflow/providers/multicluster/multicluster.go#L153
func (p *provider) Deploy(ctx monitorContext.Context, _ wfContext.Context, v *value.Value, act wfTypes.Action) error {
param := DeployParameter{}
if err := v.CueValue().Decode(¶m); err != nil {
return err
}
if param.Parallelism <= 0 {
return errors.Errorf("parallelism cannot be smaller than 1")
}
executor := NewDeployWorkflowStepExecutor(p.Client, p.af, p.apply, p.healthCheck, p.renderer, param)
healthy, reason, err := executor.Deploy(ctx)
if err != nil {
return err
}
if !healthy {
act.Wait(reason)
}
return nil
}
直接进入 executor.Deploy 方法。
这里先小结一下,这部分主要根据 WorkflowStep 生成 ApplicationStep,然后构建 workflow engine 来执行具体步骤。
然后具体执行的方法是在GenerateApplicationSteps
方法里注册上去的,根据名字选择执行不同的方法。
下面就选择最核心的 Deploy 方法来分析,其他的步骤根据各种实现就有所不同了,大家感兴趣的可以去GenerateApplicationSteps
方法里翻一下。
3. 部署逻辑:Deploy
这部分开始就是具体的部署逻辑,前面在解析 appFile 的时候我们的 component 已经解析完成了,从 XDefine 里拿到 CUE 模板,从 app 对象的 component 里拿到对应参数,然后将二者合并生成最终的 k8s object 对象。
如果对这部分逻辑不清楚,可以回过头去看一下上篇第一部分 解析 appFIle
**这里就会将前面得到的 k8s object 对象部署到集群里去。**这里需要两部分参数:
- 1)部署到哪个集群那个命名空间
- 2)部署什么内容
下面的代码会告诉我们答案,Deploy 具体代码如下:
func (executor *deployWorkflowStepExecutor) Deploy(ctx context.Context) (bool, string, error) {
policies, err := selectPolicies(executor.af.Policies, executor.parameter.Policies)
if err != nil {
return false, "", err
}
policies = append(policies, fillInlinePolicyNames(executor.parameter.InlinePolicies)...)
// 首先是 loadComponents,这里的 render 方法是通过外部传进去的,实际上就是前面分析过的方法
components, err := loadComponents(ctx, executor.renderer, executor.cli, executor.af, executor.af.Components, executor.parameter.IgnoreTerraformComponent)
if err != nil {
return false, "", err
}
// 这里就是在处理 topology 类型的 policy,解析出 component 需要被分发到哪些集群去
placements, err := pkgpolicy.GetPlacementsFromTopologyPolicies(ctx, executor.cli, executor.af.Namespace, policies, resourcekeeper.AllowCrossNamespaceResource)
if err != nil {
return false, "", err
}
// 这里就是在处理 override policy,对不同集群内容分别处理
components, err = overrideConfiguration(policies, components)
if err != nil {
return false, "", err
}
// 这里在处理 replication 类型的 policy,他是把一个应用多个副本数分到不同集群
components, err = pkgpolicy.ReplicateComponents(policies, components)
if err != nil {
return false, "", err
}
// 核心,apply 到 k8s 集群
return applyComponents(ctx, executor.apply, executor.healthCheck, components, placements, int(executor.parameter.Parallelism))
}
分为三个步骤:
- 1)加载 component
- 2)分别处理不同类型的 policy
- Topology
- Override
- Replication
- 3)applyComponents
注意📢:这里是 deployWorkflowStepExecutor ,也就是说执行的步骤对应的是 Application 对象里面 spec.workflow.steps 中的某一个步骤 ,因此如果有多个 step 则会分别走多次流程,而且是互不干扰的
所以每个 step 关联了哪些策略是影响最大的
比如 step1 关联了 topology1 和 override1这两个策略:
- 1) topology 策略指定要部署到 cluster1 的default 命名空间
- 2)override 策略对默认参数做了一些自定义
等该 step 执行之后,效果就是部署到 cluster1 的default 命名空间的应用是有自定义效果的
step2 只关联了 topology2:要部署到 cluster2 的 default 命名空间,由于没有 override 策略,因此部署到 cluster2 default 命名空间的应用就是默认值。
topology 策略:GetPlacementsFromTopologyPolicies
这部分主要是根据 topology 类型的 policy 拿到具体要部署的集群的命名空间。
func GetPlacementsFromTopologyPolicies(ctx context.Context, cli client.Client, appNs string, policies []v1beta1.AppPolicy, allowCrossNamespace bool) ([]v1alpha1.PlacementDecision, error) {
placements := make([]v1alpha1.PlacementDecision, 0)
placementMap := map[string]struct{}{}
addCluster := func(cluster string, ns string, validateCluster bool) error {
if validateCluster {
if _, e := multicluster.NewClusterClient(cli).Get(ctx, cluster); e != nil {
return errors.Wrapf(e, "failed to get cluster %s", cluster)
}
}
if !allowCrossNamespace && (ns != appNs && ns != "") {
return errors.Errorf("cannot cross namespace")
}
placement := v1alpha1.PlacementDecision{Cluster: cluster, Namespace: ns}
name := placement.String()
if _, found := placementMap[name]; !found {
placementMap[name] = struct{}{}
placements = append(placements, placement)
}
return nil
}
hasTopologyPolicy := false
for _, policy := range policies {
// 只处理 topology 类型的额 policy
if policy.Type == v1alpha1.TopologyPolicyType {
if policy.Properties == nil {
return nil, fmt.Errorf("topology policy %s must not have empty properties", policy.Name)
}
hasTopologyPolicy = true
clusterLabelSelector := GetClusterLabelSelectorInTopology(topologySpec)
// 核心逻辑
switch {
case topologySpec.Clusters != nil:
// 如果policy 里面指定了要分发到哪些集群就直接使用,调用 addCluster 方法
// 将其加入到 placements 数组里
for _, cluster := range topologySpec.Clusters {
if err := addCluster(cluster, topologySpec.Namespace, true); err != nil {
return nil, err
}
}
case clusterLabelSelector != nil:
// 如果是通过 labelSelector 方式来选择集群,那就先把集群查询出来
// 然后调用 addCluster 方法添加
clusterList, err := multicluster.NewClusterClient(cli).List(ctx, client.MatchingLabels(clusterLabelSelector))
for _, cluster := range clusterList.Items {
if err = addCluster(cluster.Name, topologySpec.Namespace, false); err != nil {
return nil, err
}
}
default:
if err := addCluster(pkgmulticluster.Local, topologySpec.Namespace, false); err != nil {
return nil, err
}
}
}
}
// 兜底策略,如果没有指定 topology 类型的 policy 那就部署到 hub 集群
if !hasTopologyPolicy {
placements = []v1alpha1.PlacementDecision{{Cluster: multicluster.ClusterLocalName}}
}
return placements, nil
}
补上一个 placement 的定义:
type PlacementDecision struct {
Cluster string `json:"cluster"`
Namespace string `json:"namespace"`
}
逻辑比较简单,topology 类型的 policy 里面有两种方式指定集群,一个是直接指定集群名,另一个是指定 label,然后这里就根据 label 查询到对应的集群。
至于命名空间暂时只提供了直接指定名字的方式,因为只能指定一个命名空间,因此对于多个集群也只能部署到同一个命名空间。
现在 topology 类型的 policy 是怎么生效的应该就比较清晰了,
就是根据 topology 策略里的配置拿到需要部署的集群,如果没有就默认部署到 local 集群****的 default 命名空间。
这也就是为什么有时候没有指定 Policy 应用也能部署起来。
覆盖策略:overrideConfiguration
再看一下覆盖策略,代码比较简单,拿到 override 类型的策略,然后解析 properties 中的配置并覆盖到 component 上。
这里是用的 for 循环,因此指定了多个 override 策略则是都会生效,如果都修改同一个值则是后面的策略会覆盖前面的策略。
多个策略,按顺序依次覆盖,后者覆盖前者。
func overrideConfiguration(policies []v1beta1.AppPolicy, components []common.ApplicationComponent) ([]common.ApplicationComponent, error) {
var err error
for _, policy := range policies {
if policy.Type == v1alpha1.OverridePolicyType {
if policy.Properties == nil {
return nil, fmt.Errorf("override policy %s must not have empty properties", policy.Name)
}
overrideSpec := &v1alpha1.OverridePolicySpec{}
if err := utils.StrictUnmarshal(policy.Properties.Raw, overrideSpec); err != nil {
return nil, errors.Wrapf(err, "failed to parse override policy %s", policy.Name)
}
components, err = envbinding.PatchComponents(components, overrideSpec.Components, overrideSpec.Selector)
if err != nil {
return nil, errors.Wrapf(err, "failed to apply override policy %s", policy.Name)
}
}
}
return components, nil
}
关于这个 override 策略具体是怎和某个组件关联的,有必要提一下, 因为比较容易理解错。
具体的组件定义:override.cue
内容如下:
"override": {
annotations: {}
description: "Describe the configuration to override when deploying resources, it only works with specified `deploy` step in workflow."
labels: {}
attributes: {}
type: "policy"
}
template: {
#PatchParams: {
// +usage=Specify the name of the patch component, if empty, all components will be merged
name?: string
// +usage=Specify the type of the patch component.
type?: string
// +usage=Specify the properties to override.
properties?: {...}
// +usage=Specify the traits to override.
traits?: [...{
// +usage=Specify the type of the trait to be patched.
type: string
// +usage=Specify the properties to override.
properties?: {...}
// +usage=Specify if the trait should be remove, default false
disable: *false | bool
}]
}
parameter: {
// +usage=Specify the overridden component configuration.
components: [...#PatchParams]
// +usage=Specify a list of component names to use, if empty, all components will be selected.
selector?: [...string]
}
}
有 components 和 selector 两个参数,具体代码见:pkg/policy/envbinding/patch.go#L135
- components:这部分用于指定修改某个组件内的某些参数,通过 name + type 来定位到唯一组件,可以修改多个组件参数。
- 由于组件的 name 不是必填的,因此没有指定 name 时会按照 type 匹配 component,如果同类型有多个 component 都会被覆盖
- selector:直接修改要部署的 component 列表,上一步 components 里只能做参数差异化,如果直接不想部署某些组件就可以使用 selector 参数来实现。未在 selector 列表中的 component 会被过滤掉,不会真正被部署。
- 不填(参数为 nil )默认会部署所有 component
- 如果是空数组则所有 component 都会被过滤掉
Components 匹配规则如下:
- 没有 name 就按照 type 匹配
- 有就同时按 name + type 匹配
for _, comp := range patchComponents {
if comp.Name == "" {
// when no component name specified in the patch
// 1. if no type name specified in the patch, it will merge all components
// 2. if type name specified, it will merge components with the specified type
for compName, baseComp := range compMaps {
if comp.Type == "" || comp.Type == baseComp.Type {
compMaps[compName], err = MergeComponent(baseComp, comp.DeepCopy())
if err != nil {
errs = append(errs, errors.Wrapf(err, "failed to merge component %s", compName))
}
}
}
} else {
// when component name (pattern) specified in the patch, it will find the component with the matched name
// 1. if the component type is not specified in the patch, the matched component will be merged with the patch
// 2. if the matched component uses the same type, the matched component will be merged with the patch
// 3. if the matched component uses a different type, the matched component will be overridden by the patch
// 4. if no component matches, and the component name is a valid kubernetes name, a new component will be added
addComponent := regexp.MustCompile("[a-z]([a-z-]{0,61}[a-z])?").MatchString(comp.Name)
if re, err := regexp.Compile(strings.ReplaceAll(comp.Name, "*", ".*")); err == nil {
for compName, baseComp := range compMaps {
if re.MatchString(compName) {
addComponent = false
if baseComp.Type != comp.Type && comp.Type != "" {
compMaps[compName] = comp.ToApplicationComponent()
} else {
compMaps[compName], err = MergeComponent(baseComp, comp.DeepCopy())
if err != nil {
errs = append(errs, errors.Wrapf(err, "failed to merge component %s", comp.Name))
}
}
}
}
}
if addComponent {
compMaps[comp.Name] = comp.ToApplicationComponent()
compOrders = append(compOrders, comp.Name)
}
}
}
selector 逻辑也比较简单,不在里面的组件就直接过滤掉。
// if selector is enabled, filter
compOrders = utils.FilterComponents(compOrders, selector)
// fill in new application
newComponents := []common.ApplicationComponent{}
for _, compName := range compOrders {
newComponents = append(newComponents, *compMaps[compName])
}
小结一下,override policy 有两个作用:
- 1)修改某个组件的参数,实现差异化部署,比如可以调整副本数或者镜像等信息
- 2)直接指定不部署某个策略,进一步实现差异化部署,比如某些组件可能不用部署到测试环境,就可以在这里过滤掉。
部署到集群:applyComponents
到这里,我们已经拿到有要部署的集群和命名空间,以及要部署的内容,这里就开始真正的 apply 操作了。
这里的 apply 可以看做 kubectl apply中的 apply。
已经接近真相了,再加把劲,追踪一下 applyComponents:
这个方法也很复杂,我们只看重点,就是 apply 方法
func applyComponents(ctx context.Context, apply oamProvider.ComponentApply, healthCheck oamProvider.ComponentHealthCheck, components []common.ApplicationComponent, placements []v1alpha1.PlacementDecision, parallelism int) (bool, string, error) {
// 省略...
_, _, healthy, err := apply(ctx, task.component, nil, task.placement.Cluster, task.placement.Namespace)
if err != nil {
return &applyTaskResult{healthy: healthy, err: err, task: task}
}
// 省略...
}
然后这个 apply 方法实际是外部传进来的,是这样调用的:
applyComponents(ctx, executor.apply, executor.healthCheck, components, placements, int(executor.parameter.Parallelism))
合理推测,这个可能是之前 Install 的时候赋值的,翻回去看一下:
// Install register handlers to provider discover.
func Install(p wfTypes.Providers, c client.Client, app *v1beta1.Application, af *appfile.Appfile, apply oamProvider.ComponentApply, healthCheck oamProvider.ComponentHealthCheck, renderer oamProvider.WorkloadRenderer) {
prd := &provider{Client: c, app: app, af: af, apply: apply, healthCheck: healthCheck, renderer: renderer}
p.Register(ProviderName, map[string]wfTypes.Handler{
"make-placement-decisions": prd.MakePlacementDecisions,
"patch-application": prd.PatchApplication,
"list-clusters": prd.ListClusters,
"get-placements-from-topology-policies": prd.GetPlacementsFromTopologyPolicies,
"deploy": prd.Deploy,
})
}
可以看到,这里正好有一个 apply 参数传进来:
prd := &provider{Client: c, app: app, af: af, apply: apply, healthCheck: healthCheck, renderer: renderer}
p.Register(ProviderName, map[string]wfTypes.Handler{
回到外面,这个 apply 对应的就是:
multiclusterProvider.Install(handlerProviders, h.Client, app, af,
h.applyComponentFunc(appParser, appRev, af),
h.checkComponentHealth(appParser, appRev, af),
renderer)
applyComponentFunc 如下:
// pkg/controller/core.oam.dev/v1beta1/application/generator.go#L361
func (h *AppHandler) applyComponentFunc(appParser *appfile.Parser, appRev *v1beta1.ApplicationRevision, af *appfile.Appfile) oamProvider.ComponentApply {
return func(baseCtx context.Context, comp common.ApplicationComponent, patcher *value.Value, clusterName string, overrideNamespace string) (*unstructured.Unstructured, []*unstructured.Unstructured, bool, error) {
// 省略其他逻辑...
if utilfeature.DefaultMutableFeatureGate.Enabled(features.MultiStageComponentApply) {
manifestDispatchers, err := h.generateDispatcher(appRev, readyWorkload, readyTraits, overrideNamespace)
if err != nil {
return nil, nil, false, errors.WithMessage(err, "generateDispatcher")
}
for _, dispatcher := range manifestDispatchers {
if isHealth, err := dispatcher.run(ctx, wl, appRev, clusterName); !isHealth || err != nil {
return nil, nil, false, err
}
}
} else {
dispatchResources := readyTraits
if !wl.SkipApplyWorkload {
dispatchResources = append([]*unstructured.Unstructured{readyWorkload}, readyTraits...)
}
if err := h.Dispatch(ctx, clusterName, common.WorkflowResourceCreator, dispatchResources...); err != nil {
return nil, nil, false, errors.WithMessage(err, "Dispatch")
}
_, _, _, isHealth, err = h.collectHealthStatus(ctx, wl, appRev, overrideNamespace, false)
if err != nil {
return nil, nil, false, errors.WithMessage(err, "CollectHealthStatus")
}
}
if DisableResourceApplyDoubleCheck {
return readyWorkload, readyTraits, isHealth, nil
}
workload, traits, err := getComponentResources(auth.ContextWithUserInfo(ctx, h.app), manifest, wl.SkipApplyWorkload, h.Client)
return workload, traits, isHealth, err
}
}
这里根据是否开启了 features.MultiStageComponentApply
特性分为两个分支
MultiStageComponentApply
: 启用多阶段组件资源部署能力。当启用时,组件内的资源下发可分不同批次下发。
if utilfeature.DefaultMutableFeatureGate.Enabled(features.MultiStageComponentApply) {
manifestDispatchers, err := h.generateDispatcher(appRev, readyWorkload, readyTraits, overrideNamespace)
if err != nil {
return nil, nil, false, errors.WithMessage(err, "generateDispatcher")
}
for _, dispatcher := range manifestDispatchers {
if isHealth, err := dispatcher.run(ctx, wl, appRev, clusterName); !isHealth || err != nil {
return nil, nil, false, err
}
}
} else {
if err := h.Dispatch(ctx, clusterName, common.WorkflowResourceCreator, dispatchResources...); err != nil {
return nil, nil, false, errors.WithMessage(err, "Dispatch")
}
这个特性默认为 false,因此我们直接进入下面的h.Dispatch
方法:
// Dispatch apply manifests into k8s.
func (h *AppHandler) Dispatch(ctx context.Context, cluster string, owner string, manifests ...*unstructured.Unstructured) error {
manifests = multicluster.ResourcesWithClusterName(cluster, manifests...)
// 核心逻辑
if err := h.resourceKeeper.Dispatch(ctx, manifests, nil); err != nil {
return err
}
for _, mf := range manifests {
// 记录 apply 过的资源
h.addAppliedResource(false, ref)
}
return nil
}
里面又调用了一个 Dispatch 方法,继续追踪:
func (h *resourceKeeper) Dispatch(ctx context.Context, manifests []*unstructured.Unstructured, applyOpts []apply.ApplyOption, options ...DispatchOption) (err error) {
if utilfeature.DefaultMutableFeatureGate.Enabled(features.ApplyOnce) ||
(h.applyOncePolicy != nil && h.applyOncePolicy.Enable && h.applyOncePolicy.Rules == nil) {
options = append(options, MetaOnlyOption{})
}
h.ClearNamespaceForClusterScopedResources(manifests)
// 0. check admission
if err = h.AdmissionCheck(ctx, manifests); err != nil {
return err
}
// 1. pre-dispatch check
opts := []apply.ApplyOption{apply.MustBeControlledByApp(h.app), apply.NotUpdateRenderHashEqual()}
if len(applyOpts) > 0 {
opts = append(opts, applyOpts...)
}
if utilfeature.DefaultMutableFeatureGate.Enabled(features.PreDispatchDryRun) {
if err = h.dispatch(ctx,
velaslices.Map(manifests, func(manifest *unstructured.Unstructured) *unstructured.Unstructured { return manifest.DeepCopy() }),
append([]apply.ApplyOption{apply.DryRunAll()}, opts...)); err != nil {
return fmt.Errorf("pre-dispatch dryrun failed: %w", err)
}
}
// 2. record manifests in resourcetracker
if err = h.record(ctx, manifests, options...); err != nil {
return err
}
// 3. apply manifests
if err = h.dispatch(ctx, manifests, opts); err != nil {
return err
}
return nil
}
里面做了一些检查,然后做了记录,最终 apply 的实现在 h.dispatch(ctx, manifests, opts)
里面:
一路往下跳,最终进入 apply 方法:
func (h *resourceKeeper) dispatch(ctx context.Context, manifests []*unstructured.Unstructured, applyOpts []apply.ApplyOption) error {
errs := velaslices.ParMap(manifests, func(manifest *unstructured.Unstructured) error {
return h.applicator.Apply(applyCtx, manifest, ao...)
}, velaslices.Parallelism(MaxDispatchConcurrent))
return velaerrors.AggregateErrors(errs)
}
核心就在h.applicator.Apply(applyCtx, manifest, ao...)
func (a *APIApplicator) Apply(ctx context.Context, desired client.Object, ao ...ApplyOption) error {
// 如果已经存在就返回,没有就创建
existing, err := a.createOrGetExisting(ctx, applyAct, a.c, desired, ao...)
if err != nil {
return err
}
if existing == nil {
return nil
}
// 然后判断是否需要 recreate
shouldRecreate, err := needRecreate(strategy.RecreateFields, existing, desired)
if err != nil {
return fmt.Errorf("failed to evaluate recreateFields: %w", err)
}
if shouldRecreate {
// 需要的话就 delete 然后在 create
if existing.GetDeletionTimestamp() == nil { // check if recreation needed
if err = a.c.Delete(ctx, existing); err != nil {
return errors.Wrap(err, "cannot delete object")
}
}
return errors.Wrap(a.c.Create(ctx, desired), "cannot recreate object")
}
// 最后在有更新的话就,区分是 update 还是 patch 调用不同的方法
switch strategy.Op {
case v1alpha1.ResourceUpdateStrategyReplace:
return errors.Wrapf(a.c.Update(ctx, desired, options...), "cannot update object")
case v1alpha1.ResourceUpdateStrategyPatch:
fallthrough
default:
return errors.Wrapf(a.c.Patch(ctx, desired, patch), "cannot patch object")
}
}
可以看到,这里就真正的在调用 k8s api 来实现部署操作了。
OK,至此,我们渲染出来的 k8s object 终于是 apply 到集群里了。
整个流程还是有这么复杂..
这里分析的时候还是只看了核心逻辑,加上其他非核心逻辑的话,简直不敢想。
所以:源码分析首先要抓住主线,否则这么多代码是看不完的
4. FAQ
这里记录了一下,自己的一些疑问,实际上分析完源码后大部分都被解开了。
多个 override policy 会怎么执行?
根据覆盖策略:overrideConfiguration
章节可知,多个 override 会按先后顺序进行覆盖,以后生效的为准。
为什么 Application 中不指定 Workflow 也能执行?
因为在 Application Controller 逻辑中检测到未指定任何 Workflow 时会自动生成用于部署的 Workflow。
情况一:指定了 topology policy 只是没有指定 Workflow
根据第一部分DeployWorkflowStepGenerator
可知,会根据 topology policy 生成默认 Workflow。
情况二:连 topology policy 都没有指定
第一部分ApplyComponentWorkflowStepGenerator
会默认会每个 component 生成一个 step 用于部署。
第三部分获取目标集群和命名空间:GetPlacementsFromTopologyPolicies
在没有指定部署位置时会默认部署到 local 集群的 default 命名空间。
因此:未指定 topology policy 和 Workflow 的 Application 也会被部署到 local 集群的 default 命名空间。
注意:但是如果手动指定了一个 WorkflowStep 就会导致上述逻辑失效,最终 Application 对象可能无法被部署出来。
因此要么不写 Workflow,要么就保证 Workflow 是对的。
5. 小结
流程和之前说的差不多,大致分为这几个步骤:
- 1)首先解析 app 对象,解析为 内部的 appfile
- App 对象是个用户看的,KubeVela 内部使用的是一个叫做 appfile 的结构体
- 这里就会分离 app 里的 component、policy、workflow 等结构
- 2)查询 CRD 拿到对应插件里的 spec.cue.template
- 因为 KubeVela 里面的插件也是通过 CRD 形式注册的,因此这里直接通过查询 CRD 拿到插件对象
- CRD 的名字就是查询的类型
- 3)将 CUE 模板和组件里的参数合并生成 k8s object
- 这部分就是调用的 CUE 的包了
- 4)生成 Workflow 并执行,Workflow 里有一个 apply 类型的 handler,就是把 k8s object 应用到对应集群里
- 每个步骤是单独执行的,互不干扰。
- 每个步骤执行都会根据关联的策略计算目标集群、组件参数覆盖等数据
比较重要的就是两部分吧:
- 1)app 对象在 KubeVela 内部是怎么解析成 appFIle 然后渲染成 k8s object 的
- 2)ExecuteRunners 部分,Workflow 是什么生成的,各种类型的 policy 是怎么处理的