Skip to content

[test] ref grant support + e2e test #4258

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
174 changes: 174 additions & 0 deletions controllers/gateway/eventhandlers/reference_grant_events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
package eventhandlers

import (
"context"
"fmt"
"github.com/go-logr/logr"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/aws-load-balancer-controller/pkg/gateway/routeutils"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
gatewayv1 "sigs.k8s.io/gateway-api/apis/v1"
gwalpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2"
gwbeta1 "sigs.k8s.io/gateway-api/apis/v1beta1"
)

// NewEnqueueRequestsForReferenceGrantEvent creates handler for ReferenceGrant resources
func NewEnqueueRequestsForReferenceGrantEvent(httpRouteEventChan chan<- event.TypedGenericEvent[*gatewayv1.HTTPRoute],
grpcRouteEventChan chan<- event.TypedGenericEvent[*gatewayv1.GRPCRoute],
tcpRouteEventChan chan<- event.TypedGenericEvent[*gwalpha2.TCPRoute],
udpRouteEventChan chan<- event.TypedGenericEvent[*gwalpha2.UDPRoute],
tlsRouteEventChan chan<- event.TypedGenericEvent[*gwalpha2.TLSRoute],
k8sClient client.Client, eventRecorder record.EventRecorder, logger logr.Logger) handler.TypedEventHandler[*gwbeta1.ReferenceGrant, reconcile.Request] {
return &enqueueRequestsForReferenceGrantEvent{
httpRouteEventChan: httpRouteEventChan,
grpcRouteEventChan: grpcRouteEventChan,
tcpRouteEventChan: tcpRouteEventChan,
udpRouteEventChan: udpRouteEventChan,
tlsRouteEventChan: tlsRouteEventChan,
k8sClient: k8sClient,
eventRecorder: eventRecorder,
logger: logger,
}
}

var _ handler.TypedEventHandler[*gwbeta1.ReferenceGrant, reconcile.Request] = (*enqueueRequestsForReferenceGrantEvent)(nil)

// enqueueRequestsForReferenceGrantEvent handles ReferenceGrant events
type enqueueRequestsForReferenceGrantEvent struct {
httpRouteEventChan chan<- event.TypedGenericEvent[*gatewayv1.HTTPRoute]
grpcRouteEventChan chan<- event.TypedGenericEvent[*gatewayv1.GRPCRoute]
tcpRouteEventChan chan<- event.TypedGenericEvent[*gwalpha2.TCPRoute]
udpRouteEventChan chan<- event.TypedGenericEvent[*gwalpha2.UDPRoute]
tlsRouteEventChan chan<- event.TypedGenericEvent[*gwalpha2.TLSRoute]
k8sClient client.Client
eventRecorder record.EventRecorder
logger logr.Logger
}

func (h *enqueueRequestsForReferenceGrantEvent) Create(ctx context.Context, e event.TypedCreateEvent[*gwbeta1.ReferenceGrant], _ workqueue.TypedRateLimitingInterface[reconcile.Request]) {
referenceGrantNew := e.Object
h.logger.V(1).Info("enqueue reference grant create event", "reference grant", referenceGrantNew.Name)
h.enqueueImpactedRoutes(ctx, referenceGrantNew, nil)
}

func (h *enqueueRequestsForReferenceGrantEvent) Update(ctx context.Context, e event.TypedUpdateEvent[*gwbeta1.ReferenceGrant], _ workqueue.TypedRateLimitingInterface[reconcile.Request]) {
referenceGrantNew := e.ObjectNew
referenceGrantOld := e.ObjectOld
h.logger.V(1).Info("enqueue reference grant update event", "reference grant", referenceGrantNew.Name)
h.enqueueImpactedRoutes(ctx, referenceGrantNew, referenceGrantOld)
}

func (h *enqueueRequestsForReferenceGrantEvent) Delete(ctx context.Context, e event.TypedDeleteEvent[*gwbeta1.ReferenceGrant], _ workqueue.TypedRateLimitingInterface[reconcile.Request]) {
refgrant := e.Object
h.logger.V(1).Info("enqueue reference grant delete event", "reference grant", refgrant.Name)
h.enqueueImpactedRoutes(ctx, refgrant, nil)
}

