Skip to content

Commit

Permalink
fix: replace helpers.Retry with go-retry and adjust delay (#2856)
Browse files Browse the repository at this point in the history
Signed-off-by: Philip Laine <philip.laine@gmail.com>
  • Loading branch information
phillebaba committed Aug 12, 2024
1 parent 9f7b178 commit 3ee850e
Show file tree
Hide file tree
Showing 14 changed files with 154 additions and 229 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ require (
github.com/anchore/clio v0.0.0-20240705045624-ac88e09ad9d0
github.com/anchore/stereoscope v0.0.1
github.com/anchore/syft v0.100.0
github.com/avast/retry-go/v4 v4.6.0
github.com/defenseunicorns/pkg/helpers/v2 v2.0.1
github.com/defenseunicorns/pkg/kubernetes v0.2.0
github.com/defenseunicorns/pkg/oci v1.0.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,8 @@ github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2/go.mod h1:W
github.com/atomicgo/cursor v0.0.1/go.mod h1:cBON2QmmrysudxNBFthvMtN32r3jxVRIvzkUiF/RuIk=
github.com/atotto/clipboard v0.1.4 h1:EH0zSVneZPSuFR11BlR9YppQTVDbh5+16AmcJi4g1z4=
github.com/atotto/clipboard v0.1.4/go.mod h1:ZY9tmq7sm5xIbd9bOK4onWV4S6X0u6GY7Vn0Yu86PYI=
github.com/avast/retry-go/v4 v4.6.0 h1:K9xNA+KeB8HHc2aWFuLb25Offp+0iVRXEvFx8IinRJA=
github.com/avast/retry-go/v4 v4.6.0/go.mod h1:gvWlPhBVsvBbLkVGDg/KwvBv0bEkCOLRRSHKIr2PyOE=
github.com/aws/aws-sdk-go v1.44.122/go.mod h1:y4AeaBuwd2Lk+GepC1E9v0qOiTws0MIWAX4oIKwKHZo=
github.com/aws/aws-sdk-go v1.54.9 h1:e0Czh9AhrCVPuyaIUnibYmih3cYexJKlqlHSJ2eMKbI=
github.com/aws/aws-sdk-go v1.54.9/go.mod h1:eRwEWoyTWFMVYVQzKMNHWP5/RV4xIUGMQfXQHfHkpNU=
Expand Down
10 changes: 4 additions & 6 deletions src/internal/packager/helm/chart.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/Masterminds/semver/v3"
"github.com/avast/retry-go/v4"
plutoversionsfile "github.com/fairwindsops/pluto/v5"
plutoapi "github.com/fairwindsops/pluto/v5/pkg/api"
goyaml "github.com/goccy/go-yaml"
Expand All @@ -23,8 +24,6 @@ import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"sigs.k8s.io/yaml"

"github.com/defenseunicorns/pkg/helpers/v2"

"github.com/zarf-dev/zarf/src/config"
"github.com/zarf-dev/zarf/src/pkg/message"
"github.com/zarf-dev/zarf/src/types"
Expand Down Expand Up @@ -59,7 +58,8 @@ func (h *Helm) InstallOrUpgradeChart(ctx context.Context) (types.ConnectStrings,
}

histClient := action.NewHistory(h.actionConfig)
tryHelm := func() error {

err = retry.Do(func() error {
var err error

releases, histErr := histClient.Run(h.chart.ReleaseName)
Expand Down Expand Up @@ -89,9 +89,7 @@ func (h *Helm) InstallOrUpgradeChart(ctx context.Context) (types.ConnectStrings,

spinner.Success()
return nil
}

err = helpers.Retry(tryHelm, h.retries, 5*time.Second, message.Warnf)
}, retry.Context(ctx), retry.Attempts(uint(h.retries)), retry.Delay(500*time.Millisecond))
if err != nil {
releases, _ := histClient.Run(h.chart.ReleaseName)
previouslyDeployedVersion := 0
Expand Down
27 changes: 12 additions & 15 deletions src/internal/packager/images/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ import (
"strings"
"sync"
"sync/atomic"
"time"

"github.com/avast/retry-go/v4"
"github.com/defenseunicorns/pkg/helpers/v2"
"github.com/google/go-containerregistry/pkg/crane"
"github.com/google/go-containerregistry/pkg/logs"
Expand Down Expand Up @@ -229,26 +229,23 @@ func Pull(ctx context.Context, cfg PullConfig) (map[transform.Image]v1.Image, er

toPull := maps.Clone(fetched)

sc := func() error {
err = retry.Do(func() error {
saved, err := SaveConcurrent(ctx, cranePath, toPull)
for k := range saved {
delete(toPull, k)
}
return err
}

ss := func() error {
saved, err := SaveSequential(ctx, cranePath, toPull)
for k := range saved {
delete(toPull, k)
}
return err
}

if err := helpers.RetryWithContext(ctx, sc, 2, 5*time.Second, message.Warnf); err != nil {
}, retry.Context(ctx), retry.Attempts(2))
if err != nil {
message.Warnf("Failed to save images in parallel, falling back to sequential save: %s", err.Error())

if err := helpers.RetryWithContext(ctx, ss, 2, 5*time.Second, message.Warnf); err != nil {
err = retry.Do(func() error {
saved, err := SaveSequential(ctx, cranePath, toPull)
for k := range saved {
delete(toPull, k)
}
return err
}, retry.Context(ctx), retry.Attempts(2))
if err != nil {
return nil, err
}
}
Expand Down
7 changes: 5 additions & 2 deletions src/internal/packager/images/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@ import (
"fmt"
"time"

"github.com/avast/retry-go/v4"
"github.com/defenseunicorns/pkg/helpers/v2"
"github.com/google/go-containerregistry/pkg/crane"
"github.com/google/go-containerregistry/pkg/logs"
v1 "github.com/google/go-containerregistry/pkg/v1"

"github.com/zarf-dev/zarf/src/pkg/cluster"
"github.com/zarf-dev/zarf/src/pkg/message"
"github.com/zarf-dev/zarf/src/pkg/transform"
Expand Down Expand Up @@ -54,7 +56,7 @@ func Push(ctx context.Context, cfg PushConfig) error {
progress := message.NewProgressBar(totalSize, fmt.Sprintf("Pushing %d images", len(toPush)))
defer progress.Close()

if err := helpers.Retry(func() error {
err = retry.Do(func() error {
c, _ := cluster.NewCluster()
if c != nil {
registryURL, tunnel, err = c.ConnectToZarfRegistryEndpoint(ctx, cfg.RegInfo)
Expand Down Expand Up @@ -123,7 +125,8 @@ func Push(ctx context.Context, cfg PushConfig) error {
totalSize -= size
}
return nil
}, cfg.Retries, 5*time.Second, message.Warnf); err != nil {
}, retry.Context(ctx), retry.Attempts(uint(cfg.Retries)), retry.Delay(500*time.Millisecond))
if err != nil {
return err
}

Expand Down
62 changes: 20 additions & 42 deletions src/pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"k8s.io/client-go/rest"
"sigs.k8s.io/cli-utils/pkg/kstatus/watcher"

"github.com/avast/retry-go/v4"
pkgkubernetes "github.com/defenseunicorns/pkg/kubernetes"

"github.com/zarf-dev/zarf/src/pkg/message"
Expand Down Expand Up @@ -44,7 +45,25 @@ func NewClusterWithWait(ctx context.Context) (*Cluster, error) {
if err != nil {
return nil, err
}
err = waitForHealthyCluster(ctx, c.Clientset)
err = retry.Do(func() error {
nodeList, err := c.Clientset.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
if err != nil {
return err
}
if len(nodeList.Items) < 1 {
return fmt.Errorf("cluster does not have any nodes")
}
pods, err := c.Clientset.CoreV1().Pods(corev1.NamespaceAll).List(ctx, metav1.ListOptions{})
if err != nil {
return err
}
for _, pod := range pods.Items {
if pod.Status.Phase == corev1.PodSucceeded || pod.Status.Phase == corev1.PodRunning {
return nil
}
}
return fmt.Errorf("no pods are in succeeded or running state")
}, retry.Context(ctx), retry.Attempts(0), retry.DelayType(retry.FixedDelay), retry.Delay(time.Second))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -77,44 +96,3 @@ func NewCluster() (*Cluster, error) {
}
return c, nil
}

// WaitForHealthyCluster checks for an available K8s cluster every second until timeout.
func waitForHealthyCluster(ctx context.Context, client kubernetes.Interface) error {
const waitDuration = 1 * time.Second

timer := time.NewTimer(0)
defer timer.Stop()

for {
select {
case <-ctx.Done():
return fmt.Errorf("error waiting for cluster to report healthy: %w", ctx.Err())
case <-timer.C:
// Make sure there is at least one running Node
nodeList, err := client.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
if err != nil || len(nodeList.Items) < 1 {
message.Debugf("No nodes reporting healthy yet: %v\n", err)
timer.Reset(waitDuration)
continue
}

// Get the cluster pod list
pods, err := client.CoreV1().Pods(corev1.NamespaceAll).List(ctx, metav1.ListOptions{})
if err != nil {
message.Debugf("Could not get the pod list: %v", err)
timer.Reset(waitDuration)
continue
}

// Check that at least one pod is in the 'succeeded' or 'running' state
for _, pod := range pods.Items {
if pod.Status.Phase == corev1.PodSucceeded || pod.Status.Phase == corev1.PodRunning {
return nil
}
}

message.Debug("No pods reported 'succeeded' or 'running' state yet.")
timer.Reset(waitDuration)
}
}
}
114 changes: 54 additions & 60 deletions src/pkg/cluster/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"

"github.com/avast/retry-go/v4"
"github.com/defenseunicorns/pkg/helpers/v2"

"github.com/zarf-dev/zarf/src/api/v1alpha1"
Expand Down Expand Up @@ -59,6 +60,7 @@ func (c *Cluster) HandleDataInjection(ctx context.Context, data v1alpha1.ZarfDat
return fmt.Errorf("unable to execute tar, ensure it is installed in the $PATH: %w", err)
}

// TODO: Refactor to use retry.
for {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -180,76 +182,68 @@ type podFilter func(pod corev1.Pod) bool
func waitForPodsAndContainers(ctx context.Context, clientset kubernetes.Interface, target podLookup, include podFilter) ([]corev1.Pod, error) {
waitCtx, cancel := context.WithTimeout(ctx, 90*time.Second)
defer cancel()

timer := time.NewTimer(0)
defer timer.Stop()

for {
select {
case <-waitCtx.Done():
return nil, ctx.Err()
case <-timer.C:
listOpts := metav1.ListOptions{
LabelSelector: target.Selector,
}
podList, err := clientset.CoreV1().Pods(target.Namespace).List(ctx, listOpts)
if err != nil {
return nil, err
readyPods, err := retry.DoWithData(func() ([]corev1.Pod, error) {
listOpts := metav1.ListOptions{
LabelSelector: target.Selector,
}
podList, err := clientset.CoreV1().Pods(target.Namespace).List(waitCtx, listOpts)
if err != nil {
return nil, err
}
message.Debugf("Found %d pods for target %#v", len(podList.Items), target)
// Sort the pods from newest to oldest
sort.Slice(podList.Items, func(i, j int) bool {
return podList.Items[i].CreationTimestamp.After(podList.Items[j].CreationTimestamp.Time)
})

readyPods := []corev1.Pod{}
for _, pod := range podList.Items {
message.Debugf("Testing pod %q", pod.Name)

// If an include function is provided, only keep pods that return true
if include != nil && !include(pod) {
continue
}

message.Debugf("Found %d pods for target %#v", len(podList.Items), target)

var readyPods = []corev1.Pod{}

// Sort the pods from newest to oldest
sort.Slice(podList.Items, func(i, j int) bool {
return podList.Items[i].CreationTimestamp.After(podList.Items[j].CreationTimestamp.Time)
})
// Handle container targeting
if target.Container != "" {
message.Debugf("Testing pod %q for container %q", pod.Name, target.Container)

for _, pod := range podList.Items {
message.Debugf("Testing pod %q", pod.Name)

// If an include function is provided, only keep pods that return true
if include != nil && !include(pod) {
continue
}

// Handle container targeting
if target.Container != "" {
message.Debugf("Testing pod %q for container %q", pod.Name, target.Container)

// Check the status of initContainers for a running match
for _, initContainer := range pod.Status.InitContainerStatuses {
isRunning := initContainer.State.Running != nil
if initContainer.Name == target.Container && isRunning {
// On running match in initContainer break this loop
readyPods = append(readyPods, pod)
break
}
// Check the status of initContainers for a running match
for _, initContainer := range pod.Status.InitContainerStatuses {
isRunning := initContainer.State.Running != nil
if initContainer.Name == target.Container && isRunning {
// On running match in initContainer break this loop
readyPods = append(readyPods, pod)
break
}
}

// Check the status of regular containers for a running match
for _, container := range pod.Status.ContainerStatuses {
isRunning := container.State.Running != nil
if container.Name == target.Container && isRunning {
readyPods = append(readyPods, pod)
break
}
}
} else {
status := pod.Status.Phase
message.Debugf("Testing pod %q phase, want (%q) got (%q)", pod.Name, corev1.PodRunning, status)
// Regular status checking without a container
if status == corev1.PodRunning {
// Check the status of regular containers for a running match
for _, container := range pod.Status.ContainerStatuses {
isRunning := container.State.Running != nil
if container.Name == target.Container && isRunning {
readyPods = append(readyPods, pod)
break
}
}
} else {
status := pod.Status.Phase
message.Debugf("Testing pod %q phase, want (%q) got (%q)", pod.Name, corev1.PodRunning, status)
// Regular status checking without a container
if status == corev1.PodRunning {
readyPods = append(readyPods, pod)
break
}
}
if len(readyPods) > 0 {
return readyPods, nil
}
timer.Reset(3 * time.Second)
}
if len(readyPods) == 0 {
return nil, fmt.Errorf("no ready pods found")
}
return readyPods, nil
}, retry.Context(waitCtx), retry.Attempts(0), retry.DelayType(retry.FixedDelay), retry.Delay(time.Second))
if err != nil {
return nil, err
}
return readyPods, nil
}
Loading

0 comments on commit 3ee850e

Please sign in to comment.