Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Move MiqWorker record creation out of run_single_worker.rb on podified #23110

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 4 additions & 10 deletions app/models/miq_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ class MiqWorker < ApplicationRecord
include SystemdCommon
include UuidMixin

after_initialize :set_system_uid
before_destroy :error_out_tasks_with_active_queue_message, :log_destroy_of_worker_messages

belongs_to :miq_server
Expand Down Expand Up @@ -310,9 +309,7 @@ def self.create_worker_record(*params)
end

def self.start_worker(*params)
w = containerized_worker? ? init_worker_object(*params) : create_worker_record(*params)
w.start
w
create_worker_record(*params).tap(&:start)
end

cache_with_timeout(:my_worker) { server_scope.find_by(:guid => my_guid) }
Expand Down Expand Up @@ -353,6 +350,7 @@ def start_runner

def start_runner_via_container
create_container_objects
nil # We don't know the pod name/system_uid until after it is started
end

def self.build_command_line(guid, ems_id = nil)
Expand Down Expand Up @@ -390,8 +388,8 @@ def start_runner_via_spawn
end

def start
self.pid = start_runner
save if !containerized_worker? && !systemd_worker?
self.system_uid = start_runner
save!

msg = "Worker started: ID [#{id}], PID [#{pid}], GUID [#{guid}]"
MiqEvent.raise_evm_event_queue(miq_server || MiqServer.my_server, "evm_worker_start", :event_details => msg, :type => self.class.name)
Expand Down Expand Up @@ -497,10 +495,6 @@ def clean_active_messages
end
end

private def set_system_uid
self.system_uid = unit_name if systemd_worker?
end

private def error_out_tasks_with_active_queue_message
message = "Task Handler: [#{friendly_name}] ID [#{id}] has been deleted!"
processed_messages.includes(:miq_task).where.not(:miq_task_id => nil).each do |m|
Expand Down
13 changes: 10 additions & 3 deletions app/models/miq_worker/runner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,16 @@ def log_prefix
#

def find_worker_record
@worker = self.class.corresponding_model.find_by(:guid => @cfg[:guid])
do_exit("Unable to find instance for worker GUID [#{@cfg[:guid]}].", 1) if @worker.nil?
MiqWorker.my_guid = @cfg[:guid]
worker_find_opts = {}
if @cfg[:system_uid]
worker_find_opts[:system_uid] = @cfg[:system_uid]
else
worker_find_opts[:guid] = @cfg[:guid]
end

@worker = self.class.corresponding_model.find_by(worker_find_opts)
do_exit("Unable to find instance for worker [#{worker_find_opts.values.first}].", 1) if @worker.nil?
MiqWorker.my_guid = @worker.guid
end

def starting_worker_record
Expand Down
2 changes: 2 additions & 0 deletions app/models/miq_worker/systemd_common.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ def start_systemd_worker
enable_systemd_unit
write_unit_settings_file
start_systemd_unit

unit_name
end

def stop_systemd_worker
Expand Down
41 changes: 9 additions & 32 deletions lib/workers/bin/run_single_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ def all_role_names
ENV["DISABLE_MIQ_WORKER_HEARTBEAT"] ||= options[:heartbeat] ? nil : '1'

options[:ems_id] ||= ENV.fetch("EMS_ID", nil)
options[:guid] ||= SecureRandom.uuid

if options[:roles].present?
MiqServer.my_server.server_role_names += options[:roles]
Expand All @@ -96,45 +97,21 @@ def all_role_names

worker_class.preload_for_worker_role if worker_class.respond_to?(:preload_for_worker_role)
unless options[:dry_run]
create_options = {:pid => Process.pid}
runner_options = {}

create_options[:system_uid] = options[:system_uid] if options[:system_uid]

if options[:ems_id]
create_options[:queue_name] = "ems_#{options[:ems_id]}"
runner_options[:ems_id] = options[:ems_id]
end

update_options = create_options.dup
# If a guid is provided, raise if it's not found, update otherwise
# Because podified needs to create on the first run_single_worker and update after:
# If system_uid is provided, update if found, create if not found.
# TODO: This is really inconsistent and confusing. Why can't GUID follow the same rules?
worker = if options[:guid]
worker_class.find_by!(:guid => options[:guid]).tap do |wrkr|
wrkr.update(update_options)
end
elsif options[:system_uid] && worker = worker_class.find_by(:system_uid => options[:system_uid])
worker.update(update_options)
worker
else
worker_class.create_worker_record(create_options)
end
runner_options = {:guid => options[:guid]}
runner_options[:ems_id] = options[:ems_id] if options[:ems_id]

begin
runner_options[:guid] = worker.guid
$log.info("Starting #{worker.class.name} with runner options #{runner_options}")
worker.class::Runner.new(runner_options).tap(&:setup_sigterm_trap).start
$log.info("Starting #{worker_class.name} with runner options #{runner_options}")
worker_class::Runner.new(runner_options).tap(&:setup_sigterm_trap).start
rescue SystemExit
raise
rescue Exception => err
MiqWorker::Runner.safe_log(worker, "An unhandled error has occurred: #{err}\n#{err.backtrace.join("\n")}", :error)
#MiqWorker::Runner.safe_log(worker, "An unhandled error has occurred: #{err}\n#{err.backtrace.join("\n")}", :error)
STDERR.puts("ERROR: An unhandled error has occurred: #{err}. See log for details.") rescue nil
exit 1
ensure
FileUtils.rm_f(worker.heartbeat_file)
$log.info("Deleting worker record for #{worker.class.name}, id #{worker.id}")
worker.delete
#FileUtils.rm_f(worker.heartbeat_file)
$log.info("Deleting worker record for #{worker_class.name}, GUID #{options[:guid]}")
#worker.delete
end
end