diff --git a/app/models/miq_queue_worker_base/runner.rb b/app/models/miq_queue_worker_base/runner.rb index b9f251e1fb9..f4a1d73b67b 100644 --- a/app/models/miq_queue_worker_base/runner.rb +++ b/app/models/miq_queue_worker_base/runner.rb @@ -35,7 +35,7 @@ def thresholds_exceeded? def get_message_via_drb loop do begin - msg_id, lock_version = @worker_monitor_drb.get_queue_message(@worker.pid) + msg_id, lock_version = worker_monitor_drb.get_queue_message(@worker.pid) rescue DRb::DRbError => err do_exit("Error communicating with WorkerMonitor because <#{err.message}>", 1) end @@ -64,6 +64,7 @@ def get_message_via_drb _log.debug("#{log_prefix} #{MiqQueue.format_short_log_msg(msg)} stale, retrying...") next rescue => err + msg.update_column(:state, MiqQueue::STATUS_ERROR) raise _("%{log} \"%{error}\" attempting to get next message") % {:log => log_prefix, :error => err} end end diff --git a/spec/factories/miq_worker.rb b/spec/factories/miq_worker.rb index 7f2f9651654..698a62e6cde 100644 --- a/spec/factories/miq_worker.rb +++ b/spec/factories/miq_worker.rb @@ -4,6 +4,7 @@ status "ready" end + factory :miq_generic_worker, :class => "MiqGenericWorker", :parent => :miq_worker factory :miq_ui_worker, :class => "MiqUiWorker", :parent => :miq_worker factory :miq_websocket_worker, :class => "MiqWebsocketWorker", :parent => :miq_worker diff --git a/spec/models/miq_queue_worker_base/runner_spec.rb b/spec/models/miq_queue_worker_base/runner_spec.rb new file mode 100644 index 00000000000..63193f6a3ab --- /dev/null +++ b/spec/models/miq_queue_worker_base/runner_spec.rb @@ -0,0 +1,24 @@ +describe MiqQueueWorkerBase::Runner do + context "#get_message_via_drb" do + let(:server) { EvmSpecHelper.local_miq_server } + let(:worker) { FactoryGirl.create(:miq_generic_worker, :miq_server => server, :pid => 123) } + let(:runner) do + allow_any_instance_of(MiqQueueWorkerBase::Runner).to receive(:sync_active_roles) + allow_any_instance_of(MiqQueueWorkerBase::Runner).to receive(:sync_config) + allow_any_instance_of(MiqQueueWorkerBase::Runner).to receive(:set_connection_pool_size) + described_class.new(:guid => worker.guid) + end + + it "sets message to 'error' and raises for unhandled exceptions" do + # simulate what may happen if invalid yaml is deserialized + allow_any_instance_of(MiqQueue).to receive(:args).and_raise(ArgumentError) + q1 = FactoryGirl.create(:miq_queue) + allow(runner) + .to receive(:worker_monitor_drb) + .and_return(double(:get_queue_message => [q1.id, q1.lock_version])) + + expect { runner.get_message_via_drb }.to raise_error(StandardError) + expect(q1.reload.state).to eql(MiqQueue::STATUS_ERROR) + end + end +end