K8s sharedInformer源码阅读总结
@ wgjak47 | 星期四,八月 29 日,2019 年 | 18 分钟阅读 | 更新于 星期四,八月 29 日,2019 年

k8s informer源码阅读

K8s sharedInformer源码阅读总结

简介

k8s中的informer是用于k8s Controller和k8s APIServer通讯的缓存机制,informer通过ListAndWatch机制,从apiserver同步数据, k8s的controller从informer获取相关数据。通过这种缓存机制,减少了与k8s APIServer的通讯压力。

架构图

组件

Reflector

reflector的作用是将从ListAndWatch机制获取的数据转换成对象。位于tools/cache/reflector.go。数据结构定义如下:

// Reflector 监听一种特定的资源,并将这种资源的全部变化通过反射机制转换成响应对象存储在store中
type Reflector struct {
// reflector名称. 默认为 文件名:行号
name string
// 监控信息
metrics *reflectorMetrics
// store中存放对象的类型
expectedType reflect.Type
// 存储介质,这里在sharedInformer里一般是DeltaFIFO
store Store
// ListerWatcher接口,提供ListAndWatchAPI接口调用
listerWatcher ListerWatcher
// watch间隔
period       time.Duration
// 重新同步的间隔,注意,这里不是重新调用List,而是store的resync
resyncPeriod time.Duration
// 是否可以进行完整同步
ShouldResync func() bool
// 时钟
clock clock.Clock
// 最后一次同步的资源版本
lastSyncResourceVersion string
// 用于读取lastSyncResourceVersionMutex的保护锁
lastSyncResourceVersionMutex sync.RWMutex
// 调用的Watch接口时的单页数量(chunk size)
// 现在通过分页机制,减轻k8s api server的请求压力
WatchListPageSize int64
}

方法如下定义如下:

// 创建一个使用Namespace为key的indexer作为store的reflector,返回indexer和reflector
func NewNamespaceKeyedIndexerAndReflector(lw ListerWatcher, expectedType interface{}, resyncPeriod time.Duration) (indexer Indexer, reflector *Reflector)
// 使用文件名:行号创建一个reflector
func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector
// 创建一个有名字的reflector
func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector
// reflector执行主流程
func (r *Reflector) Run(stopCh <-chan struct{})
// 设置store的定期同步
func (r *Reflector) resyncChan() (<-chan time.Time, func() bool)
// 通过List API获取所有数据,存储在store中,并通过Watch进行更新
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error
// 更新store中的所有数据
func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error
// 从watcher那里ResultChan接收数据并存储,并更新lastResourceVersion
func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error
// 获取最后一次同步的资源版本
func (r *Reflector) LastSyncResourceVersion() string
// 设置最后一次同步的资源版本
func (r *Reflector) setLastSyncResourceVersion(v string)

reflector这里有一个非常有趣的地方,就是resyncPeriod这个参数,根据源代码注释,看起来是每个一段时间完整调用List API,全量更新数据,但是实际上,这个更新是 把Index中的所有数据 重新同步到DeltaFIFO。我猜想这里是完全信任Watch api处理不会产生问题,而自定义controller那里可能无法确认(毕竟不是自己写的),所以提供 一个全量同步的机会,让controller能有机会修复错误,达到对象正确的状态。

DeltaFIFO

DeltaFIFO是一个队列,用于存储k8s对象变化的数据,以便更新到local store,同时通过callbacks发送到WorkQueue。 数据结构定义:

// DelatType类型
type DeltaType string

// Delta常量
const (
Added   DeltaType = "Added"
Updated DeltaType = "Updated"
Deleted DeltaType = "Deleted"
// Sync状态主要会在一下状态存在:
//  * watch出现异常/超时或者一个新的list/watch周期开始
//  * 启用周期同步(resyncPeriod不为0)
// 以及任何可能出发DeltaFIFO的replace的操作
Sync DeltaType = "Sync"
)

// Delta类型定义,DeltaFIFO中存储的类型,除非是删除变化,你得到的是对先在删除前的
// 最终状态
type Delta struct {
// 变更的类型
Type   DeltaType
// 变更的对象
Object interface{}
}

