内容导读:

文章目录

背景源码粗读源码下载定位webhooks接口定义代码定位CanaryWebhook相关代码解读CallEventWebhook和CallWebhook小结:问题1结论分析loadtester接口实现代码

解读Webhook type一共有8个可选值,7个阶段confirm-rollout 源码解读pre-rollout 源码解读rollout(默认值) 源码解读confirm-traffic-increase 源码解读confirm-promotion 源码解读post-rollout 源码解读rollback 源码解读event 源码解读

附录Goland本地启动服务

结语

背景

上文通过下面的配置就实现了验收测试和压力测试,对此有以下疑问:

metadata定义脚本和类型,说明接口能执行shell,那它是怎么实现的?type未设置是怎样的执行逻辑?type有哪些值,各有什么作用?

本文将通过源码来解答以上问题

源码粗读

本文采用粗读源码方式,因为webhook是一个功能点,不算Flagger核心流程。

源码下载

源码:https://github.com/fluxcd/flagger

git clone https://github.com/fluxcd/flagger.git

定位webhooks接口定义代码

接口定义信息:Kind为Canary,webhooks位于spec.analysis.webhooks

Canary代码位置:pkg/apis/flagger/v1beta1/canary.go:44

接着点击CanarySpec、CanaryAnalysis就找到了webhooks属性定义。

webhooks对应的结构体为CanaryWebhook:

// CanaryWebhook holds the reference to external checks used for canary analysis

type CanaryWebhook struct {

// Type of this webhook

Type HookType `json:"type"`

// Name of this webhook

Name string `json:"name"`

// URL address of this webhook

URL string `json:"url"`

// false会触发告警,目前支持confirm-rollout、confirm-traffic-increase、confirm-promotion阶段

MuteAlert bool `json:"muteAlert"`

// Request timeout for this webhook

Timeout string `json:"timeout,omitempty"`

// Metadata (key-value pairs) for this webhook

// +optional

Metadata *map[string]string `json:"metadata,omitempty"`

// Number of retries for this webhook

// +optional

Retries int `json:"retries,omitempty"`

}

定位CanaryWebhook相关代码

只需关注两个函数:

CallEventWebhook:位于pkg/controller/webhook.go:106CallWebhook:位于pkg/controller/webhook.go:87

说明:events.go为系统的创建Webhook(用户创建的由spec.analysis.webhooks定义),最终是调用CallEventWebhook

解读CallEventWebhook和CallWebhook

比较两个函数差异点和共同点如下图:

最终调用callWebhook:发起httpPOST请求的常规代码。

小结:问题1结论

Flagger Webhook会向目标地址发送方法的http POST请求,发送数据结构如下:

// CanaryWebhookPayload holds the deployment info and metadata sent to webhooks

type CanaryWebhookPayload struct {

// Name of the canary

Name string `json:"name"`

// Namespace of the canary

Namespace string `json:"namespace"`

// Phase of the canary analysis

Phase CanaryPhase `json:"phase"`

// Hash from the TrackedConfigs and LastAppliedSpec of the Canary.

// Can be used to identify a Canary for a specific configuration of the

// deployed resources.

Checksum string `json:"checksum"`

// Metadata (key-value pairs) for this webhook

Metadata map[string]string `json:"metadata,omitempty"`

}

以上就解答了问题1:Flagger仅发送http请求,具体逻辑由目标接口实现。

分析loadtester接口实现代码

我们用到了http://flagger-loadtester.test/

loadtester服务启动入口 找到接口http://flagger-loadtester.test/处理逻辑

最后按metadata.type执行任务。

解读Webhook type

用到Webhook type的相关代码如下

一共有8个可选值,7个阶段

各值含义如下:

