1
背景
在现代软件开发中,持续集成和持续交付(CI/CD)已成为提高开发效率和软件质量的关键实践。然而,确保测试环境(Test Environment)的数据与生产环境(Production Environment)的数据保持一致,仍然是一个挑战。尤其是在防止预发环境中的操作意外删除或更新生产环境的数据方面,更是面临巨大的风险。为了解决这些问题,基于360AI平台的解决方案通过整合Kubernetes Informer和Webhook方法,不仅能够有效地阻止测试环境中的操作对生产环境的影响,还可以结合平台的智能分析能力,进一步提升环境间的安全性和数据一致性管理。
2
Informer的工作原理
2.1 Informer工作流程
初始化 Informer
当Informer初始化时,它首先通过调用API Server的List API获取所有目标资源(例如 Pod)的当前状态。这一步骤全量拉取(Full Sync)资源,然后Informer 将这些资源存储在一个本地缓存(Local Store)中。
设置 Watch
接下来,Informer通过设置一个Watch监听目标资源的变更。Watch是一种异步机制,它会在资源发生更改时通知Informer。
事件处理
一旦资源发生变化,API Server会将事件推送到Informer。Informer 会将这些事件通过Dispatch操作分发给一个名为 DeltaFIFO 的队列。DeltaFIFO是一个先进先出(FIFO)队列,它会按顺序处理事件,并且保证不会丢失任何事件。
更新本地缓存
DeltaFIFO队列中的事件会依次被处理。对于每个事件,Informer会更新其本地缓存中的资源状态。例如,如果接收到一个 Pod被删除的事件,Informer会从本地缓存中移除对应的Pod。
触发回调函数
除了更新本地缓存外,Informer还会触发一个回调函数(Callback),这个函数由使用者指定。回调函数通常会执行一些业务逻辑,比如更新数据库、发送通知等。
重复循环
Informer会不断地重复上述步骤,监听资源的变更并将它们反映到本地缓存中。这样,应用程序就可以依赖于 Informer 提供的本地缓存,而不是频繁地查询API Server,从而提高效率。
2.2 Informer核心机制 (List-Watch)
核心组件介绍:
Reflector:
Reflector 是 Kubernetes 中一个重要的组件,用于监控指定的资源类型,并确保集群中本地缓存的数据与 API 服务器上的资源状态保持一致。它通过 Watch 机制实现对资源的实时监控,捕获资源对象的各种变化事件,如添加、更新和删除等。同时,为了处理可能出现的错误情况,Reflector 还具有错误重试机制。工作流程如下:
实时监控资源变化:Reflector利用Watch机制,从Kubernetes API 服务器接收资源对象的更新流,并将这些变化(增、删、改操作)推送到本地存储中。
定期同步机制(Resync):为了防止与API服务器的数据不一致,Reflector会定期执行Resync 操作,根据resyncPeriod 配置的时间间隔,将本地存储(Local Store)与DeltaFIFO 队列中的数据进行同步。这样可以确保即使错过了某些Watch事件,本地存储的数据也会与 API服务器保持一致。
处理错误重试机制:当发生网络中断或其他错误导致Reflector无法从 API服务器获取最新的Watch事件时,Reflector会执行退避重试策略,通过backoffManager控制重试间隔,逐步恢复与API服务器的连接,直到重新建立Watch流为止。
DeltaFIFO:
先进先出队列,用于存储Reflector触发的事件(存储Watch返回的各种事件)和对应的资源对象,通过下面DeltaFIFO数据结构的内部机制,可以看到Kubernetes中常用于资源对象随时间的变化。
在上图左边描绘了DeltaFIFO队列中包含了ObjA Key、ObjB Key 和 ObjC Key等资源对象,右边是资源对象到资源变更(Add、Update和Delete)的映射。DeltaFIFO有助于高效管理操作顺序,确保Kubernetes控制器按先进先出(FIFO)顺序处理变化。
Local Store:
本地存储,用于存储资源对象并提供索引功能。Informer 从 API Server 拉取资源数据,并将其存储在本地缓存中。这减少了对 API Server的直接请求,降低了API Server的负载,同时提高了数据访问的速度。通过二级缓存机制能够有效减少直接对API Server的请求。下图中描述了Kubernetes 索引在Local Store中如何被索引和访问的。
顶部标为 Items 的部分 包含键(objKey1, objKey2, ..., objKeyN)及其对应的值(objValue1, objValue2, ..., objValueN),每个 objKey 表示一个对象的唯一标识符(例如 Kubernetes 中的 Pod 或节点),而 objValue 包含对象的数据;索引器部分定义了索引函数(IndexFuncName, IndexFunc1, IndexFunc2, ...),这些函数指定了对象如何被索引,IndexFuncName 可能表示一个按命名空间索引对象的函数;索引 (Indices) 存储索引函数结果与实际对象之间的映射,每个索引函数生成一组唯一的值。例如,如果 IndexFuncName 是按命名空间索引的,那么 namespace1, namespace2, ..., namespaceN 就是从对象中提取的不同的命名空间值;索引组部分(如 Index1 和 Index2)存储基于索引值的对象键集合,Index1 可能表示一个命名空间索引,而 Index2 表示一个节点索引;结果集 (Namespace1IndexSets1 和 NodeName1IndexSets1),如 Namespace1IndexSets1 和 NodeName1IndexSets1,展示了按特定索引值(如命名空间或节点名)分组的对象键子集。
Processor:
记录了所有的回调函数(即ResourceEventHandler)的实例,并负责触发回调函数。存储和管理回调函数:Processor 记录了所有 ResourceEventHandler 实例,并将它们管理起来;监听和接收事件:它会持续监听资源事件(增删改);分发事件:在监听到事件后,负责将事件传递到每个 ResourceEventHandler 的回调函数中,执行相应的处理逻辑;事件的并发处理:在高并发场景中,Processor 可以采用并发机制来分发和处理事件,提升性能。
Controller:
用于调度和执行业务逻辑。它通过注册事件处理函数来响应资源的变化,并执行相应的操作。它的核心是一个循环(Reconciliation Loop),其任务是:获取资源的当前状态(从 Informer 缓存或直接访问 API Server);比较当前状态和期望状态;调用相应的逻辑以调整当前状态,使之与期望状态匹配。
2.3 整体流程图
下图展示了client-go中各组件的工作流程,通过Pod对象举例具体流程如下:
Informer 在初始化时,Reflector会先通过List API接口获得所有Pod。
Reflect获取全量Pod后,会将全部Pod放到Local Store中。
若有方法通过Informer调用Lister的List/Get方法获取 Pod,那么 Lister会直接从Local Store中获取Cache中最近ResourceVersion的Pod对象。
Informer初始化完成之后,Reflector开始Watch Pod,监听操作Pod的所有事件;若有Pod被删除,那么Reflector会监听到这个事件后将这个事件发送到 DeltaFIFO。
DeltaFIFO 首先会将这个事件存储在自己的Queue(先进先出且去重的队列),然后会直接操作Store 中的数据,删除 Store 中的Pod。
DeltaFIFO再Pop这个事件到 Controller 中。
Controller收到这个事件,会触发Processor的回调函数。
LocalStore会周期性地把所有的Pod信息重新放到DeltaFIFO中。
3
360AI平台Informer和Webhook的结合
在Kubernetes中,Webhook是一种常用的外部扩展机制,允许在资源生命周期的不同阶段进行拦截。结合Kubernetes Informer,可以实现对资源变化的精细化控制,因此360AI平台使用二者结合实现预发拦截生产。具体来说,Webhook和Informer的配合可以在以下几个方面发挥作用:
资源准入控制:通过admission webhook拦截资源创建、修改请求,配合Informer 获取当前资源的状态,验证资源是否符合策略要求。例如,在 Pod 创建前,webhook可以通过Informer获取集群的节点状态和资源使用情况,判断是否允许该 Pod 创建。
实时响应事件:利用Informer接收资源更新事件,并触发webhook进行验证和处理。如在Pod更新或删除时,Informer捕捉事件并通知webhook进行审计或资源更新验证。
自动化处理:通过Informer与Mutating Webhook的结合,自动修改资源对象的配置。例如,当新Pod创建时,通过Informer监听创建事件,并调用webhook自动注入默认标签、环境变量或配置。
针对Pod/Deployment/Statefulset/Crd/Namespace/Job等资源分别进行拦截,下面是360AI平台Pod拦截具体实现流程,相关实现和代码如下:
1. Informer 的实现
创建 SharedInformerFactory和添加事件处理器
factory := informers.NewSharedInformerFactory(clientset, 0)
podInformer := factory.Core().V1().Pods().Informer()podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{AddFunc: func(obj interface{}) {// 处理新增事件},UpdateFunc: func(oldObj, newObj interface{}) {// 处理更新事件},DeleteFunc: func(obj interface{}) {// 处理删除事件},
})
2. Webhook Validatin配置定义
Validating Webhook配置如下,API Server会通过client.caBundle提供的CA证书验证Webhook服务器的证书,确保通信双方的身份可信,这种验证机制是基于TLS(Transport Layer Security)协议的,提供数据加密和认证功能,防止中间人攻击或未经授权的访问。Webhook配置中Service映射到具体的Pod,在Validating Webhook的配置中,通常会指定一个Service地址,例如my-webhook-service.default.svc,
Service根据配置的选择器规则(Selector)将请求路由到具体的Webhook Pod。
apiVersion: admissionregistration.k8s.io/v1
kind: ValidatingWebhookConfiguration
metadata:creationTimestamp: "2024-01-23T06:36:35Z"generation: 1name: hbox-guard-podresourceVersion: "418441561"selfLink: /apis/admissionregistration.k8s.io/v1/validatingwebhookconfigurations/hbox-guard-pod
webhooks:
- admissionReviewVersions:- v1clientConfig:caBundle: LS0tLS1CRUdJTiBDRVJUSUZJQ0FU......0tLS0tCg==service:name: my-webhook-service.default.svcnamespace: hboxpath: /pod/validateport: 443failurePolicy: FailmatchPolicy: Equivalentname: pod.validate.hboxnamespaceSelector: {}objectSelector: {}rules:- apiGroups:- '*'apiVersions:- v1operations:- DELETE- CREATE- UPDATEresources:- podsscope: '*'sideEffects: NoneOnDryRuntimeoutSeconds: 10
Webhook与API Server通信过程如下:
当用户提交的资源请求到达API Server时,API Server会调用配置的 Validating Webhook地址。
API Server使用client.caBundle验证目标Webhook服务的身份,确保安全。
验证通过后,API Server将请求转发到Webhook对应的Service。
Service负责将请求分发到后端的具体Pod,最终由Pod内的Webhook服务处理请求并返回验证结果。
3. Webhook服务开发
func admitPodEnv(ar admissionv1.AdmissionReview) *admissionv1.AdmissionResponse {req := ar.Requestklog.Infof("pod name %s, account: %+v, account name: %s, opt %s", req.Name, req.UserInfo, req.UserInfo.Username, req.Operation)env := utils.ProdEnvif strings.Contains(req.UserInfo.Username, utils.PreEnv) {env = utils.PreEnv}opt := req.Operationraw := req.Object.Rawif opt == admissionv1.Delete {raw = req.OldObject.Raw}var pod corev1.Poddeserializer := scheme.GetCodecs().UniversalDeserializer()if _, _, err := deserializer.Decode(raw, nil, &pod); err != nil {klog.Error("Decode pod err: ", err)return utils.PackAdmissionResponse(false, err.Error())}
}
首先,通过 AdmissionReview 的 Request 对象解析出 Pod 和请求方的相关信息。具体步骤如下:
解析请求方信息:从req.UserInfo.Username中提取请求方的用户名;如果用户名中包含pre字符串,则说明当前操作请求方来自预发环境。
解析 Pod 信息:从req.Object中解析出 Pod 的详细信息;如果请求方来自预发环境,则进一步解析 Pod 的具体信息,以决定是否需要进行拦截操作。
处理删除操作:在处理 Pod 删除操作时,需要注意以下特殊情况:当操作类型为删除(DELETE)时,req.Object 中的 Pod 信息将被清空,无法直接获取当前 Pod 的数据;此时需要通过 req.OldObject 获取已被删除的 Pod 的完整信息。req.OldObject 包含的是在删除之前的资源快照,这对于日志记录、验证或其他自定义逻辑(如审计或资源状态备份)非常重要。
通用逻辑:这一逻辑的实现不仅适用于删除操作,还为其他资源类型的拦截提供了模板。通过巧妙地利用请求上下文中的 Object 和 OldObject,可以处理资源操作的多样性,确保在不同操作类型下都能正确解析和处理资源信息。
if v, ok := pod.Labels[utils.PrePodAnnotation]; ok && v == utils.PreEnvPod {if env == utils.PreEnv {klog.V(5).Infof("Pre-release environment not allowed to %s production pod %s", string(opt), pod.Name)return utils.PackAdmissionResponse(false, fmt.Sprintf("Pre-release environment not allowed to %s production pod", string(opt)))}
}
接着,根据Pod对象内容解析Label相关信息,获取Pod所在的环境信息。具体步骤如下:
a. 解析 Pod 标签:从 Pod 对象中提取标签信息,特别是用于标识环境的标签(例如environment标签)。
b. 获取 Pod 所在环境:根据标签信息确定Pod所在的环境。例如:标签environment:production表示Pod运行在生产环境中。
c. 验证发送方环境:检查请求的发送方环境。发送方环境信息通常包含在req.UserInfo.Username中,可以通过字符串匹配来判断发送方是否来自预发环境(例如:用户名包含pre前缀)。
d. 环境对比和拦截:如果 Pod 所在环境为生产环境,而发送方环境为预发环境,则拦截此次操作;通过返回Allowed: false的响应来拒绝此次针对Pod 的操作,并附带适当的错误消息。
now := time.Now().Format(utils.TimeLayout)
createTime := pod.CreationTimestamp.AddDate(0, 1, 0).Format(utils.TimeLayout)
//tmpTime := pod.CreationTimestamp.Add(time.Minute * 2).Format(utils.TimeLayout)
klog.V(5).Infof("env %s, now %s, create: %s, compare %v", env, now, createTime, now > createTime)
if env == utils.PreEnv && (now > createTime) {klog.V(5).Infof("Pre-release environment not allowed to %s too old pod %s", string(opt), string(opt), pod.Name)return utils.PackAdmissionResponse(false, fmt.Sprintf("Pre-release environment not allowed to %s too old pod", string(opt)))
}
klog.V(4).Infof("Pod %s validate pass ", pod.Name)
return utils.PackAdmissionResponse(true, "pass delete")
检测来自预发的删除请求时,对Pod的创建时间进行校验。如果Pod创建时间超过1个月,直接拒绝操作。
4. 预发KubeConfig的生成
cat <<EOF > $KUBECONFIG_NAME
apiVersion: v1
kind: Config
clusters:
- name: ${USER}cluster:server: $APISERVER_ADDRESScertificate-authority-data: $clusterCA
users:
- name: $USERuser:token: $token
contexts:
- name: ${USER}context:user: $USERcluster: ${USER}
current-context: ${USER}
EOFecho "Congratulations! The kubeconfig file is generated at $KUBECONFIG_NAME"
一个标准的 KubeConfig 文件由以下几部分组成:clusters:集群信息(如 API 服务器的地址);users:用户信息(如认证方式和凭据);contexts:上下文信息(将用户、集群和命名空间绑定在一起);current-context:当前的上下文(用于指定默认使用的集群和用户)。生成一个 KubeConfig
文件的步骤如下:
确认 Kubernetes 集群信息,如APISERVER_ADDRESS、USER=hbox-pre、NAMESPACE=hbox、KUBECONFIG_NAME=${USER}.yaml; CA 证书
创建认证信息,主要通过Client Certificate生成客户端证书和私钥
配置 KubeConfig 文件,通过上面代码配置生成KubeConfig文件。
5. 360AI平台拦截最终实现的效果
采用预发环境的KubeConfig删除生产环境pod时,不允许删除操作,收到Error from server: admission webhook "pod.validate.xxx" denied the request: Pre-release environment not allowed to DELETE production pod;生产环境KubeConfig能够正常删除生产环境的Pod。
推荐阅读:
基于eni弹性网卡实现k8s自研网络组件
openstack内部rpc消息通信源码分析
微服务的熔断、限流和降级
更多技术和产品文章,请关注👆
如果您对哪个产品感兴趣,欢迎留言给我们,我们会定向邀文~
360智汇云是以"汇聚数据价值,助力智能未来"为目标的企业应用开放服务平台,融合360丰富的产品、技术力量,为客户提供平台服务。
目前,智汇云提供数据库、中间件、存储、大数据、人工智能、计算、网络、视联物联与通信等多种产品服务以及一站式解决方案,助力客户降本增效,累计服务业务1000+。
智汇云致力于为各行各业的业务及应用提供强有力的产品、技术服务,帮助企业和业务实现更大的商业价值。
官网:https://zyun.360.cn 或搜索“360智汇云”
客服电话:4000052360
欢迎使用我们的产品!😊