// Deltas包含一个或多个独立对象的变化,按照时间先后升序排列
type Deltas []Delta

type DeltaFIFO struct {
// 安全访问`items`和`queue`的信号量
lock sync.RWMutex
cond sync.Cond

// 存放Deltas的Map
items map[string]Deltas
// 存放deltas的key的队列
queue []string

// 第一次执行Replace操作或者第一次调用Delete/Add/Update时返回true
populated bool
// 第一次执行Replace操作时放入队列的数量
initialPopulationCount int

// 对象的key计算函数,用于queue
keyFunc KeyFunc

// knownObjects list keys that are "known", for the
// purpose of figuring out which items have been deleted
// when Replace() or Delete() is called.
// knownObjects保存这所有已知对象的key,用于执行Replace/Delete
// 时识别那些items已经被删除了
knownObjects KeyListerGetter

// Indication the queue is closed.
// Used to indicate a queue is closed so a control loop can exit when a queue is empty.
// Currently, not used to gate any of CRED operations.
// 是否已经被关闭
closed     bool
// 用于变更关闭状态的锁
closedLock sync.Mutex
}
// 对于一个删除的对象,如果watch api没有捕捉到这个变更
// 那么就会使用这个类型来代替
type DeletedFinalStateUnknown struct {
Key string
Obj interface{}
}

// keyLister和KeyGetter的组合
// 由于store Interface实现了ListKeys和GetBykey,因此
// 可以使用store对象为KeyListerGetter变量的值
// 根据controller里的代码,DeltaFIFO中的KnownObjects是一个Indexer对象
type KeyListerGetter interface {
KeyLister
KeyGetter
}

type KeyLister interface {
ListKeys() []string
}

type KeyGetter interface {
GetByKey(key string) (interface{}, bool, error)
}

方法定义:

// 创建一个新的DeltaFIFO对象
func NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO
// 关闭DeltaFIFO队列
func (f *DeltaFIFO) Close()
// 获取对象的Key
func (f *DeltaFIFO) KeyOf(obj interface{}) (string, error)
// 如果最开始调用Add/Update/Delete/AddIfNotPresent或者
// 当执行第一个update操作的时候,但是被replace操作入队的对象都被处理完了,返回True
// 其实就是通过List的对象是否已经全部存放到Store中了
func (f *DeltaFIFO) HasSynced() bool
// 添加一个对象到队列,该对象的类型为Added
func (f *DeltaFIFO) Add(obj interface{}) error
// 添加一个对象到队列,该对象的类型为Updated
func (f *DeltaFIFO) Update(obj interface{}) error
// 添加一个对象到队列,该对象的类型为Deleted,如果对象已经不存在了,就会被忽略
func (f *DeltaFIFO) Delete(obj interface{}) error
// 添加一个对象到队列,注意这个对象的类型必须是Deltas类型,如果对象已经存在,则忽略
func (f *DeltaFIFO) AddIfNotPresent(obj interface{}) error
// 合并最近两个Delta对象,如果两个对象相同
// 由于Deltas是从items中以object key取出的,
// 可以认为相同的对象的不同变更。
func dedupDeltas(deltas Deltas) Deltas
// 判断Delta对象是否时相同的
func isDup(a, b *Delta) *Delta
// 判断Deleted类型的Delta是否相同
func isDeletionDup(a, b *Delta) *Delta
// 用于判对象是否会被删除,如果最新的变更时删除,返回true
func (f *DeltaFIFO) willObjectBeDeletedLocked(id string) bool
// 添加对象到队列,调用这个函数时默认已经处理好了锁
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error
// 返回所有items中最近的变更对象
func (f *DeltaFIFO) List() []interface{}
// 返回所有items中的keys
func (f *DeltaFIFO) ListKeys() []string
// 根据一个对象,返回该对象的所有变更
func (f *DeltaFIFO) Get(obj interface{}) (item interface{}, exists bool, err error)
// 根据Key返回对应对象的所有变更
func (f *DeltaFIFO) GetByKey(key string) (item interface{}, exists bool, err error)
// 判断队列是否关闭
func (f *DeltaFIFO) IsClosed() bool
// 该函数会在队列为空的时候阻塞,直到有对象进入队列,并返回这个对象
// 由于返回对象前会从队列里清除,因此如果调用process失败,会通过AddIfNotPresent重新添加到队列中
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error)
// 将f的内容用给定的对象集完全替换掉,用于实现对象的全量更新
func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error
// 使用knownObjects中的对象重新加入到队列中,类型为sync
// 一般在设置的了ResyncPeriod不为0的时候定时执行
func (f *DeltaFIFO) Resync() error

