/* Copyright 2014 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package cache import ( "errors" "fmt" "sync" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/klog" ) // NewDeltaFIFO returns a Queue which can be used to process changes to items. // // keyFunc is used to figure out what key an object should have. (It is // exposed in the returned DeltaFIFO's KeyOf() method, with additional handling // around deleted objects and queue state). // // 'knownObjects' may be supplied to modify the behavior of Delete, // Replace, and Resync. It may be nil if you do not need those // modifications. // // TODO: consider merging keyLister with this object, tracking a list of // "known" keys when Pop() is called. Have to think about how that // affects error retrying. // NOTE: It is possible to misuse this and cause a race when using an // external known object source. // Whether there is a potential race depends on how the comsumer // modifies knownObjects. In Pop(), process function is called under // lock, so it is safe to update data structures in it that need to be // in sync with the queue (e.g. knownObjects). // // Example: // In case of sharedIndexInformer being a consumer // (https://github.com/kubernetes/kubernetes/blob/0cdd940f/staging/ // src/k8s.io/client-go/tools/cache/shared_informer.go#L192), // there is no race as knownObjects (s.indexer) is modified safely // under DeltaFIFO's lock. The only exceptions are GetStore() and // GetIndexer() methods, which expose ways to modify the underlying // storage. Currently these two methods are used for creating Lister // and internal tests. // // Also see the comment on DeltaFIFO. // // Warning: This constructs a DeltaFIFO that does not differentiate between // events caused by a call to Replace (e.g., from a relist, which may // contain object updates), and synthetic events caused by a periodic resync // (which just emit the existing object). See https://issue.k8s.io/86015 for details. // // Use `NewDeltaFIFOWithOptions(DeltaFIFOOptions{..., EmitDeltaTypeReplaced: true})` // instead to receive a `Replaced` event depending on the type. // // Deprecated: Equivalent to NewDeltaFIFOWithOptions(DeltaFIFOOptions{KeyFunction: keyFunc, KnownObjects: knownObjects}) func NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO { return NewDeltaFIFOWithOptions(DeltaFIFOOptions{ KeyFunction: keyFunc, KnownObjects: knownObjects, }) } // DeltaFIFOOptions is the configuration parameters for DeltaFIFO. All are // optional. type DeltaFIFOOptions struct { // KeyFunction is used to figure out what key an object should have. (It's // exposed in the returned DeltaFIFO's KeyOf() method, with additional // handling around deleted objects and queue state). // Optional, the default is MetaNamespaceKeyFunc. KeyFunction KeyFunc // KnownObjects is expected to return a list of keys that the consumer of // this queue "knows about". It is used to decide which items are missing // when Replace() is called; 'Deleted' deltas are produced for the missing items. // KnownObjects may be nil if you can tolerate missing deletions on Replace(). KnownObjects KeyListerGetter // EmitDeltaTypeReplaced indicates that the queue consumer // understands the Replaced DeltaType. Before the `Replaced` event type was // added, calls to Replace() were handled the same as Sync(). For // backwards-compatibility purposes, this is false by default. // When true, `Replaced` events will be sent for items passed to a Replace() call. // When false, `Sync` events will be sent instead. EmitDeltaTypeReplaced bool } // NewDeltaFIFOWithOptions returns a Store which can be used process changes to // items. See also the comment on DeltaFIFO. func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO { if opts.KeyFunction == nil { opts.KeyFunction = MetaNamespaceKeyFunc } f := &DeltaFIFO{ items: map[string]Deltas{}, queue: []string{}, keyFunc: opts.KeyFunction, knownObjects: opts.KnownObjects, emitDeltaTypeReplaced: opts.EmitDeltaTypeReplaced, } f.cond.L = &f.lock return f } // DeltaFIFO is like FIFO, but differs in two ways. One is that the // accumulator associated with a given object's key is not that object // but rather a Deltas, which is a slice of Delta values for that // object. Applying an object to a Deltas means to append a Delta // except when the potentially appended Delta is a Deleted and the // Deltas already ends with a Deleted. In that case the Deltas does // not grow, although the terminal Deleted will be replaced by the new // Deleted if the older Deleted's object is a // DeletedFinalStateUnknown. // // The other difference is that DeltaFIFO has an additional way that // an object can be applied to an accumulator, called Sync. // // DeltaFIFO is a producer-consumer queue, where a Reflector is // intended to be the producer, and the consumer is whatever calls // the Pop() method. // // DeltaFIFO solves this use case: // * You want to process every object change (delta) at most once. // * When you process an object, you want to see everything // that's happened to it since you last processed it. // * You want to process the deletion of some of the objects. // * You might want to periodically reprocess objects. // // DeltaFIFO's Pop(), Get(), and GetByKey() methods return // interface{} to satisfy the Store/Queue interfaces, but they // will always return an object of type Deltas. // // A DeltaFIFO's knownObjects KeyListerGetter provides the abilities // to list Store keys and to get objects by Store key. The objects in // question are called "known objects" and this set of objects // modifies the behavior of the Delete, Replace, and Resync methods // (each in a different way). // // A note on threading: If you call Pop() in parallel from multiple // threads, you could end up with multiple threads processing slightly // different versions of the same object. type DeltaFIFO struct { // lock/cond protects access to 'items' and 'queue'. lock sync.RWMutex cond sync.Cond // We depend on the property that items in the set are in // the queue and vice versa, and that all Deltas in this // map have at least one Delta. items map[string]Deltas queue []string // populated is true if the first batch of items inserted by Replace() has been populated // or Delete/Add/Update was called first. populated bool // initialPopulationCount is the number of items inserted by the first call of Replace() initialPopulationCount int // keyFunc is used to make the key used for queued item // insertion and retrieval, and should be deterministic. keyFunc KeyFunc // knownObjects list keys that are "known" --- affecting Delete(), // Replace(), and Resync() knownObjects KeyListerGetter // Indication the queue is closed. // Used to indicate a queue is closed so a control loop can exit when a queue is empty. // Currently, not used to gate any of CRED operations. closed bool closedLock sync.Mutex // emitDeltaTypeReplaced is whether to emit the Replaced or Sync // DeltaType when Replace() is called (to preserve backwards compat). emitDeltaTypeReplaced bool } var ( _ = Queue(&DeltaFIFO{}) // DeltaFIFO is a Queue ) var ( // ErrZeroLengthDeltasObject is returned in a KeyError if a Deltas // object with zero length is encountered (should be impossible, // but included for completeness). ErrZeroLengthDeltasObject = errors.New("0 length Deltas object; can't get key") ) // Close the queue. func (f *DeltaFIFO) Close() { f.closedLock.Lock() defer f.closedLock.Unlock() f.closed = true f.cond.Broadcast() } // KeyOf exposes f's keyFunc, but also detects the key of a Deltas object or // DeletedFinalStateUnknown objects. func (f *DeltaFIFO) KeyOf(obj interface{}) (string, error) { if d, ok := obj.(Deltas); ok { if len(d) == 0 { return "", KeyError{obj, ErrZeroLengthDeltasObject} } obj = d.Newest().Object } if d, ok := obj.(DeletedFinalStateUnknown); ok { return d.Key, nil } return f.keyFunc(obj) } // HasSynced returns true if an Add/Update/Delete/AddIfNotPresent are called first, // or an Update called first but the first batch of items inserted by Replace() has been popped func (f *DeltaFIFO) HasSynced() bool { f.lock.Lock() defer f.lock.Unlock() return f.populated && f.initialPopulationCount == 0 } // Add inserts an item, and puts it in the queue. The item is only enqueued // if it doesn't already exist in the set. func (f *DeltaFIFO) Add(obj interface{}) error { f.lock.Lock() defer f.lock.Unlock() f.populated = true return f.queueActionLocked(Added, obj) } // Update is just like Add, but makes an Updated Delta. func (f *DeltaFIFO) Update(obj interface{}) error { f.lock.Lock() defer f.lock.Unlock() f.populated = true return f.queueActionLocked(Updated, obj) } // Delete is just like Add, but makes a Deleted Delta. If the given // object does not already exist, it will be ignored. (It may have // already been deleted by a Replace (re-list), for example.) In this // method `f.knownObjects`, if not nil, provides (via GetByKey) // _additional_ objects that are considered to already exist. func (f *DeltaFIFO) Delete(obj interface{}) error { id, err := f.KeyOf(obj) if err != nil { return KeyError{obj, err} } f.lock.Lock() defer f.lock.Unlock() f.populated = true if f.knownObjects == nil { if _, exists := f.items[id]; !exists { // Presumably, this was deleted when a relist happened. // Don't provide a second report of the same deletion. return nil } } else { // We only want to skip the "deletion" action if the object doesn't // exist in knownObjects and it doesn't have corresponding item in items. // Note that even if there is a "deletion" action in items, we can ignore it, // because it will be deduped automatically in "queueActionLocked" _, exists, err := f.knownObjects.GetByKey(id) _, itemsExist := f.items[id] if err == nil && !exists && !itemsExist { // Presumably, this was deleted when a relist happened. // Don't provide a second report of the same deletion. return nil } } return f.queueActionLocked(Deleted, obj) } // AddIfNotPresent inserts an item, and puts it in the queue. If the item is already // present in the set, it is neither enqueued nor added to the set. // // This is useful in a single producer/consumer scenario so that the consumer can // safely retry items without contending with the producer and potentially enqueueing // stale items. // // Important: obj must be a Deltas (the output of the Pop() function). Yes, this is // different from the Add/Update/Delete functions. func (f *DeltaFIFO) AddIfNotPresent(obj interface{}) error { deltas, ok := obj.(Deltas) if !ok { return fmt.Errorf("object must be of type deltas, but got: %#v", obj) } id, err := f.KeyOf(deltas.Newest().Object) if err != nil { return KeyError{obj, err} } f.lock.Lock() defer f.lock.Unlock() f.addIfNotPresent(id, deltas) return nil } // addIfNotPresent inserts deltas under id if it does not exist, and assumes the caller // already holds the fifo lock. func (f *DeltaFIFO) addIfNotPresent(id string, deltas Deltas) { f.populated = true if _, exists := f.items[id]; exists { return } f.queue = append(f.queue, id) f.items[id] = deltas f.cond.Broadcast() } // re-listing and watching can deliver the same update multiple times in any // order. This will combine the most recent two deltas if they are the same. func dedupDeltas(deltas Deltas) Deltas { n := len(deltas) if n < 2 { return deltas } a := &deltas[n-1] b := &deltas[n-2] if out := isDup(a, b); out != nil { d := append(Deltas{}, deltas[:n-2]...) return append(d, *out) } return deltas } // If a & b represent the same event, returns the delta that ought to be kept. // Otherwise, returns nil. // TODO: is there anything other than deletions that need deduping? func isDup(a, b *Delta) *Delta { if out := isDeletionDup(a, b); out != nil { return out } // TODO: Detect other duplicate situations? Are there any? return nil } // keep the one with the most information if both are deletions. func isDeletionDup(a, b *Delta) *Delta { if b.Type != Deleted || a.Type != Deleted { return nil } // Do more sophisticated checks, or is this sufficient? if _, ok := b.Object.(DeletedFinalStateUnknown); ok { return a } return b } // queueActionLocked appends to the delta list for the object. // Caller must lock first. func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error { id, err := f.KeyOf(obj) if err != nil { return KeyError{obj, err} } newDeltas := append(f.items[id], Delta{actionType, obj}) newDeltas = dedupDeltas(newDeltas) if len(newDeltas) > 0 { if _, exists := f.items[id]; !exists { f.queue = append(f.queue, id) } f.items[id] = newDeltas f.cond.Broadcast() } else { // This never happens, because dedupDeltas never returns an empty list // when given a non-empty list (as it is here). // But if somehow it ever does return an empty list, then // We need to remove this from our map (extra items in the queue are // ignored if they are not in the map). delete(f.items, id) } return nil } // List returns a list of all the items; it returns the object // from the most recent Delta. // You should treat the items returned inside the deltas as immutable. func (f *DeltaFIFO) List() []interface{} { f.lock.RLock() defer f.lock.RUnlock() return f.listLocked() } func (f *DeltaFIFO) listLocked() []interface{} { list := make([]interface{}, 0, len(f.items)) for _, item := range f.items { list = append(list, item.Newest().Object) } return list } // ListKeys returns a list of all the keys of the objects currently // in the FIFO. func (f *DeltaFIFO) ListKeys() []string { f.lock.RLock() defer f.lock.RUnlock() list := make([]string, 0, len(f.items)) for key := range f.items { list = append(list, key) } return list } // Get returns the complete list of deltas for the requested item, // or sets exists=false. // You should treat the items returned inside the deltas as immutable. func (f *DeltaFIFO) Get(obj interface{}) (item interface{}, exists bool, err error) { key, err := f.KeyOf(obj) if err != nil { return nil, false, KeyError{obj, err} } return f.GetByKey(key) } // GetByKey returns the complete list of deltas for the requested item, // setting exists=false if that list is empty. // You should treat the items returned inside the deltas as immutable. func (f *DeltaFIFO) GetByKey(key string) (item interface{}, exists bool, err error) { f.lock.RLock() defer f.lock.RUnlock() d, exists := f.items[key] if exists { // Copy item's slice so operations on this slice // won't interfere with the object we return. d = copyDeltas(d) } return d, exists, nil } // IsClosed checks if the queue is closed func (f *DeltaFIFO) IsClosed() bool { f.closedLock.Lock() defer f.closedLock.Unlock() return f.closed } // Pop blocks until an item is added to the queue, and then returns it. If // multiple items are ready, they are returned in the order in which they were // added/updated. The item is removed from the queue (and the store) before it // is returned, so if you don't successfully process it, you need to add it back // with AddIfNotPresent(). // process function is called under lock, so it is safe update data structures // in it that need to be in sync with the queue (e.g. knownKeys). The PopProcessFunc // may return an instance of ErrRequeue with a nested error to indicate the current // item should be requeued (equivalent to calling AddIfNotPresent under the lock). // // Pop returns a 'Deltas', which has a complete list of all the things // that happened to the object (deltas) while it was sitting in the queue. func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) { f.lock.Lock() defer f.lock.Unlock() for { for len(f.queue) == 0 { // When the queue is empty, invocation of Pop() is blocked until new item is enqueued. // When Close() is called, the f.closed is set and the condition is broadcasted. // Which causes this loop to continue and return from the Pop(). if f.IsClosed() { return nil, ErrFIFOClosed } f.cond.Wait() } id := f.queue[0] f.queue = f.queue[1:] if f.initialPopulationCount > 0 { f.initialPopulationCount-- } item, ok := f.items[id] if !ok { // Item may have been deleted subsequently. continue } delete(f.items, id) err := process(item) if e, ok := err.(ErrRequeue); ok { f.addIfNotPresent(id, item) err = e.Err } // Don't need to copyDeltas here, because we're transferring // ownership to the caller. return item, err } } // Replace atomically does two things: (1) it adds the given objects // using the Sync or Replace DeltaType and then (2) it does some deletions. // In particular: for every pre-existing key K that is not the key of // an object in `list` there is the effect of // `Delete(DeletedFinalStateUnknown{K, O})` where O is current object // of K. If `f.knownObjects == nil` then the pre-existing keys are // those in `f.items` and the current object of K is the `.Newest()` // of the Deltas associated with K. Otherwise the pre-existing keys // are those listed by `f.knownObjects` and the current object of K is // what `f.knownObjects.GetByKey(K)` returns. func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error { f.lock.Lock() defer f.lock.Unlock() keys := make(sets.String, len(list)) // keep backwards compat for old clients action := Sync if f.emitDeltaTypeReplaced { action = Replaced } for _, item := range list { key, err := f.KeyOf(item) if err != nil { return KeyError{item, err} } keys.Insert(key) if err := f.queueActionLocked(action, item); err != nil { return fmt.Errorf("couldn't enqueue object: %v", err) } } if f.knownObjects == nil { // Do deletion detection against our own list. queuedDeletions := 0 for k, oldItem := range f.items { if keys.Has(k) { continue } var deletedObj interface{} if n := oldItem.Newest(); n != nil { deletedObj = n.Object } queuedDeletions++ if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil { return err } } if !f.populated { f.populated = true // While there shouldn't be any queued deletions in the initial // population of the queue, it's better to be on the safe side. f.initialPopulationCount = len(list) + queuedDeletions } return nil } // Detect deletions not already in the queue. knownKeys := f.knownObjects.ListKeys() queuedDeletions := 0 for _, k := range knownKeys { if keys.Has(k) { continue } deletedObj, exists, err := f.knownObjects.GetByKey(k) if err != nil { deletedObj = nil klog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k) } else if !exists { deletedObj = nil klog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k) } queuedDeletions++ if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil { return err } } if !f.populated { f.populated = true f.initialPopulationCount = len(list) + queuedDeletions } return nil } // Resync adds, with a Sync type of Delta, every object listed by // `f.knownObjects` whose key is not already queued for processing. // If `f.knownObjects` is `nil` then Resync does nothing. func (f *DeltaFIFO) Resync() error { f.lock.Lock() defer f.lock.Unlock() if f.knownObjects == nil { return nil } keys := f.knownObjects.ListKeys() for _, k := range keys { if err := f.syncKeyLocked(k); err != nil { return err } } return nil } func (f *DeltaFIFO) syncKeyLocked(key string) error { obj, exists, err := f.knownObjects.GetByKey(key) if err != nil { klog.Errorf("Unexpected error %v during lookup of key %v, unable to queue object for sync", err, key) return nil } else if !exists { klog.Infof("Key %v does not exist in known objects store, unable to queue object for sync", key) return nil } // If we are doing Resync() and there is already an event queued for that object, // we ignore the Resync for it. This is to avoid the race, in which the resync // comes with the previous value of object (since queueing an event for the object // doesn't trigger changing the underlying store . id, err := f.KeyOf(obj) if err != nil { return KeyError{obj, err} } if len(f.items[id]) > 0 { return nil } if err := f.queueActionLocked(Sync, obj); err != nil { return fmt.Errorf("couldn't queue object: %v", err) } return nil } // A KeyListerGetter is anything that knows how to list its keys and look up by key. type KeyListerGetter interface { KeyLister KeyGetter } // A KeyLister is anything that knows how to list its keys. type KeyLister interface { ListKeys() []string } // A KeyGetter is anything that knows how to get the value stored under a given key. type KeyGetter interface { GetByKey(key string) (interface{}, bool, error) } // DeltaType is the type of a change (addition, deletion, etc) type DeltaType string // Change type definition const ( Added DeltaType = "Added" Updated DeltaType = "Updated" Deleted DeltaType = "Deleted" // Replaced is emitted when we encountered watch errors and had to do a // relist. We don't know if the replaced object has changed. // // NOTE: Previous versions of DeltaFIFO would use Sync for Replace events // as well. Hence, Replaced is only emitted when the option // EmitDeltaTypeReplaced is true. Replaced DeltaType = "Replaced" // Sync is for synthetic events during a periodic resync. Sync DeltaType = "Sync" ) // Delta is the type stored by a DeltaFIFO. It tells you what change // happened, and the object's state after* that change. // // [*] Unless the change is a deletion, and then you'll get the final // state of the object before it was deleted. type Delta struct { Type DeltaType Object interface{} } // Deltas is a list of one or more 'Delta's to an individual object. // The oldest delta is at index 0, the newest delta is the last one. type Deltas []Delta // Oldest is a convenience function that returns the oldest delta, or // nil if there are no deltas. func (d Deltas) Oldest() *Delta { if len(d) > 0 { return &d[0] } return nil } // Newest is a convenience function that returns the newest delta, or // nil if there are no deltas. func (d Deltas) Newest() *Delta { if n := len(d); n > 0 { return &d[n-1] } return nil } // copyDeltas returns a shallow copy of d; that is, it copies the slice but not // the objects in the slice. This allows Get/List to return an object that we // know won't be clobbered by a subsequent modifications. func copyDeltas(d Deltas) Deltas { d2 := make(Deltas, len(d)) copy(d2, d) return d2 } // DeletedFinalStateUnknown is placed into a DeltaFIFO in the case where // an object was deleted but the watch deletion event was missed. In this // case we don't know the final "resting" state of the object, so there's // a chance the included `Obj` is stale. type DeletedFinalStateUnknown struct { Key string Obj interface{} }