diff --git a/app/models/miq_server/worker_management/kubernetes.rb b/app/models/miq_server/worker_management/kubernetes.rb index 224ebd2501e..841ad198549 100644 --- a/app/models/miq_server/worker_management/kubernetes.rb +++ b/app/models/miq_server/worker_management/kubernetes.rb @@ -34,10 +34,21 @@ def sync_starting_workers # pod available for our worker type and link them up. if worker.system_uid.nil? system_uid = pods_without_workers.detect { |pod_name| pod_name.start_with?(worker.worker_deployment_name) } - next if system_uid.nil? - - pods_without_workers.delete(system_uid) - worker.update!(:system_uid => system_uid) + if system_uid + # We have found a pod for the current worker record so remove the pod from + # the list of pods without workers and set the pod name as the system_uid + # for the current worker record. + pods_without_workers.delete(system_uid) + worker.update!(:system_uid => system_uid) + else + # If we haven't found a pod for this worker record then we need to check + # whether it has been starting for too long and should be marked as + # not responding. + stop_worker(worker, MiqServer::WorkerManagement::NOT_RESPONDING) if exceeded_heartbeat_threshold?(worker) + # Without a valid system_uid we cannot run any further logic in this + # loop. + next + end end worker_pod = current_pods[worker.system_uid] @@ -67,7 +78,11 @@ def enough_resource_to_start_worker?(_worker_class) def cleanup_orphaned_worker_rows unless current_pods.empty? + # Any worker rows which have a system_uid that is not in the list of + # current pod names, and is not starting (aka hasn't had a system_uid set + # yet) should be deleted. orphaned_rows = miq_workers.where.not(:system_uid => current_pods.keys) + .where.not(:status => MiqWorker::STATUSES_STARTING) unless orphaned_rows.empty? _log.warn("Removing orphaned worker rows without corresponding pods: #{orphaned_rows.collect(&:system_uid).inspect}") orphaned_rows.destroy_all diff --git a/spec/models/miq_server/worker_management/kubernetes_spec.rb b/spec/models/miq_server/worker_management/kubernetes_spec.rb index 47af29a7ee2..359706d7fba 100644 --- a/spec/models/miq_server/worker_management/kubernetes_spec.rb +++ b/spec/models/miq_server/worker_management/kubernetes_spec.rb @@ -214,14 +214,16 @@ before { MiqWorkerType.seed } context "podified" do - let(:rails_worker) { true } - let(:pod_name) { "#{server.compressed_id}-generic-abcd" } - let(:pod_running) { true } - let(:worker) { FactoryBot.create(:miq_generic_worker, :miq_server => server, :status => status, :system_uid => system_uid) } - let(:system_uid) { pod_name } - let(:current_pods) { {pod_name => {:label_name => pod_label, :last_state_terminated => false, :container_restarts => 0, :running => pod_running}} } + let(:rails_worker) { true } + let(:pod_name) { "#{server.compressed_id}-generic-abcd" } + let(:pod_running) { true } + let(:last_heartbeat) { Time.now.utc } + let(:worker) { FactoryBot.create(:miq_generic_worker, :miq_server => server, :status => status, :system_uid => system_uid, :last_heartbeat => last_heartbeat) } + let(:system_uid) { pod_name } + let(:current_pods) { {pod_name => {:label_name => pod_label, :last_state_terminated => false, :container_restarts => 0, :running => pod_running}} } before do + allow(worker.class).to receive(:containerized_worker?).and_return(true) allow(worker.class).to receive(:rails_worker?).and_return(rails_worker) server.worker_manager.current_pods = current_pods end @@ -275,12 +277,28 @@ context "with a worker that doesn't have a system_uid yet" do let(:system_uid) { nil } + before { server.worker_manager.sync_config } + context "without a pod" do let(:current_pods) { {} } it "returns the worker as still starting" do expect(server.worker_manager.sync_starting_workers).to include(worker) end + + context "with a worker that has been starting longer than the starting_timeout" do + let(:last_heartbeat) { 20.minutes.ago.utc } + + it "marks the worker as not responding" do + # Make sure that #find_worker returns our instance of worker that + # that stubs the #stop_container method. + expect(server.worker_manager).to receive(:find_worker).with(worker).and_return(worker) + expect(worker).to receive(:stop_container) + + server.worker_manager.sync_starting_workers + expect(worker.reload.status).to eq("stopping") + end + end end context "with a pod that is running" do