欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 健康 > 美食 > k8s-Informer之Reflector的解析

k8s-Informer之Reflector的解析

2025/7/3 17:38:45 来源:https://blog.csdn.net/realize_dream/article/details/144310749  浏览:    关键词:k8s-Informer之Reflector的解析

Reflect 概述

Reflector 从 kube-apiserver 中 list&watch 资源对象,用于监听指定资源的 Kubernetes 。当资源对象发生变化时(如:添加和删除等事件),Reflector 会将其这些资源对象的变化包装成Delta并将其丢到DeltaFIFO中。其实就是将 Etcd 的对象及其变化反射到DeltaFIFO中,实时更新本地缓存,确保本地数据和 ETCD 数据一致。

源码位置:k8s.io/client-go/tools/cache/reflector.go
(1)Reflector 它的数据结构如下:

 type Reflector struct {name stringexpectedTypeName stringexpectedType reflect.Type // 放到Store中(即DeltaFIFO中)的对象类型expectedGVK *schema.GroupVersionKindstore Store // 与 Watch 源同步的⽬标,会赋值为 DeltaFIFOlisterWatcher ListerWatcher // ListerWatcher是个interface(含list和watch)backoffManager wait.BackoffManagerinitConnBackoffManager wait.BackoffManagerMaxInternalErrorRetryDuration time.DurationresyncPeriod time.Duration // 重新同步周期ShouldResync func() boolclock clock.ClockpaginatedResult boollastSyncResourceVersion stringisLastSyncResourceVersionUnavailable boollastSyncResourceVersionMutex sync.RWMutexWatchListPageSize int64watchErrorHandler WatchErrorHandler
}

(2)Reflector 初始化

通过 NewReflector 实例化 Reflector 对象,在实例中需要传入的 ListerWatcher 数据接口对象,这个包含核心 List 和 Watch 方法,主要是负责 List 和 Watch 指定的 Kubernetes APIServer 资源。


func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {  return NewNamedReflector(naming.GetNameFromCallsite(internalPackages...), lw, expectedType, store, resyncPeriod)  
}// NewNamedReflector same as NewReflector, but with a specified name for logging  
func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {  realClock := &clock.RealClock{}  r := &Reflector{  name:          name,  listerWatcher: lw,  store:         store,  initConnBackoffManager: wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, realClock),  resyncPeriod:           resyncPeriod,  clock:                  realClock,  watchErrorHandler:      WatchErrorHandler(DefaultWatchErrorHandler),  }  r.setExpectedType(expectedType)  return r  
}

(3)ListerWatcher interface

type Lister interface {  List(options metav1.ListOptions) (runtime.Object, error)  
}  type Watcher interface {  
Watch(options metav1.ListOptions) (watch.Interface, error)  
}  type ListerWatcher interface {  Lister  Watcher
}

(4)ListWatch struct

type ListFunc func(options metav1.ListOptions) (runtime.Object, error)type WatchFunc func(options metav1.ListOptions) (watch.Interface, error)type ListWatch struct {  ListFunc  ListFunc  WatchFunc WatchFunc  DisableChunking bool  
}

(5)Reflector 启动

创建 Reflector 对象后, Run 方法启动监听并处理事件,通过 wait.BackoffUntil 不断调用 ListAndWatch 方法,如果该方法 return 了,那么就会发生re-list,watch过程则被嵌套在for循环中。 Run() 中最核心的就是 List-Watch 方法。

func (r *Reflector) Run(stopCh <-chan struct{}) {  klog.V(3).Infof("Starting reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)  wait.BackoffUntil(func() {  if err := r.ListAndWatch(stopCh); err != nil {  r.watchErrorHandler(r, err)  }  }, r.backoffManager, true, stopCh)  klog.V(3).Infof("Stopping reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)  
}

ListAndWatch 核心代码:

