From 61000da0d3a6a14e681470a4e8f3aea99e127cd7 Mon Sep 17 00:00:00 2001 From: rstam Date: Wed, 12 Feb 2020 14:54:53 -0500 Subject: [PATCH] CSHARP-2960: Unacknowledged writes fail silently when retryWrites=true. --- .../Core/Operations/IRetryableOperation.cs | 5 + .../RetryableWriteOperationExecutor.cs | 25 +- .../BsonDocumentAssertions.cs | 20 + .../BulkMixedWriteOperationTests.cs | 18 + .../RetryableWriteOperationExecutorTests.cs | 163 ++++++ .../MongoCollectionTests.cs | 60 ++- .../Operations/BulkWriteOperationTests.cs | 9 +- .../prose-tests/CommandConstructionTests.cs | 466 ++++++++++++++++++ .../prose-tests/MMapV1Tests.cs | 67 +++ 9 files changed, 806 insertions(+), 27 deletions(-) create mode 100644 tests/MongoDB.Driver.Core.Tests/Core/Operations/RetryableWriteOperationExecutorTests.cs create mode 100644 tests/MongoDB.Driver.Tests/Specifications/retryable-writes/prose-tests/CommandConstructionTests.cs create mode 100644 tests/MongoDB.Driver.Tests/Specifications/retryable-writes/prose-tests/MMapV1Tests.cs diff --git a/src/MongoDB.Driver.Core/Core/Operations/IRetryableOperation.cs b/src/MongoDB.Driver.Core/Core/Operations/IRetryableOperation.cs index 45a58c07b02..ca857ec8089 100644 --- a/src/MongoDB.Driver.Core/Core/Operations/IRetryableOperation.cs +++ b/src/MongoDB.Driver.Core/Core/Operations/IRetryableOperation.cs @@ -97,6 +97,11 @@ public interface IRetryableReadOperation : IExecutableInRetryableReadCo /// The type of the result. public interface IRetryableWriteOperation : IExecutableInRetryableWriteContext { + /// + /// Gets the write concern for the operation. + /// + WriteConcern WriteConcern { get; } + /// /// Executes the first attempt. /// diff --git a/src/MongoDB.Driver.Core/Core/Operations/RetryableWriteOperationExecutor.cs b/src/MongoDB.Driver.Core/Core/Operations/RetryableWriteOperationExecutor.cs index b7ebd239303..98845bcb8fb 100644 --- a/src/MongoDB.Driver.Core/Core/Operations/RetryableWriteOperationExecutor.cs +++ b/src/MongoDB.Driver.Core/Core/Operations/RetryableWriteOperationExecutor.cs @@ -35,7 +35,7 @@ public static TResult Execute(IRetryableWriteOperation operati public static TResult Execute(IRetryableWriteOperation operation, RetryableWriteContext context, CancellationToken cancellationToken) { - if (!context.RetryRequested || !AreRetryableWritesSupported(context.Channel.ConnectionDescription) || context.Binding.Session.IsInTransaction) + if (!AreRetriesAllowed(operation, context)) { return operation.ExecuteAttempt(context, 1, null, cancellationToken); } @@ -86,7 +86,7 @@ public async static Task ExecuteAsync(IRetryableWriteOperation public static async Task ExecuteAsync(IRetryableWriteOperation operation, RetryableWriteContext context, CancellationToken cancellationToken) { - if (!context.RetryRequested || !AreRetryableWritesSupported(context.Channel.ConnectionDescription) || context.Binding.Session.IsInTransaction) + if (!AreRetriesAllowed(operation, context)) { return await operation.ExecuteAttemptAsync(context, 1, null, cancellationToken).ConfigureAwait(false); } @@ -128,6 +128,11 @@ public static async Task ExecuteAsync(IRetryableWriteOperation } // privates static methods + private static bool AreRetriesAllowed(IRetryableWriteOperation operation, RetryableWriteContext context) + { + return IsOperationAcknowledged(operation) && DoesContextAllowRetries(context); + } + private static bool AreRetryableWritesSupported(ConnectionDescription connectionDescription) { return @@ -135,6 +140,22 @@ private static bool AreRetryableWritesSupported(ConnectionDescription connection connectionDescription.IsMasterResult.ServerType != ServerType.Standalone; } + private static bool DoesContextAllowRetries(RetryableWriteContext context) + { + return + context.RetryRequested && + AreRetryableWritesSupported(context.Channel.ConnectionDescription) && + !context.Binding.Session.IsInTransaction; + } + + private static bool IsOperationAcknowledged(IRetryableWriteOperation operation) + { + var writeConcern = operation.WriteConcern; + return + writeConcern == null || // null means use server default write concern which implies acknowledged + writeConcern.IsAcknowledged; + } + private static bool ShouldThrowOriginalException(Exception retryException) { return retryException is MongoException && !(retryException is MongoConnectionException); diff --git a/tests/MongoDB.Bson.TestHelpers/BsonDocumentAssertions.cs b/tests/MongoDB.Bson.TestHelpers/BsonDocumentAssertions.cs index a89daad165d..b8457ba6be0 100644 --- a/tests/MongoDB.Bson.TestHelpers/BsonDocumentAssertions.cs +++ b/tests/MongoDB.Bson.TestHelpers/BsonDocumentAssertions.cs @@ -59,6 +59,16 @@ public AndConstraint Be(string json, string because = "" return Be(expected, because, reasonArgs); } + public AndConstraint Contain(string name, string because = "", params object[] reasonArgs) + { + Execute.Assertion + .BecauseOf(because, reasonArgs) + .ForCondition(Subject.Contains(name)) + .FailWith("Expected {context:object} to contain element {0}{reason}.", name); + + return new AndConstraint(this); + } + public AndConstraint NotBe(BsonDocument unexpected, string because = "", params object[] reasonArgs) { Execute.Assertion @@ -75,6 +85,16 @@ public AndConstraint NotBe(string json, string because = return NotBe(expected, because, reasonArgs); } + public AndConstraint NotContain(string name, string because = "", params object[] reasonArgs) + { + Execute.Assertion + .BecauseOf(because, reasonArgs) + .ForCondition(!Subject.Contains(name)) + .FailWith("Expected {context:object} to not contain element {0}{reason}.", name); + + return new AndConstraint(this); + } + protected override string Context { get { return "BsonDocument"; } diff --git a/tests/MongoDB.Driver.Core.Tests/Core/Operations/BulkMixedWriteOperationTests.cs b/tests/MongoDB.Driver.Core.Tests/Core/Operations/BulkMixedWriteOperationTests.cs index 1726f9ea607..81b4292d371 100644 --- a/tests/MongoDB.Driver.Core.Tests/Core/Operations/BulkMixedWriteOperationTests.cs +++ b/tests/MongoDB.Driver.Core.Tests/Core/Operations/BulkMixedWriteOperationTests.cs @@ -1095,6 +1095,8 @@ public void Execute_with_an_error_in_the_second_batch_and_ordered_is_true( [SkippableTheory] [ParameterAttributeData] public void Execute_unacknowledged_with_an_error_in_the_first_batch_and_ordered_is_false( + [Values(false, true)] + bool retryRequested, [Values(false, true)] bool async) { @@ -1112,6 +1114,7 @@ public void Execute_unacknowledged_with_an_error_in_the_first_batch_and_ordered_ var subject = new BulkMixedWriteOperation(_collectionNamespace, requests, _messageEncoderSettings) { IsOrdered = false, + RetryRequested = retryRequested, WriteConcern = WriteConcern.Unacknowledged }; @@ -1133,6 +1136,8 @@ public void Execute_unacknowledged_with_an_error_in_the_first_batch_and_ordered_ [SkippableTheory] [ParameterAttributeData] public void Execute_unacknowledged_with_an_error_in_the_first_batch_and_ordered_is_true( + [Values(false, true)] + bool retryRequested, [Values(false, true)] bool async) { @@ -1156,6 +1161,7 @@ public void Execute_unacknowledged_with_an_error_in_the_first_batch_and_ordered_ var subject = new BulkMixedWriteOperation(_collectionNamespace, requests, _messageEncoderSettings) { IsOrdered = true, + RetryRequested = retryRequested, WriteConcern = WriteConcern.Unacknowledged }; @@ -1176,6 +1182,8 @@ public void Execute_unacknowledged_with_an_error_in_the_first_batch_and_ordered_ [SkippableTheory] [ParameterAttributeData] public void Execute_unacknowledged_with_an_error_in_the_second_batch_and_ordered_is_false( + [Values(false, true)] + bool retryRequested, [Values(false, true)] bool async) { @@ -1194,6 +1202,7 @@ public void Execute_unacknowledged_with_an_error_in_the_second_batch_and_ordered { IsOrdered = false, MaxBatchCount = 2, + RetryRequested = retryRequested, WriteConcern = WriteConcern.Unacknowledged }; @@ -1214,6 +1223,8 @@ public void Execute_unacknowledged_with_an_error_in_the_second_batch_and_ordered [SkippableTheory] [ParameterAttributeData] public void Execute_unacknowledged_with_an_error_in_the_second_batch_and_ordered_is_true( + [Values(false, true)] + bool retryRequested, [Values(false, true)] bool async) { @@ -1232,6 +1243,7 @@ public void Execute_unacknowledged_with_an_error_in_the_second_batch_and_ordered { IsOrdered = true, MaxBatchCount = 2, + RetryRequested = retryRequested, WriteConcern = WriteConcern.Unacknowledged }; @@ -1252,6 +1264,7 @@ public void Execute_unacknowledged_with_an_error_in_the_second_batch_and_ordered [SkippableTheory] [ParameterAttributeData] public void Execute_with_delete_should_not_send_session_id_when_unacknowledged_writes( + [Values(false, true)] bool retryRequested, [Values(false, true)] bool useImplicitSession, [Values(false, true)] bool async) { @@ -1262,6 +1275,7 @@ public void Execute_with_delete_should_not_send_session_id_when_unacknowledged_w var requests = new[] { new DeleteRequest(BsonDocument.Parse("{ x : 1 }")) }; var subject = new BulkMixedWriteOperation(collectionNamespace, requests, _messageEncoderSettings) { + RetryRequested = retryRequested, WriteConcern = WriteConcern.Unacknowledged }; @@ -1284,6 +1298,7 @@ public void Execute_with_delete_should_send_session_id_when_supported( [SkippableTheory] [ParameterAttributeData] public void Execute_with_insert_should_not_send_session_id_when_unacknowledged_writes( + [Values(false, true)] bool retryRequested, [Values(false, true)] bool useImplicitSession, [Values(false, true)] bool async) { @@ -1294,6 +1309,7 @@ public void Execute_with_insert_should_not_send_session_id_when_unacknowledged_w var requests = new[] { new InsertRequest(BsonDocument.Parse("{ _id : 1, x : 3 }")) }; var subject = new BulkMixedWriteOperation(collectionNamespace, requests, _messageEncoderSettings) { + RetryRequested = retryRequested, WriteConcern = WriteConcern.Unacknowledged }; @@ -1316,6 +1332,7 @@ public void Execute_with_insert_should_send_session_id_when_supported( [SkippableTheory] [ParameterAttributeData] public void Execute_with_update_should_not_send_session_id_when_unacknowledged_writes( + [Values(false, true)] bool retryRequested, [Values(false, true)] bool useImplicitSession, [Values(false, true)] bool async) { @@ -1326,6 +1343,7 @@ public void Execute_with_update_should_not_send_session_id_when_unacknowledged_w var requests = new[] { new UpdateRequest(UpdateType.Update, BsonDocument.Parse("{ x : 1 }"), BsonDocument.Parse("{ $set : { a : 1 } }")) }; var subject = new BulkMixedWriteOperation(collectionNamespace, requests, _messageEncoderSettings) { + RetryRequested = retryRequested, WriteConcern = WriteConcern.Unacknowledged }; diff --git a/tests/MongoDB.Driver.Core.Tests/Core/Operations/RetryableWriteOperationExecutorTests.cs b/tests/MongoDB.Driver.Core.Tests/Core/Operations/RetryableWriteOperationExecutorTests.cs new file mode 100644 index 00000000000..66e76b3a9fb --- /dev/null +++ b/tests/MongoDB.Driver.Core.Tests/Core/Operations/RetryableWriteOperationExecutorTests.cs @@ -0,0 +1,163 @@ +/* Copyright 2020-present MongoDB Inc. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +using System.Linq; +using System.Net; +using System.Reflection; +using System.Threading; +using FluentAssertions; +using MongoDB.Bson; +using MongoDB.Bson.TestHelpers; +using MongoDB.Driver.Core.Bindings; +using MongoDB.Driver.Core.Clusters; +using MongoDB.Driver.Core.Connections; +using MongoDB.Driver.Core.Operations; +using MongoDB.Driver.Core.Servers; +using Moq; +using Xunit; + +namespace MongoDB.Driver.Core.Tests.Core.Operations +{ + public class RetryableWriteOperationExecutorTests + { + [Theory] + [InlineData(false, false, false, false)] + [InlineData(false, false, true, false)] + [InlineData(false, true, false, false)] + [InlineData(false, true, true, false)] + [InlineData(true, false, false, false)] + [InlineData(true, false, true, false)] + [InlineData(true, true, false, true)] + [InlineData(true, true, true, false)] + public void DoesContextAllowRetries_should_return_expected_result( + bool retryRequested, + bool areRetryableWritesSupported, + bool isInTransaction, + bool expectedResult) + { + var context = CreateContext(retryRequested, areRetryableWritesSupported, isInTransaction); + + var result = RetryableWriteOperationExecutorReflector.DoesContextAllowRetries(context); + + result.Should().Be(expectedResult); + } + + [Theory] + [InlineData(false, false, true)] + [InlineData(false, true, true)] + [InlineData(true, false, false)] + [InlineData(true, true, true)] + public void IsOperationAcknowledged_should_return_expected_result( + bool withWriteConcern, + bool isAcknowledged, + bool expectedResult) + { + var operation = CreateOperation(withWriteConcern, isAcknowledged); + + var result = RetryableWriteOperationExecutorReflector.IsOperationAcknowledged(operation); + + result.Should().Be(expectedResult); + } + + // private methods + private IWriteBinding CreateBinding(bool areRetryableWritesSupported, bool isInTransaction) + { + var mockBinding = new Mock(); + var session = CreateSession(isInTransaction); + var channelSource = CreateChannelSource(areRetryableWritesSupported); + mockBinding.SetupGet(m => m.Session).Returns(session); + mockBinding.Setup(m => m.GetWriteChannelSource(CancellationToken.None)).Returns(channelSource); + return mockBinding.Object; + } + + private IChannelHandle CreateChannel(bool areRetryableWritesSupported) + { + var mockChannel = new Mock(); + var connectionDescription = CreateConnectionDescription(areRetryableWritesSupported); + mockChannel.SetupGet(m => m.ConnectionDescription).Returns(connectionDescription); + return mockChannel.Object; + } + + private IChannelSourceHandle CreateChannelSource(bool areRetryableWritesSupported) + { + var mockChannelSource = new Mock(); + var channel = CreateChannel(areRetryableWritesSupported); + mockChannelSource.Setup(m => m.GetChannel(CancellationToken.None)).Returns(channel); + return mockChannelSource.Object; + } + + private ConnectionDescription CreateConnectionDescription(bool areRetryableWritesSupported) + { + var clusterId = new ClusterId(1); + var endPoint = new DnsEndPoint("localhost", 27017); + var serverId = new ServerId(clusterId, endPoint); + var connectionId = new ConnectionId(serverId, 1); + var isMasterResultDocument = BsonDocument.Parse("{ ok : 1 }"); + if (areRetryableWritesSupported) + { + isMasterResultDocument["logicalSessionTimeoutMinutes"] = 1; + isMasterResultDocument["msg"] = "isdbgrid"; // mongos + } + var isMasterResult = new IsMasterResult(isMasterResultDocument); + var buildInfoResult = new BuildInfoResult(BsonDocument.Parse("{ ok : 1, version : '4.2.0' }")); + var connectionDescription = new ConnectionDescription(connectionId, isMasterResult, buildInfoResult); + return connectionDescription; + } + + private RetryableWriteContext CreateContext(bool retryRequested, bool areRetryableWritesSupported, bool isInTransaction) + { + var binding = CreateBinding(areRetryableWritesSupported, isInTransaction); + return RetryableWriteContext.Create(binding, retryRequested, CancellationToken.None); + } + + private IRetryableWriteOperation CreateOperation(bool withWriteConcern, bool isAcknowledged) + { + var mockOperation = new Mock>(); + var writeConcern = withWriteConcern ? (isAcknowledged ? WriteConcern.Acknowledged : WriteConcern.Unacknowledged) : null; + mockOperation.SetupGet(m => m.WriteConcern).Returns(writeConcern); + return mockOperation.Object; + } + + private ICoreSessionHandle CreateSession(bool isInTransaction) + { + var mockSession = new Mock(); + mockSession.SetupGet(m => m.IsInTransaction).Returns(isInTransaction); + return mockSession.Object; + } + } + + // nested types + public static class RetryableWriteOperationExecutorReflector + { + public static bool DoesContextAllowRetries(RetryableWriteContext context) => + (bool)Reflector.InvokeStatic(typeof(RetryableWriteOperationExecutor), nameof(DoesContextAllowRetries), context); + + public static bool IsOperationAcknowledged(IRetryableWriteOperation operation) + { + var methodInfoDefinition = typeof(RetryableWriteOperationExecutor).GetMethods(BindingFlags.NonPublic | BindingFlags.Static) + .Where(m => m.Name == nameof(IsOperationAcknowledged)) + .Single(); + var methodInfo = methodInfoDefinition.MakeGenericMethod(typeof(BsonDocument)); + try + { + return (bool)methodInfo.Invoke(null, new object[] { operation }); + } + catch (TargetInvocationException exception) + { + throw exception.InnerException; + } + } + } +} diff --git a/tests/MongoDB.Driver.Legacy.Tests/MongoCollectionTests.cs b/tests/MongoDB.Driver.Legacy.Tests/MongoCollectionTests.cs index c58f63adcd0..ec61031417f 100644 --- a/tests/MongoDB.Driver.Legacy.Tests/MongoCollectionTests.cs +++ b/tests/MongoDB.Driver.Legacy.Tests/MongoCollectionTests.cs @@ -2307,12 +2307,16 @@ public void TestInsertBatchContinueOnError() } } - [Fact] - public void TestInsertBatchMultipleBatchesWriteConcernDisabledContinueOnErrorFalse() + [Theory] + [ParameterAttributeData] + public void TestInsertBatchMultipleBatchesWriteConcernDisabledContinueOnErrorFalse( + [Values(false, true)] bool retryWrites) { + var server = LegacyTestConfiguration.GetServer(retryWrites); + var database = server.GetDatabase(LegacyTestConfiguration.Database.Name); var collectionName = LegacyTestConfiguration.Collection.Name; var collectionSettings = new MongoCollectionSettings { WriteConcern = WriteConcern.Unacknowledged }; - var collection = LegacyTestConfiguration.Database.GetCollection(collectionName, collectionSettings); + var collection = database.GetCollection(collectionName, collectionSettings); collection.Drop(); var maxMessageLength = _primary.MaxMessageLength; @@ -2342,12 +2346,16 @@ public void TestInsertBatchMultipleBatchesWriteConcernDisabledContinueOnErrorFal Assert.Equal(0, collection.Count(Query.EQ("_id", 5))); } - [Fact] - public void TestInsertBatchMultipleBatchesWriteConcernDisabledContinueOnErrorTrue() + [Theory] + [ParameterAttributeData] + public void TestInsertBatchMultipleBatchesWriteConcernDisabledContinueOnErrorTrue( + [Values(false, true)] bool retryWrites) { + var server = LegacyTestConfiguration.GetServer(retryWrites); + var database = server.GetDatabase(LegacyTestConfiguration.Database.Name); var collectionName = LegacyTestConfiguration.Collection.Name; var collectionSettings = new MongoCollectionSettings { WriteConcern = WriteConcern.Unacknowledged }; - var collection = LegacyTestConfiguration.Database.GetCollection(collectionName, collectionSettings); + var collection = database.GetCollection(collectionName, collectionSettings); collection.Drop(); var maxMessageLength = _primary.MaxMessageLength; @@ -3064,17 +3072,22 @@ public void TestRemoveNoMatchingDocument() Assert.Equal(0, _collection.Count()); } - [Fact] - public void TestRemoveUnacknowledeged() + [Theory] + [ParameterAttributeData] + public void TestRemoveUnacknowledeged( + [Values(false, true)] bool retryWrites) { - using (_server.RequestStart()) + var server = LegacyTestConfiguration.GetServer(retryWrites); + using (server.RequestStart()) { - _collection.Drop(); - _collection.Insert(new BsonDocument("x", 1)); - var result = _collection.Remove(Query.EQ("x", 1), WriteConcern.Unacknowledged); + var database = server.GetDatabase(LegacyTestConfiguration.Database.Name); + var collection = database.GetCollection(GetType().Name); + collection.Drop(); + collection.Insert(new BsonDocument("x", 1)); + var result = collection.Remove(Query.EQ("x", 1), WriteConcern.Unacknowledged); Assert.Equal(null, result); - Assert.Equal(0, _collection.Count()); + Assert.Equal(0, collection.Count()); } } @@ -3339,20 +3352,25 @@ public void TestUpdateNullQuery() Assert.Equal(2, _collection.Count()); } - [Fact] - public void TestUpdateUnacknowledged() + [Theory] + [ParameterAttributeData] + public void TestUpdateUnacknowledged( + [Values(false, true)] bool retryWrites) { - using (_server.RequestStart()) + var server = LegacyTestConfiguration.GetServer(retryWrites); + using (server.RequestStart()) { - _collection.Drop(); - _collection.Insert(new BsonDocument("x", 1)); - var result = _collection.Update(Query.EQ("x", 1), Update.Set("x", 2), WriteConcern.Unacknowledged); + var database = server.GetDatabase(LegacyTestConfiguration.Database.Name); + var collection = database.GetCollection(GetType().Name); + collection.Drop(); + collection.Insert(new BsonDocument("x", 1)); + var result = collection.Update(Query.EQ("x", 1), Update.Set("x", 2), WriteConcern.Unacknowledged); Assert.Equal(null, result); - var document = _collection.FindOne(); + var document = collection.FindOne(); Assert.Equal(2, document["x"].AsInt32); - Assert.Equal(1, _collection.Count()); + Assert.Equal(1, collection.Count()); } } diff --git a/tests/MongoDB.Driver.Legacy.Tests/Operations/BulkWriteOperationTests.cs b/tests/MongoDB.Driver.Legacy.Tests/Operations/BulkWriteOperationTests.cs index 765781eb42f..6f41737f475 100644 --- a/tests/MongoDB.Driver.Legacy.Tests/Operations/BulkWriteOperationTests.cs +++ b/tests/MongoDB.Driver.Legacy.Tests/Operations/BulkWriteOperationTests.cs @@ -1125,11 +1125,12 @@ public void TestUpsertWithOneMatchingDocument(bool ordered) } [Theory] - [InlineData(false)] - [InlineData(true)] - public void TestW0DoesNotReportErrors(bool ordered) + [ParameterAttributeData] + public void TestW0DoesNotReportErrors( + [Values(false, true)] bool retryWrites, + [Values(false, true)] bool ordered) { - var server = LegacyTestConfiguration.GetServer(retryWrites: false); + var server = LegacyTestConfiguration.GetServer(retryWrites); var collection = GetCollection(server); // use a request so we can read our own writes even with older servers diff --git a/tests/MongoDB.Driver.Tests/Specifications/retryable-writes/prose-tests/CommandConstructionTests.cs b/tests/MongoDB.Driver.Tests/Specifications/retryable-writes/prose-tests/CommandConstructionTests.cs new file mode 100644 index 00000000000..c7667b37aba --- /dev/null +++ b/tests/MongoDB.Driver.Tests/Specifications/retryable-writes/prose-tests/CommandConstructionTests.cs @@ -0,0 +1,466 @@ +/* Copyright 2020-present MongoDB Inc. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using FluentAssertions; +using MongoDB.Bson; +using MongoDB.Bson.TestHelpers.XunitExtensions; +using MongoDB.Driver.Core; +using MongoDB.Driver.Core.Clusters; +using MongoDB.Driver.Core.Events; +using MongoDB.Driver.Core.Misc; +using MongoDB.Driver.Core.TestHelpers.XunitExtensions; +using MongoDB.Driver.TestHelpers; +using Xunit; + +namespace MongoDB.Driver.Tests.Specifications.retryable_writes.prose_tests +{ + public class CommandConstructionTests + { + private readonly string _collectionName = CoreTestConfiguration.GetCollectionNamespaceForTestClass(typeof(CommandConstructionTests)).CollectionName; + private readonly string _databaseName = CoreTestConfiguration.DatabaseNamespace.DatabaseName; + + [SkippableTheory] + [ParameterAttributeData] + public void Unacknowledged_writes_should_not_have_transaction_id( + [Values("delete", "insert", "update")] string operation, + [Values(false, true)] bool async) + { + RequireServer.Check().ClusterTypes(ClusterType.ReplicaSet, ClusterType.Sharded); + + DropCollection(); + var eventCapturer = CreateEventCapturer(); + using (var client = CreateDisposableClient(eventCapturer)) + { + var database = client.GetDatabase(_databaseName); + var collection = database.GetCollection(_collectionName).WithWriteConcern(WriteConcern.Unacknowledged); + + switch (operation) + { + case "delete": + var deleteFilter = Builders.Filter.Eq("_id", 1); + if (async) + { + collection.DeleteOneAsync(deleteFilter).GetAwaiter().GetResult(); + } + else + { + collection.DeleteOne(deleteFilter); + } + break; + + case "insert": + var document = new BsonDocument("_id", 1); + if (async) + { + collection.InsertOneAsync(document).GetAwaiter().GetResult(); ; + } + else + { + collection.InsertOne(document); + } + SpinUntilCollectionIsNotEmpty(); // wait for unacknowledged insert to complete so it won't execute later while another test is running + break; + + case "update": + var updateFilter = Builders.Filter.Eq("_id", 1); + var update = Builders.Update.Set("x", 1); + if (async) + { + collection.UpdateOneAsync(updateFilter, update).GetAwaiter().GetResult(); + } + else + { + collection.UpdateOne(updateFilter, update); + } + break; + + default: + throw new Exception($"Unexpected operation: {operation}."); + } + + AssertCommandDoesNotHaveTransactionId(eventCapturer); + } + } + + [SkippableTheory] + [ParameterAttributeData] + public void Unsupported_single_statement_writes_should_not_have_transaction_id( + [Values("deleteMany", "updateMany")] string operation, + [Values(false, true)] bool async) + { + RequireServer.Check().ClusterTypes(ClusterType.ReplicaSet, ClusterType.Sharded); + + DropCollection(); + var eventCapturer = CreateEventCapturer(); + using (var client = CreateDisposableClient(eventCapturer)) + { + var database = client.GetDatabase(_databaseName); + var collection = database.GetCollection(_collectionName); + + switch (operation) + { + case "deleteMany": + var deleteManyFilter = Builders.Filter.Eq("_id", 1); + if (async) + { + collection.DeleteManyAsync(deleteManyFilter).GetAwaiter().GetResult(); + } + else + { + collection.DeleteMany(deleteManyFilter); + } + break; + + case "updateMany": + var updateManyFilter = Builders.Filter.Eq("_id", 1); + var update = Builders.Update.Set("x", 1); + if (async) + { + collection.UpdateManyAsync(updateManyFilter, update).GetAwaiter().GetResult(); + } + else + { + collection.UpdateMany(updateManyFilter, update); + } + break; + + default: + throw new Exception($"Unexpected operation: {operation}."); + } + + AssertCommandDoesNotHaveTransactionId(eventCapturer); + } + } + + [SkippableTheory] + [ParameterAttributeData] + public void Unsupported_multi_statement_writes_should_not_have_transaction_id( + [Values("deleteMany", "updateMany")] string operation, + [Values(false, true)] bool async) + { + RequireServer.Check().ClusterTypes(ClusterType.ReplicaSet, ClusterType.Sharded); + + DropCollection(); + var eventCapturer = CreateEventCapturer(); + using (var client = CreateDisposableClient(eventCapturer)) + { + var database = client.GetDatabase(_databaseName); + var collection = database.GetCollection(_collectionName); + + WriteModel[] requests; + switch (operation) + { + case "deleteMany": + var deleteManyFilter = Builders.Filter.Eq("_id", 1); + requests = new[] { new DeleteManyModel(deleteManyFilter) }; + break; + + case "updateMany": + var updateManyFilter = Builders.Filter.Eq("_id", 1); + var update = Builders.Update.Set("x", 1); + requests = new[] { new UpdateManyModel(updateManyFilter, update) }; + break; + + default: + throw new Exception($"Unexpected operation: {operation}."); + } + if (async) + { + collection.BulkWriteAsync(requests).GetAwaiter().GetResult(); + } + else + { + collection.BulkWrite(requests); + } + + AssertCommandDoesNotHaveTransactionId(eventCapturer); + } + } + + [SkippableTheory] + [ParameterAttributeData] + public void Aggregate_with_write_stage_should_not_have_transaction_id( + [Values("$out", "$merge")] string outStage, + [Values(false, true)] bool async) + { + RequireServer.Check().ClusterTypes(ClusterType.ReplicaSet, ClusterType.Sharded); + if (outStage == "$merge") + { + RequireServer.Check().Supports(Feature.AggregateMerge); + } + + DropCollection(); + var eventCapturer = CreateEventCapturer(); + using (var client = CreateDisposableClient(eventCapturer)) + { + var database = client.GetDatabase(_databaseName); + var collection = database.GetCollection(_collectionName); + + PipelineDefinition pipeline = new EmptyPipelineDefinition(); + var outputCollection = database.GetCollection(_collectionName + "-outputCollection"); + switch (outStage) + { + case "$out": + pipeline = pipeline.Out(outputCollection); + break; + + case "$merge": + var mergeOptions = new MergeStageOptions(); + pipeline = pipeline.Merge(outputCollection, mergeOptions); + break; + + default: + throw new Exception($"Unexpected outStage: {outStage}."); + } + if (async) + { + collection.AggregateAsync(pipeline).GetAwaiter().GetResult(); + } + else + { + collection.Aggregate(pipeline); + } + + AssertCommandDoesNotHaveTransactionId(eventCapturer); + } + } + + [SkippableTheory] + [ParameterAttributeData] + public void Supported_single_statement_writes_should_have_transaction_id( + [Values("insertOne", "updateOne", "replaceOne", "deleteOne", "findOneAndDelete", "findOneAndReplace", "findOneAndUpdate")] string operation, + [Values(false, true)] bool async) + { + RequireServer.Check().VersionGreaterThanOrEqualTo("3.6.0").ClusterTypes(ClusterType.ReplicaSet, ClusterType.Sharded); + + DropCollection(); + var eventCapturer = CreateEventCapturer(); + using (var client = CreateDisposableClient(eventCapturer)) + { + var database = client.GetDatabase(_databaseName); + var collection = database.GetCollection(_collectionName); + + switch (operation) + { + case "deleteOne": + var deleteOneFilter = Builders.Filter.Eq("_id", 1); + if (async) + { + collection.DeleteOneAsync(deleteOneFilter).GetAwaiter().GetResult(); + } + else + { + collection.DeleteOne(deleteOneFilter); + } + break; + + case "findOneAndDelete": + var findOneAndDeleteFilter = Builders.Filter.Eq("_id", 1); + if (async) + { + collection.FindOneAndDeleteAsync(findOneAndDeleteFilter).GetAwaiter().GetResult(); + } + else + { + collection.FindOneAndDelete(findOneAndDeleteFilter); + } + break; + + case "findOneAndReplace": + var findOneAndReplaceFilter = Builders.Filter.Eq("_id", 1); + var findOneAndReplaceReplacement = new BsonDocument("_id", 1); + if (async) + { + collection.FindOneAndReplaceAsync(findOneAndReplaceFilter, findOneAndReplaceReplacement).GetAwaiter().GetResult(); + } + else + { + collection.FindOneAndReplace(findOneAndReplaceFilter, findOneAndReplaceReplacement); + } + break; + + case "findOneAndUpdate": + var findOneAndUpdateFilter = Builders.Filter.Eq("_id", 1); + var findOneAndUpdateUpdate = Builders.Update.Set("x", 2); + if (async) + { + collection.FindOneAndUpdateAsync(findOneAndUpdateFilter, findOneAndUpdateUpdate).GetAwaiter().GetResult(); + } + else + { + collection.FindOneAndUpdate(findOneAndUpdateFilter, findOneAndUpdateUpdate); + } + break; + + case "insertOne": + var document = new BsonDocument("_id", 1); + if (async) + { + collection.InsertOneAsync(document).GetAwaiter().GetResult(); + } + else + { + collection.InsertOne(document); + } + break; + + case "replaceOne": + var replaceOneFilter = Builders.Filter.Eq("_id", 1); + var replacement = new BsonDocument("_id", 1); + if (async) + { + collection.ReplaceOneAsync(replaceOneFilter, replacement).GetAwaiter().GetResult(); + } + else + { + collection.ReplaceOne(replaceOneFilter, replacement); + } + break; + + case "updateOne": + var updateOneFilter = Builders.Filter.Eq("_id", 1); + var updateOne = Builders.Update.Set("x", 2); + if (async) + { + collection.UpdateOneAsync(updateOneFilter, updateOne).GetAwaiter().GetResult(); + } + else + { + collection.UpdateOne(updateOneFilter, updateOne); + } + break; + + default: + throw new Exception($"Unexpected operation: {operation}."); + } + + AssertCommandHasTransactionId(eventCapturer); + } + } + + [SkippableTheory] + [ParameterAttributeData] + public void Supported_multi_statement_writes_should_have_transaction_id( + [Values("insertMany", "bulkWrite")] string operation, + [Values(false, true)] bool ordered, + [Values(false, true)] bool async) + { + RequireServer.Check().VersionGreaterThanOrEqualTo("3.6.0").ClusterTypes(ClusterType.ReplicaSet, ClusterType.Sharded); + + DropCollection(); + var eventCapturer = CreateEventCapturer(); + using (var client = CreateDisposableClient(eventCapturer)) + { + var database = client.GetDatabase(_databaseName); + var collection = database.GetCollection(_collectionName); + + switch (operation) + { + case "bulkWrite": + var requests = new[] { new InsertOneModel(new BsonDocument("_id", 1)) }; + var bulkWriteOptions = new BulkWriteOptions { IsOrdered = ordered }; + if (async) + { + collection.BulkWriteAsync(requests, bulkWriteOptions).GetAwaiter().GetResult(); + } + else + { + collection.BulkWrite(requests, bulkWriteOptions); + } + break; + + case "insertMany": + var documents = new[] { new BsonDocument("_id", 1) }; + var insertManyOptions = new InsertManyOptions { IsOrdered = ordered }; + if (async) + { + collection.InsertManyAsync(documents, insertManyOptions).GetAwaiter().GetResult(); + } + else + { + collection.InsertMany(documents, insertManyOptions); + } + break; + + default: + throw new Exception($"Unexpected operation: {operation}."); + } + + AssertCommandHasTransactionId(eventCapturer); + } + } + + // private methods + private void AssertCommandDoesNotHaveTransactionId(EventCapturer eventCapturer) + { + var commandStartedEvent = eventCapturer.Events.OfType().Single(); + var command = commandStartedEvent.Command; + command.Should().NotContain("txnNumber"); + } + + private void AssertCommandHasTransactionId(EventCapturer eventCapturer) + { + var commandStartedEvent = eventCapturer.Events.OfType().Single(); + var command = commandStartedEvent.Command; + command.Should().Contain("txnNumber"); + } + + private DisposableMongoClient CreateDisposableClient(EventCapturer eventCapturer) + { + return DriverTestConfiguration.CreateDisposableClient((MongoClientSettings settings) => + { + settings.ClusterConfigurator = c => c.Subscribe(eventCapturer); + settings.RetryWrites = true; + }); + } + + private EventCapturer CreateEventCapturer() + { + var commandsToNotCapture = new HashSet + { + "isMaster", + "buildInfo", + "getLastError", + "authenticate", + "saslStart", + "saslContinue", + "getnonce" + }; + + return + new EventCapturer() + .Capture(e => !commandsToNotCapture.Contains(e.CommandName)); + } + + private void DropCollection() + { + var client = DriverTestConfiguration.Client; + var database = client.GetDatabase(_databaseName); + database.DropCollection(_collectionName); + } + + private void SpinUntilCollectionIsNotEmpty() + { + var client = DriverTestConfiguration.Client; + var database = client.GetDatabase(_databaseName); + var collection = database.GetCollection(_collectionName); + SpinWait.SpinUntil(() => collection.CountDocuments("{}") > 0, TimeSpan.FromSeconds(10)).Should().BeTrue(); + } + } +} diff --git a/tests/MongoDB.Driver.Tests/Specifications/retryable-writes/prose-tests/MMapV1Tests.cs b/tests/MongoDB.Driver.Tests/Specifications/retryable-writes/prose-tests/MMapV1Tests.cs new file mode 100644 index 00000000000..7327c23f6a4 --- /dev/null +++ b/tests/MongoDB.Driver.Tests/Specifications/retryable-writes/prose-tests/MMapV1Tests.cs @@ -0,0 +1,67 @@ +/* Copyright 2020-present MongoDB Inc. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +using System; +using FluentAssertions; +using MongoDB.Bson; +using MongoDB.Bson.TestHelpers.XunitExtensions; +using MongoDB.Driver.Core.Clusters; +using MongoDB.Driver.Core.TestHelpers.XunitExtensions; +using MongoDB.Driver.TestHelpers; +using Xunit; + +namespace MongoDB.Driver.Tests.Specifications.retryable_writes.prose_tests +{ + public class MMapV1Tests + { + [SkippableTheory] + [ParameterAttributeData] + public void Write_operation_should_throw_when_retry_writes_is_true_and_storage_engine_is_MMMAPv1( + [Values(false, true)] bool async) + { + RequireServer.Check() + .VersionGreaterThanOrEqualTo("3.6.0") + .ClusterType(ClusterType.ReplicaSet) + .StorageEngine("mmapv1"); + + using (var client = CreateDisposableMongoClient()) + { + var database = client.GetDatabase(DriverTestConfiguration.DatabaseNamespace.DatabaseName); + var collection = database.GetCollection(DriverTestConfiguration.CollectionNamespace.CollectionName); + database.DropCollection(collection.CollectionNamespace.CollectionName); + + var document = new BsonDocument("_id", 1); + Exception exception; + if (async) + { + exception = Record.Exception(() => collection.InsertOneAsync(document).GetAwaiter().GetResult()); + } + else + { + exception = Record.Exception(() => collection.InsertOne(document)); + } + + var e = exception.Should().BeOfType().Subject; + e.Message.Should().Be("This MongoDB deployment does not support retryable writes. Please add retryWrites=false to your connection string."); + } + } + + // private methods + private DisposableMongoClient CreateDisposableMongoClient() + { + return DriverTestConfiguration.CreateDisposableClient(s => s.RetryWrites = true); + } + } +}