// Copyright (C) MongoDB, Inc. 2017-present. // // Licensed under the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. You may obtain // a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 package unified import ( "context" "fmt" "strings" "sync" "sync/atomic" "time" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/event" "go.mongodb.org/mongo-driver/internal/handshake" "go.mongodb.org/mongo-driver/internal/integtest" "go.mongodb.org/mongo-driver/internal/logger" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/integration/mtest" "go.mongodb.org/mongo-driver/mongo/options" "go.mongodb.org/mongo-driver/mongo/readconcern" "go.mongodb.org/mongo-driver/x/bsonx/bsoncore" ) // There are no automated tests for truncation. Given that, setting the // "MaxDocumentLength" to 10_000 will ensure that the default truncation // length does not interfere with tests with commands/replies that // exceed the default truncation length. const defaultMaxDocumentLen = 10_000 // Security-sensitive commands that should be ignored in command monitoring by default. var securitySensitiveCommands = []string{ "authenticate", "saslStart", "saslContinue", "getnonce", "createUser", "updateUser", "copydbgetnonce", "copydbsaslstart", "copydb", } // clientEntity is a wrapper for a mongo.Client object that also holds additional information required during test // execution. type clientEntity struct { *mongo.Client disconnected bool recordEvents atomic.Value started []*event.CommandStartedEvent succeeded []*event.CommandSucceededEvent failed []*event.CommandFailedEvent pooled []*event.PoolEvent serverDescriptionChanged []*event.ServerDescriptionChangedEvent serverHeartbeatFailedEvent []*event.ServerHeartbeatFailedEvent serverHeartbeatStartedEvent []*event.ServerHeartbeatStartedEvent serverHeartbeatSucceeded []*event.ServerHeartbeatSucceededEvent topologyDescriptionChanged []*event.TopologyDescriptionChangedEvent ignoredCommands map[string]struct{} observeSensitiveCommands *bool numConnsCheckedOut int32 // These should not be changed after the clientEntity is initialized observedEvents map[monitoringEventType]struct{} storedEvents map[monitoringEventType][]string eventsCount map[monitoringEventType]int32 serverDescriptionChangedEventsCount map[serverDescriptionChangedEventInfo]int32 eventsCountLock sync.RWMutex serverDescriptionChangedEventsCountLock sync.RWMutex eventProcessMu sync.RWMutex entityMap *EntityMap logQueue chan orderedLogMessage } func newClientEntity(ctx context.Context, em *EntityMap, entityOptions *entityOptions) (*clientEntity, error) { // The "configureFailPoint" command should always be ignored. ignoredCommands := map[string]struct{}{ "configureFailPoint": {}, } // If not observing sensitive commands, add security-sensitive commands // to ignoredCommands by default. if entityOptions.ObserveSensitiveCommands == nil || !*entityOptions.ObserveSensitiveCommands { for _, cmd := range securitySensitiveCommands { ignoredCommands[cmd] = struct{}{} } } entity := &clientEntity{ ignoredCommands: ignoredCommands, observedEvents: make(map[monitoringEventType]struct{}), storedEvents: make(map[monitoringEventType][]string), eventsCount: make(map[monitoringEventType]int32), serverDescriptionChangedEventsCount: make(map[serverDescriptionChangedEventInfo]int32), entityMap: em, observeSensitiveCommands: entityOptions.ObserveSensitiveCommands, } entity.setRecordEvents(true) // Construct a ClientOptions instance by first applying the cluster URI and then the URIOptions map to ensure that // the options specified in the test file take precedence. uri := getURIForClient(entityOptions) clientOpts := options.Client().ApplyURI(uri) if entityOptions.URIOptions != nil { if err := setClientOptionsFromURIOptions(clientOpts, entityOptions.URIOptions); err != nil { return nil, fmt.Errorf("error parsing URI options: %w", err) } } if olm := entityOptions.ObserveLogMessages; olm != nil { clientLogger := newLogger(olm, expectedLogMessageCount(ctx)) wrap := func(str string) options.LogLevel { return options.LogLevel(logger.ParseLevel(str)) } // Assign the log queue to the entity so that it can be used to // retrieve log messages. entity.logQueue = clientLogger.logQueue // Update the client options to add the clientLogger. clientOpts.LoggerOptions = options.Logger(). SetComponentLevel(options.LogComponentCommand, wrap(olm.Command)). SetComponentLevel(options.LogComponentTopology, wrap(olm.Topology)). SetComponentLevel(options.LogComponentServerSelection, wrap(olm.ServerSelection)). SetComponentLevel(options.LogComponentConnection, wrap(olm.Connection)). SetMaxDocumentLength(defaultMaxDocumentLen). SetSink(clientLogger) } // UseMultipleMongoses requires validation when connecting to a sharded cluster. Options changes and validation are // only required if the option is explicitly set. If it's unset, we make no changes because the cluster URI already // includes all nodes and we don't enforce any limits on the number of nodes. if mtest.ClusterTopologyKind() == mtest.Sharded && entityOptions.UseMultipleMongoses != nil { if err := evaluateUseMultipleMongoses(clientOpts, *entityOptions.UseMultipleMongoses); err != nil { return nil, err } } if entityOptions.ObserveEvents != nil || entityOptions.StoreEventsAsEntities != nil { // Configure a command monitor that listens for the specified event types. We don't take the IgnoredCommands // option into account here because it can be overridden at the test level after the entity has already been // created, so we store the events for now but account for it when iterating over them later. commandMonitor := &event.CommandMonitor{ Started: entity.processStartedEvent, Succeeded: entity.processSucceededEvent, Failed: entity.processFailedEvent, } poolMonitor := &event.PoolMonitor{ Event: entity.processPoolEvent, } serverMonitor := &event.ServerMonitor{ ServerDescriptionChanged: entity.processServerDescriptionChangedEvent, ServerHeartbeatFailed: entity.processServerHeartbeatFailedEvent, ServerHeartbeatStarted: entity.processServerHeartbeatStartedEvent, ServerHeartbeatSucceeded: entity.processServerHeartbeatSucceededEvent, TopologyDescriptionChanged: entity.processTopologyDescriptionChangedEvent, } clientOpts.SetMonitor(commandMonitor).SetPoolMonitor(poolMonitor).SetServerMonitor(serverMonitor) for _, eventTypeStr := range entityOptions.ObserveEvents { eventType, ok := monitoringEventTypeFromString(eventTypeStr) if !ok { return nil, fmt.Errorf("unrecognized observed event type %q", eventTypeStr) } entity.observedEvents[eventType] = struct{}{} } for _, eventsAsEntity := range entityOptions.StoreEventsAsEntities { for _, eventTypeStr := range eventsAsEntity.Events { eventType, ok := monitoringEventTypeFromString(eventTypeStr) if !ok { return nil, fmt.Errorf("unrecognized stored event type %q", eventTypeStr) } entity.storedEvents[eventType] = append(entity.storedEvents[eventType], eventsAsEntity.EventListID) } } } if entityOptions.ServerAPIOptions != nil { if err := entityOptions.ServerAPIOptions.ServerAPIVersion.Validate(); err != nil { return nil, err } clientOpts.SetServerAPIOptions(entityOptions.ServerAPIOptions.ServerAPIOptions) } else { integtest.AddTestServerAPIVersion(clientOpts) } for _, cmd := range entityOptions.IgnoredCommands { entity.ignoredCommands[cmd] = struct{}{} } client, err := mongo.Connect(ctx, clientOpts) if err != nil { return nil, fmt.Errorf("error creating mongo.Client: %w", err) } entity.Client = client return entity, nil } func getURIForClient(opts *entityOptions) string { if mtest.Serverless() || mtest.ClusterTopologyKind() != mtest.LoadBalanced { return mtest.ClusterURI() } // For non-serverless load-balanced deployments, UseMultipleMongoses is used to determine the load balancer URI. If set to false, // the LB fronts a single server. If unset or explicitly true, the LB fronts multiple mongos servers. switch { case opts.UseMultipleMongoses != nil && !*opts.UseMultipleMongoses: return mtest.SingleMongosLoadBalancerURI() default: return mtest.MultiMongosLoadBalancerURI() } } // disconnect disconnects the client associated with this entity. It is an // idempotent operation, unlike the mongo client's disconnect method. This // property will help avoid unnecessary errors when calling disconnect on a // client that has already been disconnected, such as the case when the test // runner is required to run the closure as part of an operation. func (c *clientEntity) disconnect(ctx context.Context) error { if c.disconnected { return nil } if err := c.Client.Disconnect(ctx); err != nil { return err } c.disconnected = true return nil } func (c *clientEntity) stopListeningForEvents() { c.setRecordEvents(false) } func (c *clientEntity) isIgnoredEvent(commandName string, eventDoc bson.Raw) bool { // Check if command is in ignoredCommands. if _, ok := c.ignoredCommands[commandName]; ok { return true } if commandName == "hello" || strings.ToLower(commandName) == handshake.LegacyHelloLowercase { // If observeSensitiveCommands is false (or unset) and hello command has been // redacted at operation level, hello command should be ignored as it contained // speculativeAuthenticate. sensitiveCommandsIgnored := c.observeSensitiveCommands == nil || !*c.observeSensitiveCommands redacted := len(eventDoc) == 0 if sensitiveCommandsIgnored && redacted { return true } } return false } func (c *clientEntity) startedEvents() []*event.CommandStartedEvent { var events []*event.CommandStartedEvent for _, evt := range c.started { if !c.isIgnoredEvent(evt.CommandName, evt.Command) { events = append(events, evt) } } return events } func (c *clientEntity) succeededEvents() []*event.CommandSucceededEvent { var events []*event.CommandSucceededEvent for _, evt := range c.succeeded { if !c.isIgnoredEvent(evt.CommandName, evt.Reply) { events = append(events, evt) } } return events } func (c *clientEntity) failedEvents() []*event.CommandFailedEvent { var events []*event.CommandFailedEvent for _, evt := range c.failed { if _, ok := c.ignoredCommands[evt.CommandName]; !ok { events = append(events, evt) } } return events } func (c *clientEntity) poolEvents() []*event.PoolEvent { return c.pooled } func (c *clientEntity) numberConnectionsCheckedOut() int32 { return c.numConnsCheckedOut } func (c *clientEntity) addEventsCount(eventType monitoringEventType) { c.eventsCountLock.Lock() defer c.eventsCountLock.Unlock() c.eventsCount[eventType]++ } func (c *clientEntity) addServerDescriptionChangedEventCount(evt serverDescriptionChangedEventInfo) { c.serverDescriptionChangedEventsCountLock.Lock() defer c.serverDescriptionChangedEventsCountLock.Unlock() c.serverDescriptionChangedEventsCount[evt]++ } func (c *clientEntity) getEventCount(eventType monitoringEventType) int32 { c.eventsCountLock.RLock() defer c.eventsCountLock.RUnlock() return c.eventsCount[eventType] } func (c *clientEntity) getServerDescriptionChangedEventCount(evt serverDescriptionChangedEventInfo) int32 { c.serverDescriptionChangedEventsCountLock.Lock() defer c.serverDescriptionChangedEventsCountLock.Unlock() return c.serverDescriptionChangedEventsCount[evt] } func getSecondsSinceEpoch() float64 { return float64(time.Now().UnixNano()) / float64(time.Second/time.Nanosecond) } func (c *clientEntity) processStartedEvent(_ context.Context, evt *event.CommandStartedEvent) { if !c.getRecordEvents() { return } if _, ok := c.observedEvents[commandStartedEvent]; ok { c.started = append(c.started, evt) } c.addEventsCount(commandStartedEvent) eventListIDs, ok := c.storedEvents[commandStartedEvent] if !ok { return } bsonBuilder := bsoncore.NewDocumentBuilder(). AppendString("name", "CommandStartedEvent"). AppendDouble("observedAt", getSecondsSinceEpoch()). AppendString("databaseName", evt.DatabaseName). AppendString("commandName", evt.CommandName). AppendInt64("requestId", evt.RequestID). AppendString("connectionId", evt.ConnectionID) if evt.ServiceID != nil { bsonBuilder.AppendString("serviceId", evt.ServiceID.String()) } doc := bson.Raw(bsonBuilder.Build()) for _, id := range eventListIDs { c.entityMap.appendEventsEntity(id, doc) } } func (c *clientEntity) processSucceededEvent(_ context.Context, evt *event.CommandSucceededEvent) { if !c.getRecordEvents() { return } if _, ok := c.observedEvents[commandSucceededEvent]; ok { c.succeeded = append(c.succeeded, evt) } c.addEventsCount(commandSucceededEvent) eventListIDs, ok := c.storedEvents["CommandSucceededEvent"] if !ok { return } bsonBuilder := bsoncore.NewDocumentBuilder(). AppendString("name", "CommandSucceededEvent"). AppendDouble("observedAt", getSecondsSinceEpoch()). AppendString("commandName", evt.CommandName). AppendInt64("requestId", evt.RequestID). AppendString("connectionId", evt.ConnectionID) if evt.ServiceID != nil { bsonBuilder.AppendString("serviceId", evt.ServiceID.String()) } doc := bson.Raw(bsonBuilder.Build()) for _, id := range eventListIDs { c.entityMap.appendEventsEntity(id, doc) } } func (c *clientEntity) processFailedEvent(_ context.Context, evt *event.CommandFailedEvent) { if !c.getRecordEvents() { return } if _, ok := c.observedEvents[commandFailedEvent]; ok { c.failed = append(c.failed, evt) } c.addEventsCount(commandFailedEvent) eventListIDs, ok := c.storedEvents["CommandFailedEvent"] if !ok { return } bsonBuilder := bsoncore.NewDocumentBuilder(). AppendString("name", "CommandFailedEvent"). AppendDouble("observedAt", getSecondsSinceEpoch()). AppendInt64("durationNanos", evt.Duration.Nanoseconds()). AppendString("commandName", evt.CommandName). AppendInt64("requestId", evt.RequestID). AppendString("connectionId", evt.ConnectionID). AppendString("failure", evt.Failure) if evt.ServiceID != nil { bsonBuilder.AppendString("serviceId", evt.ServiceID.String()) } doc := bson.Raw(bsonBuilder.Build()) for _, id := range eventListIDs { c.entityMap.appendEventsEntity(id, doc) } } func getPoolEventDocument(evt *event.PoolEvent, eventType monitoringEventType) bson.Raw { bsonBuilder := bsoncore.NewDocumentBuilder(). AppendString("name", string(eventType)). AppendDouble("observedAt", getSecondsSinceEpoch()). AppendString("address", evt.Address) if evt.ConnectionID != 0 { bsonBuilder.AppendString("connectionId", fmt.Sprint(evt.ConnectionID)) } if evt.PoolOptions != nil { optionsDoc := bsoncore.NewDocumentBuilder(). AppendString("maxPoolSize", fmt.Sprint(evt.PoolOptions.MaxPoolSize)). AppendString("minPoolSize", fmt.Sprint(evt.PoolOptions.MinPoolSize)). AppendString("maxIdleTimeMS", fmt.Sprint(evt.PoolOptions.WaitQueueTimeoutMS)). Build() bsonBuilder.AppendDocument("poolOptions", optionsDoc) } if evt.Reason != "" { bsonBuilder.AppendString("reason", evt.Reason) } if evt.ServiceID != nil { bsonBuilder.AppendString("serviceId", evt.ServiceID.String()) } return bson.Raw(bsonBuilder.Build()) } func (c *clientEntity) processPoolEvent(evt *event.PoolEvent) { if !c.getRecordEvents() { return } // Update the connection counter. This happens even if we're not storing any events. switch evt.Type { case event.GetSucceeded: c.numConnsCheckedOut++ case event.ConnectionReturned: c.numConnsCheckedOut-- } eventType := monitoringEventTypeFromPoolEvent(evt) if _, ok := c.observedEvents[eventType]; ok { c.pooled = append(c.pooled, evt) } c.addEventsCount(eventType) if eventListIDs, ok := c.storedEvents[eventType]; ok { eventBSON := getPoolEventDocument(evt, eventType) for _, id := range eventListIDs { c.entityMap.appendEventsEntity(id, eventBSON) } } } func (c *clientEntity) processServerDescriptionChangedEvent(evt *event.ServerDescriptionChangedEvent) { c.eventProcessMu.Lock() defer c.eventProcessMu.Unlock() if !c.getRecordEvents() { return } if _, ok := c.observedEvents[serverDescriptionChangedEvent]; ok { c.serverDescriptionChanged = append(c.serverDescriptionChanged, evt) } // Record object-specific unified spec test data on an event. c.addServerDescriptionChangedEventCount(*newServerDescriptionChangedEventInfo(evt)) // Record the event generally. c.addEventsCount(serverDescriptionChangedEvent) } func (c *clientEntity) processServerHeartbeatFailedEvent(evt *event.ServerHeartbeatFailedEvent) { c.eventProcessMu.Lock() defer c.eventProcessMu.Unlock() if !c.getRecordEvents() { return } if _, ok := c.observedEvents[serverHeartbeatFailedEvent]; ok { c.serverHeartbeatFailedEvent = append(c.serverHeartbeatFailedEvent, evt) } c.addEventsCount(serverHeartbeatFailedEvent) } func (c *clientEntity) processServerHeartbeatStartedEvent(evt *event.ServerHeartbeatStartedEvent) { c.eventProcessMu.Lock() defer c.eventProcessMu.Unlock() if !c.getRecordEvents() { return } if _, ok := c.observedEvents[serverHeartbeatStartedEvent]; ok { c.serverHeartbeatStartedEvent = append(c.serverHeartbeatStartedEvent, evt) } c.addEventsCount(serverHeartbeatStartedEvent) } func (c *clientEntity) processServerHeartbeatSucceededEvent(evt *event.ServerHeartbeatSucceededEvent) { c.eventProcessMu.Lock() defer c.eventProcessMu.Unlock() if !c.getRecordEvents() { return } if _, ok := c.observedEvents[serverHeartbeatSucceededEvent]; ok { c.serverHeartbeatSucceeded = append(c.serverHeartbeatSucceeded, evt) } c.addEventsCount(serverHeartbeatSucceededEvent) } func (c *clientEntity) processTopologyDescriptionChangedEvent(evt *event.TopologyDescriptionChangedEvent) { c.eventProcessMu.Lock() defer c.eventProcessMu.Unlock() if !c.getRecordEvents() { return } if _, ok := c.observedEvents[topologyDescriptionChangedEvent]; ok { c.topologyDescriptionChanged = append(c.topologyDescriptionChanged, evt) } c.addEventsCount(topologyDescriptionChangedEvent) } func (c *clientEntity) setRecordEvents(record bool) { c.recordEvents.Store(record) } func (c *clientEntity) getRecordEvents() bool { return c.recordEvents.Load().(bool) } func setClientOptionsFromURIOptions(clientOpts *options.ClientOptions, uriOpts bson.M) error { // A write concern can be constructed across multiple URI options (e.g. "w", "j", and "wTimeoutMS") so we declare an // empty writeConcern instance here that can be populated in the loop below. var wc writeConcern var wcSet bool for key, value := range uriOpts { switch strings.ToLower(key) { case "appname": clientOpts.SetAppName(value.(string)) case "heartbeatfrequencyms": clientOpts.SetHeartbeatInterval(time.Duration(value.(int32)) * time.Millisecond) case "loadbalanced": clientOpts.SetLoadBalanced(value.(bool)) case "maxidletimems": clientOpts.SetMaxConnIdleTime(time.Duration(value.(int32)) * time.Millisecond) case "minpoolsize": clientOpts.SetMinPoolSize(uint64(value.(int32))) case "maxpoolsize": clientOpts.SetMaxPoolSize(uint64(value.(int32))) case "maxconnecting": clientOpts.SetMaxConnecting(uint64(value.(int32))) case "readconcernlevel": clientOpts.SetReadConcern(readconcern.New(readconcern.Level(value.(string)))) case "retryreads": clientOpts.SetRetryReads(value.(bool)) case "retrywrites": clientOpts.SetRetryWrites(value.(bool)) case "sockettimeoutms": clientOpts.SetSocketTimeout(time.Duration(value.(int32)) * time.Millisecond) case "w": wc.W = value wcSet = true case "waitqueuetimeoutms": return newSkipTestError("the waitQueueTimeoutMS client option is not supported") case "waitqueuesize": return newSkipTestError("the waitQueueSize client option is not supported") case "timeoutms": clientOpts.SetTimeout(time.Duration(value.(int32)) * time.Millisecond) case "serverselectiontimeoutms": clientOpts.SetServerSelectionTimeout(time.Duration(value.(int32)) * time.Millisecond) case "servermonitoringmode": clientOpts.SetServerMonitoringMode(value.(string)) default: return fmt.Errorf("unrecognized URI option %s", key) } } if wcSet { converted, err := wc.toWriteConcernOption() if err != nil { return fmt.Errorf("error creating write concern: %w", err) } clientOpts.SetWriteConcern(converted) } return nil } func evaluateUseMultipleMongoses(clientOpts *options.ClientOptions, useMultipleMongoses bool) error { hosts := mtest.ClusterConnString().Hosts if !useMultipleMongoses { clientOpts.SetHosts(hosts[:1]) return nil } if len(hosts) < 2 { return fmt.Errorf("multiple mongoses required but cluster URI %q only contains one host", mtest.ClusterURI()) } return nil }