Skip to content

Commit

Permalink
Merge pull request #23116 from agrare/fix_non_rails_worker_starting
Browse files Browse the repository at this point in the history
Fix non rails worker starting on podified
  • Loading branch information
jrafanie authored Aug 1, 2024
2 parents 7b5817d + ed8d21a commit 89cb15b
Show file tree
Hide file tree
Showing 4 changed files with 223 additions and 13 deletions.
23 changes: 15 additions & 8 deletions app/models/miq_server/worker_management/kubernetes.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,31 @@ def sync_from_system
end

def sync_starting_workers
MiqWorker.find_all_starting.each do |worker|
next if worker.class.rails_worker?
starting = MiqWorker.find_all_starting

worker_pod = current_pods[worker[:system_uid]]
# Non-rails workers cannot set their own miq_worker record to started once they
# have finished initializing. Check for any starting non-rails workers whose
# pod is running and mark the miq_worker as started.
starting.reject(&:rails_worker?).each do |worker|
worker_pod = current_pods[worker.system_uid]
next if worker_pod.nil?

container_status = worker_pod.status.containerStatuses.find { |container| container.name == worker.worker_deployment_name }
if worker_pod.status.phase == "Running" && container_status.ready && container_status.started
worker.update!(:status => "started")
end
worker.update!(:status => MiqWorker::STATUS_STARTED) if worker_pod[:running]
end

starting.reload
end

def sync_stopping_workers
MiqWorker.find_all_stopping.reject { |w| w.class.rails_worker? }.each do |worker|
stopping = MiqWorker.find_all_stopping

stopping.reject(&:rails_worker?).each do |worker|
next if current_pods.key?(worker[:system_uid])

worker.update!(:status => MiqWorker::STATUS_STOPPED)
end

stopping.reload
end

def enough_resource_to_start_worker?(_worker_class)
Expand Down Expand Up @@ -255,6 +261,7 @@ def save_pod(pod)
ch[:label_name] = pod.metadata.labels.name
ch[:last_state_terminated] = pod.status.containerStatuses.any? { |cs| cs.lastState.terminated }
ch[:container_restarts] = pod.status.containerStatuses.sum { |cs| cs.restartCount.to_i }
ch[:running] = pod.status.phase == "Running" && pod.status.containerStatuses.all? { |cs| cs.ready && cs.started }

name = pod.metadata.name
current_pods[name] ||= ch
Expand Down
6 changes: 2 additions & 4 deletions app/models/miq_server/worker_management/systemd.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@ def sync_from_system

def sync_starting_workers
sync_from_system
MiqWorker.find_all_starting.each do |worker|
next if worker.class.rails_worker?

MiqWorker.find_all_starting.reject(&:rails_worker?).each do |worker|
systemd_worker = miq_services_by_unit[worker[:system_uid]]
next if systemd_worker.nil?

Expand All @@ -20,7 +18,7 @@ def sync_starting_workers

def sync_stopping_workers
sync_from_system
MiqWorker.find_all_stopping.reject { |w| w.class.rails_worker? }.each do |worker|
MiqWorker.find_all_stopping.reject(&:rails_worker?).each do |worker|
# If the worker record is "stopping" and the systemd unit is gone then the
# worker has successfully exited.
next if miq_services_by_unit[worker[:system_uid]].present?
Expand Down
1 change: 1 addition & 0 deletions app/models/miq_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ def self.rails_worker?

true
end
delegate :rails_worker?, :to => :class

def self.scalable?
maximum_workers_count.nil? || maximum_workers_count > 1
Expand Down
206 changes: 205 additions & 1 deletion spec/models/miq_server/worker_management/kubernetes_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,88 @@
end
end

context "#save_pod (private)" do
it "saves the deployment in #current_deployments" do
require 'kubeclient/resource'

deployment = Kubeclient::Resource.new(
:metadata => {
:name => "1-automation",
:namespace => "manageiq",
:uid => "3f4aeff1-e1a5-45bf-8221-2627bfff38c6",
:resourceVersion => "1557249",
:generation => 2,
:creationTimestamp => "2024-07-29T19:50:23Z",
:labels => {:app => "manageiq", :"manageiq-orchestrated-by" => "orchestrator-5474db46d9-rfjmn"},
:annotations => {:"deployment.kubernetes.io/revision" => "1"}
},
:spec => {:replicas => 1, :selector => {:matchLabels => {:name => "1-automation"}}},
:status => {:observedGeneration => 2, :replicas => 1, :updatedReplicas => 1, :readyReplicas => 1, :availableReplicas => 1}
)

server.worker_manager.send(:save_deployment, deployment)

expect(server.worker_manager.current_deployments).to have_key("1-automation")
expect(server.worker_manager.current_deployments["1-automation"]).to include(:spec => deployment.spec.to_h)
end
end

context "#save_deployment (private)" do
let(:phase) { "Running" }
let(:ready) { true }
let(:started) { true }
let(:restart_count) { 0 }
let(:last_state) { {} }
let(:pod) do
require 'kubeclient/resource'
Kubeclient::Resource.new(
:metadata => {:name => "1-automation-545fbd845-s2pjt", :labels => {:app => "manageiq", :"manageiq-orchestrated-by" => "orchestrator-5474db46d9-rfjmn", :name => "1-automation", :"pod-template-hash" => "545fbd845"}},
:spec => {},
:status => {:phase => phase, :containerStatuses => [{:name => "1-automation", :state => {:running => {:startedAt => "2024-07-29T19:50:25Z"}}, :lastState => last_state, :ready => ready, :restartCount => restart_count, :started => started}]}
)
end

it "saves the pod in #current_pods" do
server.worker_manager.send(:save_pod, pod)

