Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove Agent's dependency on proxy to access Antrea Service #6361

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions cmd/antrea-agent-simulator/simulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
componentbaseconfig "k8s.io/component-base/config"
"k8s.io/klog/v2"

"antrea.io/antrea/pkg/agent"
"antrea.io/antrea/pkg/agent/client"
"antrea.io/antrea/pkg/signals"
"antrea.io/antrea/pkg/util/env"
"antrea.io/antrea/pkg/util/k8s"
Expand All @@ -49,7 +49,10 @@ func run() error {
}

// Create Antrea Clientset for the given config.
antreaClientProvider := agent.NewAntreaClientProvider(componentbaseconfig.ClientConnectionConfiguration{}, k8sClient)
antreaClientProvider, err := client.NewAntreaClientProvider(componentbaseconfig.ClientConnectionConfiguration{}, k8sClient)
if err != nil {
return err
}

if err = antreaClientProvider.RunOnce(); err != nil {
return err
Expand Down
8 changes: 5 additions & 3 deletions cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
mcinformers "antrea.io/antrea/multicluster/pkg/client/informers/externalversions"
"antrea.io/antrea/pkg/agent"
"antrea.io/antrea/pkg/agent/apiserver"
"antrea.io/antrea/pkg/agent/client"
"antrea.io/antrea/pkg/agent/cniserver"
"antrea.io/antrea/pkg/agent/cniserver/ipam"
"antrea.io/antrea/pkg/agent/config"
Expand Down Expand Up @@ -125,7 +126,10 @@ func run(o *Options) error {
namespaceInformer := informerFactory.Core().V1().Namespaces()

// Create Antrea Clientset for the given config.
antreaClientProvider := agent.NewAntreaClientProvider(o.config.AntreaClientConnection, k8sClient)
antreaClientProvider, err := client.NewAntreaClientProvider(o.config.AntreaClientConnection, k8sClient)
if err != nil {
return fmt.Errorf("failed to create Antrea client provider: %w", err)
}

// Register Antrea Agent metrics if EnablePrometheusMetrics is set
if *o.config.EnablePrometheusMetrics {
Expand Down Expand Up @@ -792,8 +796,6 @@ func run(o *Options) error {
}
}

// NetworkPolicyController and EgressController accesses the "antrea" Service via its ClusterIP.
// Run them after AntreaProxy is ready.
go networkPolicyController.Run(stopCh)
if o.enableEgress {
go egressController.Run(stopCh)
Expand Down
88 changes: 64 additions & 24 deletions pkg/agent/client.go → pkg/agent/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package agent
package client

import (
"context"
"fmt"
"net"
"os"
"strconv"
"sync"

"k8s.io/apiserver/pkg/server/dynamiccertificates"
Expand All @@ -40,7 +40,9 @@ type AntreaClientProvider interface {
GetAntreaClient() (versioned.Interface, error)
}

// antreaClientProvider provides an AntreaClientProvider that can dynamically react to ConfigMap changes.
// antreaClientProvider provides an AntreaClientProvider that can dynamically react to CA bundle
// ConfigMap changes, as well as directly resolve the Antrea Service Endpoint when running inside a K8s cluster.
// The consumers of antreaClientProvider are supposed to always call GetAntreaClient() to get a client and not cache it.
type antreaClientProvider struct {
config config.ClientConnectionConfiguration
// mutex protects client.
Expand All @@ -49,27 +51,62 @@ type antreaClientProvider struct {
client versioned.Interface
// caContentProvider provides the very latest content of the ca bundle.
caContentProvider *dynamiccertificates.ConfigMapCAController
// endpointResolver provides a known Endpoint for the Antrea Service. There is usually a
// single Endpoint at any given time, given that the Antrea Controller runs as a
// single-replica Deployment. By resolving the Endpoint manually and accessing it directly,
// instead of depending on the ClusterIP functionality provided by the K8s proxy, we get
// more flexibility when initializing the Antrea Agent. For example, we can retrieve
// NetworkPolicies from the Controller even if the proxy is not (yet) available.
// endpointResolver is only used when no kubeconfig is provided (otherwise we honor the
// provided config).
endpointResolver *EndpointResolver
}

// antreaClientProvider must implement the dynamiccertificates.Listener interface to be notified of
// CA bundle updates.
var _ dynamiccertificates.Listener = &antreaClientProvider{}

func NewAntreaClientProvider(config config.ClientConnectionConfiguration, kubeClient kubernetes.Interface) *antreaClientProvider {
// The key "ca.crt" may not exist at the beginning, no need to fail as the CA provider will watch the ConfigMap
// and notify antreaClientProvider of any update. The consumers of antreaClientProvider are supposed to always
// call GetAntreaClient() to get a client and not cache it.
antreaCAProvider, _ := dynamiccertificates.NewDynamicCAFromConfigMapController(
// antreaClientProvider must implement the Listener interface to be notified of an Endpoint change
// for the Antrea Service.
var _ Listener = &antreaClientProvider{}

func NewAntreaClientProvider(config config.ClientConnectionConfiguration, kubeClient kubernetes.Interface) (*antreaClientProvider, error) {
antreaCAProvider, err := dynamiccertificates.NewDynamicCAFromConfigMapController(
"antrea-ca",
cert.GetCAConfigMapNamespace(),
apis.AntreaCAConfigMapName,
apis.CAConfigMapKey,
kubeClient)
if err != nil {
return nil, err
}

var endpointResolver *EndpointResolver
if len(config.Kubeconfig) == 0 {
klog.InfoS("No Antrea kubeconfig file was specified. Falling back to in-cluster config")
port := os.Getenv("ANTREA_SERVICE_PORT")
if len(port) == 0 {
return nil, fmt.Errorf("unable to create Endpoint resolver for Antrea Service, ANTREA_SERVICE_PORT must be defined for in-cluster config")
}
servicePort, err := strconv.ParseInt(port, 10, 32)
if err != nil {
return nil, fmt.Errorf("invalid port number stored in ANTREA_SERVICE_PORT: %w", err)
}
endpointResolver = NewEndpointResolver(kubeClient, env.GetAntreaNamespace(), apis.AntreaServiceName, int32(servicePort))
}

antreaClientProvider := &antreaClientProvider{
config: config,
caContentProvider: antreaCAProvider,
endpointResolver: endpointResolver,
}

antreaCAProvider.AddListener(antreaClientProvider)
return antreaClientProvider
if endpointResolver != nil {
endpointResolver.AddListener(antreaClientProvider)
}

return antreaClientProvider, nil
}

// RunOnce runs the task a single time synchronously, ensuring client is initialized if kubeconfig is specified.
Expand All @@ -80,14 +117,18 @@ func (p *antreaClientProvider) RunOnce() error {
// Run starts the caContentProvider, which watches the ConfigMap and notifies changes
// by calling Enqueue.
func (p *antreaClientProvider) Run(ctx context.Context) {
p.caContentProvider.Run(ctx, 1)
go p.caContentProvider.Run(ctx, 1)
if p.endpointResolver != nil {
go p.endpointResolver.Run(ctx)
}
<-ctx.Done()
}

// Enqueue implements dynamiccertificates.Listener. It will be called by caContentProvider
// when caBundle is updated.
func (p *antreaClientProvider) Enqueue() {
if err := p.updateAntreaClient(); err != nil {
klog.Errorf("Failed to update Antrea client: %v", err)
klog.ErrorS(err, "Failed to update Antrea client")
}
}

Expand All @@ -105,13 +146,17 @@ func (p *antreaClientProvider) updateAntreaClient() error {
var kubeConfig *rest.Config
var err error
if len(p.config.Kubeconfig) == 0 {
klog.Info("No antrea kubeconfig file was specified. Falling back to in-cluster config")
caBundle := p.caContentProvider.CurrentCABundleContent()
if caBundle == nil {
klog.Info("Didn't get CA certificate, skip updating Antrea Client")
klog.InfoS("Didn't get CA certificate, skip updating Antrea Client")
return nil
}
kubeConfig, err = inClusterConfig(caBundle)
endpointURL := p.endpointResolver.CurrentEndpointURL()
if endpointURL == nil {
klog.InfoS("Didn't get Endpoint URL for Antrea Service, skip updating Antrea Client")
return nil
}
kubeConfig, err = inClusterConfig(caBundle, endpointURL.String())
} else {
kubeConfig, err = clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
&clientcmd.ClientConfigLoadingRules{ExplicitPath: p.config.Kubeconfig},
Expand All @@ -138,17 +183,12 @@ func (p *antreaClientProvider) updateAntreaClient() error {
return nil
}

// inClusterConfig returns a config object which uses the service account
// kubernetes gives to pods. It's intended for clients that expect to be
// running inside a pod running on kubernetes. It will return error
// if called from a process not running in a kubernetes environment.
func inClusterConfig(caBundle []byte) (*rest.Config, error) {
// inClusterConfig returns a config object which uses the service account Kubernetes gives to
// Pods. It's intended for clients that expect to be running inside a Pod running on Kubernetes. It
// will return error if called from a process not running in a Kubernetes environment.
func inClusterConfig(caBundle []byte, endpoint string) (*rest.Config, error) {
// #nosec G101: false positive triggered by variable name which includes "token"
const tokenFile = "/var/run/secrets/kubernetes.io/serviceaccount/token"
host, port := os.Getenv("ANTREA_SERVICE_HOST"), os.Getenv("ANTREA_SERVICE_PORT")
if len(host) == 0 || len(port) == 0 {
return nil, fmt.Errorf("unable to load in-cluster configuration, ANTREA_SERVICE_HOST and ANTREA_SERVICE_PORT must be defined")
}

token, err := os.ReadFile(tokenFile)
if err != nil {
Expand All @@ -161,7 +201,7 @@ func inClusterConfig(caBundle []byte) (*rest.Config, error) {
}

return &rest.Config{
Host: "https://" + net.JoinHostPort(host, port),
Host: endpoint,
TLSClientConfig: tlsClientConfig,
BearerToken: string(token),
BearerTokenFile: tokenFile,
Expand Down
Loading
Loading