Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Schedule the check for timed out queue messages once for the region #19585

Merged
merged 5 commits into from
Dec 13, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions app/models/miq_queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,14 @@ def check_for_timeout(log_prefix = "MIQ(MiqQueue.check_for_timeout)", grace = 10
end
end

def self.candidates_for_timeout
where(:state => STATE_DEQUEUE).where("(select date_part('epoch', updated_on) + msg_timeout) < ?", Time.now.to_i)
end
carbonin marked this conversation as resolved.
Show resolved Hide resolved

def self.check_for_timeout
candidates_for_timeout.each(&:check_for_timeout)
end

def finished?
FINISHED_STATES.include?(state)
end
Expand Down
4 changes: 4 additions & 0 deletions app/models/miq_schedule_worker/jobs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,10 @@ def database_maintenance_vacuum_timer
end
end

def queue_miq_queue_check_for_timeout
queue_work(:class_name => "MiqQueue", :method_name => "check_for_timeout", :zone => nil)
carbonin marked this conversation as resolved.
Show resolved Hide resolved
end

def check_for_stuck_dispatch(threshold_seconds)
class_n = "JobProxyDispatcher"
method_n = "dispatch"
Expand Down
6 changes: 5 additions & 1 deletion app/models/miq_schedule_worker/runner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ def schedules_for_all_roles
end

def schedules_for_scheduler_role
# These schedules need to run only once in a zone per interval, so let the single scheduler role handle them
# These schedules need to run only once in a region per interval, so let the single scheduler role handle them
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return unless schedule_enabled?(:scheduler)
scheduler = scheduler_for(:scheduler)
# Schedule - Check for timed out jobs
Expand Down Expand Up @@ -201,6 +201,10 @@ def schedules_for_scheduler_role
enqueue(:vim_performance_states_purge_timer)
end

scheduler.schedule_every(worker_settings[:queue_timeout_interval]) do
enqueue(:queue_miq_queue_check_for_timeout)
end

# Schedule every 24 hours
at = worker_settings[:storage_file_collection_time_utc]
time_at = if Time.zone.today.to_time(:utc) + at.seconds < Time.now.utc
Expand Down
37 changes: 15 additions & 22 deletions app/models/miq_server/worker_management/monitor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,24 +22,19 @@ def monitor_workers

MiqWorker.status_update_all

processed_worker_ids = []
processed_worker_ids += check_not_responding
processed_worker_ids += check_pending_stop
processed_worker_ids += clean_worker_records
check_not_responding
check_pending_stop
clean_worker_records

# Monitor all remaining current worker records
processed_worker_ids += miq_workers.where(:status => MiqWorker::STATUSES_CURRENT_OR_STARTING).each do |worker|
# Check their queue messages for timeout
worker.validate_active_messages
miq_workers.where(:status => MiqWorker::STATUSES_CURRENT_OR_STARTING).each do |worker|
Copy link
Member

@jrafanie jrafanie Dec 12, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@carbonin FYI, this commit: e6cbfa8 fixed the "can't modify frozen Hash" error. If I change the above line to remove the .where, I can recreate that error for a simulated "exceeding memory worker" because it attempts to update a deleted worker from the cached association. The .where makes it run the query each time through the method.

Copy link
Member Author

@carbonin carbonin Dec 12, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, interesting. We may still want to make that explicit at some point though ... I wonder if we should do a miq_workers.all.select here and a miq_workers.reload at the start of the method? I guess we're really splitting hairs at that point though ...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's ok here as I'd expect a query like this to not be cached, so, there's no need to reload anything. Relying on an association previously made it very unclear whether we had any expectations on caching. I believe your change is much more explicit. Maybe it's just me though?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My concern is more clearly expressed in https://github.com/ManageIQ/manageiq/pull/19638/files#r357318923

I'm worried that someone removing the where for whatever reason wouldn't realize that we were relying on this line reloading the association for reasons other than this particular query.

# Push the heartbeat into the database
persist_last_heartbeat(worker)
# Check the worker record for heartbeat timeouts
next unless validate_worker(worker)
# Tell the valid workers to sync config if needed
worker_set_message(worker, "sync_config") if resync_needed
end.map(&:id)

validate_active_messages(processed_worker_ids)
end

do_system_limit_exceeded if self.kill_workers_due_to_resources_exhausted?
end
Expand Down Expand Up @@ -72,42 +67,40 @@ def sync_workers
end

def clean_worker_records
processed_workers = []
worker_deleted = false
miq_workers.each do |w|
next unless w.is_stopped?
_log.info("SQL Record for #{w.format_full_log_msg}, Status: [#{w.status}] is being deleted")
processed_workers << w
worker_delete(w.pid)
w.destroy
worker_deleted = true
end
miq_workers.delete(*processed_workers) unless processed_workers.empty?
processed_workers.collect(&:id)

miq_workers.reload if worker_deleted
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 I like this better than modifying the in-memory association

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want this merged to test if my other change in this area is needed on master... 😉

end

def check_pending_stop
processed_worker_ids = []
miq_workers.each do |w|
next unless w.is_stopped?
next unless worker_get_monitor_status(w.pid) == :waiting_for_stop
worker_set_monitor_status(w.pid, nil)
processed_worker_ids << w.id
end
processed_worker_ids
end

def check_not_responding
return [] if MiqEnvironment::Command.is_podified?
return if MiqEnvironment::Command.is_podified?

processed_workers = []
worker_deleted = false
miq_workers.each do |w|
next unless monitor_reason_not_responding?(w)
next unless worker_get_monitor_status(w.pid) == :waiting_for_stop
processed_workers << w
worker_not_responding(w)
worker_delete(w.pid)
w.destroy
worker_deleted = true
end
miq_workers.delete(*processed_workers) unless processed_workers.empty?
processed_workers.collect(&:id)

miq_workers.reload if worker_deleted
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💯 Would recommend

end

def monitor_reason_not_responding?(w)
Expand Down
16 changes: 0 additions & 16 deletions app/models/miq_server/worker_management/monitor/validation.rb
Original file line number Diff line number Diff line change
Expand Up @@ -52,22 +52,6 @@ def exceeded_memory_threshold?(w)
false
end

def validate_active_messages(processed_worker_ids = [])
actives = MiqQueue.where(:state => 'dequeue').includes(:handler)
actives.each do |msg|
next if processed_worker_ids.include?(msg.handler_id)

# Exclude messages on starting/started servers
handler = msg.handler
handler_server = handler.respond_to?(:miq_server) ? handler.miq_server : handler
if handler_server.kind_of?(MiqServer) && handler_server != self
next if [MiqServer::STATUS_STARTED, MiqServer::STATUS_STARTING].include?(handler_server.status)
end

msg.check_for_timeout(_log.prefix)
end
end

private

def usage_exceeds_threshold?(usage, threshold)
Expand Down
1 change: 1 addition & 0 deletions config/settings.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1238,6 +1238,7 @@
:performance_rollup_purging_start_delay: 5.minutes
:policy_events_purge_interval: 1.day
:poll: 15.seconds
:queue_timeout_interval: 15.seconds
:report_result_purge_interval: 1.week
:server_log_stats_interval: 5.minutes
:server_stats_interval: 60.seconds
Expand Down
31 changes: 31 additions & 0 deletions spec/models/miq_queue_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,37 @@
end
end

describe ".check_for_timeout" do
it "will destroy all timed out dequeued messages" do
handler = FactoryBot.create(:miq_ems_refresh_worker)
msg1 = FactoryBot.create(:miq_queue, :state => MiqQueue::STATE_DEQUEUE, :handler => handler, :msg_timeout => 1.minute)
msg2 = FactoryBot.create(:miq_queue, :state => MiqQueue::STATE_DEQUEUE, :handler => handler, :msg_timeout => 2.minutes)
msg3 = FactoryBot.create(:miq_queue, :state => MiqQueue::STATE_DEQUEUE, :handler => handler, :msg_timeout => 10.minutes)

Timecop.travel(5.minutes) do
described_class.check_for_timeout
expect(described_class.find_by(:id => msg1.id)).to be_nil
expect(described_class.find_by(:id => msg2.id)).to be_nil
expect(described_class.find_by(:id => msg3.id)).to_not be_nil
end
end
end

describe ".candidates_for_timeout" do
it "returns only messages in dequeue state which are outside their timeout" do
FactoryBot.create(:miq_queue, :state => MiqQueue::STATE_READY, :msg_timeout => 1.minute) # not in dequeue
FactoryBot.create(:miq_queue, :state => MiqQueue::STATE_DEQUEUE, :msg_timeout => 10.minutes) # not timed out

expected_ids = []
expected_ids << FactoryBot.create(:miq_queue, :state => MiqQueue::STATE_DEQUEUE, :msg_timeout => 1.minute).id
expected_ids << FactoryBot.create(:miq_queue, :state => MiqQueue::STATE_DEQUEUE, :msg_timeout => 2.minutes).id

Timecop.travel(5.minutes) do
expect(described_class.candidates_for_timeout.pluck(:id)).to match_array(expected_ids)
end
end
end

it "should validate formatting of message for logging" do
# Add various key/value combos as needs arise...
message_parms = [{
Expand Down
61 changes: 2 additions & 59 deletions spec/models/miq_server/worker_monitor_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@
@worker.update(:status => MiqWorker::STATUS_STOPPED)

expect(@miq_server.miq_workers.length).to eq(2)
ids = @miq_server.clean_worker_records
@miq_server.clean_worker_records
expect(@miq_server.miq_workers.length).to eq(1)
expect(ids).to eq([@worker.id])
end

it "MiqServer#check_not_responding" do
Expand All @@ -36,9 +35,8 @@
@worker.update(:status => MiqWorker::STATUS_STOPPING)

expect(@miq_server.miq_workers.length).to eq(2)
ids = @miq_server.check_not_responding
@miq_server.check_not_responding
expect(@miq_server.miq_workers.length).to eq(1)
expect(ids).to eq([@worker.id])
end

describe "#do_system_limit_exceeded" do
Expand Down Expand Up @@ -147,61 +145,6 @@
end

context "A WorkerMonitor" do
context "with active messages without worker" do
before do
@actives = []
@actives << FactoryBot.create(:miq_queue, :state => 'dequeue', :msg_timeout => 4.minutes)
@actives << FactoryBot.create(:miq_queue, :state => 'dequeue', :msg_timeout => 5.minutes)
end

it "should timeout the right active messages" do
actives = MiqQueue.where(:state => 'dequeue')
expect(actives.length).to eq(@actives.length)

Timecop.travel 5.minutes do
@miq_server.validate_active_messages
end

actives = MiqQueue.where(:state => 'dequeue')
expect(actives.length).to eq(@actives.length - 1)
end
end

context "with expired active messages assigned to workers from multiple" do
before do
@miq_server2 = FactoryBot.create(:miq_server, :zone => @miq_server.zone)
@worker1 = FactoryBot.create(:miq_worker, :miq_server_id => @miq_server.id)
@worker2 = FactoryBot.create(:miq_worker, :miq_server_id => @miq_server2.id)

@actives = []
end

it "should timeout messages on my server or servers that are down" do
@actives << FactoryBot.create(:miq_queue, :state => 'dequeue', :msg_timeout => 4.minutes, :handler => @worker1)
@actives << FactoryBot.create(:miq_queue, :state => 'dequeue', :msg_timeout => 4.minutes, :handler => @worker2)

actives = MiqQueue.where(:state => 'dequeue')
expect(actives.length).to eq(@actives.length)

Timecop.travel 5.minutes do
@miq_server.validate_active_messages
end

actives = MiqQueue.where(:state => 'dequeue')
expect(actives.length).to eq(@actives.length - 1)
expect(actives.first.handler).to eq(@worker2)

@miq_server2.update_attribute(:status, 'stopped')

Timecop.travel 5.minutes do
@miq_server.validate_active_messages
end

actives = MiqQueue.where(:state => 'dequeue')
expect(actives.length).to eq(0)
end
end

context "with vanilla generic worker" do
before do
@worker1 = FactoryBot.create(:miq_worker, :miq_server_id => @miq_server.id, :pid => 42, :type => 'MiqGenericWorker')
Expand Down
52 changes: 0 additions & 52 deletions spec/models/miq_server_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -318,58 +318,6 @@
expect(@miq_server.quiesce_workers_loop_timeout?).not_to be_truthy
end

context "with an active messsage and a second server" do
before do
@msg = FactoryBot.create(:miq_queue, :state => 'dequeue')
@miq_server2 = FactoryBot.create(:miq_server, :is_master => true, :zone => @zone)
end

it "will validate the 'started' first server's active message when called on it" do
@msg.handler = @miq_server.reload
@msg.save
expect_any_instance_of(MiqQueue).to receive(:check_for_timeout)
@miq_server.validate_active_messages
end

it "will validate the 'not responding' first server's active message when called on it" do
@miq_server.update_attribute(:status, 'not responding')
@msg.handler = @miq_server.reload
@msg.save
expect_any_instance_of(MiqQueue).to receive(:check_for_timeout)
@miq_server.validate_active_messages
end

it "will validate the 'not resonding' second server's active message when called on first server" do
@miq_server2.update_attribute(:status, 'not responding')
@msg.handler = @miq_server2
@msg.save
expect_any_instance_of(MiqQueue).to receive(:check_for_timeout)
@miq_server.validate_active_messages
end

it "will NOT validate the 'started' second server's active message when called on first server" do
@miq_server2.update_attribute(:status, 'started')
@msg.handler = @miq_server2
@msg.save
expect_any_instance_of(MiqQueue).to receive(:check_for_timeout).never
@miq_server.validate_active_messages
end

it "will validate a worker's active message when called on the worker's server" do
@msg.handler = @worker
@msg.save
expect_any_instance_of(MiqQueue).to receive(:check_for_timeout)
@miq_server.validate_active_messages
end

it "will not validate a worker's active message when called on the worker's server if already processed" do
@msg.handler = @worker
@msg.save
expect_any_instance_of(MiqQueue).to receive(:check_for_timeout).never
@miq_server.validate_active_messages([@worker.id])
end
end

context "#server_timezone" do
it "utc with no system default" do
stub_settings(:server => {:timezone => nil})
Expand Down