package hystrix import ( "bytes" "encoding/json" "net/http" "sync" "time" "github.com/afex/hystrix-go/hystrix/rolling" ) const ( streamEventBufferSize = 10 ) // NewStreamHandler returns a server capable of exposing dashboard metrics via HTTP. func NewStreamHandler() *StreamHandler { return &StreamHandler{} } // StreamHandler publishes metrics for each command and each pool once a second to all connected HTTP client. type StreamHandler struct { requests map[*http.Request]chan []byte mu sync.RWMutex done chan struct{} } // Start begins watching the in-memory circuit breakers for metrics func (sh *StreamHandler) Start() { sh.requests = make(map[*http.Request]chan []byte) sh.done = make(chan struct{}) go sh.loop() } // Stop shuts down the metric collection routine func (sh *StreamHandler) Stop() { close(sh.done) } var _ http.Handler = (*StreamHandler)(nil) func (sh *StreamHandler) ServeHTTP(rw http.ResponseWriter, req *http.Request) { // Make sure that the writer supports flushing. f, ok := rw.(http.Flusher) if !ok { http.Error(rw, "Streaming unsupported!", http.StatusInternalServerError) return } events := sh.register(req) defer sh.unregister(req) notify := rw.(http.CloseNotifier).CloseNotify() rw.Header().Add("Content-Type", "text/event-stream") rw.Header().Set("Cache-Control", "no-cache") rw.Header().Set("Connection", "keep-alive") for { select { case <-notify: // client is gone return case event := <-events: _, err := rw.Write(event) if err != nil { return } f.Flush() } } } func (sh *StreamHandler) loop() { tick := time.Tick(1 * time.Second) for { select { case <-tick: circuitBreakersMutex.RLock() for _, cb := range circuitBreakers { sh.publishMetrics(cb) sh.publishThreadPools(cb.executorPool) } circuitBreakersMutex.RUnlock() case <-sh.done: return } } } func (sh *StreamHandler) publishMetrics(cb *CircuitBreaker) error { now := time.Now() reqCount := cb.metrics.Requests().Sum(now) errCount := cb.metrics.DefaultCollector().Errors().Sum(now) errPct := cb.metrics.ErrorPercent(now) eventBytes, err := json.Marshal(&streamCmdMetric{ Type: "HystrixCommand", Name: cb.Name, Group: cb.Name, Time: currentTime(), ReportingHosts: 1, RequestCount: uint32(reqCount), ErrorCount: uint32(errCount), ErrorPct: uint32(errPct), CircuitBreakerOpen: cb.IsOpen(), RollingCountSuccess: uint32(cb.metrics.DefaultCollector().Successes().Sum(now)), RollingCountFailure: uint32(cb.metrics.DefaultCollector().Failures().Sum(now)), RollingCountThreadPoolRejected: uint32(cb.metrics.DefaultCollector().Rejects().Sum(now)), RollingCountShortCircuited: uint32(cb.metrics.DefaultCollector().ShortCircuits().Sum(now)), RollingCountTimeout: uint32(cb.metrics.DefaultCollector().Timeouts().Sum(now)), RollingCountFallbackSuccess: uint32(cb.metrics.DefaultCollector().FallbackSuccesses().Sum(now)), RollingCountFallbackFailure: uint32(cb.metrics.DefaultCollector().FallbackFailures().Sum(now)), LatencyTotal: generateLatencyTimings(cb.metrics.DefaultCollector().TotalDuration()), LatencyTotalMean: cb.metrics.DefaultCollector().TotalDuration().Mean(), LatencyExecute: generateLatencyTimings(cb.metrics.DefaultCollector().RunDuration()), LatencyExecuteMean: cb.metrics.DefaultCollector().RunDuration().Mean(), // TODO: all hard-coded values should become configurable settings, per circuit RollingStatsWindow: 10000, ExecutionIsolationStrategy: "THREAD", CircuitBreakerEnabled: true, CircuitBreakerForceClosed: false, CircuitBreakerForceOpen: cb.forceOpen, CircuitBreakerErrorThresholdPercent: uint32(getSettings(cb.Name).ErrorPercentThreshold), CircuitBreakerSleepWindow: uint32(getSettings(cb.Name).SleepWindow.Seconds() * 1000), CircuitBreakerRequestVolumeThreshold: uint32(getSettings(cb.Name).RequestVolumeThreshold), }) if err != nil { return err } err = sh.writeToRequests(eventBytes) if err != nil { return err } return nil } func (sh *StreamHandler) publishThreadPools(pool *executorPool) error { now := time.Now() eventBytes, err := json.Marshal(&streamThreadPoolMetric{ Type: "HystrixThreadPool", Name: pool.Name, ReportingHosts: 1, CurrentActiveCount: uint32(pool.ActiveCount()), CurrentTaskCount: 0, CurrentCompletedTaskCount: 0, RollingCountThreadsExecuted: uint32(pool.Metrics.Executed.Sum(now)), RollingMaxActiveThreads: uint32(pool.Metrics.MaxActiveRequests.Max(now)), CurrentPoolSize: uint32(pool.Max), CurrentCorePoolSize: uint32(pool.Max), CurrentLargestPoolSize: uint32(pool.Max), CurrentMaximumPoolSize: uint32(pool.Max), RollingStatsWindow: 10000, QueueSizeRejectionThreshold: 0, CurrentQueueSize: 0, }) if err != nil { return err } err = sh.writeToRequests(eventBytes) return nil } func (sh *StreamHandler) writeToRequests(eventBytes []byte) error { var b bytes.Buffer _, err := b.Write([]byte("data:")) if err != nil { return err } _, err = b.Write(eventBytes) if err != nil { return err } _, err = b.Write([]byte("\n\n")) if err != nil { return err } dataBytes := b.Bytes() sh.mu.RLock() for _, requestEvents := range sh.requests { select { case requestEvents <- dataBytes: default: } } sh.mu.RUnlock() return nil } func (sh *StreamHandler) register(req *http.Request) <-chan []byte { sh.mu.RLock() events, ok := sh.requests[req] sh.mu.RUnlock() if ok { return events } events = make(chan []byte, streamEventBufferSize) sh.mu.Lock() sh.requests[req] = events sh.mu.Unlock() return events } func (sh *StreamHandler) unregister(req *http.Request) { sh.mu.Lock() delete(sh.requests, req) sh.mu.Unlock() } func generateLatencyTimings(r *rolling.Timing) streamCmdLatency { return streamCmdLatency{ Timing0: r.Percentile(0), Timing25: r.Percentile(25), Timing50: r.Percentile(50), Timing75: r.Percentile(75), Timing90: r.Percentile(90), Timing95: r.Percentile(95), Timing99: r.Percentile(99), Timing995: r.Percentile(99.5), Timing100: r.Percentile(100), } } type streamCmdMetric struct { Type string `json:"type"` Name string `json:"name"` Group string `json:"group"` Time int64 `json:"currentTime"` ReportingHosts uint32 `json:"reportingHosts"` // Health RequestCount uint32 `json:"requestCount"` ErrorCount uint32 `json:"errorCount"` ErrorPct uint32 `json:"errorPercentage"` CircuitBreakerOpen bool `json:"isCircuitBreakerOpen"` RollingCountCollapsedRequests uint32 `json:"rollingCountCollapsedRequests"` RollingCountExceptionsThrown uint32 `json:"rollingCountExceptionsThrown"` RollingCountFailure uint32 `json:"rollingCountFailure"` RollingCountFallbackFailure uint32 `json:"rollingCountFallbackFailure"` RollingCountFallbackRejection uint32 `json:"rollingCountFallbackRejection"` RollingCountFallbackSuccess uint32 `json:"rollingCountFallbackSuccess"` RollingCountResponsesFromCache uint32 `json:"rollingCountResponsesFromCache"` RollingCountSemaphoreRejected uint32 `json:"rollingCountSemaphoreRejected"` RollingCountShortCircuited uint32 `json:"rollingCountShortCircuited"` RollingCountSuccess uint32 `json:"rollingCountSuccess"` RollingCountThreadPoolRejected uint32 `json:"rollingCountThreadPoolRejected"` RollingCountTimeout uint32 `json:"rollingCountTimeout"` CurrentConcurrentExecutionCount uint32 `json:"currentConcurrentExecutionCount"` LatencyExecuteMean uint32 `json:"latencyExecute_mean"` LatencyExecute streamCmdLatency `json:"latencyExecute"` LatencyTotalMean uint32 `json:"latencyTotal_mean"` LatencyTotal streamCmdLatency `json:"latencyTotal"` // Properties CircuitBreakerRequestVolumeThreshold uint32 `json:"propertyValue_circuitBreakerRequestVolumeThreshold"` CircuitBreakerSleepWindow uint32 `json:"propertyValue_circuitBreakerSleepWindowInMilliseconds"` CircuitBreakerErrorThresholdPercent uint32 `json:"propertyValue_circuitBreakerErrorThresholdPercentage"` CircuitBreakerForceOpen bool `json:"propertyValue_circuitBreakerForceOpen"` CircuitBreakerForceClosed bool `json:"propertyValue_circuitBreakerForceClosed"` CircuitBreakerEnabled bool `json:"propertyValue_circuitBreakerEnabled"` ExecutionIsolationStrategy string `json:"propertyValue_executionIsolationStrategy"` ExecutionIsolationThreadTimeout uint32 `json:"propertyValue_executionIsolationThreadTimeoutInMilliseconds"` ExecutionIsolationThreadInterruptOnTimeout bool `json:"propertyValue_executionIsolationThreadInterruptOnTimeout"` ExecutionIsolationThreadPoolKeyOverride string `json:"propertyValue_executionIsolationThreadPoolKeyOverride"` ExecutionIsolationSemaphoreMaxConcurrentRequests uint32 `json:"propertyValue_executionIsolationSemaphoreMaxConcurrentRequests"` FallbackIsolationSemaphoreMaxConcurrentRequests uint32 `json:"propertyValue_fallbackIsolationSemaphoreMaxConcurrentRequests"` RollingStatsWindow uint32 `json:"propertyValue_metricsRollingStatisticalWindowInMilliseconds"` RequestCacheEnabled bool `json:"propertyValue_requestCacheEnabled"` RequestLogEnabled bool `json:"propertyValue_requestLogEnabled"` } type streamCmdLatency struct { Timing0 uint32 `json:"0"` Timing25 uint32 `json:"25"` Timing50 uint32 `json:"50"` Timing75 uint32 `json:"75"` Timing90 uint32 `json:"90"` Timing95 uint32 `json:"95"` Timing99 uint32 `json:"99"` Timing995 uint32 `json:"99.5"` Timing100 uint32 `json:"100"` } type streamThreadPoolMetric struct { Type string `json:"type"` Name string `json:"name"` ReportingHosts uint32 `json:"reportingHosts"` CurrentActiveCount uint32 `json:"currentActiveCount"` CurrentCompletedTaskCount uint32 `json:"currentCompletedTaskCount"` CurrentCorePoolSize uint32 `json:"currentCorePoolSize"` CurrentLargestPoolSize uint32 `json:"currentLargestPoolSize"` CurrentMaximumPoolSize uint32 `json:"currentMaximumPoolSize"` CurrentPoolSize uint32 `json:"currentPoolSize"` CurrentQueueSize uint32 `json:"currentQueueSize"` CurrentTaskCount uint32 `json:"currentTaskCount"` RollingMaxActiveThreads uint32 `json:"rollingMaxActiveThreads"` RollingCountThreadsExecuted uint32 `json:"rollingCountThreadsExecuted"` RollingStatsWindow uint32 `json:"propertyValue_metricsRollingStatisticalWindowInMilliseconds"` QueueSizeRejectionThreshold uint32 `json:"propertyValue_queueSizeRejectionThreshold"` } func currentTime() int64 { return time.Now().UnixNano() / int64(1000000) }