package pubsub // import "github.com/docker/docker/pkg/pubsub" import ( "fmt" "testing" "time" ) func TestSendToOneSub(t *testing.T) { p := NewPublisher(100*time.Millisecond, 10) c := p.Subscribe() p.Publish("hi") msg := <-c if msg.(string) != "hi" { t.Fatalf("expected message hi but received %v", msg) } } func TestSendToMultipleSubs(t *testing.T) { p := NewPublisher(100*time.Millisecond, 10) var subs []chan interface{} subs = append(subs, p.Subscribe(), p.Subscribe(), p.Subscribe()) p.Publish("hi") for _, c := range subs { msg := <-c if msg.(string) != "hi" { t.Fatalf("expected message hi but received %v", msg) } } } func TestEvictOneSub(t *testing.T) { p := NewPublisher(100*time.Millisecond, 10) s1 := p.Subscribe() s2 := p.Subscribe() p.Evict(s1) p.Publish("hi") if _, ok := <-s1; ok { t.Fatal("expected s1 to not receive the published message") } msg := <-s2 if msg.(string) != "hi" { t.Fatalf("expected message hi but received %v", msg) } } func TestClosePublisher(t *testing.T) { p := NewPublisher(100*time.Millisecond, 10) var subs []chan interface{} subs = append(subs, p.Subscribe(), p.Subscribe(), p.Subscribe()) p.Close() for _, c := range subs { if _, ok := <-c; ok { t.Fatal("expected all subscriber channels to be closed") } } } const sampleText = "test" type testSubscriber struct { dataCh chan interface{} ch chan error } func (s *testSubscriber) Wait() error { return <-s.ch } func newTestSubscriber(p *Publisher) *testSubscriber { ts := &testSubscriber{ dataCh: p.Subscribe(), ch: make(chan error), } go func() { for data := range ts.dataCh { s, ok := data.(string) if !ok { ts.ch <- fmt.Errorf("Unexpected type %T", data) break } if s != sampleText { ts.ch <- fmt.Errorf("Unexpected text %s", s) break } } close(ts.ch) }() return ts } // for testing with -race func TestPubSubRace(t *testing.T) { p := NewPublisher(0, 1024) var subs []*testSubscriber for j := 0; j < 50; j++ { subs = append(subs, newTestSubscriber(p)) } for j := 0; j < 1000; j++ { p.Publish(sampleText) } time.AfterFunc(1*time.Second, func() { for _, s := range subs { p.Evict(s.dataCh) } }) for _, s := range subs { s.Wait() } } func BenchmarkPubSub(b *testing.B) { for i := 0; i < b.N; i++ { b.StopTimer() p := NewPublisher(0, 1024) var subs []*testSubscriber for j := 0; j < 50; j++ { subs = append(subs, newTestSubscriber(p)) } b.StartTimer() for j := 0; j < 1000; j++ { p.Publish(sampleText) } time.AfterFunc(1*time.Second, func() { for _, s := range subs { p.Evict(s.dataCh) } }) for _, s := range subs { if err := s.Wait(); err != nil { b.Fatal(err) } } } }