InformerController

InformerController是驱动informer整个流程的引擎。数据结构定义如下:

// InformerController的配置对象
type Config struct {
// 对象的队列,这里必须是一个DeltaFIFO
Queue

// 调用ListAndWatch接口的Interface
ListerWatcher

// 处理对象的接口,这个函数将接收Queue.Pop方法获取的对象
Process ProcessFunc

// 要处理对象的类型
ObjectType runtime.Object

// 从Store(Indexer)中定期向DeltaFIFO中同步的周期
FullResyncPeriod time.Duration

// 是否应该进行resync操作,如果返回True,则Controller就开始进行resync操作
ShouldResync ShouldResyncFunc

// 如果是True,当对象处理一场,会将对象重新加入到队列之中
RetryOnError bool
}

// 判断是否进行resync操作的函数
type ShouldResyncFunc func() bool

// 处理对象的方法
type ProcessFunc func(obj interface{}) error

// informer的Controller
type controller struct {
// 配置对象
config         Config
// 反射器,用于将API的返回转换成对象
reflector      *Reflector
// 反射器的锁
reflectorMutex sync.RWMutex
// 时钟
clock          clock.Clock
}

// Controller的Interface
type Controller interface {
// 主逻辑
Run(stopCh <-chan struct{})
// 资源是否已经同步到Store中了
HasSynced() bool
// 最后一次同步的资源版本
LastSyncResourceVersion() string
}

// ResourceEventHandler用于处理资源产生的变更
type ResourceEventHandler interface {
OnAdd(obj interface{})
OnUpdate(oldObj, newObj interface{})
OnDelete(obj interface{})
}

// 提供给用户接口以处理不同的资源变更事件
type ResourceEventHandlerFuncs struct {
AddFunc    func(obj interface{})
UpdateFunc func(oldObj, newObj interface{})
DeleteFunc func(obj interface{})
}
// 增加了FilterFunc函数的ResourceEventHandler
// 所有操作都会经过FilterFunc进行过滤,确认是否可以操作
type FilteringResourceEventHandler struct {
FilterFunc func(obj interface{}) bool
Handler    ResourceEventHandler
}

方法定义:

// 通过一个新的Config创建一个新的Controller
// 我觉得叫NewController比较合适吧...
func New(c *Config) Controller
// 主流流程,并发执行reflector.run和定期执行processloop方法
func (c *controller) Run(stopCh <-chan struct{})
// 资源是否已经同步到Store中了
func (c *controller) HasSynced() bool
// 最后一次同步的资源版本
func (c *controller) LastSyncResourceVersion()
// 处理队列中的对象。
func (c *controller) processLoop()
// 处理新增,更新和删除时间,调用用户定义的ResourceEventHandlerFuncs的方法
func (r ResourceEventHandlerFuncs) OnAdd(obj interface{})
func (r ResourceEventHandlerFuncs) OnUpdate(oldObj, newObj interface{})
func (r ResourceEventHandlerFuncs) OnDelete(obj interface{})
// 处理新增,更新和删除时间,调用用户定义的ResourceEventHandlerFuncs的方法
// 在次之前会对变更对象使用filterfunc以判断是否满足操作条件
func (r FilteringResourceEventHandler) OnAdd(obj interface{})
// 这里会根据oldObj和newObj的filterFunc执行情况分别执行add, update, delete
func (r FilteringResourceEventHandler) OnUpdate(oldObj, newObj interface{})
func (r FilteringResourceEventHandler) OnDelete(obj interface{})
// 检查对象是否为DeleteFinalStatusUnknown,如果是,返回key,否则调用MetaNamespaceKeyFunc
// 返回对象的index key。
func DeletionHandlingMetaNamespaceKeyFunc(obj interface{}) (string, error)
// 创建一个InformerController,返回存储介质和Controller
func NewInformer(
lw ListerWatcher,
objType runtime.Object,
resyncPeriod time.Duration,
h ResourceEventHandler,
) (Store, Controller)
// 创建一个带索引的InformerController,返回Indexer和Controller
func NewIndexerInformer(
lw ListerWatcher,
objType runtime.Object,
resyncPeriod time.Duration,
h ResourceEventHandler,
indexers Indexers,
) (Indexer, Controller)
// 创建Controller核心逻辑
// 创建Controller的cfg,并且使用内部的匿名函数作为Process,
// 对于一个对象的所有变更,按照时间先后顺序,添加到Indexer,并调用用户定义
// 的ResourceEventHandler的回调接口
func newInformer(
lw ListerWatcher,
objType runtime.Object,
resyncPeriod time.Duration,
h ResourceEventHandler,
clientState Store,
) Controller

