Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix non rails worker starting on podified #23116

Merged
merged 3 commits into from
Aug 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 15 additions & 8 deletions app/models/miq_server/worker_management/kubernetes.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,31 @@ def sync_from_system
end

def sync_starting_workers
MiqWorker.find_all_starting.each do |worker|
next if worker.class.rails_worker?
starting = MiqWorker.find_all_starting

worker_pod = current_pods[worker[:system_uid]]
# Non-rails workers cannot set their own miq_worker record to started once they
# have finished initializing. Check for any starting non-rails workers whose
# pod is running and mark the miq_worker as started.
starting.reject(&:rails_worker?).each do |worker|
worker_pod = current_pods[worker.system_uid]
next if worker_pod.nil?

container_status = worker_pod.status.containerStatuses.find { |container| container.name == worker.worker_deployment_name }
if worker_pod.status.phase == "Running" && container_status.ready && container_status.started
worker.update!(:status => "started")
end
worker.update!(:status => MiqWorker::STATUS_STARTED) if worker_pod[:running]
end

starting.reload
end

def sync_stopping_workers
MiqWorker.find_all_stopping.reject { |w| w.class.rails_worker? }.each do |worker|
stopping = MiqWorker.find_all_stopping

stopping.reject(&:rails_worker?).each do |worker|
next if current_pods.key?(worker[:system_uid])

worker.update!(:status => MiqWorker::STATUS_STOPPED)
end

stopping.reload
end

def enough_resource_to_start_worker?(_worker_class)
Expand Down Expand Up @@ -255,6 +261,7 @@ def save_pod(pod)
ch[:label_name] = pod.metadata.labels.name
ch[:last_state_terminated] = pod.status.containerStatuses.any? { |cs| cs.lastState.terminated }
ch[:container_restarts] = pod.status.containerStatuses.sum { |cs| cs.restartCount.to_i }
ch[:running] = pod.status.phase == "Running" && pod.status.containerStatuses.all? { |cs| cs.ready && cs.started }
jrafanie marked this conversation as resolved.
Show resolved Hide resolved

name = pod.metadata.name
current_pods[name] ||= ch
Expand Down
6 changes: 2 additions & 4 deletions app/models/miq_server/worker_management/systemd.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@ def sync_from_system

def sync_starting_workers
sync_from_system
MiqWorker.find_all_starting.each do |worker|
next if worker.class.rails_worker?

MiqWorker.find_all_starting.reject(&:rails_worker?).each do |worker|
systemd_worker = miq_services_by_unit[worker[:system_uid]]
next if systemd_worker.nil?

Expand All @@ -20,7 +18,7 @@ def sync_starting_workers

def sync_stopping_workers
sync_from_system
MiqWorker.find_all_stopping.reject { |w| w.class.rails_worker? }.each do |worker|
MiqWorker.find_all_stopping.reject(&:rails_worker?).each do |worker|
# If the worker record is "stopping" and the systemd unit is gone then the
# worker has successfully exited.
next if miq_services_by_unit[worker[:system_uid]].present?
Expand Down
1 change: 1 addition & 0 deletions app/models/miq_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ def self.rails_worker?

true
end
delegate :rails_worker?, :to => :class

def self.scalable?
maximum_workers_count.nil? || maximum_workers_count > 1
Expand Down
206 changes: 205 additions & 1 deletion spec/models/miq_server/worker_management/kubernetes_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,88 @@
end
end

context "#save_pod (private)" do
it "saves the deployment in #current_deployments" do
require 'kubeclient/resource'

deployment = Kubeclient::Resource.new(
:metadata => {
:name => "1-automation",
:namespace => "manageiq",
:uid => "3f4aeff1-e1a5-45bf-8221-2627bfff38c6",
:resourceVersion => "1557249",
:generation => 2,
:creationTimestamp => "2024-07-29T19:50:23Z",
:labels => {:app => "manageiq", :"manageiq-orchestrated-by" => "orchestrator-5474db46d9-rfjmn"},
:annotations => {:"deployment.kubernetes.io/revision" => "1"}
},
:spec => {:replicas => 1, :selector => {:matchLabels => {:name => "1-automation"}}},
:status => {:observedGeneration => 2, :replicas => 1, :updatedReplicas => 1, :readyReplicas => 1, :availableReplicas => 1}
)

server.worker_manager.send(:save_deployment, deployment)

expect(server.worker_manager.current_deployments).to have_key("1-automation")
expect(server.worker_manager.current_deployments["1-automation"]).to include(:spec => deployment.spec.to_h)
end
end

context "#save_deployment (private)" do
let(:phase) { "Running" }
let(:ready) { true }
let(:started) { true }
let(:restart_count) { 0 }
let(:last_state) { {} }
let(:pod) do
require 'kubeclient/resource'
Kubeclient::Resource.new(
:metadata => {:name => "1-automation-545fbd845-s2pjt", :labels => {:app => "manageiq", :"manageiq-orchestrated-by" => "orchestrator-5474db46d9-rfjmn", :name => "1-automation", :"pod-template-hash" => "545fbd845"}},
:spec => {},
:status => {:phase => phase, :containerStatuses => [{:name => "1-automation", :state => {:running => {:startedAt => "2024-07-29T19:50:25Z"}}, :lastState => last_state, :ready => ready, :restartCount => restart_count, :started => started}]}
)
end

it "saves the pod in #current_pods" do
server.worker_manager.send(:save_pod, pod)

