From 890c3891721bc04144a46b0f418d731f9d9b8c7e Mon Sep 17 00:00:00 2001 From: jokestax Date: Tue, 13 Aug 2024 00:58:23 +0530 Subject: [PATCH 1/5] fix: deployment watch fix --- internal/k8s/exec.go | 36 ++++++++++++++++++++++++++++-------- 1 file changed, 28 insertions(+), 8 deletions(-) diff --git a/internal/k8s/exec.go b/internal/k8s/exec.go index 6b364c88..ad0709a8 100644 --- a/internal/k8s/exec.go +++ b/internal/k8s/exec.go @@ -18,6 +18,7 @@ import ( appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/tools/remotecommand" @@ -170,14 +171,33 @@ func ReturnDeploymentObject(clientset *kubernetes.Clientset, matchLabel string, log.Info().Msgf("waiting for %s Deployment to be created", matchLabelValue) - // Create watch operation - objWatch, err := clientset. - AppsV1(). - Deployments(namespace). - Watch(context.Background(), deploymentListOptions) - if err != nil { - log.Error().Msgf("error when attempting to search for Deployment: %s", err) - return nil, err + var objWatch watch.Interface + var err error + + timeout := 20 * time.Second + startTime := time.Now() + + // Create watch operation with retries + for { + // Attempt to watch the deployment + objWatch, err = clientset.AppsV1().Deployments(namespace).Watch(context.Background(), deploymentListOptions) + + // Check if the timeout has been reached + if time.Since(startTime) > timeout { + if err != nil { + log.Printf("Error when attempting to watch Deployment: %s", err) + } else { + log.Printf("Timeout reached while watching for Deployment") + } + return nil, fmt.Errorf("timeout reached while watching for Deployment") + } + + if err == nil { + break + } + + log.Printf("Error when attempting to watch Deployment: %s. Retrying...", err) + time.Sleep(1 * time.Second) } objChan := objWatch.ResultChan() From e4af23a1a0816b2aa97bae6bd77f2f39175751e0 Mon Sep 17 00:00:00 2001 From: jokestax Date: Tue, 13 Aug 2024 01:01:05 +0530 Subject: [PATCH 2/5] return error format --- internal/k8s/exec.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/k8s/exec.go b/internal/k8s/exec.go index ad0709a8..921558f0 100644 --- a/internal/k8s/exec.go +++ b/internal/k8s/exec.go @@ -215,7 +215,7 @@ func ReturnDeploymentObject(clientset *kubernetes.Clientset, matchLabel string, spec, err := clientset.AppsV1().Deployments(namespace).List(context.Background(), deploymentListOptions) if err != nil { log.Error().Msgf("Error when searching for Deployment: %s", err) - return nil, err + return nil, fmt.Errorf("Error when searching for Deployment: %w", err) } return &spec.Items[0], nil } From 00f063bfefbf6b2d1ff28c4ed7cc5d7fe21c0baf Mon Sep 17 00:00:00 2001 From: jokestax Date: Tue, 13 Aug 2024 01:28:47 +0530 Subject: [PATCH 3/5] changed logic --- internal/k8s/exec.go | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/internal/k8s/exec.go b/internal/k8s/exec.go index 921558f0..6cb7ef1e 100644 --- a/internal/k8s/exec.go +++ b/internal/k8s/exec.go @@ -182,20 +182,16 @@ func ReturnDeploymentObject(clientset *kubernetes.Clientset, matchLabel string, // Attempt to watch the deployment objWatch, err = clientset.AppsV1().Deployments(namespace).Watch(context.Background(), deploymentListOptions) + if err == nil { + break + } + // Check if the timeout has been reached if time.Since(startTime) > timeout { - if err != nil { - log.Printf("Error when attempting to watch Deployment: %s", err) - } else { - log.Printf("Timeout reached while watching for Deployment") - } + log.Printf("Error when attempting to watch Deployment: %s", err) return nil, fmt.Errorf("timeout reached while watching for Deployment") } - if err == nil { - break - } - log.Printf("Error when attempting to watch Deployment: %s. Retrying...", err) time.Sleep(1 * time.Second) } From 6625d68ff098990a6d5bf39735d6b274a022d265 Mon Sep 17 00:00:00 2001 From: jokestax Date: Tue, 13 Aug 2024 03:57:25 +0530 Subject: [PATCH 4/5] refactor return-deployment --- internal/k8s/exec.go | 84 ++++++++++++++++++-------------------------- 1 file changed, 35 insertions(+), 49 deletions(-) diff --git a/internal/k8s/exec.go b/internal/k8s/exec.go index 6cb7ef1e..16588c79 100644 --- a/internal/k8s/exec.go +++ b/internal/k8s/exec.go @@ -8,9 +8,11 @@ package k8s import ( "context" + "errors" "fmt" "io" "os" + "syscall" "time" "github.com/rs/zerolog/log" @@ -18,7 +20,7 @@ import ( appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/watch" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/tools/remotecommand" @@ -161,65 +163,49 @@ func podExec(kubeConfigPath string, ps *PodSessionOptions, pe v1.PodExecOptions, return nil } -// ReturnDeploymentObject returns a matching appsv1.Deployment object based on the filters -func ReturnDeploymentObject(clientset *kubernetes.Clientset, matchLabel string, matchLabelValue string, namespace string, timeoutSeconds int) (*appsv1.Deployment, error) { +func ReturnDeploymentObject(client kubernetes.Interface, matchLabel string, matchLabelValue string, namespace string, timeoutSeconds int) (*appsv1.Deployment, error) { - // Filter - deploymentListOptions := metav1.ListOptions{ - LabelSelector: fmt.Sprintf("%s=%s", matchLabel, matchLabelValue), + timeout := time.Duration(timeoutSeconds) * time.Second + var deployment *appsv1.Deployment + labelSelector := map[string]string{ + matchLabel: matchLabelValue, } - log.Info().Msgf("waiting for %s Deployment to be created", matchLabelValue) - - var objWatch watch.Interface - var err error - - timeout := 20 * time.Second - startTime := time.Now() - - // Create watch operation with retries - for { - // Attempt to watch the deployment - objWatch, err = clientset.AppsV1().Deployments(namespace).Watch(context.Background(), deploymentListOptions) + err := wait.PollImmediate(15*time.Second, timeout, func() (bool, error) { + deployments, err := client.AppsV1().Deployments(namespace).List(context.Background(), metav1.ListOptions{ + LabelSelector: metav1.FormatLabelSelector(metav1.SetAsLabelSelector(labelSelector)), + }) + if err != nil { + // if we couldn't connect, ask to try again + if errors.Is(err, syscall.ECONNREFUSED) { + return false, nil + } - if err == nil { - break + // if we got an error, return it + return false, fmt.Errorf("error getting Deployment: %w", err) } - // Check if the timeout has been reached - if time.Since(startTime) > timeout { - log.Printf("Error when attempting to watch Deployment: %s", err) - return nil, fmt.Errorf("timeout reached while watching for Deployment") + // if we couldn't find any deployments, ask to try again + if len(deployments.Items) == 0 { + return false, nil } - log.Printf("Error when attempting to watch Deployment: %s. Retrying...", err) - time.Sleep(1 * time.Second) - } + // fetch the first item from the list matching the labels + deployment = &deployments.Items[0] - objChan := objWatch.ResultChan() - for { - select { - case event, ok := <-objChan: - time.Sleep(time.Second * 15) - if !ok { - // Error if the channel closes - log.Error().Msgf("error waiting for %s Deployment to be created: %s", matchLabelValue, err) - return nil, fmt.Errorf("error waiting for %s Deployment to be created: %s", matchLabelValue, err) - } - if event. - Object.(*appsv1.Deployment).Status.Replicas > 0 { - spec, err := clientset.AppsV1().Deployments(namespace).List(context.Background(), deploymentListOptions) - if err != nil { - log.Error().Msgf("Error when searching for Deployment: %s", err) - return nil, fmt.Errorf("Error when searching for Deployment: %w", err) - } - return &spec.Items[0], nil - } - case <-time.After(time.Duration(timeoutSeconds) * time.Second): - log.Error().Msg("the Deployment was not created within the timeout period") - return nil, fmt.Errorf("the Deployment was not created within the timeout period") + // Check if it has at least one replica, if not, ask to try again + if deployment.Status.Replicas == 0 { + return false, nil } + + // if we found a deployment, return it + return true, nil + }) + if err != nil { + return nil, fmt.Errorf("error waiting for Deployment: %w", err) } + + return deployment, nil } // ReturnPodObject returns a matching v1.Pod object based on the filters From dd6d61f3f7b4766102ef3c696972f23154a56235 Mon Sep 17 00:00:00 2001 From: jokestax Date: Tue, 13 Aug 2024 04:11:06 +0530 Subject: [PATCH 5/5] edit label-selector --- internal/k8s/exec.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/internal/k8s/exec.go b/internal/k8s/exec.go index 16588c79..ea0dddfe 100644 --- a/internal/k8s/exec.go +++ b/internal/k8s/exec.go @@ -167,13 +167,10 @@ func ReturnDeploymentObject(client kubernetes.Interface, matchLabel string, matc timeout := time.Duration(timeoutSeconds) * time.Second var deployment *appsv1.Deployment - labelSelector := map[string]string{ - matchLabel: matchLabelValue, - } err := wait.PollImmediate(15*time.Second, timeout, func() (bool, error) { deployments, err := client.AppsV1().Deployments(namespace).List(context.Background(), metav1.ListOptions{ - LabelSelector: metav1.FormatLabelSelector(metav1.SetAsLabelSelector(labelSelector)), + LabelSelector: fmt.Sprintf("%s=%s", matchLabel, matchLabelValue), }) if err != nil { // if we couldn't connect, ask to try again