Indexer

Indexer是一种可以使用不同索引函数为对象进行索引的存储介质,作为sharedInformer的存储。 数据结构定义如下:

// Indexer包含三种key:
//   * 对象的存储key,也是对象的唯一识别key
//   * 索引名称,关联不同的对象索引计算函数
//   * 对象的索引Key,用于对对象分组
type Indexer interface {
// 底层基本存储类型
Store
// 根据indexName返回对象的Index keys(注意这里有多个)
Index(indexName string, obj interface{}) ([]interface{}, error)
// 根据index类型和index key返回所有对象的存储key
IndexKeys(indexName, indexedValue string) ([]string, error)
// 根据IndexName获取所有的索引建
ListIndexFuncValues(indexName string) []string
// 根据index类型和index key返回所有对象
ByIndex(indexName, indexedValue string) ([]interface{}, error)
// 获取所有的index计算函数
GetIndexers() Indexers
// 添加一个或多个index计算函数,注意不能在store中有数据的时候使用
AddIndexers(newIndexers Indexers) error
}

// 计算对象index的函数
type IndexFunc func(obj interface{}) ([]string, error)

const (
// 最常用的index函数,即对象的namespace
NamespaceIndex string = "namespace"
)

// Indexer可能有多种计算方式,进而产生多个index key,而index key可能相同,因此使用Sets
type Index map[string]sets.String

// 根据IndexName存放不同的index计算函数
type Indexers map[string]IndexFunc

// 根据indexName存放不同类型的索引值
type Indices map[string]Index

方法定义如下:

// 最常用的index计算函数,返回对象的namespace
func MetaNamespaceIndexFunc(obj interface{}) ([]string, error)

// 将一个indexFunc转换为一个KeyFunc(计算对象唯一的key),如果你的index计算函数可以为所欧的object产生不同的index值
// 则可以使用这个方法将你的计算函数转换成一个KeyFunc。在执行转换后的KeyFunc时,如果有key冲突,会报错
func IndexFuncToKeyFuncAdapter(indexFunc IndexFunc) KeyFunc

Cache

cache是一种最常见的Indexer实现,将所有对象存放在内存之中。 数据结构定义如下:

// Store 通用存储接口
type Store interface {
// 添加对象
Add(obj interface{}) error
// 更新对象
Update(obj interface{}) error
// 删除对象
Delete(obj interface{}) error
// 列出所有对象
List() []interface{}
// 列出所有对象的key
ListKeys() []string
// 返回与传入对象的唯一key相同的对象
Get(obj interface{}) (item interface{}, exists bool, err error)
// 通过对象唯一key获取对象
GetByKey(key string) (item interface{}, exists bool, err error)

// 使用一些对象完全替换调当前存储的对象
Replace([]interface{}, string) error
// 重新同步
Resync() error
}

// KeyFunc返回的异常
type KeyError struct {
Obj interface{}
Err error
}

// 外部key,当没有完整对象,只有对象的唯一key时,可以用来计算
// 出对象的namespace
type ExplicitKey string

// cache是一种indexer的实现,主要功能有两个:
//  1. 调用cacheStorage的方法
//  2. 通过keyFunc计算对象的唯一key
type cache struct {
// 线程安全的存储结构
cacheStorage ThreadSafeStore
// 计算对象key的函数,要求保证每个对象的key唯一
keyFunc KeyFunc
}

