From 8d159e39a641c6351ff6fb836e83507b143066df Mon Sep 17 00:00:00 2001 From: Dmitry Date: Mon, 25 Apr 2022 19:05:15 -0700 Subject: [PATCH] [receiver/k8scluster] Check resource support before setting up watchers This change adds additional validation using discovery API before setting up watchers on particular k8s resources. Otherwise, if k8s API doesn't support a k8s resource, setting up watchers breaks the receiver. This change also adds support of batch/v1beta1 CronJob resources. This makes the receiver able to work with k8s server versions 1.19 and 1.20 which are still available on some cloud providers. --- CHANGELOG.md | 2 + receiver/k8sclusterreceiver/factory_test.go | 5 +- .../internal/collection/collector.go | 12 +- .../internal/collection/cronjobs.go | 40 +++++ .../internal/collection/metadatastore.go | 17 +-- .../k8sclusterreceiver/internal/gvk/gvk.go | 36 +++++ receiver/k8sclusterreceiver/receiver.go | 7 +- receiver/k8sclusterreceiver/receiver_test.go | 80 ++++++++-- receiver/k8sclusterreceiver/watcher.go | 141 ++++++++++++++---- receiver/k8sclusterreceiver/watcher_test.go | 139 +++++++++++++++++ 10 files changed, 421 insertions(+), 58 deletions(-) create mode 100644 receiver/k8sclusterreceiver/internal/gvk/gvk.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 0f82252df639..a04880ffc327 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -32,6 +32,7 @@ - `attributesprocessor`: Support filter by severity (#9132) - `processor/transform`: Add transformation of logs (#9368) - `datadogexporter`: Add `metrics::summaries::mode` to specify export mode for summaries (#8846) +- `k8sclusterreceiver`: Validate that k8s API supports a resource before setting up a watcher for it (#9523) ### 🧰 Bug fixes 🧰 @@ -40,6 +41,7 @@ - `resourcedetectionprocessor`: Wire docker detector (#9372) - `kafkametricsreceiver`: The kafkametricsreceiver was changed to connect to kafka during scrape, rather than startup. If kafka is unavailable the receiver will attempt to connect during subsequent scrapes until succcessful (#8817). - `datadogexporter`: Update Kubernetes example manifest to new executable name. (#9425). +- `k8sclusterreceiver`: Fix the receiver to work with 1.19 and 1.20 k8s API versions (#9523) ## v0.49.0 diff --git a/receiver/k8sclusterreceiver/factory_test.go b/receiver/k8sclusterreceiver/factory_test.go index d5c6fc38b58c..ddbc6e1f71d5 100644 --- a/receiver/k8sclusterreceiver/factory_test.go +++ b/receiver/k8sclusterreceiver/factory_test.go @@ -27,6 +27,7 @@ import ( "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/consumer/consumertest" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/fake" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig" ) @@ -66,7 +67,7 @@ func TestFactory(t *testing.T) { // Override for tests. rCfg.makeClient = func(apiConf k8sconfig.APIConfig) (kubernetes.Interface, error) { - return nil, nil + return fake.NewSimpleClientset(), nil } r, err = f.CreateMetricsReceiver( context.Background(), componenttest.NewNopReceiverCreateSettings(), @@ -92,7 +93,7 @@ func TestFactoryDistributions(t *testing.T) { require.True(t, ok) rCfg.makeClient = func(apiConf k8sconfig.APIConfig) (kubernetes.Interface, error) { - return nil, nil + return fake.NewSimpleClientset(), nil } rCfg.makeOpenShiftQuotaClient = func(apiConf k8sconfig.APIConfig) (quotaclientset.Interface, error) { return fakeQuota.NewSimpleClientset(), nil diff --git a/receiver/k8sclusterreceiver/internal/collection/collector.go b/receiver/k8sclusterreceiver/internal/collection/collector.go index bba8827eb3a7..f806654ffafb 100644 --- a/receiver/k8sclusterreceiver/internal/collection/collector.go +++ b/receiver/k8sclusterreceiver/internal/collection/collector.go @@ -26,8 +26,10 @@ import ( appsv1 "k8s.io/api/apps/v1" autoscalingv2beta2 "k8s.io/api/autoscaling/v2beta2" batchv1 "k8s.io/api/batch/v1" + batchv1beta1 "k8s.io/api/batch/v1beta1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/cache" @@ -120,9 +122,9 @@ func NewDataCollector(logger *zap.Logger, nodeConditionsToReport, allocatableTyp } } -// SetupMetadataStore initializes a metadata store for the kubernetes object. -func (dc *DataCollector) SetupMetadataStore(o runtime.Object, store cache.Store) { - dc.metadataStore.setupStore(o, store) +// SetupMetadataStore initializes a metadata store for the kubernetes kind. +func (dc *DataCollector) SetupMetadataStore(gvk schema.GroupVersionKind, store cache.Store) { + dc.metadataStore.setupStore(gvk, store) } func (dc *DataCollector) RemoveFromMetricsStore(obj interface{}) { @@ -176,6 +178,8 @@ func (dc *DataCollector) SyncMetrics(obj interface{}) { rm = getMetricsForJob(o) case *batchv1.CronJob: rm = getMetricsForCronJob(o) + case *batchv1beta1.CronJob: + rm = getMetricsForCronJobBeta(o) case *autoscalingv2beta2.HorizontalPodAutoscaler: rm = getMetricsForHPA(o) case *quotav1.ClusterResourceQuota: @@ -213,6 +217,8 @@ func (dc *DataCollector) SyncMetadata(obj interface{}) map[metadata.ResourceID]* km = getMetadataForJob(o) case *batchv1.CronJob: km = getMetadataForCronJob(o) + case *batchv1beta1.CronJob: + km = getMetadataForCronJobBeta(o) case *autoscalingv2beta2.HorizontalPodAutoscaler: km = getMetadataForHPA(o) } diff --git a/receiver/k8sclusterreceiver/internal/collection/cronjobs.go b/receiver/k8sclusterreceiver/internal/collection/cronjobs.go index cf73a81f9f58..ea045d8ecda2 100644 --- a/receiver/k8sclusterreceiver/internal/collection/cronjobs.go +++ b/receiver/k8sclusterreceiver/internal/collection/cronjobs.go @@ -19,6 +19,7 @@ import ( resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1" conventions "go.opentelemetry.io/collector/semconv/v1.6.1" batchv1 "k8s.io/api/batch/v1" + batchv1beta1 "k8s.io/api/batch/v1beta1" metadata "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/utils" @@ -37,6 +38,8 @@ var activeJobs = &metricspb.MetricDescriptor{ Type: metricspb.MetricDescriptor_GAUGE_INT64, } +// TODO: All the CronJob related functions below can be de-duplicated using generics from go 1.18 + func getMetricsForCronJob(cj *batchv1.CronJob) []*resourceMetrics { metrics := []*metricspb.Metric{ { @@ -55,6 +58,24 @@ func getMetricsForCronJob(cj *batchv1.CronJob) []*resourceMetrics { } } +func getMetricsForCronJobBeta(cj *batchv1beta1.CronJob) []*resourceMetrics { + metrics := []*metricspb.Metric{ + { + MetricDescriptor: activeJobs, + Timeseries: []*metricspb.TimeSeries{ + utils.GetInt64TimeSeries(int64(len(cj.Status.Active))), + }, + }, + } + + return []*resourceMetrics{ + { + resource: getResourceForCronJobBeta(cj), + metrics: metrics, + }, + } +} + func getResourceForCronJob(cj *batchv1.CronJob) *resourcepb.Resource { return &resourcepb.Resource{ Type: k8sType, @@ -67,9 +88,28 @@ func getResourceForCronJob(cj *batchv1.CronJob) *resourcepb.Resource { } } +func getResourceForCronJobBeta(cj *batchv1beta1.CronJob) *resourcepb.Resource { + return &resourcepb.Resource{ + Type: k8sType, + Labels: map[string]string{ + conventions.AttributeK8SCronJobUID: string(cj.UID), + conventions.AttributeK8SCronJobName: cj.Name, + conventions.AttributeK8SNamespaceName: cj.Namespace, + conventions.AttributeK8SClusterName: cj.ClusterName, + }, + } +} + func getMetadataForCronJob(cj *batchv1.CronJob) map[metadata.ResourceID]*KubernetesMetadata { rm := getGenericMetadata(&cj.ObjectMeta, k8sKindCronJob) rm.metadata[cronJobKeySchedule] = cj.Spec.Schedule rm.metadata[cronJobKeyConcurrencyPolicy] = string(cj.Spec.ConcurrencyPolicy) return map[metadata.ResourceID]*KubernetesMetadata{metadata.ResourceID(cj.UID): rm} } + +func getMetadataForCronJobBeta(cj *batchv1beta1.CronJob) map[metadata.ResourceID]*KubernetesMetadata { + rm := getGenericMetadata(&cj.ObjectMeta, k8sKindCronJob) + rm.metadata[cronJobKeySchedule] = cj.Spec.Schedule + rm.metadata[cronJobKeyConcurrencyPolicy] = string(cj.Spec.ConcurrencyPolicy) + return map[metadata.ResourceID]*KubernetesMetadata{metadata.ResourceID(cj.UID): rm} +} diff --git a/receiver/k8sclusterreceiver/internal/collection/metadatastore.go b/receiver/k8sclusterreceiver/internal/collection/metadatastore.go index 1dcd193dead3..f8de325e95ce 100644 --- a/receiver/k8sclusterreceiver/internal/collection/metadatastore.go +++ b/receiver/k8sclusterreceiver/internal/collection/metadatastore.go @@ -15,11 +15,10 @@ package collection // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/collection" import ( - appsv1 "k8s.io/api/apps/v1" - batchv1 "k8s.io/api/batch/v1" - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/tools/cache" + + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/gvk" ) // metadataStore keeps track of required caches exposed by informers. @@ -32,13 +31,13 @@ type metadataStore struct { } // setupStore tracks metadata of services, jobs and replicasets. -func (ms *metadataStore) setupStore(o runtime.Object, store cache.Store) { - switch o.(type) { - case *corev1.Service: +func (ms *metadataStore) setupStore(kind schema.GroupVersionKind, store cache.Store) { + switch kind { + case gvk.Service: ms.services = store - case *batchv1.Job: + case gvk.Job: ms.jobs = store - case *appsv1.ReplicaSet: + case gvk.ReplicaSet: ms.replicaSets = store } } diff --git a/receiver/k8sclusterreceiver/internal/gvk/gvk.go b/receiver/k8sclusterreceiver/internal/gvk/gvk.go new file mode 100644 index 000000000000..87faa9961dc4 --- /dev/null +++ b/receiver/k8sclusterreceiver/internal/gvk/gvk.go @@ -0,0 +1,36 @@ +// Copyright 2020, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package gvk // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/gvk" + +import "k8s.io/apimachinery/pkg/runtime/schema" + +// Kubernetes group version kinds +var ( + Pod = schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Pod"} + Node = schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Node"} + Namespace = schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Namespace"} + ReplicationController = schema.GroupVersionKind{Group: "", Version: "v1", Kind: "ReplicationController"} + ResourceQuota = schema.GroupVersionKind{Group: "", Version: "v1", Kind: "ResourceQuota"} + Service = schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Service"} + DaemonSet = schema.GroupVersionKind{Group: "apps", Version: "v1", Kind: "DaemonSet"} + Deployment = schema.GroupVersionKind{Group: "apps", Version: "v1", Kind: "Deployment"} + ReplicaSet = schema.GroupVersionKind{Group: "apps", Version: "v1", Kind: "ReplicaSet"} + StatefulSet = schema.GroupVersionKind{Group: "apps", Version: "v1", Kind: "StatefulSet"} + Job = schema.GroupVersionKind{Group: "batch", Version: "v1", Kind: "Job"} + CronJob = schema.GroupVersionKind{Group: "batch", Version: "v1", Kind: "CronJob"} + CronJobBeta = schema.GroupVersionKind{Group: "batch", Version: "v1beta1", Kind: "CronJob"} + HorizontalPodAutoscaler = schema.GroupVersionKind{Group: "autoscaling", Version: "v2beta2", Kind: "HorizontalPodAutoscaler"} + ClusterResourceQuota = schema.GroupVersionKind{Group: "quota", Version: "v1", Kind: "ClusterResourceQuota"} +) diff --git a/receiver/k8sclusterreceiver/receiver.go b/receiver/k8sclusterreceiver/receiver.go index ec072a64f228..45ecad046875 100644 --- a/receiver/k8sclusterreceiver/receiver.go +++ b/receiver/k8sclusterreceiver/receiver.go @@ -20,6 +20,7 @@ import ( "time" quotaclientset "github.com/openshift/client-go/quota/clientset/versioned" + "github.com/pkg/errors" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config" "go.opentelemetry.io/collector/consumer" @@ -115,7 +116,11 @@ func (kr *kubernetesReceiver) dispatchMetrics(ctx context.Context) { func newReceiver( set component.ReceiverCreateSettings, config *Config, consumer consumer.Metrics, client kubernetes.Interface, osQuotaClient quotaclientset.Interface) (component.MetricsReceiver, error) { - resourceWatcher := newResourceWatcher(set.Logger, client, osQuotaClient, config.NodeConditionTypesToReport, config.AllocatableTypesToReport, defaultInitialSyncTimeout) + resourceWatcher, err := newResourceWatcher(set.Logger, client, osQuotaClient, config.NodeConditionTypesToReport, + config.AllocatableTypesToReport, defaultInitialSyncTimeout) + if err != nil { + return nil, errors.Wrap(err, "Failed to setup the receiver") + } return &kubernetesReceiver{ resourceWatcher: resourceWatcher, diff --git a/receiver/k8sclusterreceiver/receiver_test.go b/receiver/k8sclusterreceiver/receiver_test.go index b09b1eccad68..5d44ee76dc11 100644 --- a/receiver/k8sclusterreceiver/receiver_test.go +++ b/receiver/k8sclusterreceiver/receiver_test.go @@ -31,8 +31,10 @@ import ( "go.uber.org/zap" corev1 "k8s.io/api/core/v1" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/kubernetes/fake" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/gvk" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/testutils" ) @@ -41,11 +43,12 @@ func TestReceiver(t *testing.T) { require.NoError(t, err) defer tt.Shutdown(context.Background()) - client := fake.NewSimpleClientset() + client := newFakeClientWithAllResources() osQuotaClient := fakeQuota.NewSimpleClientset() sink := new(consumertest.MetricsSink) - r := setupReceiver(client, osQuotaClient, sink, 10*time.Second, tt) + r, err := setupReceiver(client, osQuotaClient, sink, 10*time.Second, tt) + require.NoError(t, err) // Setup k8s resources. numPods := 2 @@ -88,10 +91,11 @@ func TestReceiverTimesOutAfterStartup(t *testing.T) { tt, err := obsreporttest.SetupTelemetry() require.NoError(t, err) defer tt.Shutdown(context.Background()) - client := fake.NewSimpleClientset() + client := newFakeClientWithAllResources() // Mock initial cache sync timing out, using a small timeout. - r := setupReceiver(client, nil, consumertest.NewNop(), 1*time.Millisecond, tt) + r, err := setupReceiver(client, nil, consumertest.NewNop(), 1*time.Millisecond, tt) + require.NoError(t, err) createPods(t, client, 1) @@ -108,11 +112,12 @@ func TestReceiverWithManyResources(t *testing.T) { require.NoError(t, err) defer tt.Shutdown(context.Background()) - client := fake.NewSimpleClientset() + client := newFakeClientWithAllResources() osQuotaClient := fakeQuota.NewSimpleClientset() sink := new(consumertest.MetricsSink) - r := setupReceiver(client, osQuotaClient, sink, 10*time.Second, tt) + r, err := setupReceiver(client, osQuotaClient, sink, 10*time.Second, tt) + require.NoError(t, err) numPods := 1000 numQuotas := 2 @@ -144,11 +149,12 @@ func TestReceiverWithMetadata(t *testing.T) { require.NoError(t, err) defer tt.Shutdown(context.Background()) - client := fake.NewSimpleClientset() + client := newFakeClientWithAllResources() next := &mockExporterWithK8sMetadata{MetricsSink: new(consumertest.MetricsSink)} numCalls = atomic.NewInt32(0) - r := setupReceiver(client, nil, next, 10*time.Second, tt) + r, err := setupReceiver(client, nil, next, 10*time.Second, tt) + require.NoError(t, err) r.config.MetadataExporters = []string{"nop/withmetadata"} // Setup k8s resources. @@ -196,7 +202,7 @@ func setupReceiver( osQuotaClient quotaclientset.Interface, consumer consumer.Metrics, initialSyncTimeout time.Duration, - tt obsreporttest.TestTelemetry) *kubernetesReceiver { + tt obsreporttest.TestTelemetry) (*kubernetesReceiver, error) { distribution := distributionKubernetes if osQuotaClient != nil { @@ -211,8 +217,12 @@ func setupReceiver( Distribution: distribution, } - rw := newResourceWatcher(logger, client, osQuotaClient, config.NodeConditionTypesToReport, config.AllocatableTypesToReport, initialSyncTimeout) - rw.dataCollector.SetupMetadataStore(&corev1.Service{}, &testutils.MockStore{}) + rw, err := newResourceWatcher(logger, client, osQuotaClient, config.NodeConditionTypesToReport, + config.AllocatableTypesToReport, initialSyncTimeout) + if err != nil { + return nil, err + } + rw.dataCollector.SetupMetadataStore(gvk.Service, &testutils.MockStore{}) return &kubernetesReceiver{ resourceWatcher: rw, @@ -224,5 +234,53 @@ func setupReceiver( Transport: "http", ReceiverCreateSettings: tt.ToReceiverCreateSettings(), }), + }, nil +} + +func newFakeClientWithAllResources() *fake.Clientset { + client := fake.NewSimpleClientset() + client.Resources = []*v1.APIResourceList{ + { + GroupVersion: "v1", + APIResources: []v1.APIResource{ + gvkToAPIResource(gvk.Pod), + gvkToAPIResource(gvk.Node), + gvkToAPIResource(gvk.Namespace), + gvkToAPIResource(gvk.ReplicationController), + gvkToAPIResource(gvk.ResourceQuota), + gvkToAPIResource(gvk.Service), + }, + }, + { + GroupVersion: "apps/v1", + APIResources: []v1.APIResource{ + gvkToAPIResource(gvk.DaemonSet), + gvkToAPIResource(gvk.Deployment), + gvkToAPIResource(gvk.ReplicaSet), + gvkToAPIResource(gvk.StatefulSet), + }, + }, + { + GroupVersion: "batch/v1", + APIResources: []v1.APIResource{ + gvkToAPIResource(gvk.Job), + gvkToAPIResource(gvk.CronJob), + }, + }, + { + GroupVersion: "autoscaling/v2beta2", + APIResources: []v1.APIResource{ + gvkToAPIResource(gvk.HorizontalPodAutoscaler), + }, + }, + } + return client +} + +func gvkToAPIResource(gvk schema.GroupVersionKind) v1.APIResource { + return v1.APIResource{ + Group: gvk.Group, + Version: gvk.Version, + Kind: gvk.Kind, } } diff --git a/receiver/k8sclusterreceiver/watcher.go b/receiver/k8sclusterreceiver/watcher.go index b1c7cb89c6db..6d7cd25599fd 100644 --- a/receiver/k8sclusterreceiver/watcher.go +++ b/receiver/k8sclusterreceiver/watcher.go @@ -20,24 +20,22 @@ import ( "reflect" "time" - quotav1 "github.com/openshift/api/quota/v1" quotaclientset "github.com/openshift/client-go/quota/clientset/versioned" quotainformersv1 "github.com/openshift/client-go/quota/informers/externalversions" + "github.com/pkg/errors" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config" "go.uber.org/atomic" "go.uber.org/zap" - appsv1 "k8s.io/api/apps/v1" - autoscalingv2beta2 "k8s.io/api/autoscaling/v2beta2" - batchv1 "k8s.io/api/batch/v1" - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/runtime" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" metadata "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/collection" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/gvk" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/utils" ) @@ -63,7 +61,8 @@ type metadataConsumer func(metadata []*metadata.MetadataUpdate) error // newResourceWatcher creates a Kubernetes resource watcher. func newResourceWatcher( logger *zap.Logger, client kubernetes.Interface, osQuotaClient quotaclientset.Interface, - nodeConditionTypesToReport, allocatableTypesToReport []string, initialSyncTimeout time.Duration) *resourceWatcher { + nodeConditionTypesToReport, allocatableTypesToReport []string, + initialSyncTimeout time.Duration) (*resourceWatcher, error) { rw := &resourceWatcher{ client: client, osQuotaClient: osQuotaClient, @@ -75,39 +74,117 @@ func newResourceWatcher( initialTimeout: initialSyncTimeout, } - rw.prepareSharedInformerFactory() + err := rw.prepareSharedInformerFactory() + if err != nil { + return nil, err + } - return rw + return rw, nil } -func (rw *resourceWatcher) prepareSharedInformerFactory() { +func (rw *resourceWatcher) prepareSharedInformerFactory() error { factory := informers.NewSharedInformerFactoryWithOptions(rw.client, 0) - // Add shared informers for each resource type that has to be watched. - rw.setupInformers(&corev1.Pod{}, factory.Core().V1().Pods().Informer()) - rw.setupInformers(&corev1.Node{}, factory.Core().V1().Nodes().Informer()) - rw.setupInformers(&corev1.Namespace{}, factory.Core().V1().Namespaces().Informer()) - rw.setupInformers(&corev1.ReplicationController{}, - factory.Core().V1().ReplicationControllers().Informer(), - ) - rw.setupInformers(&corev1.ResourceQuota{}, factory.Core().V1().ResourceQuotas().Informer()) - rw.setupInformers(&corev1.Service{}, factory.Core().V1().Services().Informer()) - rw.setupInformers(&appsv1.DaemonSet{}, factory.Apps().V1().DaemonSets().Informer()) - rw.setupInformers(&appsv1.Deployment{}, factory.Apps().V1().Deployments().Informer()) - rw.setupInformers(&appsv1.ReplicaSet{}, factory.Apps().V1().ReplicaSets().Informer()) - rw.setupInformers(&appsv1.StatefulSet{}, factory.Apps().V1().StatefulSets().Informer()) - rw.setupInformers(&batchv1.Job{}, factory.Batch().V1().Jobs().Informer()) - rw.setupInformers(&batchv1.CronJob{}, factory.Batch().V1().CronJobs().Informer()) - rw.setupInformers(&autoscalingv2beta2.HorizontalPodAutoscaler{}, - factory.Autoscaling().V2beta2().HorizontalPodAutoscalers().Informer(), - ) + // Map of supported group version kinds by name of a kind. + // If none of the group versions are supported by k8s server for a specific kind, + // informer for that kind won't be set and a warning message is thrown. + // This map should be kept in sync with what can be provided by the supported k8s server versions. + supportedKinds := map[string][]schema.GroupVersionKind{ + "Pod": {gvk.Pod}, + "Node": {gvk.Node}, + "Namespace": {gvk.Namespace}, + "ReplicationController": {gvk.ReplicationController}, + "ResourceQuota": {gvk.ResourceQuota}, + "Service": {gvk.Service}, + "DaemonSet": {gvk.DaemonSet}, + "Deployment": {gvk.Deployment}, + "ReplicaSet": {gvk.ReplicaSet}, + "StatefulSet": {gvk.StatefulSet}, + "Job": {gvk.Job}, + "CronJob": {gvk.CronJob, gvk.CronJobBeta}, + "HorizontalPodAutoscaler": {gvk.HorizontalPodAutoscaler}, + } + + for kind, gvks := range supportedKinds { + anySupported := false + for _, gvk := range gvks { + supported, err := rw.isKindSupported(gvk) + if err != nil { + return err + } + if supported { + anySupported = true + rw.setupInformerForKind(gvk, factory) + } + } + if !anySupported { + rw.logger.Warn("Server doesn't support any of the group versions defined for the kind", + zap.String("kind", kind)) + } + } if rw.osQuotaClient != nil { quotaFactory := quotainformersv1.NewSharedInformerFactory(rw.osQuotaClient, 0) - rw.setupInformers("av1.ClusterResourceQuota{}, quotaFactory.Quota().V1().ClusterResourceQuotas().Informer()) + rw.setupInformer(gvk.ClusterResourceQuota, quotaFactory.Quota().V1().ClusterResourceQuotas().Informer()) rw.informerFactories = append(rw.informerFactories, quotaFactory) } rw.informerFactories = append(rw.informerFactories, factory) + + return nil +} + +func (rw *resourceWatcher) isKindSupported(gvk schema.GroupVersionKind) (bool, error) { + resources, err := rw.client.Discovery().ServerResourcesForGroupVersion(gvk.GroupVersion().String()) + if err != nil { + if apierrors.IsNotFound(err) { // if the discovery endpoint isn't present, assume group version is not supported + rw.logger.Debug("Group version is not supported", zap.String("group", gvk.GroupVersion().String())) + return false, nil + } + return false, errors.Wrap(err, "Failed to fetch group version details") + } + + for _, r := range resources.APIResources { + if r.Kind == gvk.Kind { + return true, nil + } + } + return false, nil +} + +func (rw *resourceWatcher) setupInformerForKind(kind schema.GroupVersionKind, factory informers.SharedInformerFactory) { + switch kind { + case gvk.Pod: + rw.setupInformer(kind, factory.Core().V1().Pods().Informer()) + case gvk.Node: + rw.setupInformer(kind, factory.Core().V1().Nodes().Informer()) + case gvk.Namespace: + rw.setupInformer(kind, factory.Core().V1().Namespaces().Informer()) + case gvk.ReplicationController: + rw.setupInformer(kind, factory.Core().V1().ReplicationControllers().Informer()) + case gvk.ResourceQuota: + rw.setupInformer(kind, factory.Core().V1().ResourceQuotas().Informer()) + case gvk.Service: + rw.setupInformer(kind, factory.Core().V1().Services().Informer()) + case gvk.DaemonSet: + rw.setupInformer(kind, factory.Apps().V1().DaemonSets().Informer()) + case gvk.Deployment: + rw.setupInformer(kind, factory.Apps().V1().Deployments().Informer()) + case gvk.ReplicaSet: + rw.setupInformer(kind, factory.Apps().V1().ReplicaSets().Informer()) + case gvk.StatefulSet: + rw.setupInformer(kind, factory.Apps().V1().StatefulSets().Informer()) + case gvk.Job: + rw.setupInformer(kind, factory.Batch().V1().Jobs().Informer()) + case gvk.CronJob: + rw.setupInformer(kind, factory.Batch().V1().CronJobs().Informer()) + case gvk.CronJobBeta: + rw.setupInformer(kind, factory.Batch().V1beta1().CronJobs().Informer()) + case gvk.HorizontalPodAutoscaler: + rw.setupInformer(kind, factory.Autoscaling().V2beta2().HorizontalPodAutoscalers().Informer()) + default: + rw.logger.Error("Could not setup an informer for provided group version kind", + zap.String("group version kind", kind.String())) + } } // startWatchingResources starts up all informers. @@ -129,14 +206,14 @@ func (rw *resourceWatcher) startWatchingResources(ctx context.Context, inf share return timedContextForInitialSync } -// setupInformers adds event handlers to informers and setups a metadataStore. -func (rw *resourceWatcher) setupInformers(o runtime.Object, informer cache.SharedIndexInformer) { +// setupInformer adds event handlers to informers and setups a metadataStore. +func (rw *resourceWatcher) setupInformer(gvk schema.GroupVersionKind, informer cache.SharedIndexInformer) { informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: rw.onAdd, UpdateFunc: rw.onUpdate, DeleteFunc: rw.onDelete, }) - rw.dataCollector.SetupMetadataStore(o, informer.GetStore()) + rw.dataCollector.SetupMetadataStore(gvk, informer.GetStore()) } func (rw *resourceWatcher) onAdd(obj interface{}) { diff --git a/receiver/k8sclusterreceiver/watcher_test.go b/receiver/k8sclusterreceiver/watcher_test.go index 09b6095d2632..d359f509d1b0 100644 --- a/receiver/k8sclusterreceiver/watcher_test.go +++ b/receiver/k8sclusterreceiver/watcher_test.go @@ -17,10 +17,19 @@ package k8sclusterreceiver import ( "testing" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config" "go.uber.org/zap" + "go.uber.org/zap/zaptest/observer" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes/fake" + + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/collection" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/gvk" ) func TestSetupMetadataExporters(t *testing.T) { @@ -86,3 +95,133 @@ func TestSetupMetadataExporters(t *testing.T) { }) } } + +func TestIsKindSupported(t *testing.T) { + var tests = []struct { + name string + client *fake.Clientset + gvk schema.GroupVersionKind + expected bool + }{ + { + name: "nothing_supported", + client: fake.NewSimpleClientset(), + gvk: gvk.Pod, + expected: false, + }, + { + name: "all_kinds_supported", + client: newFakeClientWithAllResources(), + gvk: gvk.Pod, + expected: true, + }, + { + name: "unsupported_kind", + client: fake.NewSimpleClientset(), + gvk: gvk.CronJobBeta, + expected: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + rw := &resourceWatcher{ + client: tt.client, + logger: zap.NewNop(), + } + supported, err := rw.isKindSupported(tt.gvk) + assert.NoError(t, err) + assert.Equal(t, tt.expected, supported) + }) + } +} + +func TestPrepareSharedInformerFactory(t *testing.T) { + var tests = []struct { + name string + client *fake.Clientset + }{ + { + name: "new_server_version", + client: newFakeClientWithAllResources(), + }, + { + name: "old_server_version", // With no batch/v1.CronJob support. + client: func() *fake.Clientset { + client := fake.NewSimpleClientset() + client.Resources = []*metav1.APIResourceList{ + { + GroupVersion: "v1", + APIResources: []metav1.APIResource{ + gvkToAPIResource(gvk.Pod), + gvkToAPIResource(gvk.Node), + gvkToAPIResource(gvk.Namespace), + gvkToAPIResource(gvk.ReplicationController), + gvkToAPIResource(gvk.ResourceQuota), + gvkToAPIResource(gvk.Service), + }, + }, + { + GroupVersion: "apps/v1", + APIResources: []metav1.APIResource{ + gvkToAPIResource(gvk.DaemonSet), + gvkToAPIResource(gvk.Deployment), + gvkToAPIResource(gvk.ReplicaSet), + gvkToAPIResource(gvk.StatefulSet), + }, + }, + { + GroupVersion: "batch/v1", + APIResources: []metav1.APIResource{ + gvkToAPIResource(gvk.Job), + }, + }, + { + GroupVersion: "batch/v1beta1", + APIResources: []metav1.APIResource{ + gvkToAPIResource(gvk.CronJobBeta), + }, + }, + { + GroupVersion: "autoscaling/v2beta2", + APIResources: []metav1.APIResource{ + gvkToAPIResource(gvk.HorizontalPodAutoscaler), + }, + }, + } + return client + }(), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + + obs, logs := observer.New(zap.WarnLevel) + obsLogger := zap.New(obs) + rw := &resourceWatcher{ + client: newFakeClientWithAllResources(), + logger: obsLogger, + dataCollector: collection.NewDataCollector(zap.NewNop(), []string{}, []string{}), + } + + rw.prepareSharedInformerFactory() + + // Make sure no warning or error logs are raised + assert.Equal(t, 0, logs.Len()) + }) + } +} + +func TestSetupInformerForKind(t *testing.T) { + obs, logs := observer.New(zap.WarnLevel) + obsLogger := zap.New(obs) + rw := &resourceWatcher{ + client: newFakeClientWithAllResources(), + logger: obsLogger, + } + + factory := informers.NewSharedInformerFactoryWithOptions(rw.client, 0) + rw.setupInformerForKind(schema.GroupVersionKind{Group: "", Version: "v1", Kind: "WrongKind"}, factory) + + assert.Equal(t, 1, logs.Len()) + assert.Equal(t, "Could not setup an informer for provided group version kind", logs.All()[0].Entry.Message) +}