expect(server.worker_manager.current_pods).to have_key("1-automation-545fbd845-s2pjt")
expect(server.worker_manager.current_pods["1-automation-545fbd845-s2pjt"]).to include(:label_name => "1-automation", :last_state_terminated => false, :container_restarts => 0, :running => true)
end

context "with a pod that is not running" do
let(:phase) { "ContainerCreating" }

it "marks the pod as not running" do
server.worker_manager.send(:save_pod, pod)

expect(server.worker_manager.current_pods).to have_key("1-automation-545fbd845-s2pjt")
expect(server.worker_manager.current_pods["1-automation-545fbd845-s2pjt"]).to include(:running => false)
end
end

context "with a pod that has a restart count" do
let(:restart_count) { 2 }

it "sets container_restarts" do
server.worker_manager.send(:save_pod, pod)

expect(server.worker_manager.current_pods).to have_key("1-automation-545fbd845-s2pjt")
expect(server.worker_manager.current_pods["1-automation-545fbd845-s2pjt"]).to include(:container_restarts => 2)
end
end

context "with a lastState of terminated" do
let(:last_state) { {:terminated => true} }

it "sets last_state_terminated to true" do
server.worker_manager.send(:save_pod, pod)

expect(server.worker_manager.current_pods).to have_key("1-automation-545fbd845-s2pjt")
expect(server.worker_manager.current_pods["1-automation-545fbd845-s2pjt"]).to include(:last_state_terminated => true)
end
end
end

context "#sync_from_system" do
context "#ensure_kube_monitors_started" do
it "podified, ensures pod monitor started and orphaned rows are removed" do
Expand All @@ -128,6 +210,128 @@
end
end

describe "#sync_starting_workers" do
before { MiqWorkerType.seed }

context "podified" do
let(:rails_worker) { true }
let(:pod_name) { "1-generic-abcd" }
let(:pod_running) { true }
let(:worker) { FactoryBot.create(:miq_generic_worker, :miq_server => server, :status => status, :system_uid => pod_name) }
let(:current_pods) { {pod_name => {:label_name => pod_label, :last_state_terminated => false, :container_restarts => 0, :running => pod_running}} }

before do
allow(worker.class).to receive(:rails_worker?).and_return(rails_worker)
server.worker_manager.current_pods = current_pods
end

after do
server.worker_manager.current_pods.clear
end

context "with no starting workers" do
let(:status) { MiqWorker::STATUS_STARTED }

it "returns an empty array" do
expect(server.worker_manager.sync_starting_workers).to be_empty
end
end

context "with a starting worker" do
let(:status) { MiqWorker::STATUS_STARTING }

it "returns the starting worker" do
expect(server.worker_manager.sync_starting_workers).to include(worker)
end

context "non-rails worker" do
let(:rails_worker) { false }

context "without a pod" do
let(:current_pods) { {} }

it "returns the worker as still starting" do
expect(server.worker_manager.sync_starting_workers).to include(worker)
end
end

context "with a pod that is running" do
let(:pod_running) { true }

it "marks the worker as running and returns an empty array" do
expect(server.worker_manager.sync_starting_workers).to be_empty
end
end

context "with a pod that isn't running yet" do
let(:pod_running) { false }

it "returns the starting worker" do
expect(server.worker_manager.sync_starting_workers).to include(worker)
end
end
end
end
end
end

describe "#sync_stopping_workers" do
before { MiqWorkerType.seed }

context "podified" do
let(:rails_worker) { true }
let(:pod_name) { "1-generic-abcd" }
let(:pod_running) { true }
let(:worker) { FactoryBot.create(:miq_generic_worker, :miq_server => server, :status => status, :system_uid => pod_name) }
let(:current_pods) { {pod_name => {:label_name => pod_label, :last_state_terminated => false, :container_restarts => 0, :running => pod_running}} }

before do
allow(worker.class).to receive(:rails_worker?).and_return(rails_worker)
server.worker_manager.current_pods = current_pods
end

after do
server.worker_manager.current_pods.clear
end

context "with no stopping workers" do
let(:status) { MiqWorker::STATUS_STARTED }

it "returns an empty array" do
expect(server.worker_manager.sync_stopping_workers).to be_empty
end
end

context "with a stopping worker" do
let(:status) { MiqWorker::STATUS_STOPPING }

it "returns the stopping worker" do
expect(server.worker_manager.sync_stopping_workers).to include(worker)
end

context "non-rails worker" do
let(:rails_worker) { false }

context "without a pod" do
let(:current_pods) { {} }

it "marks the worker as stopped and returns an empty array" do
expect(server.worker_manager.sync_stopping_workers).to be_empty
end
end

context "with a pod that is still running" do
let(:pod_running) { true }

it "returns the stopping worker" do
expect(server.worker_manager.sync_stopping_workers).to include(worker)
end
end
end
end
end
end

context "#cleanup_orphaned_worker_rows" do
context "podified" do
let(:server2) { EvmSpecHelper.remote_miq_server }
Expand Down Expand Up @@ -210,7 +414,7 @@
metadata = double(:name => deployment_name, :labels => double(:name => pod_label))
state = double(:running => double(:startedAt => started_at))
last_state = double(:terminated => nil)
status = double(:containerStatuses => [double(:state => state, :lastState => last_state, :restartCount => 0)])
status = double(:phase => "Running", :containerStatuses => [double(:state => state, :ready => true, :started => true, :lastState => last_state, :restartCount => 0)])
pods = [double(:metadata => metadata, :status => status)]
allow(pods).to receive(:resourceVersion).and_return(resource_version)
pods
Expand Down