expect(server.worker_manager.current_pods).to have_key("1-automation-545fbd845-s2pjt")
expect(server.worker_manager.current_pods["1-automation-545fbd845-s2pjt"]).to include(:label_name => "1-automation", :last_state_terminated => false, :container_restarts => 0, :running => true)
end

context "with a pod that is not running" do
let(:phase) { "ContainerCreating" }

it "marks the pod as not running" do
server.worker_manager.send(:save_pod, pod)

expect(server.worker_manager.current_pods).to have_key("1-automation-545fbd845-s2pjt")
expect(server.worker_manager.current_pods["1-automation-545fbd845-s2pjt"]).to include(:running => false)
end
end

context "with a pod that has a restart count" do
let(:restart_count) { 2 }

it "sets container_restarts" do
server.worker_manager.send(:save_pod, pod)

expect(server.worker_manager.current_pods).to have_key("1-automation-545fbd845-s2pjt")
expect(server.worker_manager.current_pods["1-automation-545fbd845-s2pjt"]).to include(:container_restarts => 2)
end
end

context "with a lastState of terminated" do
let(:last_state) { {:terminated => true} }

it "sets last_state_terminated to true" do
server.worker_manager.send(:save_pod, pod)

expect(server.worker_manager.current_pods).to have_key("1-automation-545fbd845-s2pjt")
expect(server.worker_manager.current_pods["1-automation-545fbd845-s2pjt"]).to include(:last_state_terminated => true)
end
end
end

context "#sync_from_system" do
context "#ensure_kube_monitors_started" do
it "podified, ensures pod monitor started and orphaned rows are removed" do
Expand All @@ -128,6 +210,128 @@
end
end

describe "#sync_starting_workers" do
before { MiqWorkerType.seed }

context "podified" do
let(:rails_worker) { true }
let(:pod_name) { "1-generic-abcd" }
let(:pod_running) { true }
let(:worker) { FactoryBot.create(:miq_generic_worker, :miq_server => server, :status => status, :system_uid => pod_name) }
let(:current_pods) { {pod_name => {:label_name => pod_label, :last_state_terminated => false, :container_restarts => 0, :running => pod_running}} }

before do
allow(worker.class).to receive(:rails_worker?).and_return(rails_worker)
server.worker_manager.current_pods = current_pods
end

after do
server.worker_manager.current_pods.clear
end

context "with no starting workers" do
let(:status) { MiqWorker::STATUS_STARTED }

it "returns an empty array" do
expect(server.worker_manager.sync_starting_workers).to be_empty
end
end

context "with a starting worker" do
let(:status) { MiqWorker::STATUS_STARTING }

it "returns the starting worker" do
expect(server.worker_manager.sync_starting_workers).to include(worker)
end

context "non-rails worker" do
let(:rails_worker) { false }

context "without a pod" do
let(:current_pods) { {} }

it "returns the worker as still starting" do
expect(server.worker_manager.sync_starting_workers).to include(worker)
end
end

context "with a pod that is running" do
let(:pod_running) { true }

it "marks the worker as running and returns an empty array" do
expect(server.worker_manager.sync_starting_workers).to be_empty
end
end

context "with a pod that isn't running yet" do
let(:pod_running) { false }

it "returns the starting worker" do
expect(server.worker_manager.sync_starting_workers).to include(worker)
end
end
end
end
end
end

describe "#sync_stopping_workers" do
before { MiqWorkerType.seed }

context "podified" do
let(:rails_worker) { true }
let(:pod_name) { "1-generic-abcd" }
let(:pod_running) { true }
let(:worker) { FactoryBot.create(:miq_generic_worker, :miq_server => server, :status => status, :system_uid => pod_name) }
let(:current_pods) { {pod_name => {:label_name => pod_label, :last_state_terminated => false, :container_restarts => 0, :running => pod_running}} }

before do
allow(worker.class).to receive(:rails_worker?).and_return(rails_worker)
server.worker_manager.current_pods = current_pods
end

after do
server.worker_manager.current_pods.clear
end

context "with no stopping workers" do
let(:status) { MiqWorker::STATUS_STARTED }

it "returns an empty array" do
expect(server.worker_manager.sync_stopping_workers).to be_empty
end
end

context "with a stopping worker" do
let(:status) { MiqWorker::STATUS_STOPPING }

it "returns the stopping worker" do
expect(server.worker_manager.sync_stopping_workers).to include(worker)
end

context "non-rails worker" do
let(:rails_worker) { false }

context "without a pod" do
let(:current_pods) { {} }

it "marks the worker as stopped and returns an empty array" do
expect(server.worker_manager.sync_stopping_workers).to be_empty
end
end

context "with a pod that is still running" do
let(:pod_running) { true }

it "returns the stopping worker" do
expect(server.worker_manager.sync_stopping_workers).to include(worker)
end
end
end
end
end
end

context "#cleanup_orphaned_worker_rows" do
context "podified" do
let(:server2) { EvmSpecHelper.remote_miq_server }
Expand Down Expand Up @@ -210,7 +414,7 @@
metadata = double(:name => deployment_name, :labels => double(:name => pod_label))
state = double(:running => double(:startedAt => started_at))
last_state = double(:terminated => nil)
status = double(:containerStatuses => [double(:state => state, :lastState => last_state, :restartCount => 0)])
status = double(:phase => "Running", :containerStatuses => [double(:state => state, :ready => true, :started => true, :lastState => last_state, :restartCount => 0)])
pods = [double(:metadata => metadata, :status => status)]
allow(pods).to receive(:resourceVersion).and_return(resource_version)
pods
Expand Down

0 comments on commit 89cb15b

Please sign in to comment.