confirm-rollout:在扩展金丝雀部署之前执行,可用于手动批准。Canary 将暂停,直到 webhook 返回成功的 HTTP 状态代码。pre-rollout:在将流量路由到金丝雀之前执行。如果预部署钩子失败,则金丝雀前进将暂停,并且如果失败数量达到阈值,金丝雀将回滚。rollout:在指标检查之前的每次迭代分析过程中执行。如果 rollout 调用失败,则金丝雀进度将暂停并最终回滚。confirm-traffic-increase:在金丝雀的权重增加之前执行。金丝雀前进将暂停,直到该钩子返回 HTTP 200。confirm-promotion:在升级步骤之前执行。金丝雀升级将暂停,直到挂钩返回 HTTP 200。升级暂停时,Flagger 将继续运行指标检查和推出挂钩。post-rollout:在金丝雀升级或回滚后执行。如果发布后 webhook 失败,则会记录错误。rollback:当金丝雀部署处于“正在进行”或“等待”状态时,会执行回滚钩子。这提供了在分析期间或等待确认时回滚的能力。如果回滚钩子返回成功的 HTTP 状态代码,Flagger 将停止分析并将金丝雀发布标记为失败。event:每次 Flagger 发出 Kubernetes 事件时都会执行事件挂钩。配置后,Flagger 在金丝雀部署期间执行的每个操作都将通过 HTTP POST 请求以 JSON 形式发送。

说明:前面7个值是Canary对应的七个阶段,event表示Canary创建了k8s事件就会触发。

Webhook type定义如下:

confirm-rollout 源码解读

类型为ConfirmRolloutHook反向引用找到使用ConfirmRolloutHook代码(仅保留关键代码)

func (c *Controller) runConfirmRolloutHooks(canary *flaggerv1.Canary, canaryController canary.Controller) bool {

for _, webhook := range canary.GetAnalysis().Webhooks {

if webhook.Type == flaggerv1.ConfirmRolloutHook {

err := CallWebhook(*canary, canary.Status.Phase, webhook)

if err != nil {

return false

}

}

}

return true

}

反向引用找到使用runConfirmRolloutHooks代码(仅保留关键代码)

func (c *Controller) checkCanaryStatus(canary *flaggerv1.Canary, canaryController canary.Controller, scalerReconciler canary.ScalerReconciler, shouldAdvance bool) bool {

c.recorder.SetStatus(canary, canary.Status.Phase)

if canary.Status.Phase == flaggerv1.CanaryPhaseProgressing ||

canary.Status.Phase == flaggerv1.CanaryPhaseWaitingPromotion ||

canary.Status.Phase == flaggerv1.CanaryPhasePromoting ||

canary.Status.Phase == flaggerv1.CanaryPhaseFinalising {

return true

}

var err error

canary, err = c.flaggerClient.FlaggerV1beta1().Canaries(canary.Namespace).Get(context.TODO(), canary.Name, metav1.GetOptions{})

if err != nil {

// 按ns和name获取,err表示Canary不存在

return false

}

if shouldAdvance {

// 调用runConfirmRolloutHooks

if isApproved := c.runConfirmRolloutHooks(canary, canaryController); !isApproved {

// 接口返回false,即未通过审批

return false

}

canaryPhaseProgressing := canary.DeepCopy()

// 审批通过后,canary状态为CanaryPhaseProgressing(将由上面的逻辑直接返回true)

canaryPhaseProgressing.Status.Phase = flaggerv1.CanaryPhaseProgressing

if err := canaryController.SyncStatus(canary, flaggerv1.CanaryStatus{Phase: flaggerv1.CanaryPhaseProgressing}); err != nil {

return false

}

return false

}

return false

}

值为true的条件:shouldAdvance为true的条件:canary状态为(Progressing、Waiting、WaitingPromotion、Promoting、Finalising)、worklod有变化、worklod依赖资源(ConfigMap+Secret)有变化。查看shouldAdvance代码:

func (c *Controller) shouldAdvance(canary *flaggerv1.Canary, canaryController canary.Controller) (bool, error) {

if canary.Status.Phase == flaggerv1.CanaryPhaseProgressing ||

canary.Status.Phase == flaggerv1.CanaryPhaseWaiting ||

canary.Status.Phase == flaggerv1.CanaryPhaseWaitingPromotion ||

canary.Status.Phase == flaggerv1.CanaryPhasePromoting ||

canary.Status.Phase == flaggerv1.CanaryPhaseFinalising {

return true, nil

}

// Make sure to sync lastAppliedSpec even if the canary is in a failed state.

if canary.Status.Phase == flaggerv1.CanaryPhaseFailed {

return false, err

}

}

newTarget, err := canaryController.HasTargetChanged(canary)

if err != nil {

return false, err

}

if newTarget {

return newTarget, nil

}

newCfg, err := canaryController.HaveDependenciesChanged(canary)

if err != nil {

return false, err

}

return newCfg, nil

}