func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {  // ...// list 获取资源下的所有对象的数据  err := r.list(stopCh)  if err != nil {  return err  }  // go 部分go func() {  // 返回重新同步的定时通道  resyncCh, cleanup := r.resyncChan()  // ...for {  select {  case <-resyncCh:  case <-stopCh:  return  case <-cancelCh:  return  }  // 判断是否需要执行Resync操作,即重新同步  if r.ShouldResync == nil || r.ShouldResync() {  klog.V(4).Infof("%s: forcing resync", r.name)  // Resync 机制会将本地存储(LocalStore)的资源对象同步到 DeltaFIFO 中  if err := r.store.Resync(); err != nil {  resyncerrc <- err  return  }  }  cleanup()  // 重新启⽤定时器定时触发  resyncCh, cleanup = r.resyncChan()  }  }()  // for 部分 for {  // 1、stopCh处理,判断是否需要退出循环 select {  case <-stopCh:  return nil  default:  }  // 2、将resourceVersion为最新的resourceVersion,即从list回来的最新resourceVersion开始执行watch操作timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))  options := metav1.ListOptions{  ResourceVersion: r.LastSyncResourceVersion(),TimeoutSeconds: &timeoutSeconds,AllowWatchBookmarks: true}   // 3、 开始监听start := r.clock.Now()  w, err := r.listerWatcher.Watch(options)// 4、Reflctor 组件的功能: 事件处理函数  // 事件处理函数,当触发增删改时,将对应的资源对象更新到本地缓存 DeltaFIFO,并设置 ResouceVersion 最新  err = watchHandler(start, w, r.store, r.expectedType, r.expectedGVK, r.name, r.expectedTypeName, r.setLastSyncResourceVersion, r.clock, resyncerrc, stopCh)  // ... }  
}

r.watchHandler() 函数:

watchHandler()函数是 reflector组件的一个重要函数,它负责监听Kubernetes API server中的对象变更事件。如当触发增删改时,将对应的资源对象更新到本地缓存 DeltaFIFO,并设置 ResouceVersion 最新。

主要逻辑:
(1)从watch操作返回来的结果中获取event事件;
(2)接收到每个事件后,watchHandler()函数会判断该事件是否为错误事件及根据事件类型作出处理;
(3)获得当前watch到资源的ResourceVersion;
(4)判断不同类型的event事件作出相应处理;
(5)调用r.setLastSyncResourceVersion,更新Reflector对象中存储的最新的资源版本号。

循环操作,直至event事件处理完毕。


func watchHandler(start time.Time,  w watch.Interface,  store Store,  expectedType reflect.Type,  expectedGVK *schema.GroupVersionKind,  name string,  expectedTypeName string,  setLastSyncResourceVersion func(string),  clock clock.Clock,  errc chan error,  stopCh <-chan struct{},  
) error {  eventCount := 0  // Stopping the watcher should be idempotent and if we return from this function there's no way  // we're coming back in with the same watch interface.   defer w.Stop()  loop:  for {  select {  case <-stopCh:  return errorStopRequested  case err := <-errc:  return err  case event, ok := <-w.ResultChan():  if !ok {  // 错误事件,可能与客户端的连接已断开,则重试机制下尝试重新连接break loop  }  if event.Type == watch.Error {  return apierrors.FromObject(event.Object)  }  if expectedType != nil {  if e, a := expectedType, reflect.TypeOf(event.Object); e != a {  utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", name, e, a))  continue  }  }  if expectedGVK != nil {  if e, a := *expectedGVK, event.Object.GetObjectKind().GroupVersionKind(); e != a {  utilruntime.HandleError(fmt.Errorf("%s: expected gvk %v, but watch event object had gvk %v", name, e, a))  continue  }  }  meta, err := meta.Accessor(event.Object)  if err != nil {  utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", name, event))  continue  }  // 获得当前watch到资源的ResourceVersionresourceVersion := meta.GetResourceVersion()  switch event.Type {  // 不同类型的event事件,调用不同函数处理。如事件为Added则调用 store.Add 处理case watch.Added:  err := store.Add(event.Object)  if err != nil {  utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", name, event.Object, err))  }  case watch.Modified:  err := store.Update(event.Object)  if err != nil {  utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", name, event.Object, err))  }  case watch.Deleted:  // TODO: Will any consumers need access to the "last known  // state", which is passed in event.Object? If so, may need  // to change this.            err := store.Delete(event.Object)  if err != nil {  utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", name, event.Object, err))  }  case watch.Bookmark:  // A `Bookmark` means watch has synced here, just update the resourceVersion  default:  utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", name, event))  }  // 记录Reflector对象已经处理过的最新的资源版本号,以便在下次请求资源数据时能够从该版本号开始监听资源变更。setLastSyncResourceVersion(resourceVersion)  if rvu, ok := store.(ResourceVersionUpdater); ok {  rvu.UpdateResourceVersion(resourceVersion)  }  eventCount++  }  }  watchDuration := clock.Since(start)  if watchDuration < 1*time.Second && eventCount == 0 {  return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", name)  }  klog.V(4).Infof("%s: Watch close - %v total %v items received", name, expectedTypeName, eventCount)  return nil  
}

到这里 Reflector 组件的功能基本就结束了,接下来分析 DeltaFIFO 组件。

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词