func (h *enqueueRequestsForReferenceGrantEvent) Generic(ctx context.Context, e event.TypedGenericEvent[*gwbeta1.ReferenceGrant], _ workqueue.TypedRateLimitingInterface[reconcile.Request]) {
refgrant := e.Object
h.logger.V(1).Info("enqueue reference grant generic event", "reference grant", refgrant.Name)
h.enqueueImpactedRoutes(ctx, refgrant, nil)
}

func (h *enqueueRequestsForReferenceGrantEvent) enqueueImpactedRoutes(ctx context.Context, newRefGrant *gwbeta1.ReferenceGrant, oldRefGrant *gwbeta1.ReferenceGrant) {

impactedRoutes := make(map[string]gwbeta1.ReferenceGrantFrom)

for i, from := range newRefGrant.Spec.From {
impactedRoutes[generateGrantFromKey(from)] = newRefGrant.Spec.From[i]
}

if oldRefGrant != nil {
for i, from := range oldRefGrant.Spec.From {
impactedRoutes[generateGrantFromKey(from)] = oldRefGrant.Spec.From[i]
}
}

for _, impactedFrom := range impactedRoutes {
switch string(impactedFrom.Kind) {
case string(routeutils.HTTPRouteKind):
if h.httpRouteEventChan == nil {
continue
}
routes, err := routeutils.ListHTTPRoutes(ctx, h.k8sClient, &client.ListOptions{Namespace: string(impactedFrom.Namespace)})
if err == nil {
for _, route := range routes {
h.httpRouteEventChan <- event.TypedGenericEvent[*gatewayv1.HTTPRoute]{
Object: route.GetRawRoute().(*gatewayv1.HTTPRoute),
}
}

} else {
h.logger.Error(err, "Unable to list impacted http routes for reference grant event handler")
}
case string(routeutils.GRPCRouteKind):
if h.grpcRouteEventChan == nil {
continue
}
routes, err := routeutils.ListGRPCRoutes(ctx, h.k8sClient, &client.ListOptions{Namespace: string(impactedFrom.Namespace)})
if err == nil {
for _, route := range routes {
h.grpcRouteEventChan <- event.TypedGenericEvent[*gatewayv1.GRPCRoute]{
Object: route.GetRawRoute().(*gatewayv1.GRPCRoute),
}
}

} else {
h.logger.Error(err, "Unable to list impacted grpc routes for reference grant event handler")
}
case string(routeutils.TCPRouteKind):
if h.tcpRouteEventChan == nil {
continue
}
routes, err := routeutils.ListTCPRoutes(ctx, h.k8sClient, &client.ListOptions{Namespace: string(impactedFrom.Namespace)})
if err == nil {
for _, route := range routes {
h.tcpRouteEventChan <- event.TypedGenericEvent[*gwalpha2.TCPRoute]{
Object: route.GetRawRoute().(*gwalpha2.TCPRoute),
}
}

} else {
h.logger.Error(err, "Unable to list impacted grpc routes for reference grant event handler")
}
case string(routeutils.UDPRouteKind):
if h.udpRouteEventChan == nil {
continue
}
routes, err := routeutils.ListUDPRoutes(ctx, h.k8sClient, &client.ListOptions{Namespace: string(impactedFrom.Namespace)})
if err == nil {
for _, route := range routes {
h.udpRouteEventChan <- event.TypedGenericEvent[*gwalpha2.UDPRoute]{
Object: route.GetRawRoute().(*gwalpha2.UDPRoute),
}
}

} else {
h.logger.Error(err, "Unable to list impacted grpc routes for reference grant event handler")
}
case string(routeutils.TLSRouteKind):
if h.tlsRouteEventChan == nil {
continue
}
routes, err := routeutils.ListTLSRoutes(ctx, h.k8sClient, &client.ListOptions{Namespace: string(impactedFrom.Namespace)})
if err == nil {
for _, route := range routes {
h.tlsRouteEventChan <- event.TypedGenericEvent[*gwalpha2.TLSRoute]{
Object: route.GetRawRoute().(*gwalpha2.TLSRoute),
}
}

} else {
h.logger.Error(err, "Unable to list impacted grpc routes for reference grant event handler")
}
}
}
}

func generateGrantFromKey(from gwbeta1.ReferenceGrantFrom) string {
return fmt.Sprintf("%s-%s", from.Kind, from.Namespace)
}
11 changes: 11 additions & 0 deletions controllers/gateway/gateway_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/source"
gwv1 "sigs.k8s.io/gateway-api/apis/v1"
gwalpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2"
gwbeta1 "sigs.k8s.io/gateway-api/apis/v1beta1"
"time"
)

