Skip to content

Commit

Permalink
Add failure handling for BatchingSqlJournal SelectCurrentPersistenceI…
Browse files Browse the repository at this point in the history
…ds message batching (#5094)

* Add failure handling for BatchingSqlJournal SelectCurrentPersistenceIds message batching

* Do not throw an exception inside a failure handler, throwing will swallow the original failure cause, create a confusing error log, and blow up BatchingSqlJournal failure handling.

* Add missing using to make sure reader is properly disposed
  • Loading branch information
Arkatufus committed Jun 15, 2021
1 parent 0210e21 commit fa5ca50
Showing 1 changed file with 14 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
using System.Diagnostics;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Runtime.ExceptionServices;
using System.Runtime.Serialization;
using System.Text;
using System.Threading.Tasks;
Expand Down Expand Up @@ -800,8 +801,15 @@ private void FailChunkExecution(ChunkExecutionFailure message)
replayAll.ReplyTo.Tell(new EventReplayFailure(cause));
break;

case SelectCurrentPersistenceIds select:
// SqlJournal handled this failure case by using the default PipeTo failure
// handler which sends a Status.Failure message back to the sender.
select.ReplyTo.Tell(new Status.Failure(cause));
break;

default:
throw new Exception($"Unknown persistence journal request type [{request.GetType()}]");
Log.Error(cause, $"Batching failure not reported to original sender. Unknown batched persistence journal request type [{request.GetType()}].");
break;
}
}

Expand Down Expand Up @@ -1096,10 +1104,12 @@ protected virtual async Task HandleSelectCurrentPersistenceIds(SelectCurrentPers
command.Parameters.Clear();
AddParameter(command, "@Ordering", DbType.Int64, message.Offset);

var reader = await command.ExecuteReaderAsync();
while (await reader.ReadAsync())
using (var reader = await command.ExecuteReaderAsync())
{
result.Add(reader.GetString(0));
while (await reader.ReadAsync())
{
result.Add(reader.GetString(0));
}
}

message.ReplyTo.Tell(new CurrentPersistenceIds(result, highestOrderingNumber));
Expand Down

0 comments on commit fa5ca50

Please sign in to comment.