// 所有操作都保证线程安全的存储
type ThreadSafeStore interface {
Add(key string, obj interface{})
Update(key string, obj interface{})
Delete(key string)
Get(key string) (item interface{}, exists bool)
List() []interface{}
ListKeys() []string
Replace(map[string]interface{}, string)
Index(indexName string, obj interface{}) ([]interface{}, error)
IndexKeys(indexName, indexKey string) ([]string, error)
ListIndexFuncValues(name string) []string
ByIndex(indexName, indexKey string) ([]interface{}, error)
GetIndexers() Indexers
// 在已经存在数据的ThreadSafeStore中操作,会造成未知结果
AddIndexers(newIndexers Indexers) error
Resync() error
}

// 实现ThreadSafeStore的实际类型
type threadSafeMap struct {
// 添加的读写锁,这里没有用sync.Map
// 是因为sync.Map的写性能很差
lock  sync.RWMutex
// 核心存储介质
items map[string]interface{}

// 存储Indexer函数
indexers Indexers
// 存储不同的Indexer计算出来的Index key
// 每个index key下都对应着多个index key相同的对象的key
indices Indices
}

定义方法如下:

// 实现Error接口
func (k KeyError) Error() string
// 计算对象唯一的key的计算函数,具体的key为<namespace>/<name>
// 如果namespace为空,那么就只有name
func MetaNamespaceKeyFunc(obj interface{}) (string, error)
// 分别返回对象的namespace和name
func SplitMetaNamespaceKey(key string) (namespace, name string, err error)
// cache的函数实现基本上是通过keyFunc计算key,然后
// 调用ThreadSafeStore实现的
func (c *cache) Add(obj interface{}) error
func (c *cache) Replace(list []interface{}, resourceVersion string) error
func (c *cache) Update(obj interface{}) error
func (c *cache) Delete(obj interface{}) error
func (c *cache) List() []interface{}
func (c *cache) ListKeys() []string
func (c *cache) GetIndexers() Indexers
func (c *cache) Index(indexName string, obj interface{}) ([]interface{}, error)
func (c *cache) IndexKeys(indexName, indexKey string) ([]string, error)
func (c *cache) ListIndexFuncValues(indexName string) []string
func (c *cache) ByIndex(indexName, indexKey string) ([]interface{}, error)
func (c *cache) AddIndexers(newIndexers Indexers) error
func (c *cache) Get(obj interface{}) (item interface{}, exists bool, err error)
func (c *cache) GetByKey(key string) (item interface{}, exists bool, err error)
func (c *cache) Replace(list []interface{}, resourceVersion string) error
func (c *cache) Resync() error

// 根据KeyFunc创建一个Store(cache),索引函数为空
func NewStore(keyFunc KeyFunc) Store
// 根据KeyFunc和Indexers(索引函数)创建新的Indexer
func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer

// 根据object和它的key添加到存储中
func (c *threadSafeMap) Add(key string, obj interface{})

// 根据object和它的key进行更新
func (c *threadSafeMap) Update(key string, obj interface{})
// 根据key,删除对象
func (c *threadSafeMap) Delete(key string)
// 根据key获取对象
func (c *threadSafeMap) Get(key string) (item interface{}, exists bool)
// 获取全部对象
func (c *threadSafeMap) List() []interface{}
// 获取全部对象的唯一key
func (c *threadSafeMap) ListKeys() []string
// 替换所有对象,并重建索引
func (c *threadSafeMap) Replace(items map[string]interface{}, resourceVersion string)
// 根据index函数名称和查询对象,返回与查询对象有相同index key的对象
func (c *threadSafeMap) Index(indexName string, obj interface{}) ([]interface{}, error)
// 根据index函数名称和index,返回有相同index key的对象
func (c *threadSafeMap) ByIndex(indexName, indexKey string) ([]interface{}, error)
// 根据返回indexName和IndexKey返回相关对象的key
func (c *threadSafeMap) IndexKeys(indexName, indexKey string) ([]string, error)
// 根据indexName返回所有的index key
func (c *threadSafeMap) ListIndexFuncValues(indexName string) []string
// 获取所有的index计算函数
func (c *threadSafeMap) GetIndexers() Indexers
// 添加index计算函数,如果items不为空,会报错
func (c *threadSafeMap) AddIndexers(newIndexers Indexers) error
// 更新索引,从索引中删除就的对象的key,然后重新计算新的对象的index key,添加到索引中
func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, key string)
// 从索引中删除就的对象的key
func (c *threadSafeMap) deleteFromIndices(obj interface{}, key string)
// 实现为空
func (c *threadSafeMap) Resync() error
// 创建一个新的ThreadSafeStore
func NewThreadSafeStore(indexers Indexers, indices Indices) ThreadSafeStore

