// Copyright 2016 Google LLC. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. // Package bundler supports bundling (batching) of items. Bundling amortizes an // action with fixed costs over multiple items. For example, if an API provides // an RPC that accepts a list of items as input, but clients would prefer // adding items one at a time, then a Bundler can accept individual items from // the client and bundle many of them into a single RPC. // // This package is experimental and subject to change without notice. package bundler import ( "context" "errors" "reflect" "sync" "time" "golang.org/x/sync/semaphore" ) type mode int const ( DefaultDelayThreshold = time.Second DefaultBundleCountThreshold = 10 DefaultBundleByteThreshold = 1e6 // 1M DefaultBufferedByteLimit = 1e9 // 1G ) const ( none mode = iota add addWait ) var ( // ErrOverflow indicates that Bundler's stored bytes exceeds its BufferedByteLimit. ErrOverflow = errors.New("bundler reached buffered byte limit") // ErrOversizedItem indicates that an item's size exceeds the maximum bundle size. ErrOversizedItem = errors.New("item size exceeds bundle byte limit") // errMixedMethods indicates that mutually exclusive methods has been // called subsequently. errMixedMethods = errors.New("calls to Add and AddWait cannot be mixed") ) // A Bundler collects items added to it into a bundle until the bundle // exceeds a given size, then calls a user-provided function to handle the // bundle. // // The exported fields are only safe to modify prior to the first call to Add // or AddWait. type Bundler struct { // Starting from the time that the first message is added to a bundle, once // this delay has passed, handle the bundle. The default is DefaultDelayThreshold. DelayThreshold time.Duration // Once a bundle has this many items, handle the bundle. Since only one // item at a time is added to a bundle, no bundle will exceed this // threshold, so it also serves as a limit. The default is // DefaultBundleCountThreshold. BundleCountThreshold int // Once the number of bytes in current bundle reaches this threshold, handle // the bundle. The default is DefaultBundleByteThreshold. This triggers handling, // but does not cap the total size of a bundle. BundleByteThreshold int // The maximum size of a bundle, in bytes. Zero means unlimited. BundleByteLimit int // The maximum number of bytes that the Bundler will keep in memory before // returning ErrOverflow. The default is DefaultBufferedByteLimit. BufferedByteLimit int // The maximum number of handler invocations that can be running at once. // The default is 1. HandlerLimit int handler func(interface{}) // called to handle a bundle itemSliceZero reflect.Value // nil (zero value) for slice of items mu sync.Mutex // guards access to fields below flushTimer *time.Timer // implements DelayThreshold handlerCount int // # of bundles currently being handled (i.e. handler is invoked on them) sem *semaphore.Weighted // enforces BufferedByteLimit semOnce sync.Once // guards semaphore initialization // The current bundle we're adding items to. Not yet in the queue. // Appended to the queue once the flushTimer fires or the bundle // thresholds/limits are reached. If curBundle is nil and tail is // not, we first try to add items to tail. Once tail is full or handled, // we create a new curBundle for the incoming item. curBundle *bundle // The next bundle in the queue to be handled. Nil if the queue is // empty. head *bundle // The last bundle in the queue to be handled. Nil if the queue is // empty. If curBundle is nil and tail isn't, we attempt to add new // items to the tail until if becomes full or has been passed to the // handler. tail *bundle curFlush *sync.WaitGroup // counts outstanding bundles since last flush prevFlush chan bool // signal used to wait for prior flush // The first call to Add or AddWait, mode will be add or addWait respectively. // If there wasn't call yet then mode is none. mode mode // TODO: consider alternative queue implementation for head/tail bundle. see: // https://code-review.googlesource.com/c/google-api-go-client/+/47991/4/support/bundler/bundler.go#74 } // A bundle is a group of items that were added individually and will be passed // to a handler as a slice. type bundle struct { items reflect.Value // slice of T size int // size in bytes of all items next *bundle // bundles are handled in order as a linked list queue flush *sync.WaitGroup // the counter that tracks flush completion } // add appends item to this bundle and increments the total size. It requires // that b.mu is locked. func (bu *bundle) add(item interface{}, size int) { bu.items = reflect.Append(bu.items, reflect.ValueOf(item)) bu.size += size } // NewBundler creates a new Bundler. // // itemExample is a value of the type that will be bundled. For example, if you // want to create bundles of *Entry, you could pass &Entry{} for itemExample. // // handler is a function that will be called on each bundle. If itemExample is // of type T, the argument to handler is of type []T. handler is always called // sequentially for each bundle, and never in parallel. // // Configure the Bundler by setting its thresholds and limits before calling // any of its methods. func NewBundler(itemExample interface{}, handler func(interface{})) *Bundler { b := &Bundler{ DelayThreshold: DefaultDelayThreshold, BundleCountThreshold: DefaultBundleCountThreshold, BundleByteThreshold: DefaultBundleByteThreshold, BufferedByteLimit: DefaultBufferedByteLimit, HandlerLimit: 1, handler: handler, itemSliceZero: reflect.Zero(reflect.SliceOf(reflect.TypeOf(itemExample))), curFlush: &sync.WaitGroup{}, } return b } func (b *Bundler) initSemaphores() { // Create the semaphores lazily, because the user may set limits // after NewBundler. b.semOnce.Do(func() { b.sem = semaphore.NewWeighted(int64(b.BufferedByteLimit)) }) } // enqueueCurBundle moves curBundle to the end of the queue. The bundle may be // handled immediately if we are below HandlerLimit. It requires that b.mu is // locked. func (b *Bundler) enqueueCurBundle() { // We don't require callers to check if there is a pending bundle. It // may have already been appended to the queue. If so, return early. if b.curBundle == nil { return } // If we are below the HandlerLimit, the queue must be empty. Handle // immediately with a new goroutine. if b.handlerCount < b.HandlerLimit { b.handlerCount++ go b.handle(b.curBundle) } else if b.tail != nil { // There are bundles on the queue, so append to the end b.tail.next = b.curBundle b.tail = b.curBundle } else { // The queue is empty, so initialize the queue b.head = b.curBundle b.tail = b.curBundle } b.curBundle = nil if b.flushTimer != nil { b.flushTimer.Stop() b.flushTimer = nil } } // setMode sets the state of Bundler's mode. If mode was defined before // and passed state is different from it then return an error. func (b *Bundler) setMode(m mode) error { b.mu.Lock() defer b.mu.Unlock() if b.mode == m || b.mode == none { b.mode = m return nil } return errMixedMethods } // canFit returns true if bu can fit an additional item of size bytes based // on the limits of Bundler b. func (b *Bundler) canFit(bu *bundle, size int) bool { return (b.BundleByteLimit <= 0 || bu.size+size <= b.BundleByteLimit) && (b.BundleCountThreshold <= 0 || bu.items.Len() < b.BundleCountThreshold) } // Add adds item to the current bundle. It marks the bundle for handling and // starts a new one if any of the thresholds or limits are exceeded. // The type of item must be assignable to the itemExample parameter of the NewBundler // method, otherwise there will be a panic. // // If the item's size exceeds the maximum bundle size (Bundler.BundleByteLimit), then // the item can never be handled. Add returns ErrOversizedItem in this case. // // If adding the item would exceed the maximum memory allowed // (Bundler.BufferedByteLimit) or an AddWait call is blocked waiting for // memory, Add returns ErrOverflow. // // Add never blocks. func (b *Bundler) Add(item interface{}, size int) error { if err := b.setMode(add); err != nil { return err } // If this item exceeds the maximum size of a bundle, // we can never send it. if b.BundleByteLimit > 0 && size > b.BundleByteLimit { return ErrOversizedItem } // If adding this item would exceed our allotted memory // footprint, we can't accept it. // (TryAcquire also returns false if anything is waiting on the semaphore, // so calls to Add and AddWait shouldn't be mixed.) b.initSemaphores() if !b.sem.TryAcquire(int64(size)) { return ErrOverflow } b.mu.Lock() defer b.mu.Unlock() return b.add(item, size) } // add adds item to the tail of the bundle queue or curBundle depending on space // and nil-ness (see inline comments). It marks curBundle for handling (by // appending it to the queue) if any of the thresholds or limits are exceeded. // curBundle is lazily initialized. It requires that b.mu is locked. func (b *Bundler) add(item interface{}, size int) error { // If we don't have a curBundle, see if we can add to the queue tail. if b.tail != nil && b.curBundle == nil && b.canFit(b.tail, size) { b.tail.add(item, size) return nil } // If we can't fit in the existing curBundle, move it onto the queue. if b.curBundle != nil && !b.canFit(b.curBundle, size) { b.enqueueCurBundle() } // Create a curBundle if we don't have one. if b.curBundle == nil { b.curFlush.Add(1) b.curBundle = &bundle{ items: b.itemSliceZero, flush: b.curFlush, } } // Add the item. b.curBundle.add(item, size) // If curBundle is ready for handling, move it to the queue. if b.curBundle.size >= b.BundleByteThreshold || b.curBundle.items.Len() == b.BundleCountThreshold { b.enqueueCurBundle() } // If we created a new bundle and it wasn't immediately handled, set a timer if b.curBundle != nil && b.flushTimer == nil { b.flushTimer = time.AfterFunc(b.DelayThreshold, b.tryHandleBundles) } return nil } // tryHandleBundles is the timer callback that handles or queues any current // bundle after DelayThreshold time, even if the bundle isn't completely full. func (b *Bundler) tryHandleBundles() { b.mu.Lock() b.enqueueCurBundle() b.mu.Unlock() } // next returns the next bundle that is ready for handling and removes it from // the internal queue. It requires that b.mu is locked. func (b *Bundler) next() *bundle { if b.head == nil { return nil } out := b.head b.head = b.head.next if b.head == nil { b.tail = nil } out.next = nil return out } // handle calls the user-specified handler on the given bundle. handle is // intended to be run as a goroutine. After the handler returns, we update the // byte total. handle continues processing additional bundles that are ready. // If no more bundles are ready, the handler count is decremented and the // goroutine ends. func (b *Bundler) handle(bu *bundle) { for bu != nil { b.handler(bu.items.Interface()) bu = b.postHandle(bu) } } func (b *Bundler) postHandle(bu *bundle) *bundle { b.mu.Lock() defer b.mu.Unlock() b.sem.Release(int64(bu.size)) bu.flush.Done() bu = b.next() if bu == nil { b.handlerCount-- } return bu } // AddWait adds item to the current bundle. It marks the bundle for handling and // starts a new one if any of the thresholds or limits are exceeded. // // If the item's size exceeds the maximum bundle size (Bundler.BundleByteLimit), then // the item can never be handled. AddWait returns ErrOversizedItem in this case. // // If adding the item would exceed the maximum memory allowed (Bundler.BufferedByteLimit), // AddWait blocks until space is available or ctx is done. // // Calls to Add and AddWait should not be mixed on the same Bundler. func (b *Bundler) AddWait(ctx context.Context, item interface{}, size int) error { if err := b.setMode(addWait); err != nil { return err } // If this item exceeds the maximum size of a bundle, // we can never send it. if b.BundleByteLimit > 0 && size > b.BundleByteLimit { return ErrOversizedItem } // If adding this item would exceed our allotted memory footprint, block // until space is available. The semaphore is FIFO, so there will be no // starvation. b.initSemaphores() if err := b.sem.Acquire(ctx, int64(size)); err != nil { return err } b.mu.Lock() defer b.mu.Unlock() return b.add(item, size) } // Flush invokes the handler for all remaining items in the Bundler and waits // for it to return. func (b *Bundler) Flush() { b.mu.Lock() // If a curBundle is pending, move it to the queue. b.enqueueCurBundle() // Store a pointer to the WaitGroup that counts outstanding bundles // in the current flush and create a new one to track the next flush. wg := b.curFlush b.curFlush = &sync.WaitGroup{} // Flush must wait for all prior, outstanding flushes to complete. // We use a channel to communicate completion between each flush in // the sequence. prev := b.prevFlush next := make(chan bool) b.prevFlush = next b.mu.Unlock() // Wait until the previous flush is finished. if prev != nil { <-prev } // Wait until this flush is finished. wg.Wait() // Allow the next flush to finish. close(next) }