Skip to content

Commit

Permalink
Remove queue message validation from the server worker monitor
Browse files Browse the repository at this point in the history
It's a bit overkill to be checking for timed out queue messages
every time we monitor workers (by default every 15 seconds).

This has been moved to a schedule which means that we can also
remove all of the "procesed worker" tracking from the monitor_workers
method
  • Loading branch information
carbonin committed Dec 6, 2019
1 parent b7e52c3 commit d3e9a78
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 148 deletions.
28 changes: 7 additions & 21 deletions app/models/miq_server/worker_management/monitor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,24 +22,19 @@ def monitor_workers

MiqWorker.status_update_all

processed_worker_ids = []
processed_worker_ids += check_not_responding
processed_worker_ids += check_pending_stop
processed_worker_ids += clean_worker_records
check_not_responding
check_pending_stop
clean_worker_records

# Monitor all remaining current worker records
processed_worker_ids += miq_workers.where(:status => MiqWorker::STATUSES_CURRENT_OR_STARTING).each do |worker|
# Check their queue messages for timeout
worker.validate_active_messages
miq_workers.where(:status => MiqWorker::STATUSES_CURRENT_OR_STARTING).each do |worker|
# Push the heartbeat into the database
persist_last_heartbeat(worker)
# Check the worker record for heartbeat timeouts
next unless validate_worker(worker)
# Tell the valid workers to sync config if needed
worker_set_message(worker, "sync_config") if resync_needed
end.map(&:id)

validate_active_messages(processed_worker_ids)
end

do_system_limit_exceeded if self.kill_workers_due_to_resources_exhausted?
end
Expand Down Expand Up @@ -72,42 +67,33 @@ def sync_workers
end

def clean_worker_records
processed_workers = []
miq_workers.each do |w|
next unless w.is_stopped?
_log.info("SQL Record for #{w.format_full_log_msg}, Status: [#{w.status}] is being deleted")
processed_workers << w
worker_delete(w.pid)
w.destroy
miq_workers.delete(w)
end
miq_workers.delete(*processed_workers) unless processed_workers.empty?
processed_workers.collect(&:id)
end

def check_pending_stop
processed_worker_ids = []
miq_workers.each do |w|
next unless w.is_stopped?
next unless worker_get_monitor_status(w.pid) == :waiting_for_stop
worker_set_monitor_status(w.pid, nil)
processed_worker_ids << w.id
end
processed_worker_ids
end

def check_not_responding
return [] if MiqEnvironment::Command.is_podified?

processed_workers = []
miq_workers.each do |w|
next unless monitor_reason_not_responding?(w)
next unless worker_get_monitor_status(w.pid) == :waiting_for_stop
processed_workers << w
worker_not_responding(w)
worker_delete(w.pid)
miq_workers.delete(w)
end
miq_workers.delete(*processed_workers) unless processed_workers.empty?
processed_workers.collect(&:id)
end

def monitor_reason_not_responding?(w)
Expand Down
16 changes: 0 additions & 16 deletions app/models/miq_server/worker_management/monitor/validation.rb
Original file line number Diff line number Diff line change
Expand Up @@ -52,22 +52,6 @@ def exceeded_memory_threshold?(w)
false
end

def validate_active_messages(processed_worker_ids = [])
actives = MiqQueue.where(:state => 'dequeue').includes(:handler)
actives.each do |msg|
next if processed_worker_ids.include?(msg.handler_id)

# Exclude messages on starting/started servers
handler = msg.handler
handler_server = handler.respond_to?(:miq_server) ? handler.miq_server : handler
if handler_server.kind_of?(MiqServer) && handler_server != self
next if [MiqServer::STATUS_STARTED, MiqServer::STATUS_STARTING].include?(handler_server.status)
end

msg.check_for_timeout(_log.prefix)
end
end

private

def usage_exceeds_threshold?(usage, threshold)
Expand Down
61 changes: 2 additions & 59 deletions spec/models/miq_server/worker_monitor_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@
@worker.update(:status => MiqWorker::STATUS_STOPPED)

expect(@miq_server.miq_workers.length).to eq(2)
ids = @miq_server.clean_worker_records
@miq_server.clean_worker_records
expect(@miq_server.miq_workers.length).to eq(1)
expect(ids).to eq([@worker.id])
end

it "MiqServer#check_not_responding" do
Expand All @@ -36,9 +35,8 @@
@worker.update(:status => MiqWorker::STATUS_STOPPING)

expect(@miq_server.miq_workers.length).to eq(2)
ids = @miq_server.check_not_responding
@miq_server.check_not_responding
expect(@miq_server.miq_workers.length).to eq(1)
expect(ids).to eq([@worker.id])
end

