Skip to content

Commit

Permalink
CSHARP-2960: Unacknowledged writes fail silently when retryWrites=true.
Browse files Browse the repository at this point in the history
  • Loading branch information
rstam committed Feb 12, 2020
1 parent f8d38a8 commit 61000da
Show file tree
Hide file tree
Showing 9 changed files with 806 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@ public interface IRetryableReadOperation<TResult> : IExecutableInRetryableReadCo
/// <typeparam name="TResult">The type of the result.</typeparam>
public interface IRetryableWriteOperation<TResult> : IExecutableInRetryableWriteContext<TResult>
{
/// <summary>
/// Gets the write concern for the operation.
/// </summary>
WriteConcern WriteConcern { get; }

/// <summary>
/// Executes the first attempt.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public static TResult Execute<TResult>(IRetryableWriteOperation<TResult> operati

public static TResult Execute<TResult>(IRetryableWriteOperation<TResult> operation, RetryableWriteContext context, CancellationToken cancellationToken)
{
if (!context.RetryRequested || !AreRetryableWritesSupported(context.Channel.ConnectionDescription) || context.Binding.Session.IsInTransaction)
if (!AreRetriesAllowed(operation, context))
{
return operation.ExecuteAttempt(context, 1, null, cancellationToken);
}
Expand Down Expand Up @@ -86,7 +86,7 @@ public async static Task<TResult> ExecuteAsync<TResult>(IRetryableWriteOperation

public static async Task<TResult> ExecuteAsync<TResult>(IRetryableWriteOperation<TResult> operation, RetryableWriteContext context, CancellationToken cancellationToken)
{
if (!context.RetryRequested || !AreRetryableWritesSupported(context.Channel.ConnectionDescription) || context.Binding.Session.IsInTransaction)
if (!AreRetriesAllowed(operation, context))
{
return await operation.ExecuteAttemptAsync(context, 1, null, cancellationToken).ConfigureAwait(false);
}
Expand Down Expand Up @@ -128,13 +128,34 @@ public static async Task<TResult> ExecuteAsync<TResult>(IRetryableWriteOperation
}

// privates static methods
private static bool AreRetriesAllowed<TResult>(IRetryableWriteOperation<TResult> operation, RetryableWriteContext context)
{
return IsOperationAcknowledged(operation) && DoesContextAllowRetries(context);
}

private static bool AreRetryableWritesSupported(ConnectionDescription connectionDescription)
{
return
connectionDescription.IsMasterResult.LogicalSessionTimeout != null &&
connectionDescription.IsMasterResult.ServerType != ServerType.Standalone;
}

private static bool DoesContextAllowRetries(RetryableWriteContext context)
{
return
context.RetryRequested &&
AreRetryableWritesSupported(context.Channel.ConnectionDescription) &&
!context.Binding.Session.IsInTransaction;
}

private static bool IsOperationAcknowledged<TResult>(IRetryableWriteOperation<TResult> operation)
{
var writeConcern = operation.WriteConcern;
return
writeConcern == null || // null means use server default write concern which implies acknowledged
writeConcern.IsAcknowledged;
}

private static bool ShouldThrowOriginalException(Exception retryException)
{
return retryException is MongoException && !(retryException is MongoConnectionException);
Expand Down
20 changes: 20 additions & 0 deletions tests/MongoDB.Bson.TestHelpers/BsonDocumentAssertions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,16 @@ public AndConstraint<BsonDocumentAssertions> Be(string json, string because = ""
return Be(expected, because, reasonArgs);
}

public AndConstraint<BsonDocumentAssertions> Contain(string name, string because = "", params object[] reasonArgs)
{
Execute.Assertion
.BecauseOf(because, reasonArgs)
.ForCondition(Subject.Contains(name))
.FailWith("Expected {context:object} to contain element {0}{reason}.", name);

return new AndConstraint<BsonDocumentAssertions>(this);
}

public AndConstraint<BsonDocumentAssertions> NotBe(BsonDocument unexpected, string because = "", params object[] reasonArgs)
{
Execute.Assertion
Expand All @@ -75,6 +85,16 @@ public AndConstraint<BsonDocumentAssertions> NotBe(string json, string because =
return NotBe(expected, because, reasonArgs);
}

public AndConstraint<BsonDocumentAssertions> NotContain(string name, string because = "", params object[] reasonArgs)
{
Execute.Assertion
.BecauseOf(because, reasonArgs)
.ForCondition(!Subject.Contains(name))
.FailWith("Expected {context:object} to not contain element {0}{reason}.", name);

return new AndConstraint<BsonDocumentAssertions>(this);
}

protected override string Context
{
get { return "BsonDocument"; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1095,6 +1095,8 @@ public void Execute_with_an_error_in_the_second_batch_and_ordered_is_true(
[SkippableTheory]
[ParameterAttributeData]
public void Execute_unacknowledged_with_an_error_in_the_first_batch_and_ordered_is_false(
[Values(false, true)]
bool retryRequested,
[Values(false, true)]
bool async)
{
Expand All @@ -1112,6 +1114,7 @@ public void Execute_unacknowledged_with_an_error_in_the_first_batch_and_ordered_
var subject = new BulkMixedWriteOperation(_collectionNamespace, requests, _messageEncoderSettings)
{
IsOrdered = false,
RetryRequested = retryRequested,
WriteConcern = WriteConcern.Unacknowledged
};

Expand All @@ -1133,6 +1136,8 @@ public void Execute_unacknowledged_with_an_error_in_the_first_batch_and_ordered_
[SkippableTheory]
[ParameterAttributeData]
public void Execute_unacknowledged_with_an_error_in_the_first_batch_and_ordered_is_true(
[Values(false, true)]
bool retryRequested,
[Values(false, true)]
bool async)
{
Expand All @@ -1156,6 +1161,7 @@ public void Execute_unacknowledged_with_an_error_in_the_first_batch_and_ordered_
var subject = new BulkMixedWriteOperation(_collectionNamespace, requests, _messageEncoderSettings)
{
IsOrdered = true,
RetryRequested = retryRequested,
WriteConcern = WriteConcern.Unacknowledged
};

Expand All @@ -1176,6 +1182,8 @@ public void Execute_unacknowledged_with_an_error_in_the_first_batch_and_ordered_
[SkippableTheory]
[ParameterAttributeData]
public void Execute_unacknowledged_with_an_error_in_the_second_batch_and_ordered_is_false(
[Values(false, true)]
bool retryRequested,
[Values(false, true)]
bool async)
{
Expand All @@ -1194,6 +1202,7 @@ public void Execute_unacknowledged_with_an_error_in_the_second_batch_and_ordered
{
IsOrdered = false,
MaxBatchCount = 2,
RetryRequested = retryRequested,
WriteConcern = WriteConcern.Unacknowledged
};

Expand All @@ -1214,6 +1223,8 @@ public void Execute_unacknowledged_with_an_error_in_the_second_batch_and_ordered
[SkippableTheory]
[ParameterAttributeData]
public void Execute_unacknowledged_with_an_error_in_the_second_batch_and_ordered_is_true(
[Values(false, true)]
bool retryRequested,
[Values(false, true)]
bool async)
{
Expand All @@ -1232,6 +1243,7 @@ public void Execute_unacknowledged_with_an_error_in_the_second_batch_and_ordered
{
IsOrdered = true,
MaxBatchCount = 2,
RetryRequested = retryRequested,
WriteConcern = WriteConcern.Unacknowledged
};

Expand All @@ -1252,6 +1264,7 @@ public void Execute_unacknowledged_with_an_error_in_the_second_batch_and_ordered
[SkippableTheory]
[ParameterAttributeData]
public void Execute_with_delete_should_not_send_session_id_when_unacknowledged_writes(
[Values(false, true)] bool retryRequested,
[Values(false, true)] bool useImplicitSession,
[Values(false, true)] bool async)
{
Expand All @@ -1262,6 +1275,7 @@ public void Execute_with_delete_should_not_send_session_id_when_unacknowledged_w
var requests = new[] { new DeleteRequest(BsonDocument.Parse("{ x : 1 }")) };
var subject = new BulkMixedWriteOperation(collectionNamespace, requests, _messageEncoderSettings)
{
RetryRequested = retryRequested,
WriteConcern = WriteConcern.Unacknowledged
};

Expand All @@ -1284,6 +1298,7 @@ public void Execute_with_delete_should_send_session_id_when_supported(
[SkippableTheory]
[ParameterAttributeData]
public void Execute_with_insert_should_not_send_session_id_when_unacknowledged_writes(
[Values(false, true)] bool retryRequested,
[Values(false, true)] bool useImplicitSession,
[Values(false, true)] bool async)
{
Expand All @@ -1294,6 +1309,7 @@ public void Execute_with_insert_should_not_send_session_id_when_unacknowledged_w
var requests = new[] { new InsertRequest(BsonDocument.Parse("{ _id : 1, x : 3 }")) };
var subject = new BulkMixedWriteOperation(collectionNamespace, requests, _messageEncoderSettings)
{
RetryRequested = retryRequested,
WriteConcern = WriteConcern.Unacknowledged
};

Expand All @@ -1316,6 +1332,7 @@ public void Execute_with_insert_should_send_session_id_when_supported(
[SkippableTheory]
[ParameterAttributeData]
public void Execute_with_update_should_not_send_session_id_when_unacknowledged_writes(
[Values(false, true)] bool retryRequested,
[Values(false, true)] bool useImplicitSession,
[Values(false, true)] bool async)
{
Expand All @@ -1326,6 +1343,7 @@ public void Execute_with_update_should_not_send_session_id_when_unacknowledged_w
var requests = new[] { new UpdateRequest(UpdateType.Update, BsonDocument.Parse("{ x : 1 }"), BsonDocument.Parse("{ $set : { a : 1 } }")) };
var subject = new BulkMixedWriteOperation(collectionNamespace, requests, _messageEncoderSettings)
{
RetryRequested = retryRequested,
WriteConcern = WriteConcern.Unacknowledged
};

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/* Copyright 2020-present MongoDB Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

using System.Linq;
using System.Net;
using System.Reflection;
using System.Threading;
using FluentAssertions;
using MongoDB.Bson;
using MongoDB.Bson.TestHelpers;
using MongoDB.Driver.Core.Bindings;
using MongoDB.Driver.Core.Clusters;
using MongoDB.Driver.Core.Connections;
using MongoDB.Driver.Core.Operations;
using MongoDB.Driver.Core.Servers;
using Moq;
using Xunit;

namespace MongoDB.Driver.Core.Tests.Core.Operations
{
public class RetryableWriteOperationExecutorTests
{
[Theory]
[InlineData(false, false, false, false)]
[InlineData(false, false, true, false)]
[InlineData(false, true, false, false)]
[InlineData(false, true, true, false)]
[InlineData(true, false, false, false)]
[InlineData(true, false, true, false)]
[InlineData(true, true, false, true)]
[InlineData(true, true, true, false)]
public void DoesContextAllowRetries_should_return_expected_result(
bool retryRequested,
bool areRetryableWritesSupported,
bool isInTransaction,
bool expectedResult)
{
var context = CreateContext(retryRequested, areRetryableWritesSupported, isInTransaction);

var result = RetryableWriteOperationExecutorReflector.DoesContextAllowRetries(context);

result.Should().Be(expectedResult);
}

[Theory]
[InlineData(false, false, true)]
[InlineData(false, true, true)]
[InlineData(true, false, false)]
[InlineData(true, true, true)]
public void IsOperationAcknowledged_should_return_expected_result(
bool withWriteConcern,
bool isAcknowledged,
bool expectedResult)
{
var operation = CreateOperation(withWriteConcern, isAcknowledged);

var result = RetryableWriteOperationExecutorReflector.IsOperationAcknowledged(operation);

result.Should().Be(expectedResult);
}

// private methods
private IWriteBinding CreateBinding(bool areRetryableWritesSupported, bool isInTransaction)
{
var mockBinding = new Mock<IWriteBinding>();
var session = CreateSession(isInTransaction);
var channelSource = CreateChannelSource(areRetryableWritesSupported);
mockBinding.SetupGet(m => m.Session).Returns(session);
mockBinding.Setup(m => m.GetWriteChannelSource(CancellationToken.None)).Returns(channelSource);
return mockBinding.Object;
}

private IChannelHandle CreateChannel(bool areRetryableWritesSupported)
{
var mockChannel = new Mock<IChannelHandle>();
var connectionDescription = CreateConnectionDescription(areRetryableWritesSupported);
mockChannel.SetupGet(m => m.ConnectionDescription).Returns(connectionDescription);
return mockChannel.Object;
}

private IChannelSourceHandle CreateChannelSource(bool areRetryableWritesSupported)
{
var mockChannelSource = new Mock<IChannelSourceHandle>();
var channel = CreateChannel(areRetryableWritesSupported);
mockChannelSource.Setup(m => m.GetChannel(CancellationToken.None)).Returns(channel);
return mockChannelSource.Object;
}

private ConnectionDescription CreateConnectionDescription(bool areRetryableWritesSupported)
{
var clusterId = new ClusterId(1);
var endPoint = new DnsEndPoint("localhost", 27017);
var serverId = new ServerId(clusterId, endPoint);
var connectionId = new ConnectionId(serverId, 1);
var isMasterResultDocument = BsonDocument.Parse("{ ok : 1 }");
if (areRetryableWritesSupported)
{
isMasterResultDocument["logicalSessionTimeoutMinutes"] = 1;
isMasterResultDocument["msg"] = "isdbgrid"; // mongos
}
var isMasterResult = new IsMasterResult(isMasterResultDocument);
var buildInfoResult = new BuildInfoResult(BsonDocument.Parse("{ ok : 1, version : '4.2.0' }"));
var connectionDescription = new ConnectionDescription(connectionId, isMasterResult, buildInfoResult);
return connectionDescription;
}

private RetryableWriteContext CreateContext(bool retryRequested, bool areRetryableWritesSupported, bool isInTransaction)
{
var binding = CreateBinding(areRetryableWritesSupported, isInTransaction);
return RetryableWriteContext.Create(binding, retryRequested, CancellationToken.None);
}

private IRetryableWriteOperation<BsonDocument> CreateOperation(bool withWriteConcern, bool isAcknowledged)
{
var mockOperation = new Mock<IRetryableWriteOperation<BsonDocument>>();
var writeConcern = withWriteConcern ? (isAcknowledged ? WriteConcern.Acknowledged : WriteConcern.Unacknowledged) : null;
mockOperation.SetupGet(m => m.WriteConcern).Returns(writeConcern);
return mockOperation.Object;
}

private ICoreSessionHandle CreateSession(bool isInTransaction)
{
var mockSession = new Mock<ICoreSessionHandle>();
mockSession.SetupGet(m => m.IsInTransaction).Returns(isInTransaction);
return mockSession.Object;
}
}

// nested types
public static class RetryableWriteOperationExecutorReflector
{
public static bool DoesContextAllowRetries(RetryableWriteContext context) =>
(bool)Reflector.InvokeStatic(typeof(RetryableWriteOperationExecutor), nameof(DoesContextAllowRetries), context);

public static bool IsOperationAcknowledged(IRetryableWriteOperation<BsonDocument> operation)
{
var methodInfoDefinition = typeof(RetryableWriteOperationExecutor).GetMethods(BindingFlags.NonPublic | BindingFlags.Static)
.Where(m => m.Name == nameof(IsOperationAcknowledged))
.Single();
var methodInfo = methodInfoDefinition.MakeGenericMethod(typeof(BsonDocument));
try
{
return (bool)methodInfo.Invoke(null, new object[] { operation });
}
catch (TargetInvocationException exception)
{
throw exception.InnerException;
}
}
}
}
Loading

0 comments on commit 61000da

Please sign in to comment.