Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Always use file based heartbeat #19666

Merged
merged 8 commits into from
Jan 14, 2020
2 changes: 1 addition & 1 deletion app/models/miq_queue_worker_base/runner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ def do_work

# Only for file based heartbeating
def heartbeat_message_timeout(message)
if ENV["WORKER_HEARTBEAT_METHOD"] == "file" && message.msg_timeout
if message.msg_timeout
timeout = worker_settings[:poll] + message.msg_timeout
heartbeat_to_file(timeout)
end
Expand Down
1 change: 0 additions & 1 deletion app/models/miq_server/worker_management/dequeue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ def get_queue_message_for_worker(w)
end

def get_queue_message(pid)
update_worker_last_heartbeat(pid)
@workers_lock.synchronize(:SH) do
w = @workers[pid]

Expand Down
17 changes: 0 additions & 17 deletions app/models/miq_server/worker_management/heartbeat.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,6 @@ def worker_add_message(pid, item)
end unless @workers_lock.nil?
end

def update_worker_last_heartbeat(worker_pid)
@workers_lock.synchronize(:EX) do
@workers[worker_pid][:last_heartbeat] = Time.now.utc if @workers.key?(worker_pid)
end unless @workers_lock.nil?
end

def register_worker(worker_pid, worker_class, queue_name)
worker_class = worker_class.constantize if worker_class.kind_of?(String)

Expand Down Expand Up @@ -52,7 +46,6 @@ def message_for_worker(wid, message, *args)
worker_set_message(w, message, *args) unless w.nil?
end

# Get the latest heartbeat between the SQL and memory (updated via DRb)
def persist_last_heartbeat(w)
last_heartbeat = workers_last_heartbeat(w)

Expand All @@ -71,16 +64,6 @@ def clean_heartbeat_files
private

def workers_last_heartbeat(w)
ENV["WORKER_HEARTBEAT_METHOD"] == "file" ? workers_last_heartbeat_to_file(w) : workers_last_heartbeat_to_drb(w)
end

def workers_last_heartbeat_to_drb(w)
@workers_lock.synchronize(:SH) do
@workers.fetch_path(w.pid, :last_heartbeat)
end
end

def workers_last_heartbeat_to_file(w)
File.mtime(w.heartbeat_file).utc if File.exist?(w.heartbeat_file)
end
end
8 changes: 2 additions & 6 deletions app/models/miq_server/worker_management/monitor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def monitor_workers
# Clear the my_server cache so we can detect role and possibly other changes faster
self.class.my_server_clear_cache

resync_needed = sync_needed?
sync_monitor

# Sync the workers after sync'ing the child worker settings
sync_workers
Expand All @@ -32,8 +32,6 @@ def monitor_workers
persist_last_heartbeat(worker)
# Check the worker record for heartbeat timeouts
next unless validate_worker(worker)
# Tell the valid workers to sync config if needed
worker_set_message(worker, "sync_config") if resync_needed
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😮 Awesome

Copy link
Member

@jrafanie jrafanie Jan 7, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

though, how do the worker know when config is changed now?

end

do_system_limit_exceeded if self.kill_workers_due_to_resources_exhausted?
Expand Down Expand Up @@ -122,7 +120,7 @@ def do_system_limit_exceeded
end
end

def sync_needed?
def sync_monitor
@last_sync ||= Time.now.utc
sync_interval = @worker_monitor_settings[:sync_interval] || 30.minutes
sync_interval_reached = sync_interval.seconds.ago.utc > @last_sync
Expand Down Expand Up @@ -150,8 +148,6 @@ def sync_needed?

update_sync_timestamp(@last_sync)
end

resync_needed
end

def set_last_change(key, value)
Expand Down
41 changes: 19 additions & 22 deletions app/models/miq_worker/runner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -238,12 +238,6 @@ def do_exit(message = nil, exit_code = 0)
exit exit_code
end

def message_sync_config(*_args)
_log.info("#{log_prefix} Synchronizing configuration...")
sync_config
_log.info("#{log_prefix} Synchronizing configuration complete...")
end

def sync_config
# Sync roles
@active_roles = MiqServer.my_active_roles(true)
Expand Down Expand Up @@ -326,7 +320,16 @@ def heartbeat
# Heartbeats can be expensive, so do them only when needed
return if @last_hb.kind_of?(Time) && (@last_hb + worker_settings[:heartbeat_freq]) >= now

ENV["WORKER_HEARTBEAT_METHOD"] == "file" ? heartbeat_to_file : heartbeat_to_drb
heartbeat_to_file

if config_out_of_date?
_log.info("#{log_prefix} Synchronizing configuration...")
sync_config
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lol, there it is. Ignore my above question.

_log.info("#{log_prefix} Synchronizing configuration complete...")
end

process_messages_from_server unless MiqEnvironment::Command.is_podified?

@last_hb = now
do_heartbeat_work
rescue SystemExit, SignalException
Expand All @@ -335,41 +338,35 @@ def heartbeat
do_exit("Error heartbeating because #{err.class.name}: #{err.message}\n#{err.backtrace.join('\n')}", 1)
end