SharedInformer

数据结构定义如下:

// shardinformer接口
type SharedInformer interface {
// 添加时间处理器到Informer,当有资源变化就可以通知使用者
AddEventHandler(handler ResourceEventHandler)
// 同上,但是会定期进行重新同步操作
AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration)
// 获取存储对象,这里是indexer
GetStore() Store
// 返回SharedInformer的Controller
GetController() Controller
// 主逻辑函数
Run(stopCh <-chan struct{})
// 如果store已经存储完成所有的LIST 请求的结果,返回True
HasSynced() bool
// 最近一次同步对象的版本,
LastSyncResourceVersion() string
}

// 在sharedinformer之上增加添加indexer接口
type SharedIndexInformer interface {
SharedInformer
// AddIndexers add indexers to the informer before it starts.
AddIndexers(indexers Indexers) error
GetIndexer() Indexer
}

// 判断informer是否同步完成
type InformerSynced func() bool
// 实现SharedIndexInformer接口的类型
type sharedIndexInformer struct {
// 作为内部存储的indexer
indexer    Indexer
// 驱动整个流程的Controller
controller Controller
// 有Processor处理各种事件
processor             *sharedProcessor
// 突变检查,默认关闭
cacheMutationDetector MutationDetector

// This block is tracked to handle late initialization of the controller
// 调用ListAndWatch接口的Interface
listerWatcher ListerWatcher
// 要转换的成类型
objectType    runtime.Object

// 定时同步周期。会选择所有ResourceEventHandler的最小同步周期
resyncCheckPeriod time.Duration
// 默认同步周期(通过AddEventHandler)
defaultEventHandlerResyncPeriod time.Duration
// 测试使用的时钟
clock clock.Clock

// 启动前,已启动和已停止
started, stopped bool
// 启动过程中互斥资源需要的锁
startedLock      sync.Mutex

// 保证在添加event handler的时候delta已经被正确的处理,并且在添加完event handler之前不会被处理,防止崩溃。
blockDeltas sync.Mutex
}

// shardInformer的Processer,用于处理变更事件
type sharedProcessor struct {
// 判断sharedProcess里的Listener是否已经启动
listenersStarted bool
// 控制listeners的锁
listenersLock    sync.RWMutex
// 处理一般变更的listeners
listeners        []*processorListener
// 处理sync变更的listener
syncingListeners []*processorListener
// 用于判断是否可以进行resync的时钟
clock            clock.Clock
// 管理listener协程的Group
wg               wait.Group
}

// 用于监听事件的变化
type processorListener struct {
// 下面四个属性的关系是 addCh -> pendingNotifications -> nextCh -> ResourceEventHandler
// 由于chan满了会造成发送阻塞,因此要通过两个chan之间的ringbuffer作为缓冲,防止出现阻塞
// 处理队列
nextCh chan interface{}
// 输入队列
addCh  chan interface{}
// 事件处理器
handler ResourceEventHandler
// 缓冲队列,如果listener出现问题,可能会造成OOM
pendingNotifications buffer.RingGrowing

// requestedResyncPeriod is how frequently the listener wants a full resync from the shared informer
// listener从sharedinformer进行数据同步的周期
requestedResyncPeriod time.Duration
// resyncPeriod is how frequently the listener wants a full resync from the shared informer. This
// value may differ from requestedResyncPeriod if the shared informer adjusts it to align with the
// informer's overall resync check period.
// 又有informer中的processor要跟其它的processor同步周期保持一致,
// 这里存储的是实际上的同步周期
resyncPeriod time.Duration
// 下一次同步的时间点
nextResync time.Time
// 用于读取和写入下一次事件点的锁
resyncLock sync.Mutex
}

