/* Copyright 2017 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package watch import ( "errors" "flag" "fmt" "reflect" "strconv" "sync/atomic" "testing" "time" "github.com/davecgh/go-spew/spew" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/diff" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" ) func init() { // Enable klog which is used in dependencies klog.InitFlags(nil) flag.Set("logtostderr", "true") flag.Set("v", "9") } type testObject struct { resourceVersion string } func (o testObject) GetObjectKind() schema.ObjectKind { return schema.EmptyObjectKind } func (o testObject) DeepCopyObject() runtime.Object { return o } func (o testObject) GetResourceVersion() string { return o.resourceVersion } func withCounter(w cache.Watcher) (*uint32, cache.Watcher) { var counter uint32 return &counter, &cache.ListWatch{ WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { atomic.AddUint32(&counter, 1) return w.Watch(options) }, } } func makeTestEvent(rv int) watch.Event { return watch.Event{ Type: watch.Added, Object: testObject{ resourceVersion: fmt.Sprintf("%d", rv), }, } } func arrayToChannel(array []watch.Event) chan watch.Event { ch := make(chan watch.Event, len(array)) for _, event := range array { ch <- event } return ch } // parseResourceVersionOrDie is test-only that code simulating the server and thus can interpret resourceVersion func parseResourceVersionOrDie(resourceVersion string) uint64 { // We can't use etcdstorage.Versioner.ParseResourceVersion() because of imports restrictions if resourceVersion == "" { return 0 } version, err := strconv.ParseUint(resourceVersion, 10, 64) if err != nil { panic(fmt.Errorf("failed to parse resourceVersion %q", resourceVersion)) } return version } func fromRV(resourceVersion string, array []watch.Event) []watch.Event { var result []watch.Event rv := parseResourceVersionOrDie(resourceVersion) for _, event := range array { if event.Type == watch.Error { if len(result) == 0 { // Skip error events until we find an object matching RV requirement continue } } else { rvGetter, ok := event.Object.(resourceVersionGetter) if ok { if parseResourceVersionOrDie(rvGetter.GetResourceVersion()) <= rv { continue } } } result = append(result, event) } return result } func closeAfterN(n int, source chan watch.Event) chan watch.Event { result := make(chan watch.Event, 0) go func() { defer close(result) defer close(source) for i := 0; i < n; i++ { result <- <-source } }() return result } type unexpectedError struct { // Inheriting any struct fulfilling runtime.Object interface would do. metav1.Status } var _ runtime.Object = &unexpectedError{} func TestNewRetryWatcher(t *testing.T) { tt := []struct { name string initialRV string err error }{ { name: "empty RV should fail", initialRV: "", err: errors.New("initial RV \"\" is not supported due to issues with underlying WATCH"), }, { name: "RV \"0\" should fail", initialRV: "0", err: errors.New("initial RV \"0\" is not supported due to issues with underlying WATCH"), }, } for _, tc := range tt { t.Run(tc.name, func(t *testing.T) { _, err := NewRetryWatcher(tc.initialRV, nil) if !reflect.DeepEqual(err, tc.err) { t.Errorf("Expected error: %v, got: %v", tc.err, err) } }) } } func TestRetryWatcher(t *testing.T) { tt := []struct { name string initialRV string watchClient cache.Watcher watchCount uint32 expected []watch.Event }{ { name: "recovers if watchClient returns error", initialRV: "1", watchClient: &cache.ListWatch{ WatchFunc: func() func(options metav1.ListOptions) (watch.Interface, error) { firstRun := true return func(options metav1.ListOptions) (watch.Interface, error) { if firstRun { firstRun = false return nil, fmt.Errorf("test error") } return watch.NewProxyWatcher(arrayToChannel(fromRV(options.ResourceVersion, []watch.Event{ makeTestEvent(2), }))), nil } }(), }, watchCount: 2, expected: []watch.Event{ makeTestEvent(2), }, }, { name: "recovers if watchClient returns nil watcher", initialRV: "1", watchClient: &cache.ListWatch{ WatchFunc: func() func(options metav1.ListOptions) (watch.Interface, error) { firstRun := true return func(options metav1.ListOptions) (watch.Interface, error) { if firstRun { firstRun = false return nil, nil } return watch.NewProxyWatcher(arrayToChannel(fromRV(options.ResourceVersion, []watch.Event{ makeTestEvent(2), }))), nil } }(), }, watchCount: 2, expected: []watch.Event{ makeTestEvent(2), }, }, { name: "works with empty initialRV", initialRV: "1", watchClient: &cache.ListWatch{ WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { return watch.NewProxyWatcher(arrayToChannel(fromRV(options.ResourceVersion, []watch.Event{ makeTestEvent(2), }))), nil }, }, watchCount: 1, expected: []watch.Event{ makeTestEvent(2), }, }, { name: "works with initialRV set, skipping the preceding items but reading those directly following", initialRV: "1", watchClient: &cache.ListWatch{ WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { return watch.NewProxyWatcher(arrayToChannel(fromRV(options.ResourceVersion, []watch.Event{ makeTestEvent(1), makeTestEvent(2), }))), nil }, }, watchCount: 1, expected: []watch.Event{ makeTestEvent(2), }, }, { name: "works with initialRV set, skipping the preceding items with none following", initialRV: "3", watchClient: &cache.ListWatch{ WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { return watch.NewProxyWatcher(arrayToChannel(fromRV(options.ResourceVersion, []watch.Event{ makeTestEvent(2), }))), nil }, }, watchCount: 1, expected: nil, }, { name: "fails on Gone (RV too old error)", initialRV: "5", watchClient: &cache.ListWatch{ WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { return watch.NewProxyWatcher(arrayToChannel(fromRV(options.ResourceVersion, []watch.Event{ makeTestEvent(5), makeTestEvent(6), {Type: watch.Error, Object: &apierrors.NewGone("").ErrStatus}, makeTestEvent(7), makeTestEvent(8), }))), nil }, }, watchCount: 1, expected: []watch.Event{ makeTestEvent(6), { Type: watch.Error, Object: &apierrors.NewGone("").ErrStatus, }, }, }, { name: "recovers from timeout error", initialRV: "5", watchClient: &cache.ListWatch{ WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { return watch.NewProxyWatcher(arrayToChannel(fromRV(options.ResourceVersion, []watch.Event{ makeTestEvent(6), { Type: watch.Error, Object: &apierrors.NewTimeoutError("", 0).ErrStatus, }, makeTestEvent(7), }))), nil }, }, watchCount: 2, expected: []watch.Event{ makeTestEvent(6), makeTestEvent(7), }, }, { name: "recovers from internal server error", initialRV: "5", watchClient: &cache.ListWatch{ WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { return watch.NewProxyWatcher(arrayToChannel(fromRV(options.ResourceVersion, []watch.Event{ makeTestEvent(6), { Type: watch.Error, Object: &apierrors.NewInternalError(errors.New("")).ErrStatus, }, makeTestEvent(7), }))), nil }, }, watchCount: 2, expected: []watch.Event{ makeTestEvent(6), makeTestEvent(7), }, }, { name: "recovers from unexpected error code", initialRV: "5", watchClient: &cache.ListWatch{ WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { return watch.NewProxyWatcher(arrayToChannel(fromRV(options.ResourceVersion, []watch.Event{ makeTestEvent(6), { Type: watch.Error, Object: &metav1.Status{ Code: 666, }, }, makeTestEvent(7), }))), nil }, }, watchCount: 2, expected: []watch.Event{ makeTestEvent(6), makeTestEvent(7), }, }, { name: "recovers from unexpected error type", initialRV: "5", watchClient: &cache.ListWatch{ WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { return watch.NewProxyWatcher(arrayToChannel(fromRV(options.ResourceVersion, []watch.Event{ makeTestEvent(6), { Type: watch.Error, Object: &unexpectedError{}, }, makeTestEvent(7), }))), nil }, }, watchCount: 2, expected: []watch.Event{ makeTestEvent(6), makeTestEvent(7), }, }, { name: "survives 1 closed watch and reads 1 item", initialRV: "5", watchClient: &cache.ListWatch{ WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { return watch.NewProxyWatcher(closeAfterN(1, arrayToChannel(fromRV(options.ResourceVersion, []watch.Event{ makeTestEvent(6), })))), nil }, }, watchCount: 2, expected: []watch.Event{ makeTestEvent(6), }, }, { name: "survives 2 closed watches and reads 2 items", initialRV: "4", watchClient: &cache.ListWatch{ WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { return watch.NewProxyWatcher(closeAfterN(1, arrayToChannel(fromRV(options.ResourceVersion, []watch.Event{ makeTestEvent(5), makeTestEvent(6), })))), nil }, }, watchCount: 3, expected: []watch.Event{ makeTestEvent(5), makeTestEvent(6), }, }, { name: "survives 2 closed watches and reads 2 items for nonconsecutive RVs", initialRV: "4", watchClient: &cache.ListWatch{ WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { return watch.NewProxyWatcher(closeAfterN(1, arrayToChannel(fromRV(options.ResourceVersion, []watch.Event{ makeTestEvent(5), makeTestEvent(7), })))), nil }, }, watchCount: 3, expected: []watch.Event{ makeTestEvent(5), makeTestEvent(7), }, }, { name: "survives 2 closed watches and reads 2 items for nonconsecutive RVs starting at much lower RV", initialRV: "2", watchClient: &cache.ListWatch{ WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { return watch.NewProxyWatcher(closeAfterN(1, arrayToChannel(fromRV(options.ResourceVersion, []watch.Event{ makeTestEvent(5), makeTestEvent(7), })))), nil }, }, watchCount: 3, expected: []watch.Event{ makeTestEvent(5), makeTestEvent(7), }, }, { name: "survives 4 closed watches and reads 4 items for nonconsecutive, spread RVs", initialRV: "2", watchClient: &cache.ListWatch{ WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { return watch.NewProxyWatcher(closeAfterN(1, arrayToChannel(fromRV(options.ResourceVersion, []watch.Event{ makeTestEvent(5), makeTestEvent(6), makeTestEvent(7), makeTestEvent(11), })))), nil }, }, watchCount: 5, expected: []watch.Event{ makeTestEvent(5), makeTestEvent(6), makeTestEvent(7), makeTestEvent(11), }, }, { name: "survives 4 closed watches and reads 4 items for nonconsecutive, spread RVs and skips those with lower or equal RV", initialRV: "2", watchClient: &cache.ListWatch{ WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { return watch.NewProxyWatcher(closeAfterN(1, arrayToChannel(fromRV(options.ResourceVersion, []watch.Event{ makeTestEvent(1), makeTestEvent(2), makeTestEvent(5), makeTestEvent(6), makeTestEvent(7), makeTestEvent(11), })))), nil }, }, watchCount: 5, expected: []watch.Event{ makeTestEvent(5), makeTestEvent(6), makeTestEvent(7), makeTestEvent(11), }, }, { name: "survives 2 closed watches and reads 2+2+1 items skipping those with equal RV", initialRV: "1", watchClient: &cache.ListWatch{ WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { return watch.NewProxyWatcher(closeAfterN(2, arrayToChannel(fromRV(options.ResourceVersion, []watch.Event{ makeTestEvent(1), makeTestEvent(2), makeTestEvent(5), makeTestEvent(6), makeTestEvent(7), makeTestEvent(11), })))), nil }, }, watchCount: 3, expected: []watch.Event{ makeTestEvent(2), makeTestEvent(5), makeTestEvent(6), makeTestEvent(7), makeTestEvent(11), }, }, } for _, tc := range tt { tc := tc t.Run(tc.name, func(t *testing.T) { t.Parallel() atomicCounter, watchFunc := withCounter(tc.watchClient) watcher, err := newRetryWatcher(tc.initialRV, watchFunc, time.Duration(0)) if err != nil { t.Fatalf("failed to create a RetryWatcher: %v", err) } defer func() { watcher.Stop() t.Log("Waiting on RetryWatcher to stop...") <-watcher.Done() }() var got []watch.Event for i := 0; i < len(tc.expected); i++ { event, ok := <-watcher.ResultChan() if !ok { t.Error(spew.Errorf("expected event %#+v, but channel is closed"), tc.expected[i]) break } got = append(got, event) } // (Sanity check, best effort) Make sure there are no more events to be received // RetryWatcher proxies the source channel so we can't try reading it immediately // but have to tolerate some delay. Given this is best effort detection we can use short duration. // It also makes sure that for 0 events the watchFunc has time to be called. select { case event, ok := <-watcher.ResultChan(): if ok { t.Error(spew.Errorf("Unexpected event received after reading all the expected ones: %#+v", event)) } case <-time.After(10 * time.Millisecond): break } var counter uint32 // We always count with the last watch reestablishing which is imminent but still a race. // We will wait for the last watch to reestablish to avoid it. err = wait.PollImmediate(10*time.Millisecond, 10*time.Second, func() (done bool, err error) { counter = atomic.LoadUint32(atomicCounter) return counter == tc.watchCount, nil }) if err == wait.ErrWaitTimeout { t.Errorf("expected %d watcher starts, but it has started %d times", tc.watchCount, counter) } else if err != nil { t.Fatal(err) } if !reflect.DeepEqual(tc.expected, got) { t.Fatal(spew.Errorf("expected %#+v, got %#+v;\ndiff: %s", tc.expected, got, diff.ObjectReflectDiff(tc.expected, got))) } }) } } func TestRetryWatcherToFinishWithUnreadEvents(t *testing.T) { watcher, err := NewRetryWatcher("1", &cache.ListWatch{ WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { return watch.NewProxyWatcher(arrayToChannel([]watch.Event{ makeTestEvent(2), })), nil }, }) if err != nil { t.Fatalf("failed to create a RetryWatcher: %v", err) } // Give the watcher a chance to get to sending events (blocking) time.Sleep(10 * time.Millisecond) watcher.Stop() maxTime := time.Second select { case <-watcher.Done(): break case <-time.After(maxTime): t.Errorf("The watcher failed to be closed in %s", maxTime) } // RetryWatcher result channel should be closed _, ok := <-watcher.ResultChan() if ok { t.Error("ResultChan is not closed") } }