def heartbeat_to_drb
# Disable heartbeat check. Useful if a worker is running in isolation
# without the oversight of MiqServer::WorkerManagement
return if skip_heartbeat?

def process_messages_from_server
worker_monitor_drb.register_worker(@worker.pid, @worker.class.name, @worker.queue_name)
worker_monitor_drb.update_worker_last_heartbeat(@worker.pid)

worker_monitor_drb.worker_get_messages(@worker.pid).each do |msg, *args|
process_message(msg, *args)
end
rescue DRb::DRbError => err
do_exit("Error heartbeating to MiqServer because #{err.class.name}: #{err.message}", 1)
do_exit("Error processing messages from MiqServer because #{err.class.name}: #{err.message}", 1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this PR is about removing DRb-based heartbeating, why do we have rescue DRb::DRbError here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method in particular is not for heartbeat. This fetches messages set for the worker in the @workers hash over drb.

The vim broker is the only worker that still uses this functionality (previously it was also used for config sync and stopping workers), so this can all be removed once the vim broker is gone.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To clarify, processing messages and heartbeating are two different operations that we happen to execute at the same time.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the clarification @carbonin

end

def heartbeat_to_file(timeout = nil)
# Disable heartbeat check. Useful if a worker is running in isolation
# without the oversight of MiqServer::WorkerManagement
return if skip_heartbeat?

timeout ||= worker_settings[:heartbeat_timeout] || Workers::MiqDefaults.heartbeat_timeout
File.write(@worker.heartbeat_file, (Time.now.utc + timeout).to_s)

get_messages.each { |msg, *args| process_message(msg, *args) }
end

def get_messages
messages = []
def config_out_of_date?
@my_last_config_change ||= Time.now.utc

last_config_change = server_last_change(:last_config_change)
if last_config_change && last_config_change > @my_last_config_change
_log.info("#{log_prefix} Configuration has changed, New TS: #{last_config_change}, Old TS: #{@my_last_config_change}")
messages << ["sync_config"]

@my_last_config_change = last_config_change
return true
end

messages
false
end

def key_store
Expand Down
1 change: 1 addition & 0 deletions lib/workers/bin/run_single_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ def all_role_names
$log.info("Starting #{worker.class.name} with runner options #{runner_options}")
worker.class::Runner.new(runner_options).tap(&:setup_sigterm_trap).start
ensure
FileUtils.rm_f(worker.heartbeat_file)
$log.info("Deleting worker record for #{worker.class.name}, id #{worker.id}")
worker.delete
end
Expand Down
109 changes: 47 additions & 62 deletions spec/models/miq_server/worker_management/heartbeat_spec.rb
Original file line number Diff line number Diff line change
@@ -1,84 +1,69 @@
describe MiqServer::WorkerManagement::Heartbeat do
context "#persist_last_heartbeat" do
let(:miq_server) { EvmSpecHelper.local_miq_server.tap(&:setup_drb_variables) }
let(:pid) { 1234 }
let(:worker) { FactoryBot.create(:miq_worker, :miq_server_id => miq_server.id, :pid => pid) }

it "sets initial and subsequent heartbeats" do
2.times do
t = Time.now.utc
Timecop.freeze(t) do
miq_server.update_worker_last_heartbeat(pid)
miq_server.persist_last_heartbeat(worker)
end

expect(worker.reload.last_heartbeat).to be_within(1.second).of(t)
end
end
let(:miq_server) { EvmSpecHelper.local_miq_server }
let(:worker) { FactoryBot.create(:miq_worker, :miq_server_id => miq_server.id) }

# Iterating by 5 each time to allow enough spacing to be more than 1 second
# apart when using be_within(x).of(t)
context "when using file based heartbeating" do
let!(:first_heartbeat) { Time.now.utc }
let!(:heartbeat_file) { "/path/to/worker.hb" }

around do |example|
ENV["WORKER_HEARTBEAT_METHOD"] = "file"
ENV["WORKER_HEARTBEAT_FILE"] = heartbeat_file
example.run
ENV.delete("WORKER_HEARTBEAT_METHOD")
ENV.delete("WORKER_HEARTBEAT_FILE")
end
let!(:first_heartbeat) { Time.now.utc }
let!(:heartbeat_file) { "/path/to/worker.hb" }

context "with an existing heartbeat file" do
it "sets initial and subsequent heartbeats" do
expect(File).to receive(:exist?).with(heartbeat_file).and_return(true, true)
expect(File).to receive(:mtime).with(heartbeat_file).and_return(first_heartbeat, first_heartbeat + 5)
around do |example|
ENV["WORKER_HEARTBEAT_METHOD"] = "file"
ENV["WORKER_HEARTBEAT_FILE"] = heartbeat_file
example.run
ENV.delete("WORKER_HEARTBEAT_METHOD")
ENV.delete("WORKER_HEARTBEAT_FILE")
end

[0, 5].each do |i|
Timecop.freeze(first_heartbeat) do
miq_server.persist_last_heartbeat(worker)
end
context "with an existing heartbeat file" do
it "sets initial and subsequent heartbeats" do
expect(File).to receive(:exist?).with(heartbeat_file).and_return(true, true)
expect(File).to receive(:mtime).with(heartbeat_file).and_return(first_heartbeat, first_heartbeat + 5)

expect(worker.reload.last_heartbeat).to be_within(1.second).of(first_heartbeat + i)
[0, 5].each do |i|
Timecop.freeze(first_heartbeat) do
miq_server.persist_last_heartbeat(worker)
end

expect(worker.reload.last_heartbeat).to be_within(1.second).of(first_heartbeat + i)
end
end
end

context "with a missing heartbeat file" do
it "sets initial heartbeat only" do
expect(File).to receive(:exist?).with(heartbeat_file).and_return(false).exactly(4).times
expect(File).to receive(:mtime).with(heartbeat_file).never

# This has different results first iteration of the loop compared to
# the rest:
# 1. Sets the initial heartbeat
# 2. Doesn't update the worker's last_heartbeat value after that
#
# So the result from the database should not change after the first
# iteration of the loop
[0, 5, 10, 15].each do |i|
Timecop.freeze(first_heartbeat + i) do
miq_server.persist_last_heartbeat(worker)
end
context "with a missing heartbeat file" do
it "sets initial heartbeat only" do
expect(File).to receive(:exist?).with(heartbeat_file).and_return(false).exactly(4).times
expect(File).to receive(:mtime).with(heartbeat_file).never

expect(worker.reload.last_heartbeat).to be_within(1.second).of(first_heartbeat)
# This has different results first iteration of the loop compared to
# the rest:
# 1. Sets the initial heartbeat
# 2. Doesn't update the worker's last_heartbeat value after that
#
# So the result from the database should not change after the first
# iteration of the loop
[0, 5, 10, 15].each do |i|
Timecop.freeze(first_heartbeat + i) do
miq_server.persist_last_heartbeat(worker)
end

expect(worker.reload.last_heartbeat).to be_within(1.second).of(first_heartbeat)
end
end
end

context "with a missing heartbeat file on the first validate" do
it "sets initial heartbeat default, and updates the heartbeat from the file second" do
expect(File).to receive(:exist?).with(heartbeat_file).and_return(false, true)
expect(File).to receive(:mtime).with(heartbeat_file).and_return(first_heartbeat + 5)

[0, 5].each do |i|
Timecop.freeze(first_heartbeat) do
miq_server.persist_last_heartbeat(worker)
end
context "with a missing heartbeat file on the first validate" do
it "sets initial heartbeat default, and updates the heartbeat from the file second" do
expect(File).to receive(:exist?).with(heartbeat_file).and_return(false, true)
expect(File).to receive(:mtime).with(heartbeat_file).and_return(first_heartbeat + 5)

expect(worker.reload.last_heartbeat).to be_within(1.second).of(first_heartbeat + i)
[0, 5].each do |i|
Timecop.freeze(first_heartbeat) do
miq_server.persist_last_heartbeat(worker)
end

expect(worker.reload.last_heartbeat).to be_within(1.second).of(first_heartbeat + i)
end
end
end
Expand Down
30 changes: 10 additions & 20 deletions spec/models/miq_worker/runner_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,34 +27,24 @@
end
end

context "#get_messages" do
context "#config_out_of_date?" do
before do
allow_any_instance_of(MiqWorker::Runner).to receive(:worker_initialization)
@worker_base = MiqWorker::Runner.new
end

it "gets sync_config when last config change was recent" do
allow(@worker_base).to receive(:server_last_change).with(:last_config_change).and_return(1.minute.from_now.utc)

expect(@worker_base.get_messages).to eq([["sync_config"]])
end
end

context "#process_message" do
before do
allow(MiqServer).to receive(:my_zone).and_return("default")
@miq_server = EvmSpecHelper.local_miq_server
allow(@miq_server).to receive(:active_role).and_return("automate)")

@worker = FactoryBot.create(:miq_worker, :miq_server_id => @miq_server.id, :type => "MiqGenericWorker")
@worker_base = MiqWorker::Runner.new(:guid => @worker.guid)
it "returns true for the first call and false for subsequent calls" do
expect(@worker_base).to receive(:server_last_change).with(:last_config_change).thrice.and_return(1.minute.from_now.utc)
expect(@worker_base.config_out_of_date?).to be_truthy
expect(@worker_base.config_out_of_date?).to be_falsey
expect(@worker_base.config_out_of_date?).to be_falsey
end

it "syncs roles and configuration" do
expect(@worker_base).to receive(:after_sync_active_roles)
expect(@worker_base).to receive(:after_sync_config)
it "returns true when last config change was updated" do
expect(@worker_base).to receive(:server_last_change).with(:last_config_change).twice.and_return(1.minute.ago.utc, 1.minute.from_now.utc)

@worker_base.send(:process_message, "sync_config")
expect(@worker_base.config_out_of_date?).to be_falsey
expect(@worker_base.config_out_of_date?).to be_truthy
end
end

Expand Down