// Copyright 2016 The etcd Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package runner import ( "context" "fmt" "log" "sync" "time" "github.com/coreos/etcd/clientv3" "github.com/spf13/cobra" "golang.org/x/time/rate" ) // shared flags var ( totalClientConnections int // total number of client connections to be made with server endpoints []string dialTimeout time.Duration rounds int // total number of rounds to run; set to <= 0 to run forever. reqRate int // maximum number of requests per second. ) type roundClient struct { c *clientv3.Client progress int acquire func() error validate func() error release func() error } func newClient(eps []string, timeout time.Duration) *clientv3.Client { c, err := clientv3.New(clientv3.Config{ Endpoints: eps, DialTimeout: time.Duration(timeout) * time.Second, }) if err != nil { log.Fatal(err) } return c } func doRounds(rcs []roundClient, rounds int, requests int) { var wg sync.WaitGroup wg.Add(len(rcs)) finished := make(chan struct{}) limiter := rate.NewLimiter(rate.Limit(reqRate), reqRate) for i := range rcs { go func(rc *roundClient) { defer wg.Done() for rc.progress < rounds || rounds <= 0 { if err := limiter.WaitN(context.Background(), requests/len(rcs)); err != nil { log.Panicf("rate limiter error %v", err) } for rc.acquire() != nil { /* spin */ } if err := rc.validate(); err != nil { log.Fatal(err) } time.Sleep(10 * time.Millisecond) rc.progress++ finished <- struct{}{} for rc.release() != nil { /* spin */ } } }(&rcs[i]) } start := time.Now() for i := 1; i < len(rcs)*rounds+1 || rounds <= 0; i++ { select { case <-finished: if i%100 == 0 { fmt.Printf("finished %d, took %v\n", i, time.Since(start)) start = time.Now() } case <-time.After(time.Minute): log.Panic("no progress after 1 minute!") } } wg.Wait() for _, rc := range rcs { rc.c.Close() } } func endpointsFromFlag(cmd *cobra.Command) []string { eps, err := cmd.Flags().GetStringSlice("endpoints") if err != nil { ExitWithError(ExitError, err) } return eps }