Skip to content

Commit

Permalink
Merge pull request #17015 from yrudman/activate-task-when-deliver-fro…
Browse files Browse the repository at this point in the history
…m-queue

Activate miq_task when deliver from miq_queue
  • Loading branch information
Fryguy authored Mar 16, 2018
2 parents 450526b + f7653b0 commit 2ead6ad
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 5 deletions.
11 changes: 10 additions & 1 deletion app/models/miq_queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#
class MiqQueue < ApplicationRecord
belongs_to :handler, :polymorphic => true
belongs_to :miq_task

attr_accessor :last_exception

Expand Down Expand Up @@ -446,6 +447,7 @@ def deliver(requester = nil)

def dispatch_method(obj, args)
Timeout.timeout(msg_timeout) do
args = activate_miq_task(args)
obj.send(method_name, *args)
end
end
Expand Down Expand Up @@ -524,7 +526,7 @@ def unfinished?
end

def self.format_full_log_msg(msg)
"Message id: [#{msg.id}], #{msg.handler_type} id: [#{msg.handler_id}], Zone: [#{msg.zone}], Role: [#{msg.role}], Server: [#{msg.server_guid}], Ident: [#{msg.queue_name}], Target id: [#{msg.target_id}], Instance id: [#{msg.instance_id}], Task id: [#{msg.task_id}], Command: [#{msg.class_name}.#{msg.method_name}], Timeout: [#{msg.msg_timeout}], Priority: [#{msg.priority}], State: [#{msg.state}], Deliver On: [#{msg.deliver_on}], Data: [#{msg.data.nil? ? "" : "#{msg.data.length} bytes"}], Args: #{MiqPassword.sanitize_string(msg.args.inspect)}"
"Message id: [#{msg.id}], #{msg.handler_type} id: [#{msg.handler_id}], Zone: [#{msg.zone}], Role: [#{msg.role}], Server: [#{msg.server_guid}], MiqTask id: [#{msg.miq_task_id}], Ident: [#{msg.queue_name}], Target id: [#{msg.target_id}], Instance id: [#{msg.instance_id}], Task id: [#{msg.task_id}], Command: [#{msg.class_name}.#{msg.method_name}], Timeout: [#{msg.msg_timeout}], Priority: [#{msg.priority}], State: [#{msg.state}], Deliver On: [#{msg.deliver_on}], Data: [#{msg.data.nil? ? "" : "#{msg.data.length} bytes"}], Args: #{MiqPassword.sanitize_string(msg.args.inspect)}"
end

def self.format_short_log_msg(msg)
Expand All @@ -541,6 +543,13 @@ def self.get_worker(task_id)

private

def activate_miq_task(args)
MiqTask.update_status(miq_task_id, MiqTask::STATE_ACTIVE, MiqTask::STATUS_OK, "Task starting") if miq_task_id
params = args.first
params[:miq_task_id] = miq_task_id if params.kind_of?(Hash)
args
end

