Skip to content

Commit

Permalink
Using ClusterCacheTracker instead of remote.NewClusterClient
Browse files Browse the repository at this point in the history
  • Loading branch information
laozc committed Aug 1, 2023
1 parent c6341d3 commit 2eacd20
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 25 deletions.
20 changes: 17 additions & 3 deletions controllers/controllers_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/kubernetes/scheme"
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
"sigs.k8s.io/cluster-api/controllers/remote"
ctrl "sigs.k8s.io/controller-runtime"

infrav1 "sigs.k8s.io/cluster-api-provider-vsphere/apis/v1beta1"
Expand Down Expand Up @@ -62,13 +63,26 @@ func setup() {

testEnv = helpers.NewTestEnvironment()

tracker, err := remote.NewClusterCacheTracker(
testEnv.Manager,
remote.ClusterCacheTrackerOptions{
SecretCachingClient: nil,
ControllerName: "",
Log: nil,
Indexes: []remote.Index{remote.NodeProviderIDIndex},
},
)
if err != nil {
panic(fmt.Sprintf("unable to setup ClusterCacheTracker: %v", err))
}

if err := AddClusterControllerToManager(testEnv.GetContext(), testEnv.Manager, &infrav1.VSphereCluster{}); err != nil {
panic(fmt.Sprintf("unable to setup VsphereCluster controller: %v", err))
}
if err := AddMachineControllerToManager(testEnv.GetContext(), testEnv.Manager, &infrav1.VSphereMachine{}); err != nil {
panic(fmt.Sprintf("unable to setup VsphereMachine controller: %v", err))
}
if err := AddVMControllerToManager(testEnv.GetContext(), testEnv.Manager); err != nil {
if err := AddVMControllerToManager(testEnv.GetContext(), testEnv.Manager, tracker); err != nil {
panic(fmt.Sprintf("unable to setup VsphereVM controller: %v", err))
}
if err := AddVsphereClusterIdentityControllerToManager(testEnv.GetContext(), testEnv.Manager); err != nil {
Expand All @@ -77,10 +91,10 @@ func setup() {
if err := AddVSphereDeploymentZoneControllerToManager(testEnv.GetContext(), testEnv.Manager); err != nil {
panic(fmt.Sprintf("unable to setup VSphereDeploymentZone controller: %v", err))
}
if err := AddServiceAccountProviderControllerToManager(testEnv.GetContext(), testEnv.Manager); err != nil {
if err := AddServiceAccountProviderControllerToManager(testEnv.GetContext(), testEnv.Manager, tracker); err != nil {
panic(fmt.Sprintf("unable to setup ServiceAccount controller: %v", err))
}
if err := AddServiceDiscoveryControllerToManager(testEnv.GetContext(), testEnv.Manager); err != nil {
if err := AddServiceDiscoveryControllerToManager(testEnv.GetContext(), testEnv.Manager, tracker); err != nil {
panic(fmt.Sprintf("unable to setup SvcDiscovery controller: %v", err))
}

Expand Down
10 changes: 5 additions & 5 deletions controllers/serviceaccount_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ const (
)

// AddServiceAccountProviderControllerToManager adds this controller to the provided manager.
func AddServiceAccountProviderControllerToManager(ctx *context.ControllerManagerContext, mgr manager.Manager) error {
func AddServiceAccountProviderControllerToManager(ctx *context.ControllerManagerContext, mgr manager.Manager, tracker *remote.ClusterCacheTracker) error {
var (
controlledType = &vmwarev1.ProviderServiceAccount{}
controlledTypeName = reflect.TypeOf(controlledType).Elem().Name()
Expand All @@ -80,8 +80,8 @@ func AddServiceAccountProviderControllerToManager(ctx *context.ControllerManager
Logger: ctx.Logger.WithName(controllerNameShort),
}
r := ServiceAccountReconciler{
ControllerContext: controllerContext,
remoteClientGetter: remote.NewClusterClient,
ControllerContext: controllerContext,
RemoteClusterCacheTracker: tracker,
}

clusterToInfraFn := clusterToSupervisorInfrastructureMapFunc(ctx)
Expand Down Expand Up @@ -130,7 +130,7 @@ func clusterToSupervisorInfrastructureMapFunc(managerContext *context.Controller
type ServiceAccountReconciler struct {
*context.ControllerContext

remoteClientGetter remote.ClusterClientGetter
RemoteClusterCacheTracker *remote.ClusterCacheTracker
}

func (r ServiceAccountReconciler) Reconcile(_ goctx.Context, req reconcile.Request) (_ reconcile.Result, reterr error) {
Expand Down Expand Up @@ -198,7 +198,7 @@ func (r ServiceAccountReconciler) Reconcile(_ goctx.Context, req reconcile.Reque
// then just return a no-op and wait for the next sync. This will occur when
// the Cluster's status is updated with a reference to the secret that has
// the Kubeconfig data used to access the target cluster.
guestClient, err := r.remoteClientGetter(clusterContext, ProviderServiceAccountControllerName, clusterContext.Client, client.ObjectKeyFromObject(cluster))
guestClient, err := r.RemoteClusterCacheTracker.GetClient(clusterContext, client.ObjectKeyFromObject(cluster))
if err != nil {
clusterContext.Logger.Info("The control plane is not ready yet", "err", err)
return reconcile.Result{RequeueAfter: clusterNotReadyRequeueTime}, nil
Expand Down
10 changes: 5 additions & 5 deletions controllers/servicediscovery_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ const (
// +kubebuilder:rbac:groups="",resources=configmaps/status,verbs=get

// AddServiceDiscoveryControllerToManager adds the ServiceDiscovery controller to the provided manager.
func AddServiceDiscoveryControllerToManager(ctx *context.ControllerManagerContext, mgr manager.Manager) error {
func AddServiceDiscoveryControllerToManager(ctx *context.ControllerManagerContext, mgr manager.Manager, tracker *remote.ClusterCacheTracker) error {
var (
controllerNameShort = ServiceDiscoveryControllerName
controllerNameLong = fmt.Sprintf("%s/%s/%s", ctx.Namespace, ctx.Name, ServiceDiscoveryControllerName)
Expand All @@ -82,8 +82,8 @@ func AddServiceDiscoveryControllerToManager(ctx *context.ControllerManagerContex
Logger: ctx.Logger.WithName(controllerNameShort),
}
r := serviceDiscoveryReconciler{
ControllerContext: controllerContext,
remoteClientGetter: remote.NewClusterClient,
ControllerContext: controllerContext,
RemoteClusterCacheTracker: tracker,
}

configMapCache, err := cache.New(mgr.GetConfig(), cache.Options{
Expand Down Expand Up @@ -124,7 +124,7 @@ func AddServiceDiscoveryControllerToManager(ctx *context.ControllerManagerContex
type serviceDiscoveryReconciler struct {
*context.ControllerContext

remoteClientGetter remote.ClusterClientGetter
RemoteClusterCacheTracker *remote.ClusterCacheTracker
}

func (r serviceDiscoveryReconciler) Reconcile(_ goctx.Context, req reconcile.Request) (_ reconcile.Result, reterr error) {
Expand Down Expand Up @@ -185,7 +185,7 @@ func (r serviceDiscoveryReconciler) Reconcile(_ goctx.Context, req reconcile.Req

// We cannot proceed until we are able to access the target cluster. Until
// then just return a no-op and wait for the next sync.
guestClient, err := r.remoteClientGetter(clusterContext, ServiceDiscoveryControllerName, clusterContext.Client, client.ObjectKeyFromObject(cluster))
guestClient, err := r.RemoteClusterCacheTracker.GetClient(clusterContext, client.ObjectKeyFromObject(cluster))
if err != nil {
logger.Info("The control plane is not ready yet", "err", err)
return reconcile.Result{RequeueAfter: clusterNotReadyRequeueTime}, nil
Expand Down
12 changes: 7 additions & 5 deletions controllers/vspherevm_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ import (
// AddVMControllerToManager adds the VM controller to the provided manager.
//
//nolint:forcetypeassert
func AddVMControllerToManager(ctx *context.ControllerManagerContext, mgr manager.Manager) error {
func AddVMControllerToManager(ctx *context.ControllerManagerContext, mgr manager.Manager, tracker *remote.ClusterCacheTracker) error {
var (
controlledType = &infrav1.VSphereVM{}
controlledTypeName = reflect.TypeOf(controlledType).Elem().Name()
Expand All @@ -87,8 +87,9 @@ func AddVMControllerToManager(ctx *context.ControllerManagerContext, mgr manager
Logger: ctx.Logger.WithName(controllerNameShort),
}
r := vmReconciler{
ControllerContext: controllerContext,
VMService: &govmomi.VMService{},
ControllerContext: controllerContext,
VMService: &govmomi.VMService{},
RemoteClusterCacheTracker: tracker,
}
controller, err := ctrl.NewControllerManagedBy(mgr).
// Watch the controlled, infrastructure resource.
Expand Down Expand Up @@ -156,7 +157,8 @@ func AddVMControllerToManager(ctx *context.ControllerManagerContext, mgr manager
type vmReconciler struct {
*context.ControllerContext

VMService services.VirtualMachineService
VMService services.VirtualMachineService
RemoteClusterCacheTracker *remote.ClusterCacheTracker
}

// Reconcile ensures the back-end state reflects the Kubernetes resource state intent.
Expand Down Expand Up @@ -366,7 +368,7 @@ func (r vmReconciler) deleteNode(ctx *context.VMContext, name string) error {
if err != nil {
return err
}
clusterClient, err := remote.NewClusterClient(ctx, r.ControllerContext.Name, r.Client, ctrlclient.ObjectKeyFromObject(cluster))
clusterClient, err := r.RemoteClusterCacheTracker.GetClient(ctx, ctrlclient.ObjectKeyFromObject(cluster))
if err != nil {
return err
}
Expand Down
56 changes: 49 additions & 7 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ import (
"sigs.k8s.io/cluster-api/controllers/remote"
"sigs.k8s.io/cluster-api/util/flags"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
ctrlmgr "sigs.k8s.io/controller-runtime/pkg/manager"
ctrlsig "sigs.k8s.io/controller-runtime/pkg/manager/signals"
"sigs.k8s.io/controller-runtime/pkg/webhook"
Expand Down Expand Up @@ -222,14 +224,54 @@ func main() {

// Create a function that adds all the controllers and webhooks to the manager.
addToManager := func(ctx *context.ControllerManagerContext, mgr ctrlmgr.Manager) error {
secretCachingClient, err := client.New(mgr.GetConfig(), client.Options{
HTTPClient: mgr.GetHTTPClient(),
Cache: &client.CacheOptions{
Reader: mgr.GetCache(),
},
})
if err != nil {
setupLog.Error(err, "unable to create secret caching client")
return err
}

// Set up a ClusterCacheTracker and ClusterCacheReconciler to provide to controllers
// requiring a connection to a remote cluster
log := ctrl.Log.WithName("remote").WithName("ClusterCacheTracker")
tracker, err := remote.NewClusterCacheTracker(
mgr,
remote.ClusterCacheTrackerOptions{
SecretCachingClient: secretCachingClient,
ControllerName: controllerName,
Log: &log,
Indexes: []remote.Index{remote.NodeProviderIDIndex},
},
)
if err != nil {
setupLog.Error(err, "unable to create cluster cache tracker")
return err
}

// TODO: waiting for other PR to add related flags first
if err := (&remote.ClusterCacheReconciler{
Client: mgr.GetClient(),
Tracker: tracker,
WatchFilterValue: "",
}).SetupWithManager(ctx, mgr, controller.Options{
MaxConcurrentReconciles: managerOpts.MaxConcurrentReconciles,
}); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "ClusterCacheReconciler")
os.Exit(1)
}

// Check for non-supervisor VSphereCluster and start controller if found
gvr := v1beta1.GroupVersion.WithResource(reflect.TypeOf(&v1beta1.VSphereCluster{}).Elem().Name())
isLoaded, err := isCRDDeployed(mgr, gvr)
if err != nil {
return err
}
if isLoaded {
if err := setupVAPIControllers(ctx, mgr); err != nil {
if err := setupVAPIControllers(ctx, mgr, tracker); err != nil {
return fmt.Errorf("setupVAPIControllers: %w", err)
}
} else {
Expand All @@ -243,7 +285,7 @@ func main() {
return err
}
if isLoaded {
if err := setupSupervisorControllers(ctx, mgr); err != nil {
if err := setupSupervisorControllers(ctx, mgr, tracker); err != nil {
return fmt.Errorf("setupSupervisorControllers: %w", err)
}
} else {
Expand Down Expand Up @@ -290,7 +332,7 @@ func main() {
defer session.Clear()
}

func setupVAPIControllers(ctx *context.ControllerManagerContext, mgr ctrlmgr.Manager) error {
func setupVAPIControllers(ctx *context.ControllerManagerContext, mgr ctrlmgr.Manager, tracker *remote.ClusterCacheTracker) error {
if err := (&v1beta1.VSphereClusterTemplate{}).SetupWebhookWithManager(mgr); err != nil {
return err
}
Expand Down Expand Up @@ -321,7 +363,7 @@ func setupVAPIControllers(ctx *context.ControllerManagerContext, mgr ctrlmgr.Man
if err := controllers.AddMachineControllerToManager(ctx, mgr, &v1beta1.VSphereMachine{}); err != nil {
return err
}
if err := controllers.AddVMControllerToManager(ctx, mgr); err != nil {
if err := controllers.AddVMControllerToManager(ctx, mgr, tracker); err != nil {
return err
}
if err := controllers.AddVsphereClusterIdentityControllerToManager(ctx, mgr); err != nil {
Expand All @@ -331,7 +373,7 @@ func setupVAPIControllers(ctx *context.ControllerManagerContext, mgr ctrlmgr.Man
return controllers.AddVSphereDeploymentZoneControllerToManager(ctx, mgr)
}

func setupSupervisorControllers(ctx *context.ControllerManagerContext, mgr ctrlmgr.Manager) error {
func setupSupervisorControllers(ctx *context.ControllerManagerContext, mgr ctrlmgr.Manager, tracker *remote.ClusterCacheTracker) error {
if err := controllers.AddClusterControllerToManager(ctx, mgr, &vmwarev1b1.VSphereCluster{}); err != nil {
return err
}
Expand All @@ -340,11 +382,11 @@ func setupSupervisorControllers(ctx *context.ControllerManagerContext, mgr ctrlm
return err
}

if err := controllers.AddServiceAccountProviderControllerToManager(ctx, mgr); err != nil {
if err := controllers.AddServiceAccountProviderControllerToManager(ctx, mgr, tracker); err != nil {
return err
}

return controllers.AddServiceDiscoveryControllerToManager(ctx, mgr)
return controllers.AddServiceDiscoveryControllerToManager(ctx, mgr, tracker)
}

func setupChecks(mgr ctrlmgr.Manager) {
Expand Down

0 comments on commit 2eacd20

Please sign in to comment.