pre-rollout 源码解读

类型为PreRolloutHook反向引用找到使用PreRolloutHook代码(仅保留关键代码)

func (c *Controller) runPreRolloutHooks(canary *flaggerv1.Canary) bool {

for _, webhook := range canary.GetAnalysis().Webhooks {

if webhook.Type == flaggerv1.PreRolloutHook {

err := CallWebhook(*canary, flaggerv1.CanaryPhaseProgressing, webhook)

if err != nil {

return false

} else {

c.recordEventInfof(canary, "Pre-rollout check %s passed", webhook.Name)

}

}

}

return true

}

反向引用找到使用runPreRolloutHooks代码(仅保留关键代码)

// 灰度流量为0 且 canary的迭代数为0(0表示对业务有效,像AB测试和蓝绿测试迭代数非0) 且 非影子/镜像流量

if canaryWeight == 0 && cd.Status.Iterations == 0 &&

!(cd.GetAnalysis().Mirror && mirrored) {

c.recordEventInfof(cd, "Starting canary analysis for %s.%s", cd.Spec.TargetRef.Name, cd.Namespace)

// run pre-rollout web hooks

if ok := c.runPreRolloutHooks(cd); !ok {

if err := canaryController.SetStatusFailedChecks(cd, cd.Status.FailedChecks+1); err != nil {

c.recordEventWarningf(cd, "%v", err)

}

return

}

} else {

// rollout执行代码

if ok := c.runAnalysis(cd); !ok {

if err := canaryController.SetStatusFailedChecks(cd, cd.Status.FailedChecks+1); err != nil {

c.recordEventWarningf(cd, "%v", err)

}

return

}

}

rollout(默认值) 源码解读

类型为RolloutHook反向引用找到使用RolloutHook代码(仅保留关键代码)

func (c *Controller) runAnalysis(canary *flaggerv1.Canary) bool {

// run external checks

for _, webhook := range canary.GetAnalysis().Webhooks {

// type为空也走此逻辑

if webhook.Type == "" || webhook.Type == flaggerv1.RolloutHook {

err := CallWebhook(*canary, flaggerv1.CanaryPhaseProgressing, webhook)

if err != nil {

return false

}

}

}

return true

}

执行runAnalysis代码位于pre-rollout执行代码处,执行条件为pre-rollout的相反条件

// 灰度流量为0 且 canary的迭代数为0(0表示对业务有效,像AB测试和蓝绿测试迭代数非0) 且 非影子/镜像流量

if canaryWeight == 0 && cd.Status.Iterations == 0 &&

!(cd.GetAnalysis().Mirror && mirrored) {

c.recordEventInfof(cd, "Starting canary analysis for %s.%s", cd.Spec.TargetRef.Name, cd.Namespace)

// run pre-rollout web hooks

if ok := c.runPreRolloutHooks(cd); !ok {

if err := canaryController.SetStatusFailedChecks(cd, cd.Status.FailedChecks+1); err != nil {

c.recordEventWarningf(cd, "%v", err)

}

return

}

} else {

if ok := c.runAnalysis(cd); !ok {

if err := canaryController.SetStatusFailedChecks(cd, cd.Status.FailedChecks+1); err != nil {

c.recordEventWarningf(cd, "%v", err)

}

return

}

}

confirm-traffic-increase 源码解读

类型为ConfirmTrafficIncreaseHook反向引用找到使用ConfirmTrafficIncreaseHook代码(仅保留关键代码)

func (c *Controller) runConfirmTrafficIncreaseHooks(canary *flaggerv1.Canary) bool {

for _, webhook := range canary.GetAnalysis().Webhooks {

if webhook.Type == flaggerv1.ConfirmTrafficIncreaseHook {

err := CallWebhook(*canary, flaggerv1.CanaryPhaseProgressing, webhook)

if err != nil {

return false

}

}

}

return true

}

反向引用找到使用runConfirmTrafficIncreaseHooks代码(仅保留关键代码)

// 计算下次要增加流量

if c.nextStepWeight(cd, canaryWeight) > 0 {

if !mirrored &&

(cd.Status.Phase != flaggerv1.CanaryPhasePromoting &&

cd.Status.Phase != flaggerv1.CanaryPhaseWaitingPromotion &&

cd.Status.Phase != flaggerv1.CanaryPhaseFinalising) {

if promote := c.runConfirmTrafficIncreaseHooks(cd); !promote {

return

}

}

c.runCanary(cd, canaryController, meshRouter, mirrored, canaryWeight, primaryWeight, maxWeight)

}