Expand Down Expand Up @@ -457,6 +458,8 @@ func (r *gatewayReconciler) setupALBGatewayControllerWatches(ctrl controller.Con
loggerPrefix.WithName("HTTPRoute"))
svcEventHandler := eventhandlers.NewEnqueueRequestsForServiceEvent(httpRouteEventChan, grpcRouteEventChan, nil, nil, nil, r.k8sClient, r.eventRecorder,
loggerPrefix.WithName("Service"), constants.ALBGatewayController)
refGrantHandler := eventhandlers.NewEnqueueRequestsForReferenceGrantEvent(httpRouteEventChan, grpcRouteEventChan, nil, nil, nil, r.k8sClient, r.eventRecorder,
loggerPrefix.WithName("ReferenceGrant"))
if err := ctrl.Watch(source.Channel(tbConfigEventChan, tgConfigEventHandler)); err != nil {
return err
}
Expand All @@ -475,6 +478,9 @@ func (r *gatewayReconciler) setupALBGatewayControllerWatches(ctrl controller.Con
if err := ctrl.Watch(source.Kind(mgr.GetCache(), &corev1.Service{}, svcEventHandler)); err != nil {
return err
}
if err := ctrl.Watch(source.Kind(mgr.GetCache(), &gwbeta1.ReferenceGrant{}, refGrantHandler)); err != nil {
return err
}
if err := ctrl.Watch(source.Kind(mgr.GetCache(), &gwv1.HTTPRoute{}, httpRouteEventHandler)); err != nil {
return err
}
Expand All @@ -501,6 +507,8 @@ func (r *gatewayReconciler) setupNLBGatewayControllerWatches(ctrl controller.Con
loggerPrefix.WithName("TLSRoute"))
svcEventHandler := eventhandlers.NewEnqueueRequestsForServiceEvent(nil, nil, tcpRouteEventChan, udpRouteEventChan, tlsRouteEventChan, r.k8sClient, r.eventRecorder,
loggerPrefix.WithName("Service"), constants.NLBGatewayController)
refGrantHandler := eventhandlers.NewEnqueueRequestsForReferenceGrantEvent(nil, nil, tcpRouteEventChan, udpRouteEventChan, tlsRouteEventChan, r.k8sClient, r.eventRecorder,
loggerPrefix.WithName("ReferenceGrant"))
if err := ctrl.Watch(source.Channel(tbConfigEventChan, tgConfigEventHandler)); err != nil {
return err
}
Expand All @@ -522,6 +530,9 @@ func (r *gatewayReconciler) setupNLBGatewayControllerWatches(ctrl controller.Con
if err := ctrl.Watch(source.Kind(mgr.GetCache(), &corev1.Service{}, svcEventHandler)); err != nil {
return err
}
if err := ctrl.Watch(source.Kind(mgr.GetCache(), &gwbeta1.ReferenceGrant{}, refGrantHandler)); err != nil {
return err
}
if err := ctrl.Watch(source.Kind(mgr.GetCache(), &gwalpha2.TCPRoute{}, tcpRouteEventHandler)); err != nil {
return err
}
Expand Down
2 changes: 2 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
gwv1 "sigs.k8s.io/gateway-api/apis/v1"
gwalpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2"
gwbeta1 "sigs.k8s.io/gateway-api/apis/v1beta1"
"sync"

"k8s.io/client-go/util/workqueue"
Expand Down Expand Up @@ -82,6 +83,7 @@ func init() {
_ = elbv2gw.AddToScheme(scheme)
_ = gwv1.AddToScheme(scheme)
_ = gwalpha2.AddToScheme(scheme)
_ = gwbeta1.AddToScheme(scheme)
// +kubebuilder:scaffold:scheme
}

Expand Down
28 changes: 19 additions & 9 deletions pkg/gateway/model/model_build_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
elbv2gw "sigs.k8s.io/aws-load-balancer-controller/apis/gateway/v1beta1"
"sigs.k8s.io/aws-load-balancer-controller/pkg/aws/services"
certs "sigs.k8s.io/aws-load-balancer-controller/pkg/certs"
"sigs.k8s.io/aws-load-balancer-controller/pkg/certs"
"sigs.k8s.io/aws-load-balancer-controller/pkg/gateway/routeutils"
"sigs.k8s.io/aws-load-balancer-controller/pkg/k8s"
"sigs.k8s.io/aws-load-balancer-controller/pkg/model/core"
Expand All @@ -28,8 +28,8 @@ type gwListenerConfig struct {

type listenerBuilder interface {
buildListeners(ctx context.Context, stack core.Stack, lb *elbv2model.LoadBalancer, securityGroups securityGroupOutput, gw *gwv1.Gateway, routes map[int32][]routeutils.RouteDescriptor, lbConf elbv2gw.LoadBalancerConfiguration) error
buildListenerSpec(ctx context.Context, stack core.Stack, lb *elbv2model.LoadBalancer, securityGroups securityGroupOutput, gw *gwv1.Gateway, port int32, routes []routeutils.RouteDescriptor, lbCfg elbv2gw.LoadBalancerConfiguration, gwLsCfg *gwListenerConfig, lbLsCfg *elbv2gw.ListenerConfiguration) (*elbv2model.ListenerSpec, error)
buildL7ListenerSpec(ctx context.Context, stack core.Stack, lb *elbv2model.LoadBalancer, securityGroups securityGroupOutput, gw *gwv1.Gateway, lbCfg elbv2gw.LoadBalancerConfiguration, port int32, routes []routeutils.RouteDescriptor, gwLsCfg *gwListenerConfig, lbLsCfg *elbv2gw.ListenerConfiguration) (*elbv2model.ListenerSpec, error)
buildListenerSpec(ctx context.Context, lb *elbv2model.LoadBalancer, gw *gwv1.Gateway, port int32, lbCfg elbv2gw.LoadBalancerConfiguration, gwLsCfg *gwListenerConfig, lbLsCfg *elbv2gw.ListenerConfiguration) (*elbv2model.ListenerSpec, error)
buildL7ListenerSpec(ctx context.Context, lb *elbv2model.LoadBalancer, gw *gwv1.Gateway, lbCfg elbv2gw.LoadBalancerConfiguration, port int32, gwLsCfg *gwListenerConfig, lbLsCfg *elbv2gw.ListenerConfiguration) (*elbv2model.ListenerSpec, error)
buildL4ListenerSpec(ctx context.Context, stack core.Stack, lb *elbv2model.LoadBalancer, securityGroups securityGroupOutput, gw *gwv1.Gateway, lbCfg elbv2gw.LoadBalancerConfiguration, port int32, routes []routeutils.RouteDescriptor, gwLsCfg *gwListenerConfig, lbLsCfg *elbv2gw.ListenerConfiguration) (*elbv2model.ListenerSpec, error)
}

Expand Down Expand Up @@ -58,6 +58,11 @@ func (l listenerBuilderImpl) buildListeners(ctx context.Context, stack core.Stac
if err != nil {
return err
}

if ls == nil {
continue
}

// build rules only for L7 gateways
if l.loadBalancerType == elbv2model.LoadBalancerTypeApplication {
if err := l.buildListenerRules(stack, ls, lb, securityGroups, gw, port, lbCfg, routes); err != nil {
Expand All @@ -73,7 +78,7 @@ func (l listenerBuilderImpl) buildListeners(ctx context.Context, stack core.Stac
func (l listenerBuilderImpl) buildListener(ctx context.Context, stack core.Stack, lb *elbv2model.LoadBalancer, securityGroups securityGroupOutput, gw *gwv1.Gateway, port int32, routes []routeutils.RouteDescriptor, lbCfg elbv2gw.LoadBalancerConfiguration, gwLsCfg *gwListenerConfig, lbLsCfg *elbv2gw.ListenerConfiguration) (*elbv2model.Listener, error) {
var listenerSpec elbv2model.ListenerSpec
if l.loadBalancerType == elbv2model.LoadBalancerTypeApplication {
ls, err := l.buildL7ListenerSpec(ctx, stack, lb, securityGroups, gw, lbCfg, port, routes, gwLsCfg, lbLsCfg)
ls, err := l.buildL7ListenerSpec(ctx, lb, gw, lbCfg, port, gwLsCfg, lbLsCfg)
if err != nil {
return nil, err
}
Expand All @@ -83,13 +88,18 @@ func (l listenerBuilderImpl) buildListener(ctx context.Context, stack core.Stack
if err != nil {
return nil, err
}

if ls == nil {
return nil, nil
}

listenerSpec = *ls
}
lsResID := fmt.Sprintf("%v", port)
return elbv2model.NewListener(stack, lsResID, listenerSpec), nil
}

func (l listenerBuilderImpl) buildListenerSpec(ctx context.Context, stack core.Stack, lb *elbv2model.LoadBalancer, securityGroups securityGroupOutput, gw *gwv1.Gateway, port int32, routes []routeutils.RouteDescriptor, lbCfg elbv2gw.LoadBalancerConfiguration, gwLsCfg *gwListenerConfig, lbLsCfg *elbv2gw.ListenerConfiguration) (*elbv2model.ListenerSpec, error) {
func (l listenerBuilderImpl) buildListenerSpec(ctx context.Context, lb *elbv2model.LoadBalancer, gw *gwv1.Gateway, port int32, lbCfg elbv2gw.LoadBalancerConfiguration, gwLsCfg *gwListenerConfig, lbLsCfg *elbv2gw.ListenerConfiguration) (*elbv2model.ListenerSpec, error) {
tags, err := l.buildListenerTags(gw, port, lbCfg, lbLsCfg)
if err != nil {
return &elbv2model.ListenerSpec{}, err
Expand Down Expand Up @@ -118,8 +128,8 @@ func (l listenerBuilderImpl) buildListenerSpec(ctx context.Context, stack core.S
return listenerSpec, nil
}

func (l listenerBuilderImpl) buildL7ListenerSpec(ctx context.Context, stack core.Stack, lb *elbv2model.LoadBalancer, securityGroups securityGroupOutput, gw *gwv1.Gateway, lbCfg elbv2gw.LoadBalancerConfiguration, port int32, routes []routeutils.RouteDescriptor, gwLsCfg *gwListenerConfig, lbLsCfg *elbv2gw.ListenerConfiguration) (*elbv2model.ListenerSpec, error) {
listenerSpec, err := l.buildListenerSpec(ctx, stack, lb, securityGroups, gw, port, routes, lbCfg, gwLsCfg, lbLsCfg)
func (l listenerBuilderImpl) buildL7ListenerSpec(ctx context.Context, lb *elbv2model.LoadBalancer, gw *gwv1.Gateway, lbCfg elbv2gw.LoadBalancerConfiguration, port int32, gwLsCfg *gwListenerConfig, lbLsCfg *elbv2gw.ListenerConfiguration) (*elbv2model.ListenerSpec, error) {
listenerSpec, err := l.buildListenerSpec(ctx, lb, gw, port, lbCfg, gwLsCfg, lbLsCfg)
if err != nil {
return &elbv2model.ListenerSpec{}, err
}
Expand All @@ -133,7 +143,7 @@ func (l listenerBuilderImpl) buildL7ListenerSpec(ctx context.Context, stack core
}

func (l listenerBuilderImpl) buildL4ListenerSpec(ctx context.Context, stack core.Stack, lb *elbv2model.LoadBalancer, securityGroups securityGroupOutput, gw *gwv1.Gateway, lbCfg elbv2gw.LoadBalancerConfiguration, port int32, routes []routeutils.RouteDescriptor, gwLsCfg *gwListenerConfig, lbLsCfg *elbv2gw.ListenerConfiguration) (*elbv2model.ListenerSpec, error) {
listenerSpec, err := l.buildListenerSpec(ctx, stack, lb, securityGroups, gw, port, routes, lbCfg, gwLsCfg, lbLsCfg)
listenerSpec, err := l.buildListenerSpec(ctx, lb, gw, port, lbCfg, gwLsCfg, lbLsCfg)
if err != nil {
return &elbv2model.ListenerSpec{}, err
}
Expand All @@ -149,7 +159,7 @@ func (l listenerBuilderImpl) buildL4ListenerSpec(ctx context.Context, stack core
}
routeDescriptor := routes[0]
if routeDescriptor.GetAttachedRules()[0].GetBackends() == nil || len(routeDescriptor.GetAttachedRules()[0].GetBackends()) == 0 {
return &elbv2model.ListenerSpec{}, errors.Errorf("no backend refs found for route %v for gateway %v, one backend ref must be specified", routeDescriptor.GetRouteNamespacedName(), k8s.NamespacedName(gw))
return nil, nil
}
if len(routeDescriptor.GetAttachedRules()[0].GetBackends()) > 1 {
return &elbv2model.ListenerSpec{}, errors.Errorf("multiple backend refs found for route %v for listener on port:protocol %v:%v for gateway %v , only one must be specified", routeDescriptor.GetRouteNamespacedName(), port, listenerSpec.Protocol, k8s.NamespacedName(gw))
Expand Down
Loading