# default values for get operations
def self.default_get_options(options)
options.reverse_merge(
Expand Down
2 changes: 2 additions & 0 deletions app/models/miq_task.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ class MiqTask < ApplicationRecord
has_one :binary_blob, :as => :resource, :dependent => :destroy
has_one :miq_report_result
has_one :job, :dependent => :destroy
has_one :miq_queue

belongs_to :miq_server

Expand Down Expand Up @@ -271,6 +272,7 @@ def self.generic_action_with_callback(options, queue_options)

# Set the callback for this task to set the status based on the results of the actions
queue_options[:miq_callback] = {:class_name => task.class.name, :instance_id => task.id, :method_name => :queue_callback, :args => ['Finished']}
queue_options[:miq_task_id] = task.id
method_opts = queue_options[:args].first
method_opts[:task_id] = task.id if method_opts.kind_of?(Hash)
MiqQueue.put(queue_options)
Expand Down
7 changes: 5 additions & 2 deletions spec/models/log_file_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,10 @@
end

it "delivering MiqServer._request_logs message should call _post_my_logs with correct args" do
expected_options = {:timeout => described_class::LOG_REQUEST_TIMEOUT, :taskid => @task.id, :callback => {:instance_id => @task.id, :class_name => 'MiqTask', :method_name => :queue_callback_on_exceptions, :args => ['Finished']}}
expected_options = {:timeout => described_class::LOG_REQUEST_TIMEOUT,
:taskid => @task.id, :miq_task_id => nil,
:callback => {:instance_id => @task.id, :class_name => 'MiqTask',
:method_name => :queue_callback_on_exceptions, :args => ['Finished']}}
expect_any_instance_of(MiqServer).to receive(:_post_my_logs).with(expected_options)

@message.delivered(*@message.deliver)
Expand All @@ -88,7 +91,7 @@
end

it "MiqServer#post_logs message should have correct args" do
expect(message.args).to eq([{:taskid => @task.id}])
expect(message.args).to eq([{:taskid => @task.id, :miq_task_id => nil}])
expect(message.priority).to eq(MiqQueue::HIGH_PRIORITY)
expect(message.miq_callback).to eq(:instance_id => @task.id, :class_name => 'MiqTask', :method_name => :queue_callback_on_exceptions, :args => ['Finished'])
expect(message.msg_timeout).to eq(described_class::LOG_REQUEST_TIMEOUT)
Expand Down
4 changes: 2 additions & 2 deletions spec/models/miq_queue_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@
msg = FactoryGirl.create(:miq_queue)
mparms.each { |k, v| msg.send("#{k}=", v) }
expect(MiqQueue.format_short_log_msg(msg)).to eq("Message id: [#{msg.id}]")
expect(MiqQueue.format_full_log_msg(msg)).to eq("Message id: [#{msg.id}], #{msg.handler_type} id: [#{msg.handler_id}], Zone: [#{msg.zone}], Role: [#{msg.role}], Server: [#{msg.server_guid}], Ident: [#{msg.queue_name}], Target id: [#{msg.target_id}], Instance id: [#{msg.instance_id}], Task id: [#{msg.task_id}], Command: [#{msg.class_name}.#{msg.method_name}], Timeout: [#{msg.msg_timeout}], Priority: [#{msg.priority}], State: [#{msg.state}], Deliver On: [#{msg.deliver_on}], Data: [#{msg.data.nil? ? "" : "#{msg.data.length} bytes"}], Args: #{msg.args.inspect}")
expect(MiqQueue.format_full_log_msg(msg)).to eq("Message id: [#{msg.id}], #{msg.handler_type} id: [#{msg.handler_id}], Zone: [#{msg.zone}], Role: [#{msg.role}], Server: [#{msg.server_guid}], MiqTask id: [#{msg.miq_task_id}], Ident: [#{msg.queue_name}], Target id: [#{msg.target_id}], Instance id: [#{msg.instance_id}], Task id: [#{msg.task_id}], Command: [#{msg.class_name}.#{msg.method_name}], Timeout: [#{msg.msg_timeout}], Priority: [#{msg.priority}], State: [#{msg.state}], Deliver On: [#{msg.deliver_on}], Data: [#{msg.data.nil? ? "" : "#{msg.data.length} bytes"}], Args: #{msg.args.inspect}")
end
end

Expand Down Expand Up @@ -242,7 +242,7 @@
message_parms.each do |mparms|
msg = FactoryGirl.create(:miq_queue, mparms)
expect(MiqQueue.format_short_log_msg(msg)).to eq("Message id: [#{msg.id}]")
expect(MiqQueue.format_full_log_msg(msg)).to eq("Message id: [#{msg.id}], #{msg.handler_type} id: [#{msg.handler_id}], Zone: [#{msg.zone}], Role: [#{msg.role}], Server: [#{msg.server_guid}], Ident: [#{msg.queue_name}], Target id: [#{msg.target_id}], Instance id: [#{msg.instance_id}], Task id: [#{msg.task_id}], Command: [#{msg.class_name}.#{msg.method_name}], Timeout: [#{msg.msg_timeout}], Priority: [#{msg.priority}], State: [#{msg.state}], Deliver On: [#{msg.deliver_on}], Data: [#{msg.data.nil? ? "" : "#{msg.data.length} bytes"}], Args: #{args_cleaned_password.inspect}")
expect(MiqQueue.format_full_log_msg(msg)).to eq("Message id: [#{msg.id}], #{msg.handler_type} id: [#{msg.handler_id}], Zone: [#{msg.zone}], Role: [#{msg.role}], Server: [#{msg.server_guid}], MiqTask id: [#{msg.miq_task_id}], Ident: [#{msg.queue_name}], Target id: [#{msg.target_id}], Instance id: [#{msg.instance_id}], Task id: [#{msg.task_id}], Command: [#{msg.class_name}.#{msg.method_name}], Timeout: [#{msg.msg_timeout}], Priority: [#{msg.priority}], State: [#{msg.state}], Deliver On: [#{msg.deliver_on}], Data: [#{msg.data.nil? ? "" : "#{msg.data.length} bytes"}], Args: #{args_cleaned_password.inspect}")
end
end

Expand Down
1 change: 1 addition & 0 deletions spec/models/miq_task_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@
expect(message.method_name).to eq("my_method")
expect(message.args).to eq([1, 2, 3])
expect(message.zone).to eq(zone)
expect(message.miq_task_id).to eq(tid)
end
end

Expand Down

0 comments on commit 2ead6ad

Please sign in to comment.