// Copyright (C) MongoDB, Inc. 2017-present. // // Licensed under the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. You may obtain // a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 package options import ( "time" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" ) // ChangeStreamOptions represents options that can be used to configure a Watch operation. type ChangeStreamOptions struct { // The maximum number of documents to be included in each batch returned by the server. BatchSize *int32 // Specifies a collation to use for string comparisons during the operation. This option is only valid for MongoDB // versions >= 3.4. For previous server versions, the driver will return an error if this option is used. The // default value is nil, which means the default collation of the collection will be used. Collation *Collation // A string that will be included in server logs, profiling logs, and currentOp queries to help trace the operation. // The default is nil, which means that no comment will be included in the logs. Comment *string // Specifies how the updated document should be returned in change notifications for update operations. The default // is options.Default, which means that only partial update deltas will be included in the change notification. FullDocument *FullDocument // Specifies how the pre-update document should be returned in change notifications for update operations. The default // is options.Off, which means that the pre-update document will not be included in the change notification. FullDocumentBeforeChange *FullDocument // The maximum amount of time that the server should wait for new documents to satisfy a tailable cursor query. MaxAwaitTime *time.Duration // A document specifying the logical starting point for the change stream. Only changes corresponding to an oplog // entry immediately after the resume token will be returned. If this is specified, StartAtOperationTime and // StartAfter must not be set. ResumeAfter interface{} // ShowExpandedEvents specifies whether the server will return an expanded list of change stream events. Additional // events include: createIndexes, dropIndexes, modify, create, shardCollection, reshardCollection and // refineCollectionShardKey. This option is only valid for MongoDB versions >= 6.0. ShowExpandedEvents *bool // If specified, the change stream will only return changes that occurred at or after the given timestamp. This // option is only valid for MongoDB versions >= 4.0. If this is specified, ResumeAfter and StartAfter must not be // set. StartAtOperationTime *primitive.Timestamp // A document specifying the logical starting point for the change stream. This is similar to the ResumeAfter // option, but allows a resume token from an "invalidate" notification to be used. This allows a change stream on a // collection to be resumed after the collection has been dropped and recreated or renamed. Only changes // corresponding to an oplog entry immediately after the specified token will be returned. If this is specified, // ResumeAfter and StartAtOperationTime must not be set. This option is only valid for MongoDB versions >= 4.1.1. StartAfter interface{} // Custom options to be added to the initial aggregate for the change stream. Key-value pairs of the BSON map should // correlate with desired option names and values. Values must be Marshalable. Custom options may conflict with // non-custom options, and custom options bypass client-side validation. Prefer using non-custom options where possible. Custom bson.M // Custom options to be added to the $changeStream stage in the initial aggregate. Key-value pairs of the BSON map should // correlate with desired option names and values. Values must be Marshalable. Custom pipeline options bypass client-side // validation. Prefer using non-custom options where possible. CustomPipeline bson.M } // ChangeStream creates a new ChangeStreamOptions instance. func ChangeStream() *ChangeStreamOptions { cso := &ChangeStreamOptions{} cso.SetFullDocument(Default) return cso } // SetBatchSize sets the value for the BatchSize field. func (cso *ChangeStreamOptions) SetBatchSize(i int32) *ChangeStreamOptions { cso.BatchSize = &i return cso } // SetCollation sets the value for the Collation field. func (cso *ChangeStreamOptions) SetCollation(c Collation) *ChangeStreamOptions { cso.Collation = &c return cso } // SetComment sets the value for the Comment field. func (cso *ChangeStreamOptions) SetComment(comment string) *ChangeStreamOptions { cso.Comment = &comment return cso } // SetFullDocument sets the value for the FullDocument field. func (cso *ChangeStreamOptions) SetFullDocument(fd FullDocument) *ChangeStreamOptions { cso.FullDocument = &fd return cso } // SetFullDocumentBeforeChange sets the value for the FullDocumentBeforeChange field. func (cso *ChangeStreamOptions) SetFullDocumentBeforeChange(fdbc FullDocument) *ChangeStreamOptions { cso.FullDocumentBeforeChange = &fdbc return cso } // SetMaxAwaitTime sets the value for the MaxAwaitTime field. func (cso *ChangeStreamOptions) SetMaxAwaitTime(d time.Duration) *ChangeStreamOptions { cso.MaxAwaitTime = &d return cso } // SetResumeAfter sets the value for the ResumeAfter field. func (cso *ChangeStreamOptions) SetResumeAfter(rt interface{}) *ChangeStreamOptions { cso.ResumeAfter = rt return cso } // SetShowExpandedEvents sets the value for the ShowExpandedEvents field. func (cso *ChangeStreamOptions) SetShowExpandedEvents(see bool) *ChangeStreamOptions { cso.ShowExpandedEvents = &see return cso } // SetStartAtOperationTime sets the value for the StartAtOperationTime field. func (cso *ChangeStreamOptions) SetStartAtOperationTime(t *primitive.Timestamp) *ChangeStreamOptions { cso.StartAtOperationTime = t return cso } // SetStartAfter sets the value for the StartAfter field. func (cso *ChangeStreamOptions) SetStartAfter(sa interface{}) *ChangeStreamOptions { cso.StartAfter = sa return cso } // SetCustom sets the value for the Custom field. Key-value pairs of the BSON map should correlate // with desired option names and values. Values must be Marshalable. Custom options may conflict // with non-custom options, and custom options bypass client-side validation. Prefer using non-custom // options where possible. func (cso *ChangeStreamOptions) SetCustom(c bson.M) *ChangeStreamOptions { cso.Custom = c return cso } // SetCustomPipeline sets the value for the CustomPipeline field. Key-value pairs of the BSON map // should correlate with desired option names and values. Values must be Marshalable. Custom pipeline // options bypass client-side validation. Prefer using non-custom options where possible. func (cso *ChangeStreamOptions) SetCustomPipeline(cp bson.M) *ChangeStreamOptions { cso.CustomPipeline = cp return cso } // MergeChangeStreamOptions combines the given ChangeStreamOptions instances into a single ChangeStreamOptions in a // last-one-wins fashion. func MergeChangeStreamOptions(opts ...*ChangeStreamOptions) *ChangeStreamOptions { csOpts := ChangeStream() for _, cso := range opts { if cso == nil { continue } if cso.BatchSize != nil { csOpts.BatchSize = cso.BatchSize } if cso.Collation != nil { csOpts.Collation = cso.Collation } if cso.Comment != nil { csOpts.Comment = cso.Comment } if cso.FullDocument != nil { csOpts.FullDocument = cso.FullDocument } if cso.FullDocumentBeforeChange != nil { csOpts.FullDocumentBeforeChange = cso.FullDocumentBeforeChange } if cso.MaxAwaitTime != nil { csOpts.MaxAwaitTime = cso.MaxAwaitTime } if cso.ResumeAfter != nil { csOpts.ResumeAfter = cso.ResumeAfter } if cso.ShowExpandedEvents != nil { csOpts.ShowExpandedEvents = cso.ShowExpandedEvents } if cso.StartAtOperationTime != nil { csOpts.StartAtOperationTime = cso.StartAtOperationTime } if cso.StartAfter != nil { csOpts.StartAfter = cso.StartAfter } if cso.Custom != nil { csOpts.Custom = cso.Custom } if cso.CustomPipeline != nil { csOpts.CustomPipeline = cso.CustomPipeline } } return csOpts }