From 281736e46c67e5a5d65b5d4fee4cf96d66825187 Mon Sep 17 00:00:00 2001 From: Adam Grare Date: Thu, 25 Jul 2024 16:15:40 -0400 Subject: [PATCH 1/3] Use rails_worker? in find_all_starting/stopping query --- app/models/miq_server/worker_management/kubernetes.rb | 8 +++----- app/models/miq_server/worker_management/systemd.rb | 6 ++---- app/models/miq_worker.rb | 1 + 3 files changed, 6 insertions(+), 9 deletions(-) diff --git a/app/models/miq_server/worker_management/kubernetes.rb b/app/models/miq_server/worker_management/kubernetes.rb index a5682c8d157..85b2ee88e8b 100644 --- a/app/models/miq_server/worker_management/kubernetes.rb +++ b/app/models/miq_server/worker_management/kubernetes.rb @@ -21,10 +21,8 @@ def sync_from_system end def sync_starting_workers - MiqWorker.find_all_starting.each do |worker| - next if worker.class.rails_worker? - - worker_pod = current_pods[worker[:system_uid]] + MiqWorker.find_all_starting.reject(&:rails_worker?).each do |worker| + worker_pod = current_pods[worker.system_uid] next if worker_pod.nil? container_status = worker_pod.status.containerStatuses.find { |container| container.name == worker.worker_deployment_name } @@ -35,7 +33,7 @@ def sync_starting_workers end def sync_stopping_workers - MiqWorker.find_all_stopping.reject { |w| w.class.rails_worker? }.each do |worker| + MiqWorker.find_all_stopping.reject(&:rails_worker?).each do |worker| next if current_pods.key?(worker[:system_uid]) worker.update!(:status => MiqWorker::STATUS_STOPPED) diff --git a/app/models/miq_server/worker_management/systemd.rb b/app/models/miq_server/worker_management/systemd.rb index abfd50310b7..e392c57fa49 100644 --- a/app/models/miq_server/worker_management/systemd.rb +++ b/app/models/miq_server/worker_management/systemd.rb @@ -6,9 +6,7 @@ def sync_from_system def sync_starting_workers sync_from_system - MiqWorker.find_all_starting.each do |worker| - next if worker.class.rails_worker? - + MiqWorker.find_all_starting.reject(&:rails_worker?).each do |worker| systemd_worker = miq_services_by_unit[worker[:system_uid]] next if systemd_worker.nil? @@ -20,7 +18,7 @@ def sync_starting_workers def sync_stopping_workers sync_from_system - MiqWorker.find_all_stopping.reject { |w| w.class.rails_worker? }.each do |worker| + MiqWorker.find_all_stopping.reject(&:rails_worker?).each do |worker| # If the worker record is "stopping" and the systemd unit is gone then the # worker has successfully exited. next if miq_services_by_unit[worker[:system_uid]].present? diff --git a/app/models/miq_worker.rb b/app/models/miq_worker.rb index 964fc4ca03e..79e7da503e3 100644 --- a/app/models/miq_worker.rb +++ b/app/models/miq_worker.rb @@ -71,6 +71,7 @@ def self.rails_worker? true end + delegate :rails_worker?, :to => :class def self.scalable? maximum_workers_count.nil? || maximum_workers_count > 1 From f179952c7a9c6cbfbe9e1d9dd7d5ff15ec58255e Mon Sep 17 00:00:00 2001 From: Adam Grare Date: Fri, 26 Jul 2024 11:10:02 -0400 Subject: [PATCH 2/3] Fix non-rails worker startup on kubernetes --- .../worker_management/kubernetes.rb | 6 +- .../worker_management/kubernetes_spec.rb | 84 ++++++++++++++++++- 2 files changed, 85 insertions(+), 5 deletions(-) diff --git a/app/models/miq_server/worker_management/kubernetes.rb b/app/models/miq_server/worker_management/kubernetes.rb index 85b2ee88e8b..4a87de6822b 100644 --- a/app/models/miq_server/worker_management/kubernetes.rb +++ b/app/models/miq_server/worker_management/kubernetes.rb @@ -25,10 +25,7 @@ def sync_starting_workers worker_pod = current_pods[worker.system_uid] next if worker_pod.nil? - container_status = worker_pod.status.containerStatuses.find { |container| container.name == worker.worker_deployment_name } - if worker_pod.status.phase == "Running" && container_status.ready && container_status.started - worker.update!(:status => "started") - end + worker.update!(:status => "started") if worker_pod[:running] end end @@ -253,6 +250,7 @@ def save_pod(pod) ch[:label_name] = pod.metadata.labels.name ch[:last_state_terminated] = pod.status.containerStatuses.any? { |cs| cs.lastState.terminated } ch[:container_restarts] = pod.status.containerStatuses.sum { |cs| cs.restartCount.to_i } + ch[:running] = pod.status.phase == "Running" && pod.status.containerStatuses.all? { |cs| cs.ready && cs.started } name = pod.metadata.name current_pods[name] ||= ch diff --git a/spec/models/miq_server/worker_management/kubernetes_spec.rb b/spec/models/miq_server/worker_management/kubernetes_spec.rb index 02f450e1406..f6532587722 100644 --- a/spec/models/miq_server/worker_management/kubernetes_spec.rb +++ b/spec/models/miq_server/worker_management/kubernetes_spec.rb @@ -118,6 +118,88 @@ end end + context "#save_pod (private)" do + it "saves the deployment in #current_deployments" do + require 'kubeclient/resource' + + deployment = Kubeclient::Resource.new( + :metadata => { + :name => "1-automation", + :namespace => "manageiq", + :uid => "3f4aeff1-e1a5-45bf-8221-2627bfff38c6", + :resourceVersion => "1557249", + :generation => 2, + :creationTimestamp => "2024-07-29T19:50:23Z", + :labels => {:app => "manageiq", :"manageiq-orchestrated-by" => "orchestrator-5474db46d9-rfjmn"}, + :annotations => {:"deployment.kubernetes.io/revision" => "1"} + }, + :spec => {:replicas => 1, :selector => {:matchLabels => {:name => "1-automation"}}}, + :status => {:observedGeneration => 2, :replicas => 1, :updatedReplicas => 1, :readyReplicas => 1, :availableReplicas => 1} + ) + + server.worker_manager.send(:save_deployment, deployment) + + expect(server.worker_manager.current_deployments).to have_key("1-automation") + expect(server.worker_manager.current_deployments["1-automation"]).to include(:spec => deployment.spec.to_h) + end + end + + context "#save_deployment (private)" do + let(:phase) { "Running" } + let(:ready) { true } + let(:started) { true } + let(:restart_count) { 0 } + let(:last_state) { {} } + let(:pod) do + require 'kubeclient/resource' + Kubeclient::Resource.new( + :metadata => {:name => "1-automation-545fbd845-s2pjt", :labels => {:app => "manageiq", :"manageiq-orchestrated-by" => "orchestrator-5474db46d9-rfjmn", :name => "1-automation", :"pod-template-hash" => "545fbd845"}}, + :spec => {}, + :status => {:phase => phase, :containerStatuses => [{:name => "1-automation", :state => {:running => {:startedAt => "2024-07-29T19:50:25Z"}}, :lastState => last_state, :ready => ready, :restartCount => restart_count, :started => started}]} + ) + end + + it "saves the pod in #current_pods" do + server.worker_manager.send(:save_pod, pod) + + expect(server.worker_manager.current_pods).to have_key("1-automation-545fbd845-s2pjt") + expect(server.worker_manager.current_pods["1-automation-545fbd845-s2pjt"]).to include(:label_name => "1-automation", :last_state_terminated => false, :container_restarts => 0, :running => true) + end + + context "with a pod that is not running" do + let(:phase) { "ContainerCreating" } + + it "marks the pod as not running" do + server.worker_manager.send(:save_pod, pod) + + expect(server.worker_manager.current_pods).to have_key("1-automation-545fbd845-s2pjt") + expect(server.worker_manager.current_pods["1-automation-545fbd845-s2pjt"]).to include(:running => false) + end + end + + context "with a pod that has a restart count" do + let(:restart_count) { 2 } + + it "sets container_restarts" do + server.worker_manager.send(:save_pod, pod) + + expect(server.worker_manager.current_pods).to have_key("1-automation-545fbd845-s2pjt") + expect(server.worker_manager.current_pods["1-automation-545fbd845-s2pjt"]).to include(:container_restarts => 2) + end + end + + context "with a lastState of terminated" do + let(:last_state) { {:terminated => true} } + + it "sets last_state_terminated to true" do + server.worker_manager.send(:save_pod, pod) + + expect(server.worker_manager.current_pods).to have_key("1-automation-545fbd845-s2pjt") + expect(server.worker_manager.current_pods["1-automation-545fbd845-s2pjt"]).to include(:last_state_terminated => true) + end + end + end + context "#sync_from_system" do context "#ensure_kube_monitors_started" do it "podified, ensures pod monitor started and orphaned rows are removed" do @@ -210,7 +292,7 @@ metadata = double(:name => deployment_name, :labels => double(:name => pod_label)) state = double(:running => double(:startedAt => started_at)) last_state = double(:terminated => nil) - status = double(:containerStatuses => [double(:state => state, :lastState => last_state, :restartCount => 0)]) + status = double(:phase => "Running", :containerStatuses => [double(:state => state, :ready => true, :started => true, :lastState => last_state, :restartCount => 0)]) pods = [double(:metadata => metadata, :status => status)] allow(pods).to receive(:resourceVersion).and_return(resource_version) pods From ed8d21a27afbed9483a66fc65bfb24be87249036 Mon Sep 17 00:00:00 2001 From: Adam Grare Date: Tue, 30 Jul 2024 10:27:31 -0400 Subject: [PATCH 3/3] Add specs for sync_starting/stopping_workers --- .../worker_management/kubernetes.rb | 17 ++- .../worker_management/kubernetes_spec.rb | 122 ++++++++++++++++++ 2 files changed, 136 insertions(+), 3 deletions(-) diff --git a/app/models/miq_server/worker_management/kubernetes.rb b/app/models/miq_server/worker_management/kubernetes.rb index 4a87de6822b..bb9fb528cac 100644 --- a/app/models/miq_server/worker_management/kubernetes.rb +++ b/app/models/miq_server/worker_management/kubernetes.rb @@ -21,20 +21,31 @@ def sync_from_system end def sync_starting_workers - MiqWorker.find_all_starting.reject(&:rails_worker?).each do |worker| + starting = MiqWorker.find_all_starting + + # Non-rails workers cannot set their own miq_worker record to started once they + # have finished initializing. Check for any starting non-rails workers whose + # pod is running and mark the miq_worker as started. + starting.reject(&:rails_worker?).each do |worker| worker_pod = current_pods[worker.system_uid] next if worker_pod.nil? - worker.update!(:status => "started") if worker_pod[:running] + worker.update!(:status => MiqWorker::STATUS_STARTED) if worker_pod[:running] end + + starting.reload end def sync_stopping_workers - MiqWorker.find_all_stopping.reject(&:rails_worker?).each do |worker| + stopping = MiqWorker.find_all_stopping + + stopping.reject(&:rails_worker?).each do |worker| next if current_pods.key?(worker[:system_uid]) worker.update!(:status => MiqWorker::STATUS_STOPPED) end + + stopping.reload end def enough_resource_to_start_worker?(_worker_class) diff --git a/spec/models/miq_server/worker_management/kubernetes_spec.rb b/spec/models/miq_server/worker_management/kubernetes_spec.rb index f6532587722..407dde8cf19 100644 --- a/spec/models/miq_server/worker_management/kubernetes_spec.rb +++ b/spec/models/miq_server/worker_management/kubernetes_spec.rb @@ -210,6 +210,128 @@ end end + describe "#sync_starting_workers" do + before { MiqWorkerType.seed } + + context "podified" do + let(:rails_worker) { true } + let(:pod_name) { "1-generic-abcd" } + let(:pod_running) { true } + let(:worker) { FactoryBot.create(:miq_generic_worker, :miq_server => server, :status => status, :system_uid => pod_name) } + let(:current_pods) { {pod_name => {:label_name => pod_label, :last_state_terminated => false, :container_restarts => 0, :running => pod_running}} } + + before do + allow(worker.class).to receive(:rails_worker?).and_return(rails_worker) + server.worker_manager.current_pods = current_pods + end + + after do + server.worker_manager.current_pods.clear + end + + context "with no starting workers" do + let(:status) { MiqWorker::STATUS_STARTED } + + it "returns an empty array" do + expect(server.worker_manager.sync_starting_workers).to be_empty + end + end + + context "with a starting worker" do + let(:status) { MiqWorker::STATUS_STARTING } + + it "returns the starting worker" do + expect(server.worker_manager.sync_starting_workers).to include(worker) + end + + context "non-rails worker" do + let(:rails_worker) { false } + + context "without a pod" do + let(:current_pods) { {} } + + it "returns the worker as still starting" do + expect(server.worker_manager.sync_starting_workers).to include(worker) + end + end + + context "with a pod that is running" do + let(:pod_running) { true } + + it "marks the worker as running and returns an empty array" do + expect(server.worker_manager.sync_starting_workers).to be_empty + end + end + + context "with a pod that isn't running yet" do + let(:pod_running) { false } + + it "returns the starting worker" do + expect(server.worker_manager.sync_starting_workers).to include(worker) + end + end + end + end + end + end + + describe "#sync_stopping_workers" do + before { MiqWorkerType.seed } + + context "podified" do + let(:rails_worker) { true } + let(:pod_name) { "1-generic-abcd" } + let(:pod_running) { true } + let(:worker) { FactoryBot.create(:miq_generic_worker, :miq_server => server, :status => status, :system_uid => pod_name) } + let(:current_pods) { {pod_name => {:label_name => pod_label, :last_state_terminated => false, :container_restarts => 0, :running => pod_running}} } + + before do + allow(worker.class).to receive(:rails_worker?).and_return(rails_worker) + server.worker_manager.current_pods = current_pods + end + + after do + server.worker_manager.current_pods.clear + end + + context "with no stopping workers" do + let(:status) { MiqWorker::STATUS_STARTED } + + it "returns an empty array" do + expect(server.worker_manager.sync_stopping_workers).to be_empty + end + end + + context "with a stopping worker" do + let(:status) { MiqWorker::STATUS_STOPPING } + + it "returns the stopping worker" do + expect(server.worker_manager.sync_stopping_workers).to include(worker) + end + + context "non-rails worker" do + let(:rails_worker) { false } + + context "without a pod" do + let(:current_pods) { {} } + + it "marks the worker as stopped and returns an empty array" do + expect(server.worker_manager.sync_stopping_workers).to be_empty + end + end + + context "with a pod that is still running" do + let(:pod_running) { true } + + it "returns the stopping worker" do + expect(server.worker_manager.sync_stopping_workers).to include(worker) + end + end + end + end + end + end + context "#cleanup_orphaned_worker_rows" do context "podified" do let(:server2) { EvmSpecHelper.remote_miq_server }