confirm-promotion 源码解读

类型为ConfirmPromotionHook反向引用找到使用ConfirmPromotionHook代码(仅保留关键代码)

func (c *Controller) runConfirmPromotionHooks(canary *flaggerv1.Canary, canaryController canary.Controller) bool {

for _, webhook := range canary.GetAnalysis().Webhooks {

if webhook.Type == flaggerv1.ConfirmPromotionHook {

err := CallWebhook(*canary, flaggerv1.CanaryPhaseProgressing, webhook)

if err != nil {

return false

} else {

c.recordEventInfof(canary, "Confirm-promotion check %s passed", webhook.Name)

}

}

}

return true

}

反向引用找到使用runConfirmPromotionHooks代码(仅保留关键代码)

if c.nextStepWeight(cd, canaryWeight) > 0 {

// run hook only if traffic is not mirrored

if !mirrored &&

(cd.Status.Phase != flaggerv1.CanaryPhasePromoting &&

cd.Status.Phase != flaggerv1.CanaryPhaseWaitingPromotion &&

cd.Status.Phase != flaggerv1.CanaryPhaseFinalising) {

if promote := c.runConfirmTrafficIncreaseHooks(cd); !promote {

return

}

}

c.runCanary(cd, canaryController, meshRouter, mirrored, canaryWeight, primaryWeight, maxWeight)

}

func (c *Controller) runCanary(canary *flaggerv1.Canary, canaryController canary.Controller,

meshRouter router.Interface, mirrored bool, canaryWeight int, primaryWeight int, maxWeight int) {

// 灰度流量定义的最大灰度流量(analysis.maxWeight):下一步将把流量全部切换到primary

if canaryWeight >= maxWeight {

// check promotion gate

if promote := c.runConfirmPromotionHooks(canary, canaryController); !promote {

return

}

}

}

post-rollout 源码解读

类型为PostRolloutHook反向引用找到使用PostRolloutHook代码(仅保留关键代码)

func (c *Controller) runPostRolloutHooks(canary *flaggerv1.Canary, phase flaggerv1.CanaryPhase) bool {

for _, webhook := range canary.GetAnalysis().Webhooks {

if webhook.Type == flaggerv1.PostRolloutHook {

err := CallWebhook(*canary, phase, webhook)

if err != nil {

c.recordEventWarningf(canary, "Post-rollout hook %s failed %v", webhook.Name, err)

return false

} else {

c.recordEventInfof(canary, "Post-rollout check %s passed", webhook.Name)

}

}

}

return true

}

反向引用找到使用runPostRolloutHooks代码(仅保留关键代码)

// scale canary to zero if promotion has finished

if cd.Status.Phase == flaggerv1.CanaryPhaseFinalising {

if scalerReconciler != nil {

if err := scalerReconciler.PauseTargetScaler(cd); err != nil {

c.recordEventWarningf(cd, "%v", err)

return

}

}

if err := canaryController.ScaleToZero(cd); err != nil {

c.recordEventWarningf(cd, "%v", err)

return

}

// set status to succeeded

if err := canaryController.SetStatusPhase(cd, flaggerv1.CanaryPhaseSucceeded); err != nil {

c.recordEventWarningf(cd, "%v", err)

return

}

c.recorder.SetStatus(cd, flaggerv1.CanaryPhaseSucceeded)

// Canary状态为成功触发

c.runPostRolloutHooks(cd, flaggerv1.CanaryPhaseSucceeded)

c.recordEventInfof(cd, "Promotion completed! Scaling down %s.%s", cd.Spec.TargetRef.Name, cd.Namespace)

c.alert(cd, "Canary analysis completed successfully, promotion finished.",

false, flaggerv1.SeverityInfo)

return

}

rollback 源码解读

类型为RollbackHook反向引用找到使用RollbackHook代码(仅保留关键代码)

func (c *Controller) runRollbackHooks(canary *flaggerv1.Canary, phase flaggerv1.CanaryPhase) bool {

for _, webhook := range canary.GetAnalysis().Webhooks {

if webhook.Type == flaggerv1.RollbackHook {

err := CallWebhook(*canary, phase, webhook)

if err != nil {

c.recordEventInfof(canary, "Rollback hook %s not signaling a rollback", webhook.Name)

} else {

c.recordEventWarningf(canary, "Rollback check %s passed", webhook.Name)

return true

}

}

}

return false

}