方法定义如下:

// 创建一个新的SharedInformer (其实是indexer为空的SharedIndexInformer)
func NewSharedInformer(lw ListerWatcher, objType runtime.Object, resyncPeriod time.Duration) SharedInformer
// 创建一个新的SharedIndexInformer
func NewSharedIndexInformer(lw ListerWatcher, objType runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer
// WatiForCacheSync的Wrapper函数,多了一些log
func WaitForNamedCacheSync(controllerName string, stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool
// 如果所有的cache都已经完成首次数据同步,返回true,否则返回false
func WaitForCacheSync(stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool
// sharedIndexInformer 主逻辑,主要是创建controller,启动processor和controller
func (s *sharedIndexInformer) Run(stopCh <-chan struct{})
// 是否已经完成首次数据同步,调用的是controller的HasSynced
func (s *sharedIndexInformer) HasSynced() bool
// 返回controller的LastSyncResourceVersion
func (s *sharedIndexInformer) LastSyncResourceVersion() string
// 返回indexer
func (s *sharedIndexInformer) GetStore() Store
// 返回indexer
func (s *sharedIndexInformer) GetIndexer() Indexer
// 添加indexer,前提是informer没有启动,否则会报错
func (s *sharedIndexInformer) AddIndexers(indexers Indexers) error
// 获取informerController
func (s *sharedIndexInformer) GetController() Controller
// 添加资源变更事件处理,会使用informer默认的同步间隔
func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler)
// 决定同步间隔,如果shardInformer的间隔是0,表示该informer不进行同步,如果desired < check,
// 那么将会把同步周期扩展到check
func determineResyncPeriod(desired, check time.Duration) time.Duration
// 添加资源变更事件处理,使用指定的同步间隔,会经过determineResyncPeriod检查
func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration)
// 针对不同的事件类型分发到不同的listener,sync到syncingListeners,其余到Listeners
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error
// 添加Listener,并启动listener run和pop流程
func (p *sharedProcessor) addListener(listener *processorListener)
// 添加Listener,用于已经上锁的情况
// 对于每个listenr,分别添加到listeners和普通的syncingListeners
func (p *sharedProcessor) addListenerLocked(listener *processorListener)
// 分发变更对象,sync类型到syncingListeners,其余到Listeners
func (p *sharedProcessor) distribute(obj interface{}, sync bool)
// sharedProcessor的主逻辑,启动所有listener的run和pop工作流,
// 这里用stopCh阻塞等待,如果受到响应信号,则关闭listener的addCh channel,
// 然后等待所有任务结束,注意这里启动的只有listeners,没有syncinglisteners
func (p *sharedProcessor) run(stopCh <-chan struct{})
// 判断是否需要进行同步,这里会把syncingListeners清空,重新判断并添加
// listeners中需要同步的listener
func (p *sharedProcessor) shouldResync() bool
// 检查并重新设置所有listener的同步周期,这里调用的是determineResyncPeroid来进行判断
func (p *sharedProcessor) resyncCheckPeriodChanged(resyncCheckPeriod time.Duration)
// 创建一个新的ProcessListener
func newProcessListener(handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int) *processorListener
// 添加一个变更事件,发送信息到addCh
func (p *processorListener) add(notification interface{})
// 从addCh取出一个事件放到pendingNotifications,或者
// 从pendingNotifications里取出一个事件写入nextCh
func (p *processorListener) pop()
// 主要是从nextCh读取事件,根据不同的事件,调用handler中定义的接口
func (p *processorListener) run()
// 根据当前事件推算是否需要同步
func (p *processorListener) shouldResync(now time.Time) bool
// 决定下一次同步的事件
func (p *processorListener) determineNextResync(now time.Time)
// 设定同步周期
func (p *processorListener) setResyncPeriod(resyncPeriod time.Duration)

总结

sharedinformer同时提供了缓存和事件变更队列,并支持用户使用回调函数对不同的事件进行处理。这套机制十分复杂,但是有很精妙。下面是我个人画的总结图: k8s sharedInformer