Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: replace helpers.Retry with go-retry and adjust delay #2856

Merged
merged 1 commit into from
Aug 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ require (
github.com/anchore/clio v0.0.0-20240705045624-ac88e09ad9d0
github.com/anchore/stereoscope v0.0.1
github.com/anchore/syft v0.100.0
github.com/avast/retry-go/v4 v4.6.0
github.com/defenseunicorns/pkg/helpers/v2 v2.0.1
github.com/defenseunicorns/pkg/kubernetes v0.2.0
github.com/defenseunicorns/pkg/oci v1.0.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,8 @@ github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2/go.mod h1:W
github.com/atomicgo/cursor v0.0.1/go.mod h1:cBON2QmmrysudxNBFthvMtN32r3jxVRIvzkUiF/RuIk=
github.com/atotto/clipboard v0.1.4 h1:EH0zSVneZPSuFR11BlR9YppQTVDbh5+16AmcJi4g1z4=
github.com/atotto/clipboard v0.1.4/go.mod h1:ZY9tmq7sm5xIbd9bOK4onWV4S6X0u6GY7Vn0Yu86PYI=
github.com/avast/retry-go/v4 v4.6.0 h1:K9xNA+KeB8HHc2aWFuLb25Offp+0iVRXEvFx8IinRJA=
github.com/avast/retry-go/v4 v4.6.0/go.mod h1:gvWlPhBVsvBbLkVGDg/KwvBv0bEkCOLRRSHKIr2PyOE=
github.com/aws/aws-sdk-go v1.44.122/go.mod h1:y4AeaBuwd2Lk+GepC1E9v0qOiTws0MIWAX4oIKwKHZo=
github.com/aws/aws-sdk-go v1.54.9 h1:e0Czh9AhrCVPuyaIUnibYmih3cYexJKlqlHSJ2eMKbI=
github.com/aws/aws-sdk-go v1.54.9/go.mod h1:eRwEWoyTWFMVYVQzKMNHWP5/RV4xIUGMQfXQHfHkpNU=
Expand Down
10 changes: 4 additions & 6 deletions src/internal/packager/helm/chart.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/Masterminds/semver/v3"
"github.com/avast/retry-go/v4"
plutoversionsfile "github.com/fairwindsops/pluto/v5"
plutoapi "github.com/fairwindsops/pluto/v5/pkg/api"
goyaml "github.com/goccy/go-yaml"
Expand All @@ -23,8 +24,6 @@ import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"sigs.k8s.io/yaml"

"github.com/defenseunicorns/pkg/helpers/v2"

"github.com/zarf-dev/zarf/src/config"
"github.com/zarf-dev/zarf/src/pkg/message"
"github.com/zarf-dev/zarf/src/types"
Expand Down Expand Up @@ -59,7 +58,8 @@ func (h *Helm) InstallOrUpgradeChart(ctx context.Context) (types.ConnectStrings,
}

histClient := action.NewHistory(h.actionConfig)
tryHelm := func() error {

err = retry.Do(func() error {
var err error

releases, histErr := histClient.Run(h.chart.ReleaseName)
Expand Down Expand Up @@ -89,9 +89,7 @@ func (h *Helm) InstallOrUpgradeChart(ctx context.Context) (types.ConnectStrings,

spinner.Success()
return nil
}

err = helpers.Retry(tryHelm, h.retries, 5*time.Second, message.Warnf)
}, retry.Context(ctx), retry.Attempts(uint(h.retries)), retry.Delay(500*time.Millisecond))
if err != nil {
releases, _ := histClient.Run(h.chart.ReleaseName)
previouslyDeployedVersion := 0
Expand Down
27 changes: 12 additions & 15 deletions src/internal/packager/images/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ import (
"strings"
"sync"
"sync/atomic"
"time"

"github.com/avast/retry-go/v4"
"github.com/defenseunicorns/pkg/helpers/v2"
"github.com/google/go-containerregistry/pkg/crane"
"github.com/google/go-containerregistry/pkg/logs"
Expand Down Expand Up @@ -229,26 +229,23 @@ func Pull(ctx context.Context, cfg PullConfig) (map[transform.Image]v1.Image, er

toPull := maps.Clone(fetched)

sc := func() error {
err = retry.Do(func() error {
saved, err := SaveConcurrent(ctx, cranePath, toPull)
for k := range saved {
delete(toPull, k)
}
return err
}

ss := func() error {
saved, err := SaveSequential(ctx, cranePath, toPull)
for k := range saved {
delete(toPull, k)
}
return err
}

if err := helpers.RetryWithContext(ctx, sc, 2, 5*time.Second, message.Warnf); err != nil {
}, retry.Context(ctx), retry.Attempts(2))
if err != nil {
message.Warnf("Failed to save images in parallel, falling back to sequential save: %s", err.Error())

if err := helpers.RetryWithContext(ctx, ss, 2, 5*time.Second, message.Warnf); err != nil {
err = retry.Do(func() error {
saved, err := SaveSequential(ctx, cranePath, toPull)
for k := range saved {
delete(toPull, k)
}
return err
}, retry.Context(ctx), retry.Attempts(2))
if err != nil {
return nil, err
}
}
Expand Down
7 changes: 5 additions & 2 deletions src/internal/packager/images/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@ import (
"fmt"
"time"

"github.com/avast/retry-go/v4"
"github.com/defenseunicorns/pkg/helpers/v2"
"github.com/google/go-containerregistry/pkg/crane"
"github.com/google/go-containerregistry/pkg/logs"
v1 "github.com/google/go-containerregistry/pkg/v1"

"github.com/zarf-dev/zarf/src/pkg/cluster"
"github.com/zarf-dev/zarf/src/pkg/message"
"github.com/zarf-dev/zarf/src/pkg/transform"
Expand Down Expand Up @@ -54,7 +56,7 @@ func Push(ctx context.Context, cfg PushConfig) error {
progress := message.NewProgressBar(totalSize, fmt.Sprintf("Pushing %d images", len(toPush)))
defer progress.Close()

if err := helpers.Retry(func() error {
err = retry.Do(func() error {
c, _ := cluster.NewCluster()
if c != nil {
registryURL, tunnel, err = c.ConnectToZarfRegistryEndpoint(ctx, cfg.RegInfo)
Expand Down Expand Up @@ -123,7 +125,8 @@ func Push(ctx context.Context, cfg PushConfig) error {
totalSize -= size
}
return nil
}, cfg.Retries, 5*time.Second, message.Warnf); err != nil {
}, retry.Context(ctx), retry.Attempts(uint(cfg.Retries)), retry.Delay(500*time.Millisecond))
if err != nil {
return err
}

Expand Down
62 changes: 20 additions & 42 deletions src/pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"k8s.io/client-go/rest"
"sigs.k8s.io/cli-utils/pkg/kstatus/watcher"

"github.com/avast/retry-go/v4"
pkgkubernetes "github.com/defenseunicorns/pkg/kubernetes"

"github.com/zarf-dev/zarf/src/pkg/message"
Expand Down Expand Up @@ -44,7 +45,25 @@ func NewClusterWithWait(ctx context.Context) (*Cluster, error) {
if err != nil {
return nil, err
}
err = waitForHealthyCluster(ctx, c.Clientset)
err = retry.Do(func() error {
nodeList, err := c.Clientset.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
if err != nil {
return err
}
if len(nodeList.Items) < 1 {
return fmt.Errorf("cluster does not have any nodes")
}
pods, err := c.Clientset.CoreV1().Pods(corev1.NamespaceAll).List(ctx, metav1.ListOptions{})
if err != nil {
return err
}
for _, pod := range pods.Items {
if pod.Status.Phase == corev1.PodSucceeded || pod.Status.Phase == corev1.PodRunning {
return nil
}
}
return fmt.Errorf("no pods are in succeeded or running state")
}, retry.Context(ctx), retry.Attempts(0), retry.DelayType(retry.FixedDelay), retry.Delay(time.Second))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -77,44 +96,3 @@ func NewCluster() (*Cluster, error) {
}
return c, nil
}

// WaitForHealthyCluster checks for an available K8s cluster every second until timeout.
func waitForHealthyCluster(ctx context.Context, client kubernetes.Interface) error {
const waitDuration = 1 * time.Second

timer := time.NewTimer(0)
defer timer.Stop()

for {
select {
case <-ctx.Done():
return fmt.Errorf("error waiting for cluster to report healthy: %w", ctx.Err())
case <-timer.C:
// Make sure there is at least one running Node
nodeList, err := client.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
if err != nil || len(nodeList.Items) < 1 {
message.Debugf("No nodes reporting healthy yet: %v\n", err)
timer.Reset(waitDuration)
continue
}

// Get the cluster pod list
pods, err := client.CoreV1().Pods(corev1.NamespaceAll).List(ctx, metav1.ListOptions{})
if err != nil {
message.Debugf("Could not get the pod list: %v", err)
timer.Reset(waitDuration)
continue
}

// Check that at least one pod is in the 'succeeded' or 'running' state
for _, pod := range pods.Items {
if pod.Status.Phase == corev1.PodSucceeded || pod.Status.Phase == corev1.PodRunning {
return nil
}
}

message.Debug("No pods reported 'succeeded' or 'running' state yet.")
timer.Reset(waitDuration)
}
}
}
114 changes: 54 additions & 60 deletions src/pkg/cluster/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"

"github.com/avast/retry-go/v4"
"github.com/defenseunicorns/pkg/helpers/v2"

"github.com/zarf-dev/zarf/src/api/v1alpha1"
Expand Down Expand Up @@ -59,6 +60,7 @@ func (c *Cluster) HandleDataInjection(ctx context.Context, data v1alpha1.ZarfDat
return fmt.Errorf("unable to execute tar, ensure it is installed in the $PATH: %w", err)
}

// TODO: Refactor to use retry.
for {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -180,76 +182,68 @@ type podFilter func(pod corev1.Pod) bool
func waitForPodsAndContainers(ctx context.Context, clientset kubernetes.Interface, target podLookup, include podFilter) ([]corev1.Pod, error) {
waitCtx, cancel := context.WithTimeout(ctx, 90*time.Second)
defer cancel()

timer := time.NewTimer(0)
defer timer.Stop()

for {
select {
case <-waitCtx.Done():
return nil, ctx.Err()
case <-timer.C:
listOpts := metav1.ListOptions{
LabelSelector: target.Selector,
}
podList, err := clientset.CoreV1().Pods(target.Namespace).List(ctx, listOpts)
if err != nil {
return nil, err
readyPods, err := retry.DoWithData(func() ([]corev1.Pod, error) {
listOpts := metav1.ListOptions{
LabelSelector: target.Selector,
}
podList, err := clientset.CoreV1().Pods(target.Namespace).List(waitCtx, listOpts)
if err != nil {
return nil, err
}
message.Debugf("Found %d pods for target %#v", len(podList.Items), target)
// Sort the pods from newest to oldest
sort.Slice(podList.Items, func(i, j int) bool {
return podList.Items[i].CreationTimestamp.After(podList.Items[j].CreationTimestamp.Time)
})

readyPods := []corev1.Pod{}
for _, pod := range podList.Items {
message.Debugf("Testing pod %q", pod.Name)

// If an include function is provided, only keep pods that return true
if include != nil && !include(pod) {
continue
}

message.Debugf("Found %d pods for target %#v", len(podList.Items), target)

var readyPods = []corev1.Pod{}

// Sort the pods from newest to oldest
sort.Slice(podList.Items, func(i, j int) bool {
return podList.Items[i].CreationTimestamp.After(podList.Items[j].CreationTimestamp.Time)
})
// Handle container targeting
if target.Container != "" {
message.Debugf("Testing pod %q for container %q", pod.Name, target.Container)

for _, pod := range podList.Items {
message.Debugf("Testing pod %q", pod.Name)

// If an include function is provided, only keep pods that return true
if include != nil && !include(pod) {
continue
}

// Handle container targeting
if target.Container != "" {
message.Debugf("Testing pod %q for container %q", pod.Name, target.Container)

// Check the status of initContainers for a running match
for _, initContainer := range pod.Status.InitContainerStatuses {
isRunning := initContainer.State.Running != nil
if initContainer.Name == target.Container && isRunning {
// On running match in initContainer break this loop
readyPods = append(readyPods, pod)
break
}
// Check the status of initContainers for a running match
for _, initContainer := range pod.Status.InitContainerStatuses {
isRunning := initContainer.State.Running != nil
if initContainer.Name == target.Container && isRunning {
// On running match in initContainer break this loop
readyPods = append(readyPods, pod)
break
}
}

// Check the status of regular containers for a running match
for _, container := range pod.Status.ContainerStatuses {
isRunning := container.State.Running != nil
if container.Name == target.Container && isRunning {
readyPods = append(readyPods, pod)
break
}
}
} else {
status := pod.Status.Phase
message.Debugf("Testing pod %q phase, want (%q) got (%q)", pod.Name, corev1.PodRunning, status)
// Regular status checking without a container
if status == corev1.PodRunning {
// Check the status of regular containers for a running match
for _, container := range pod.Status.ContainerStatuses {
isRunning := container.State.Running != nil
if container.Name == target.Container && isRunning {
readyPods = append(readyPods, pod)
break
}
}
} else {
status := pod.Status.Phase
message.Debugf("Testing pod %q phase, want (%q) got (%q)", pod.Name, corev1.PodRunning, status)
// Regular status checking without a container
if status == corev1.PodRunning {
readyPods = append(readyPods, pod)
break
}
}
if len(readyPods) > 0 {
return readyPods, nil
}
timer.Reset(3 * time.Second)
}
if len(readyPods) == 0 {
return nil, fmt.Errorf("no ready pods found")
}
return readyPods, nil
}, retry.Context(waitCtx), retry.Attempts(0), retry.DelayType(retry.FixedDelay), retry.Delay(time.Second))
if err != nil {
return nil, err
}
return readyPods, nil
}
Loading
Loading