From 42faa0a5a952fc4ae03d9c1f2ee36aa664aeb255 Mon Sep 17 00:00:00 2001 From: Cameron Chambers Date: Thu, 25 Jan 2024 10:14:17 -0500 Subject: [PATCH] Add failed write message dispatch --- CHANGES.md | 2 ++ lib/GRNOC/TSDS/Writer/Worker.pm | 30 ++++++++++++++++++++---------- 2 files changed, 22 insertions(+), 10 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index b9766f1..1b82310 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -4,6 +4,8 @@ * Adds in local changes made to code as 'hot fixes' that were never added to the Git repo. * Adds in support for OL8 packaging +* Allows writers to move failed writers to separate queue to keep pipeline clear and allow for +examination later. ## GRNOC TSDS Services 1.6.9 -- Wed Mar 16 2022 diff --git a/lib/GRNOC/TSDS/Writer/Worker.pm b/lib/GRNOC/TSDS/Writer/Worker.pm index 9d7c299..7553c4e 100644 --- a/lib/GRNOC/TSDS/Writer/Worker.pm +++ b/lib/GRNOC/TSDS/Writer/Worker.pm @@ -32,6 +32,8 @@ use constant MEASUREMENT_CACHE_EXPIRATION => 60 * 60; use constant QUEUE_PREFETCH_COUNT => 5; use constant QUEUE_FETCH_TIMEOUT => 10 * 1000; use constant RECONNECT_TIMEOUT => 10; +use constant PENDING_QUEUE_CHANNEL => 1; +use constant FAILED_QUEUE_CHANNEL => 2; ### required attributes ### @@ -215,7 +217,7 @@ sub _consume_loop { try { # reject the message and do NOT requeue it since its malformed JSON - $self->rabbit->reject( 1, $rabbit_message->{'delivery_tag'}, 0 ); + $self->rabbit->reject( PENDING_QUEUE_CHANNEL, $rabbit_message->{'delivery_tag'}, 0 ); } catch { @@ -238,7 +240,7 @@ sub _consume_loop { try { # reject the message and do NOT requeue since its not properly formed - $self->rabbit->reject( 1, $rabbit_message->{'delivery_tag'}, 0 ); + $self->rabbit->reject( PENDING_QUEUE_CHANNEL, $rabbit_message->{'delivery_tag'}, 0 ); } catch { @@ -270,8 +272,9 @@ sub _consume_loop { $self->logger->debug( "Rejecting rabbit message, requeueing." ); try { - - $self->rabbit->reject( 1, $rabbit_message->{'delivery_tag'}, 1 ); + # push message to failed queue and ack the original message + $self->rabbit->publish( FAILED_QUEUE_CHANNEL, $self->queue . "_failed", $self->json->encode( \@$messages ), {'exchange' => ''} ); + $self->rabbit->ack( PENDING_QUEUE_CHANNEL, $rabbit_message->{'delivery_tag'} ); } catch { @@ -290,7 +293,7 @@ sub _consume_loop { try { - $self->rabbit->ack( 1, $rabbit_message->{'delivery_tag'} ); + $self->rabbit->ack( PENDING_QUEUE_CHANNEL, $rabbit_message->{'delivery_tag'} ); } catch { @@ -1630,14 +1633,21 @@ sub _rabbit_connect { my $connected = 0; try { - my $rabbit = Net::AMQP::RabbitMQ->new(); $rabbit->connect( $rabbit_host, {'port' => $rabbit_port} ); - $rabbit->channel_open( 1 ); - $rabbit->queue_declare( 1, $rabbit_queue, {'auto_delete' => 0} ); - $rabbit->basic_qos( 1, { prefetch_count => QUEUE_PREFETCH_COUNT } ); - $rabbit->consume( 1, $rabbit_queue, {'no_ack' => 0} ); + + # open channel & declare queue for pending writes + $rabbit->channel_open( PENDING_QUEUE_CHANNEL ); + $rabbit->queue_declare( PENDING_QUEUE_CHANNEL, $rabbit_queue, {'auto_delete' => 0} ); + $rabbit->basic_qos( PENDING_QUEUE_CHANNEL, { prefetch_count => QUEUE_PREFETCH_COUNT } ); + + # open channel & declare queue for failed writes + $rabbit->channel_open( FAILED_QUEUE_CHANNEL ); + $rabbit->queue_declare( FAILED_QUEUE_CHANNEL, $self->queue . "_failed", {'auto_delete' => 0} ); + + # start consuming messages + $rabbit->consume( PENDING_QUEUE_CHANNEL, $rabbit_queue, {'no_ack' => 0} ); $self->_set_rabbit( $rabbit );