反向引用找到使用runRollbackHooks代码(仅保留关键代码)

if cd.Status.Phase == flaggerv1.CanaryPhaseProgressing ||

cd.Status.Phase == flaggerv1.CanaryPhaseWaiting ||

cd.Status.Phase == flaggerv1.CanaryPhaseWaitingPromotion {

if ok := c.runRollbackHooks(cd, cd.Status.Phase); ok {

c.recordEventWarningf(cd, "Rolling back %s.%s manual webhook invoked", cd.Name, cd.Namespace)

c.alert(cd, "Rolling back manual webhook invoked", false, flaggerv1.SeverityWarn)

// 真正回滚逻辑

c.rollback(cd, canaryController, meshRouter, scalerReconciler)

return

}

}

event 源码解读

类型为EventHook反向引用找到使用EventHook代码(仅保留关键代码)

func (c *Controller) sendEventToWebhook(r *flaggerv1.Canary, eventType, template string, args []interface{}) {

webhookOverride := false

for _, canaryWebhook := range r.GetAnalysis().Webhooks {

if canaryWebhook.Type == flaggerv1.EventHook {

webhookOverride = true

err := CallEventWebhook(r, canaryWebhook, fmt.Sprintf(template, args...), eventType)

if err != nil {

c.logger.With("canary", fmt.Sprintf("%s.%s", r.Name, r.Namespace)).Errorf("error sending event to webhook: %s", err)

}

}

}

// c.eventWebhook来源于环境变量"EVENT_WEBHOOK_URL"

if c.eventWebhook != "" && !webhookOverride {

hook := flaggerv1.CanaryWebhook{

Name: "events",

URL: c.eventWebhook,

}

err := CallEventWebhook(r, hook, fmt.Sprintf(template, args...), eventType)

if err != nil {

c.logger.With("canary", fmt.Sprintf("%s.%s", r.Name, r.Namespace)).Errorf("error sending event to webhook: %s", err)

}

}

}

反向引用找到使用sendEventToWebhook代码(仅保留关键代码):Canary产生的所有k8s Event都会执行sendEventToWebhook

func (c *Controller) recordEventInfof(r *flaggerv1.Canary, template string, args ...interface{}) {

c.logger.With("canary", fmt.Sprintf("%s.%s", r.Name, r.Namespace)).Infof(template, args...)

// 记录event到k8s

c.eventRecorder.Event(r, corev1.EventTypeNormal, "Synced", fmt.Sprintf(template, args...))

c.sendEventToWebhook(r, corev1.EventTypeNormal, template, args)

}

func (c *Controller) recordEventErrorf(r *flaggerv1.Canary, template string, args ...interface{}) {

c.logger.With("canary", fmt.Sprintf("%s.%s", r.Name, r.Namespace)).Errorf(template, args...)

c.eventRecorder.Event(r, corev1.EventTypeWarning, "Synced", fmt.Sprintf(template, args...))

c.sendEventToWebhook(r, corev1.EventTypeWarning, template, args)

}

func (c *Controller) recordEventWarningf(r *flaggerv1.Canary, template string, args ...interface{}) {

c.logger.With("canary", fmt.Sprintf("%s.%s", r.Name, r.Namespace)).Infof(template, args...)

c.eventRecorder.Event(r, corev1.EventTypeWarning, "Synced", fmt.Sprintf(template, args...))

c.sendEventToWebhook(r, corev1.EventTypeWarning, template, args)

}

附录

Goland本地启动服务

参考文档:https://docs.flagger.app/dev/dev-guide#manual-testing

增加启动参数:-kubeconfig=/Users/admin/.kube/config -log-level=info -mesh-provider=istio -metrics-server=http://prom.istio.cn:9090

启动配置

执行日志:

调试效果:

结语

本文以Webhook疑问为出发点,通过粗读源码全面解读了Webhook相关知识,同时附上了Goland本地调试方法。

请用微信扫码关注下 ,持续更新云原生DevOps最佳实践。

参考链接

评论可见,请评论后查看内容,谢谢!!!评论后请刷新页面。