describe "#do_system_limit_exceeded" do
Expand Down Expand Up @@ -147,61 +145,6 @@
end

context "A WorkerMonitor" do
context "with active messages without worker" do
before do
@actives = []
@actives << FactoryBot.create(:miq_queue, :state => 'dequeue', :msg_timeout => 4.minutes)
@actives << FactoryBot.create(:miq_queue, :state => 'dequeue', :msg_timeout => 5.minutes)
end

it "should timeout the right active messages" do
actives = MiqQueue.where(:state => 'dequeue')
expect(actives.length).to eq(@actives.length)

Timecop.travel 5.minutes do
@miq_server.validate_active_messages
end

actives = MiqQueue.where(:state => 'dequeue')
expect(actives.length).to eq(@actives.length - 1)
end
end

context "with expired active messages assigned to workers from multiple" do
before do
@miq_server2 = FactoryBot.create(:miq_server, :zone => @miq_server.zone)
@worker1 = FactoryBot.create(:miq_worker, :miq_server_id => @miq_server.id)
@worker2 = FactoryBot.create(:miq_worker, :miq_server_id => @miq_server2.id)

@actives = []
end

it "should timeout messages on my server or servers that are down" do
@actives << FactoryBot.create(:miq_queue, :state => 'dequeue', :msg_timeout => 4.minutes, :handler => @worker1)
@actives << FactoryBot.create(:miq_queue, :state => 'dequeue', :msg_timeout => 4.minutes, :handler => @worker2)

actives = MiqQueue.where(:state => 'dequeue')
expect(actives.length).to eq(@actives.length)

Timecop.travel 5.minutes do
@miq_server.validate_active_messages
end

actives = MiqQueue.where(:state => 'dequeue')
expect(actives.length).to eq(@actives.length - 1)
expect(actives.first.handler).to eq(@worker2)

@miq_server2.update_attribute(:status, 'stopped')

Timecop.travel 5.minutes do
@miq_server.validate_active_messages
end

actives = MiqQueue.where(:state => 'dequeue')
expect(actives.length).to eq(0)
end
end

context "with vanilla generic worker" do
before do
@worker1 = FactoryBot.create(:miq_worker, :miq_server_id => @miq_server.id, :pid => 42, :type => 'MiqGenericWorker')
Expand Down
52 changes: 0 additions & 52 deletions spec/models/miq_server_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -318,58 +318,6 @@
expect(@miq_server.quiesce_workers_loop_timeout?).not_to be_truthy
end

context "with an active messsage and a second server" do
before do
@msg = FactoryBot.create(:miq_queue, :state => 'dequeue')
@miq_server2 = FactoryBot.create(:miq_server, :is_master => true, :zone => @zone)
end

it "will validate the 'started' first server's active message when called on it" do
@msg.handler = @miq_server.reload
@msg.save
expect_any_instance_of(MiqQueue).to receive(:check_for_timeout)
@miq_server.validate_active_messages
end

it "will validate the 'not responding' first server's active message when called on it" do
@miq_server.update_attribute(:status, 'not responding')
@msg.handler = @miq_server.reload
@msg.save
expect_any_instance_of(MiqQueue).to receive(:check_for_timeout)
@miq_server.validate_active_messages
end

it "will validate the 'not resonding' second server's active message when called on first server" do
@miq_server2.update_attribute(:status, 'not responding')
@msg.handler = @miq_server2
@msg.save
expect_any_instance_of(MiqQueue).to receive(:check_for_timeout)
@miq_server.validate_active_messages
end

it "will NOT validate the 'started' second server's active message when called on first server" do
@miq_server2.update_attribute(:status, 'started')
@msg.handler = @miq_server2
@msg.save
expect_any_instance_of(MiqQueue).to receive(:check_for_timeout).never
@miq_server.validate_active_messages
end

it "will validate a worker's active message when called on the worker's server" do
@msg.handler = @worker
@msg.save
expect_any_instance_of(MiqQueue).to receive(:check_for_timeout)
@miq_server.validate_active_messages
end

it "will not validate a worker's active message when called on the worker's server if already processed" do
@msg.handler = @worker
@msg.save
expect_any_instance_of(MiqQueue).to receive(:check_for_timeout).never
@miq_server.validate_active_messages([@worker.id])
end
end

context "#server_timezone" do
it "utc with no system default" do
stub_settings(:server => {:timezone => nil})
Expand Down

0 comments on commit d3e9a78

Please sign in to comment.