Skip to content

Commit 526d6b5

Browse files
committed
Merge pull request #14 from gaillard/master
Add newTimestamp parameter and functionality to AckSend() and Requeue() ...
2 parents 9ef2ead + 274629a commit 526d6b5

File tree

2 files changed

+68
-11
lines changed

2 files changed

+68
-11
lines changed

DominionEnterprises.Mongo.Tests/QueueTests.cs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,34 @@ public void GetWithTimeBasedPriority()
275275
Assert.AreEqual(messageThree, resultThree);
276276
}
277277

278+
[Test]
279+
public void GetWithTimeBasedPriorityAndOldTimestamp()
280+
{
281+
var messageOne = new BsonDocument { { "key", 0 } };
282+
var messageTwo = new BsonDocument { { "key", 1 } };
283+
var messageThree = new BsonDocument { { "key", 2 } };
284+
285+
queue.Send(messageOne);
286+
queue.Send(messageTwo);
287+
queue.Send(messageThree);
288+
289+
var resultTwo = queue.Get(new QueryDocument(), TimeSpan.MaxValue);
290+
//ensuring using old timestamp shouldn't affect normal time order of Send()s
291+
queue.Requeue(resultTwo, DateTime.UtcNow, 0.0, false);
292+
293+
var resultOne = queue.Get(new QueryDocument(), TimeSpan.MaxValue);
294+
resultTwo = queue.Get(new QueryDocument(), TimeSpan.MaxValue);
295+
var resultThree = queue.Get(new QueryDocument(), TimeSpan.MaxValue);
296+
297+
messageOne.InsertAt(0, new BsonElement("id", resultOne["id"]));
298+
messageTwo.InsertAt(0, new BsonElement("id", resultTwo["id"]));
299+
messageThree.InsertAt(0, new BsonElement("id", resultThree["id"]));
300+
301+
Assert.AreEqual(messageOne, resultOne);
302+
Assert.AreEqual(messageTwo, resultTwo);
303+
Assert.AreEqual(messageThree, resultThree);
304+
}
305+
278306
[Test]
279307
public void GetWait()
280308
{

DominionEnterprises.Mongo/Queue.cs

Lines changed: 40 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
using System.Reflection;
77
using System.Security.Cryptography;
88

9-
[assembly: AssemblyVersion("1.1.0.*")]
9+
[assembly: AssemblyVersion("1.2.0.*")]
1010

1111
namespace DominionEnterprises.Mongo
1212
{
@@ -310,7 +310,7 @@ public void Ack(BsonDocument message)
310310

311311
#region AckSend
312312
/// <summary>
313-
/// Ack message and send payload to queue, atomically, with earliestGet as Now and 0.0 priority
313+
/// Ack message and send payload to queue, atomically, with earliestGet as Now, 0.0 priority and new timestamp
314314
/// </summary>
315315
/// <param name="message">message to ack received from Get()</param>
316316
/// <param name="payload">payload to send</param>
@@ -322,7 +322,7 @@ public void AckSend(BsonDocument message, BsonDocument payload)
322322
}
323323

324324
/// <summary>
325-
/// Ack message and send payload to queue, atomically, with 0.0 priority
325+
/// Ack message and send payload to queue, atomically, with 0.0 priority and new timestamp
326326
/// </summary>
327327
/// <param name="message">message to ack received from Get()</param>
328328
/// <param name="payload">payload to send</param>
@@ -334,17 +334,32 @@ public void AckSend(BsonDocument message, BsonDocument payload, DateTime earlies
334334
AckSend(message, payload, earliestGet, 0.0);
335335
}
336336

337+
/// <summary>
338+
/// Ack message and send payload to queue, atomically, with new timestamp
339+
/// </summary>
340+
/// <param name="message">message to ack received from Get()</param>
341+
/// <param name="payload">payload to send</param>
342+
/// <param name="earliestGet">earliest instant that a call to Get() can return message</param>
343+
/// <param name="priority">priority for order out of Get(). 0 is higher priority than 1</param>
344+
/// <exception cref="ArgumentNullException">message or payload is null</exception>
345+
/// <exception cref="ArgumentException">message id must be a BsonObjectId</exception>
346+
public void AckSend(BsonDocument message, BsonDocument payload, DateTime earliestGet, double priority)
347+
{
348+
AckSend(message, payload, earliestGet, priority, true);
349+
}
350+
337351
/// <summary>
338352
/// Ack message and send payload to queue, atomically.
339353
/// </summary>
340354
/// <param name="message">message to ack received from Get()</param>
341355
/// <param name="payload">payload to send</param>
342356
/// <param name="earliestGet">earliest instant that a call to Get() can return message</param>
343357
/// <param name="priority">priority for order out of Get(). 0 is higher priority than 1</param>
358+
/// <param name="newTimestamp">true to give the payload a new timestamp or false to use given message timestamp</param>
344359
/// <exception cref="ArgumentNullException">message or payload is null</exception>
345360
/// <exception cref="ArgumentException">message id must be a BsonObjectId</exception>
346361
/// <exception cref="ArgumentException">priority was NaN</exception>
347-
public void AckSend(BsonDocument message, BsonDocument payload, DateTime earliestGet, double priority)
362+
public void AckSend(BsonDocument message, BsonDocument payload, DateTime earliestGet, double priority, bool newTimestamp)
348363
{
349364
if (message == null) throw new ArgumentNullException("message");
350365
if (payload == null) throw new ArgumentNullException("payload");
@@ -353,24 +368,25 @@ public void AckSend(BsonDocument message, BsonDocument payload, DateTime earlies
353368
var messageId = message["id"];
354369
if (messageId.GetType() != typeof(BsonObjectId)) throw new ArgumentException("message id must be a BsonObjectId", "message");
355370

356-
var newMessage = new UpdateDocument
371+
var toSet = new BsonDocument
357372
{
358373
{"payload", payload},
359374
{"running", false},
360375
{"resetTimestamp", DateTime.MaxValue},
361376
{"earliestGet", earliestGet},
362377
{"priority", priority},
363-
{"created", DateTime.UtcNow},
364378
};
379+
if (newTimestamp)
380+
toSet["created"] = DateTime.UtcNow;
365381

366382
//using upsert because if no documents found then the doc was removed (SHOULD ONLY HAPPEN BY SOMEONE MANUALLY) so we can just send
367-
collection.Update(new QueryDocument("_id", messageId), newMessage, UpdateFlags.Upsert);
383+
collection.Update(new QueryDocument("_id", messageId), new UpdateDocument("$set", toSet), UpdateFlags.Upsert);
368384
}
369385
#endregion
370386

371387
#region Requeue
372388
/// <summary>
373-
/// Requeue message with earliestGet as Now and 0.0 priority. Same as AckSend() with the same message.
389+
/// Requeue message with earliestGet as Now, 0.0 priority and new timestamp. Same as AckSend() with the same message.
374390
/// </summary>
375391
/// <param name="message">message</param>
376392
/// <exception cref="ArgumentNullException">message is null</exception>
@@ -381,7 +397,7 @@ public void Requeue(BsonDocument message)
381397
}
382398

383399
/// <summary>
384-
/// Requeue message with 0.0 priority. Same as AckSend() with the same message.
400+
/// Requeue message with 0.0 priority and new timestamp. Same as AckSend() with the same message.
385401
/// </summary>
386402
/// <param name="message">message</param>
387403
/// <param name="earliestGet">earliest instant that a call to Get() can return message</param>
@@ -392,22 +408,35 @@ public void Requeue(BsonDocument message, DateTime earliestGet)
392408
Requeue(message, earliestGet, 0.0);
393409
}
394410

