diff --git a/app/models/miq_server/worker_management/monitor.rb b/app/models/miq_server/worker_management/monitor.rb index 7e3ada5ba1a4..6fd1ce13b5c5 100644 --- a/app/models/miq_server/worker_management/monitor.rb +++ b/app/models/miq_server/worker_management/monitor.rb @@ -22,24 +22,19 @@ def monitor_workers MiqWorker.status_update_all - processed_worker_ids = [] - processed_worker_ids += check_not_responding - processed_worker_ids += check_pending_stop - processed_worker_ids += clean_worker_records + check_not_responding + check_pending_stop + clean_worker_records # Monitor all remaining current worker records - processed_worker_ids += miq_workers.where(:status => MiqWorker::STATUSES_CURRENT_OR_STARTING).each do |worker| - # Check their queue messages for timeout - worker.validate_active_messages + miq_workers.where(:status => MiqWorker::STATUSES_CURRENT_OR_STARTING).each do |worker| # Push the heartbeat into the database persist_last_heartbeat(worker) # Check the worker record for heartbeat timeouts next unless validate_worker(worker) # Tell the valid workers to sync config if needed worker_set_message(worker, "sync_config") if resync_needed - end.map(&:id) - - validate_active_messages(processed_worker_ids) + end do_system_limit_exceeded if self.kill_workers_due_to_resources_exhausted? end @@ -72,42 +67,33 @@ def sync_workers end def clean_worker_records - processed_workers = [] miq_workers.each do |w| next unless w.is_stopped? _log.info("SQL Record for #{w.format_full_log_msg}, Status: [#{w.status}] is being deleted") - processed_workers << w worker_delete(w.pid) w.destroy + miq_workers.delete(w) end - miq_workers.delete(*processed_workers) unless processed_workers.empty? - processed_workers.collect(&:id) end def check_pending_stop - processed_worker_ids = [] miq_workers.each do |w| next unless w.is_stopped? next unless worker_get_monitor_status(w.pid) == :waiting_for_stop worker_set_monitor_status(w.pid, nil) - processed_worker_ids << w.id end - processed_worker_ids end def check_not_responding return [] if MiqEnvironment::Command.is_podified? - processed_workers = [] miq_workers.each do |w| next unless monitor_reason_not_responding?(w) next unless worker_get_monitor_status(w.pid) == :waiting_for_stop - processed_workers << w worker_not_responding(w) worker_delete(w.pid) + miq_workers.delete(w) end - miq_workers.delete(*processed_workers) unless processed_workers.empty? - processed_workers.collect(&:id) end def monitor_reason_not_responding?(w) diff --git a/app/models/miq_server/worker_management/monitor/validation.rb b/app/models/miq_server/worker_management/monitor/validation.rb index 87f114bdf1a9..cb74345ed7c6 100644 --- a/app/models/miq_server/worker_management/monitor/validation.rb +++ b/app/models/miq_server/worker_management/monitor/validation.rb @@ -52,22 +52,6 @@ def exceeded_memory_threshold?(w) false end - def validate_active_messages(processed_worker_ids = []) - actives = MiqQueue.where(:state => 'dequeue').includes(:handler) - actives.each do |msg| - next if processed_worker_ids.include?(msg.handler_id) - - # Exclude messages on starting/started servers - handler = msg.handler - handler_server = handler.respond_to?(:miq_server) ? handler.miq_server : handler - if handler_server.kind_of?(MiqServer) && handler_server != self - next if [MiqServer::STATUS_STARTED, MiqServer::STATUS_STARTING].include?(handler_server.status) - end - - msg.check_for_timeout(_log.prefix) - end - end - private def usage_exceeds_threshold?(usage, threshold) diff --git a/spec/models/miq_server/worker_monitor_spec.rb b/spec/models/miq_server/worker_monitor_spec.rb index ed3b91e8183e..810f86d1769a 100644 --- a/spec/models/miq_server/worker_monitor_spec.rb +++ b/spec/models/miq_server/worker_monitor_spec.rb @@ -20,9 +20,8 @@ @worker.update(:status => MiqWorker::STATUS_STOPPED) expect(@miq_server.miq_workers.length).to eq(2) - ids = @miq_server.clean_worker_records + @miq_server.clean_worker_records expect(@miq_server.miq_workers.length).to eq(1) - expect(ids).to eq([@worker.id]) end it "MiqServer#check_not_responding" do @@ -36,9 +35,8 @@ @worker.update(:status => MiqWorker::STATUS_STOPPING) expect(@miq_server.miq_workers.length).to eq(2) - ids = @miq_server.check_not_responding + @miq_server.check_not_responding expect(@miq_server.miq_workers.length).to eq(1) - expect(ids).to eq([@worker.id]) end describe "#do_system_limit_exceeded" do @@ -147,61 +145,6 @@ end context "A WorkerMonitor" do - context "with active messages without worker" do - before do - @actives = [] - @actives << FactoryBot.create(:miq_queue, :state => 'dequeue', :msg_timeout => 4.minutes) - @actives << FactoryBot.create(:miq_queue, :state => 'dequeue', :msg_timeout => 5.minutes) - end - - it "should timeout the right active messages" do - actives = MiqQueue.where(:state => 'dequeue') - expect(actives.length).to eq(@actives.length) - - Timecop.travel 5.minutes do - @miq_server.validate_active_messages - end - - actives = MiqQueue.where(:state => 'dequeue') - expect(actives.length).to eq(@actives.length - 1) - end - end - - context "with expired active messages assigned to workers from multiple" do - before do - @miq_server2 = FactoryBot.create(:miq_server, :zone => @miq_server.zone) - @worker1 = FactoryBot.create(:miq_worker, :miq_server_id => @miq_server.id) - @worker2 = FactoryBot.create(:miq_worker, :miq_server_id => @miq_server2.id) - - @actives = [] - end - - it "should timeout messages on my server or servers that are down" do - @actives << FactoryBot.create(:miq_queue, :state => 'dequeue', :msg_timeout => 4.minutes, :handler => @worker1) - @actives << FactoryBot.create(:miq_queue, :state => 'dequeue', :msg_timeout => 4.minutes, :handler => @worker2) - - actives = MiqQueue.where(:state => 'dequeue') - expect(actives.length).to eq(@actives.length) - - Timecop.travel 5.minutes do - @miq_server.validate_active_messages - end - - actives = MiqQueue.where(:state => 'dequeue') - expect(actives.length).to eq(@actives.length - 1) - expect(actives.first.handler).to eq(@worker2) - - @miq_server2.update_attribute(:status, 'stopped') - - Timecop.travel 5.minutes do - @miq_server.validate_active_messages - end - - actives = MiqQueue.where(:state => 'dequeue') - expect(actives.length).to eq(0) - end - end - context "with vanilla generic worker" do before do @worker1 = FactoryBot.create(:miq_worker, :miq_server_id => @miq_server.id, :pid => 42, :type => 'MiqGenericWorker') diff --git a/spec/models/miq_server_spec.rb b/spec/models/miq_server_spec.rb index c7ff7f1a66c8..8728eda39200 100644 --- a/spec/models/miq_server_spec.rb +++ b/spec/models/miq_server_spec.rb @@ -318,58 +318,6 @@ expect(@miq_server.quiesce_workers_loop_timeout?).not_to be_truthy end - context "with an active messsage and a second server" do - before do - @msg = FactoryBot.create(:miq_queue, :state => 'dequeue') - @miq_server2 = FactoryBot.create(:miq_server, :is_master => true, :zone => @zone) - end - - it "will validate the 'started' first server's active message when called on it" do - @msg.handler = @miq_server.reload - @msg.save - expect_any_instance_of(MiqQueue).to receive(:check_for_timeout) - @miq_server.validate_active_messages - end - - it "will validate the 'not responding' first server's active message when called on it" do - @miq_server.update_attribute(:status, 'not responding') - @msg.handler = @miq_server.reload - @msg.save - expect_any_instance_of(MiqQueue).to receive(:check_for_timeout) - @miq_server.validate_active_messages - end - - it "will validate the 'not resonding' second server's active message when called on first server" do - @miq_server2.update_attribute(:status, 'not responding') - @msg.handler = @miq_server2 - @msg.save - expect_any_instance_of(MiqQueue).to receive(:check_for_timeout) - @miq_server.validate_active_messages - end - - it "will NOT validate the 'started' second server's active message when called on first server" do - @miq_server2.update_attribute(:status, 'started') - @msg.handler = @miq_server2 - @msg.save - expect_any_instance_of(MiqQueue).to receive(:check_for_timeout).never - @miq_server.validate_active_messages - end - - it "will validate a worker's active message when called on the worker's server" do - @msg.handler = @worker - @msg.save - expect_any_instance_of(MiqQueue).to receive(:check_for_timeout) - @miq_server.validate_active_messages - end - - it "will not validate a worker's active message when called on the worker's server if already processed" do - @msg.handler = @worker - @msg.save - expect_any_instance_of(MiqQueue).to receive(:check_for_timeout).never - @miq_server.validate_active_messages([@worker.id]) - end - end - context "#server_timezone" do it "utc with no system default" do stub_settings(:server => {:timezone => nil})