Skip to content

Commit

Permalink
[receiver/k8scluster] Check resource support before setting up watchers
Browse files Browse the repository at this point in the history
This change adds additional validation using discovery API before setting up watchers on particular k8s resources. Otherwise, if k8s API doesn't support a k8s resource, setting up watchers breaks the receiver.

This change also adds support of batch/v1beta1 CronJob resources. This makes the receiver able to work with k8s server versions 1.19 and 1.20 which are still available on some cloud providers.
  • Loading branch information
dmitryax committed Apr 28, 2022
1 parent b9915f7 commit 8d159e3
Show file tree
Hide file tree
Showing 10 changed files with 421 additions and 58 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
- `attributesprocessor`: Support filter by severity (#9132)
- `processor/transform`: Add transformation of logs (#9368)
- `datadogexporter`: Add `metrics::summaries::mode` to specify export mode for summaries (#8846)
- `k8sclusterreceiver`: Validate that k8s API supports a resource before setting up a watcher for it (#9523)

### 🧰 Bug fixes 🧰

Expand All @@ -40,6 +41,7 @@
- `resourcedetectionprocessor`: Wire docker detector (#9372)
- `kafkametricsreceiver`: The kafkametricsreceiver was changed to connect to kafka during scrape, rather than startup. If kafka is unavailable the receiver will attempt to connect during subsequent scrapes until succcessful (#8817).
- `datadogexporter`: Update Kubernetes example manifest to new executable name. (#9425).
- `k8sclusterreceiver`: Fix the receiver to work with 1.19 and 1.20 k8s API versions (#9523)

## v0.49.0

Expand Down
5 changes: 3 additions & 2 deletions receiver/k8sclusterreceiver/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer/consumertest"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig"
)
Expand Down Expand Up @@ -66,7 +67,7 @@ func TestFactory(t *testing.T) {

// Override for tests.
rCfg.makeClient = func(apiConf k8sconfig.APIConfig) (kubernetes.Interface, error) {
return nil, nil
return fake.NewSimpleClientset(), nil
}
r, err = f.CreateMetricsReceiver(
context.Background(), componenttest.NewNopReceiverCreateSettings(),
Expand All @@ -92,7 +93,7 @@ func TestFactoryDistributions(t *testing.T) {
require.True(t, ok)

rCfg.makeClient = func(apiConf k8sconfig.APIConfig) (kubernetes.Interface, error) {
return nil, nil
return fake.NewSimpleClientset(), nil
}
rCfg.makeOpenShiftQuotaClient = func(apiConf k8sconfig.APIConfig) (quotaclientset.Interface, error) {
return fakeQuota.NewSimpleClientset(), nil
Expand Down
12 changes: 9 additions & 3 deletions receiver/k8sclusterreceiver/internal/collection/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ import (
appsv1 "k8s.io/api/apps/v1"
autoscalingv2beta2 "k8s.io/api/autoscaling/v2beta2"
batchv1 "k8s.io/api/batch/v1"
batchv1beta1 "k8s.io/api/batch/v1beta1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/cache"

Expand Down Expand Up @@ -120,9 +122,9 @@ func NewDataCollector(logger *zap.Logger, nodeConditionsToReport, allocatableTyp
}
}

// SetupMetadataStore initializes a metadata store for the kubernetes object.
func (dc *DataCollector) SetupMetadataStore(o runtime.Object, store cache.Store) {
dc.metadataStore.setupStore(o, store)
// SetupMetadataStore initializes a metadata store for the kubernetes kind.
func (dc *DataCollector) SetupMetadataStore(gvk schema.GroupVersionKind, store cache.Store) {
dc.metadataStore.setupStore(gvk, store)
}

func (dc *DataCollector) RemoveFromMetricsStore(obj interface{}) {
Expand Down Expand Up @@ -176,6 +178,8 @@ func (dc *DataCollector) SyncMetrics(obj interface{}) {
rm = getMetricsForJob(o)
case *batchv1.CronJob:
rm = getMetricsForCronJob(o)
case *batchv1beta1.CronJob:
rm = getMetricsForCronJobBeta(o)
case *autoscalingv2beta2.HorizontalPodAutoscaler:
rm = getMetricsForHPA(o)
case *quotav1.ClusterResourceQuota:
Expand Down Expand Up @@ -213,6 +217,8 @@ func (dc *DataCollector) SyncMetadata(obj interface{}) map[metadata.ResourceID]*
km = getMetadataForJob(o)
case *batchv1.CronJob:
km = getMetadataForCronJob(o)
case *batchv1beta1.CronJob:
km = getMetadataForCronJobBeta(o)
case *autoscalingv2beta2.HorizontalPodAutoscaler:
km = getMetadataForHPA(o)
}
Expand Down
40 changes: 40 additions & 0 deletions receiver/k8sclusterreceiver/internal/collection/cronjobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1"
conventions "go.opentelemetry.io/collector/semconv/v1.6.1"
batchv1 "k8s.io/api/batch/v1"
batchv1beta1 "k8s.io/api/batch/v1beta1"

metadata "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/utils"
Expand All @@ -37,6 +38,8 @@ var activeJobs = &metricspb.MetricDescriptor{
Type: metricspb.MetricDescriptor_GAUGE_INT64,
}

// TODO: All the CronJob related functions below can be de-duplicated using generics from go 1.18

func getMetricsForCronJob(cj *batchv1.CronJob) []*resourceMetrics {
metrics := []*metricspb.Metric{
{
Expand All @@ -55,6 +58,24 @@ func getMetricsForCronJob(cj *batchv1.CronJob) []*resourceMetrics {
}
}

func getMetricsForCronJobBeta(cj *batchv1beta1.CronJob) []*resourceMetrics {
metrics := []*metricspb.Metric{
{
MetricDescriptor: activeJobs,
Timeseries: []*metricspb.TimeSeries{
utils.GetInt64TimeSeries(int64(len(cj.Status.Active))),
},
},
}

return []*resourceMetrics{
{
resource: getResourceForCronJobBeta(cj),
metrics: metrics,
},
}
}

func getResourceForCronJob(cj *batchv1.CronJob) *resourcepb.Resource {
return &resourcepb.Resource{
Type: k8sType,
Expand All @@ -67,9 +88,28 @@ func getResourceForCronJob(cj *batchv1.CronJob) *resourcepb.Resource {
}
}

func getResourceForCronJobBeta(cj *batchv1beta1.CronJob) *resourcepb.Resource {
return &resourcepb.Resource{
Type: k8sType,
Labels: map[string]string{
conventions.AttributeK8SCronJobUID: string(cj.UID),
conventions.AttributeK8SCronJobName: cj.Name,
conventions.AttributeK8SNamespaceName: cj.Namespace,
conventions.AttributeK8SClusterName: cj.ClusterName,
},
}
}

func getMetadataForCronJob(cj *batchv1.CronJob) map[metadata.ResourceID]*KubernetesMetadata {
rm := getGenericMetadata(&cj.ObjectMeta, k8sKindCronJob)
rm.metadata[cronJobKeySchedule] = cj.Spec.Schedule
rm.metadata[cronJobKeyConcurrencyPolicy] = string(cj.Spec.ConcurrencyPolicy)
return map[metadata.ResourceID]*KubernetesMetadata{metadata.ResourceID(cj.UID): rm}
}

func getMetadataForCronJobBeta(cj *batchv1beta1.CronJob) map[metadata.ResourceID]*KubernetesMetadata {
rm := getGenericMetadata(&cj.ObjectMeta, k8sKindCronJob)
rm.metadata[cronJobKeySchedule] = cj.Spec.Schedule
rm.metadata[cronJobKeyConcurrencyPolicy] = string(cj.Spec.ConcurrencyPolicy)
return map[metadata.ResourceID]*KubernetesMetadata{metadata.ResourceID(cj.UID): rm}
}
17 changes: 8 additions & 9 deletions receiver/k8sclusterreceiver/internal/collection/metadatastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,10 @@
package collection // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/collection"

import (
appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/tools/cache"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/gvk"
)

// metadataStore keeps track of required caches exposed by informers.
Expand All @@ -32,13 +31,13 @@ type metadataStore struct {
}

// setupStore tracks metadata of services, jobs and replicasets.
func (ms *metadataStore) setupStore(o runtime.Object, store cache.Store) {
switch o.(type) {
case *corev1.Service:
func (ms *metadataStore) setupStore(kind schema.GroupVersionKind, store cache.Store) {
switch kind {
case gvk.Service:
ms.services = store
case *batchv1.Job:
case gvk.Job:
ms.jobs = store
case *appsv1.ReplicaSet:
case gvk.ReplicaSet:
ms.replicaSets = store
}
}
36 changes: 36 additions & 0 deletions receiver/k8sclusterreceiver/internal/gvk/gvk.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright 2020, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package gvk // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/gvk"

import "k8s.io/apimachinery/pkg/runtime/schema"

// Kubernetes group version kinds
var (
Pod = schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Pod"}
Node = schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Node"}
Namespace = schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Namespace"}
ReplicationController = schema.GroupVersionKind{Group: "", Version: "v1", Kind: "ReplicationController"}
ResourceQuota = schema.GroupVersionKind{Group: "", Version: "v1", Kind: "ResourceQuota"}
Service = schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Service"}
DaemonSet = schema.GroupVersionKind{Group: "apps", Version: "v1", Kind: "DaemonSet"}
Deployment = schema.GroupVersionKind{Group: "apps", Version: "v1", Kind: "Deployment"}
ReplicaSet = schema.GroupVersionKind{Group: "apps", Version: "v1", Kind: "ReplicaSet"}
StatefulSet = schema.GroupVersionKind{Group: "apps", Version: "v1", Kind: "StatefulSet"}
Job = schema.GroupVersionKind{Group: "batch", Version: "v1", Kind: "Job"}
CronJob = schema.GroupVersionKind{Group: "batch", Version: "v1", Kind: "CronJob"}
CronJobBeta = schema.GroupVersionKind{Group: "batch", Version: "v1beta1", Kind: "CronJob"}
HorizontalPodAutoscaler = schema.GroupVersionKind{Group: "autoscaling", Version: "v2beta2", Kind: "HorizontalPodAutoscaler"}
ClusterResourceQuota = schema.GroupVersionKind{Group: "quota", Version: "v1", Kind: "ClusterResourceQuota"}
)
7 changes: 6 additions & 1 deletion receiver/k8sclusterreceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"time"

quotaclientset "github.com/openshift/client-go/quota/clientset/versioned"
"github.com/pkg/errors"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
Expand Down Expand Up @@ -115,7 +116,11 @@ func (kr *kubernetesReceiver) dispatchMetrics(ctx context.Context) {
func newReceiver(
set component.ReceiverCreateSettings, config *Config, consumer consumer.Metrics,
client kubernetes.Interface, osQuotaClient quotaclientset.Interface) (component.MetricsReceiver, error) {
resourceWatcher := newResourceWatcher(set.Logger, client, osQuotaClient, config.NodeConditionTypesToReport, config.AllocatableTypesToReport, defaultInitialSyncTimeout)
resourceWatcher, err := newResourceWatcher(set.Logger, client, osQuotaClient, config.NodeConditionTypesToReport,
config.AllocatableTypesToReport, defaultInitialSyncTimeout)
if err != nil {
return nil, errors.Wrap(err, "Failed to setup the receiver")
}

return &kubernetesReceiver{
resourceWatcher: resourceWatcher,
Expand Down
80 changes: 69 additions & 11 deletions receiver/k8sclusterreceiver/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@ import (
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/kubernetes/fake"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/gvk"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/testutils"
)

Expand All @@ -41,11 +43,12 @@ func TestReceiver(t *testing.T) {
require.NoError(t, err)
defer tt.Shutdown(context.Background())

client := fake.NewSimpleClientset()
client := newFakeClientWithAllResources()
osQuotaClient := fakeQuota.NewSimpleClientset()
sink := new(consumertest.MetricsSink)

r := setupReceiver(client, osQuotaClient, sink, 10*time.Second, tt)
r, err := setupReceiver(client, osQuotaClient, sink, 10*time.Second, tt)
require.NoError(t, err)

// Setup k8s resources.
numPods := 2
Expand Down Expand Up @@ -88,10 +91,11 @@ func TestReceiverTimesOutAfterStartup(t *testing.T) {
tt, err := obsreporttest.SetupTelemetry()
require.NoError(t, err)
defer tt.Shutdown(context.Background())
client := fake.NewSimpleClientset()
client := newFakeClientWithAllResources()

// Mock initial cache sync timing out, using a small timeout.
r := setupReceiver(client, nil, consumertest.NewNop(), 1*time.Millisecond, tt)
r, err := setupReceiver(client, nil, consumertest.NewNop(), 1*time.Millisecond, tt)
require.NoError(t, err)

createPods(t, client, 1)

Expand All @@ -108,11 +112,12 @@ func TestReceiverWithManyResources(t *testing.T) {
require.NoError(t, err)
defer tt.Shutdown(context.Background())

client := fake.NewSimpleClientset()
client := newFakeClientWithAllResources()
osQuotaClient := fakeQuota.NewSimpleClientset()
sink := new(consumertest.MetricsSink)

r := setupReceiver(client, osQuotaClient, sink, 10*time.Second, tt)
r, err := setupReceiver(client, osQuotaClient, sink, 10*time.Second, tt)
require.NoError(t, err)

numPods := 1000
numQuotas := 2
Expand Down Expand Up @@ -144,11 +149,12 @@ func TestReceiverWithMetadata(t *testing.T) {
require.NoError(t, err)
defer tt.Shutdown(context.Background())

client := fake.NewSimpleClientset()
client := newFakeClientWithAllResources()
next := &mockExporterWithK8sMetadata{MetricsSink: new(consumertest.MetricsSink)}
numCalls = atomic.NewInt32(0)

r := setupReceiver(client, nil, next, 10*time.Second, tt)
r, err := setupReceiver(client, nil, next, 10*time.Second, tt)
require.NoError(t, err)
r.config.MetadataExporters = []string{"nop/withmetadata"}

// Setup k8s resources.
Expand Down Expand Up @@ -196,7 +202,7 @@ func setupReceiver(
osQuotaClient quotaclientset.Interface,
consumer consumer.Metrics,
initialSyncTimeout time.Duration,
tt obsreporttest.TestTelemetry) *kubernetesReceiver {
tt obsreporttest.TestTelemetry) (*kubernetesReceiver, error) {

distribution := distributionKubernetes
if osQuotaClient != nil {
Expand All @@ -211,8 +217,12 @@ func setupReceiver(
Distribution: distribution,
}

rw := newResourceWatcher(logger, client, osQuotaClient, config.NodeConditionTypesToReport, config.AllocatableTypesToReport, initialSyncTimeout)
rw.dataCollector.SetupMetadataStore(&corev1.Service{}, &testutils.MockStore{})
rw, err := newResourceWatcher(logger, client, osQuotaClient, config.NodeConditionTypesToReport,
config.AllocatableTypesToReport, initialSyncTimeout)
if err != nil {
return nil, err
}
rw.dataCollector.SetupMetadataStore(gvk.Service, &testutils.MockStore{})

return &kubernetesReceiver{
resourceWatcher: rw,
Expand All @@ -224,5 +234,53 @@ func setupReceiver(
Transport: "http",
ReceiverCreateSettings: tt.ToReceiverCreateSettings(),
}),
}, nil
}

func newFakeClientWithAllResources() *fake.Clientset {
client := fake.NewSimpleClientset()
client.Resources = []*v1.APIResourceList{
{
GroupVersion: "v1",
APIResources: []v1.APIResource{
gvkToAPIResource(gvk.Pod),
gvkToAPIResource(gvk.Node),
gvkToAPIResource(gvk.Namespace),
gvkToAPIResource(gvk.ReplicationController),
gvkToAPIResource(gvk.ResourceQuota),
gvkToAPIResource(gvk.Service),
},
},
{
GroupVersion: "apps/v1",
APIResources: []v1.APIResource{
gvkToAPIResource(gvk.DaemonSet),
gvkToAPIResource(gvk.Deployment),
gvkToAPIResource(gvk.ReplicaSet),
gvkToAPIResource(gvk.StatefulSet),
},
},
{
GroupVersion: "batch/v1",
APIResources: []v1.APIResource{
gvkToAPIResource(gvk.Job),
gvkToAPIResource(gvk.CronJob),
},
},
{
GroupVersion: "autoscaling/v2beta2",
APIResources: []v1.APIResource{
gvkToAPIResource(gvk.HorizontalPodAutoscaler),
},
},
}
return client
}

func gvkToAPIResource(gvk schema.GroupVersionKind) v1.APIResource {
return v1.APIResource{
Group: gvk.Group,
Version: gvk.Version,
Kind: gvk.Kind,
}
}
Loading

0 comments on commit 8d159e3

Please sign in to comment.