411+
/// <summary>
412+
/// Requeue message with new timestamp. Same as AckSend() with the same message.
413+
/// </summary>
414+
/// <param name="message">message</param>
415+
/// <param name="earliestGet">earliest instant that a call to Get() can return message</param>
416+
/// <exception cref="ArgumentNullException">message is null</exception>
417+
/// <exception cref="ArgumentException">message id must be a BsonObjectId</exception>
418+
public void Requeue(BsonDocument message, DateTime earliestGet, double priority)
419+
{
420+
Requeue(message, earliestGet, priority, true);
421+
}
422+
395423
/// <summary>
396424
/// Requeue message. Same as AckSend() with the same message.
397425
/// </summary>
398426
/// <param name="message">message</param>
399427
/// <param name="earliestGet">earliest instant that a call to Get() can return message</param>
400428
/// <param name="priority">priority for order out of Get(). 0 is higher priority than 1</param>
429+
/// <param name="newTimestamp">true to give the payload a new timestamp or false to use given message timestamp</param>
401430
/// <exception cref="ArgumentNullException">message is null</exception>
402431
/// <exception cref="ArgumentException">message id must be a BsonObjectId</exception>
403432
/// <exception cref="ArgumentException">priority was NaN</exception>
404-
public void Requeue(BsonDocument message, DateTime earliestGet, double priority)
433+
public void Requeue(BsonDocument message, DateTime earliestGet, double priority, bool newTimestamp)
405434
{
406435
if (message == null) throw new ArgumentNullException("message");
407436

408437
var forRequeue = new BsonDocument(message);
409438
forRequeue.Remove("id");
410-
AckSend(message, forRequeue, earliestGet, priority);
439+
AckSend(message, forRequeue, earliestGet, priority, newTimestamp);
411440
}
412441
#endregion
413442

0 commit comments

Comments
 (0)