Skip to content

Commit

Permalink
[receiver/k8scluster] Check resource support before setting up watche…
Browse files Browse the repository at this point in the history
…rs (open-telemetry#9523)

This change adds additional validation using discovery API before setting up watchers on particular k8s resources. Otherwise, if k8s API doesn't support a k8s resource, setting up watchers breaks the receiver.

This change also adds support of batch/v1beta1 CronJob resources. This makes the receiver able to work with k8s server versions 1.19 and 1.20 which are still available on some cloud providers.
  • Loading branch information
dmitryax authored and djaglowski committed May 10, 2022
1 parent 0d9669d commit 604fdec
Show file tree
Hide file tree
Showing 10 changed files with 421 additions and 58 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
- `attributesprocessor`: Support filter by severity (#9132)
- `processor/transform`: Add transformation of logs (#9368)
- `datadogexporter`: Add `metrics::summaries::mode` to specify export mode for summaries (#8846)
- `k8sclusterreceiver`: Validate that k8s API supports a resource before setting up a watcher for it (#9523)

### 🧰 Bug fixes 🧰

Expand All @@ -52,6 +53,7 @@
- `resourcedetectionprocessor`: Wire docker detector (#9372)
- `kafkametricsreceiver`: The kafkametricsreceiver was changed to connect to kafka during scrape, rather than startup. If kafka is unavailable the receiver will attempt to connect during subsequent scrapes until succcessful (#8817).
- `datadogexporter`: Update Kubernetes example manifest to new executable name. (#9425).
- `k8sclusterreceiver`: Fix the receiver to work with 1.19 and 1.20 k8s API versions (#9523)

## v0.49.0

Expand Down
5 changes: 3 additions & 2 deletions receiver/k8sclusterreceiver/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer/consumertest"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig"
)
Expand Down Expand Up @@ -66,7 +67,7 @@ func TestFactory(t *testing.T) {

// Override for tests.
rCfg.makeClient = func(apiConf k8sconfig.APIConfig) (kubernetes.Interface, error) {
return nil, nil
return fake.NewSimpleClientset(), nil
}
r, err = f.CreateMetricsReceiver(
context.Background(), componenttest.NewNopReceiverCreateSettings(),
Expand All @@ -92,7 +93,7 @@ func TestFactoryDistributions(t *testing.T) {
require.True(t, ok)

rCfg.makeClient = func(apiConf k8sconfig.APIConfig) (kubernetes.Interface, error) {
return nil, nil
return fake.NewSimpleClientset(), nil
}
rCfg.makeOpenShiftQuotaClient = func(apiConf k8sconfig.APIConfig) (quotaclientset.Interface, error) {
return fakeQuota.NewSimpleClientset(), nil
Expand Down
12 changes: 9 additions & 3 deletions receiver/k8sclusterreceiver/internal/collection/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ import (
appsv1 "k8s.io/api/apps/v1"
autoscalingv2beta2 "k8s.io/api/autoscaling/v2beta2"
batchv1 "k8s.io/api/batch/v1"
batchv1beta1 "k8s.io/api/batch/v1beta1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/cache"

Expand Down Expand Up @@ -120,9 +122,9 @@ func NewDataCollector(logger *zap.Logger, nodeConditionsToReport, allocatableTyp
}
}

// SetupMetadataStore initializes a metadata store for the kubernetes object.
func (dc *DataCollector) SetupMetadataStore(o runtime.Object, store cache.Store) {
dc.metadataStore.setupStore(o, store)
// SetupMetadataStore initializes a metadata store for the kubernetes kind.
func (dc *DataCollector) SetupMetadataStore(gvk schema.GroupVersionKind, store cache.Store) {
dc.metadataStore.setupStore(gvk, store)
}

func (dc *DataCollector) RemoveFromMetricsStore(obj interface{}) {
Expand Down Expand Up @@ -176,6 +178,8 @@ func (dc *DataCollector) SyncMetrics(obj interface{}) {
rm = getMetricsForJob(o)
case *batchv1.CronJob:
rm = getMetricsForCronJob(o)
case *batchv1beta1.CronJob:
rm = getMetricsForCronJobBeta(o)
case *autoscalingv2beta2.HorizontalPodAutoscaler:
rm = getMetricsForHPA(o)
case *quotav1.ClusterResourceQuota:
Expand Down Expand Up @@ -213,6 +217,8 @@ func (dc *DataCollector) SyncMetadata(obj interface{}) map[metadata.ResourceID]*
km = getMetadataForJob(o)
case *batchv1.CronJob:
km = getMetadataForCronJob(o)
case *batchv1beta1.CronJob:
km = getMetadataForCronJobBeta(o)
case *autoscalingv2beta2.HorizontalPodAutoscaler:
km = getMetadataForHPA(o)
}
Expand Down
40 changes: 40 additions & 0 deletions receiver/k8sclusterreceiver/internal/collection/cronjobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1"
conventions "go.opentelemetry.io/collector/semconv/v1.6.1"
batchv1 "k8s.io/api/batch/v1"
batchv1beta1 "k8s.io/api/batch/v1beta1"

metadata "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/utils"
Expand All @@ -37,6 +38,8 @@ var activeJobs = &metricspb.MetricDescriptor{
Type: metricspb.MetricDescriptor_GAUGE_INT64,
}

// TODO: All the CronJob related functions below can be de-duplicated using generics from go 1.18

func getMetricsForCronJob(cj *batchv1.CronJob) []*resourceMetrics {
metrics := []*metricspb.Metric{
{
Expand All @@ -55,6 +58,24 @@ func getMetricsForCronJob(cj *batchv1.CronJob) []*resourceMetrics {
}
}

func getMetricsForCronJobBeta(cj *batchv1beta1.CronJob) []*resourceMetrics {
metrics := []*metricspb.Metric{
{
MetricDescriptor: activeJobs,
Timeseries: []*metricspb.TimeSeries{
utils.GetInt64TimeSeries(int64(len(cj.Status.Active))),
},
},
}

return []*resourceMetrics{
{
resource: getResourceForCronJobBeta(cj),
metrics: metrics,
},
}
}

func getResourceForCronJob(cj *batchv1.CronJob) *resourcepb.Resource {
return &resourcepb.Resource{
Type: k8sType,
Expand All @@ -67,9 +88,28 @@ func getResourceForCronJob(cj *batchv1.CronJob) *resourcepb.Resource {
}
}

func getResourceForCronJobBeta(cj *batchv1beta1.CronJob) *resourcepb.Resource {
return &resourcepb.Resource{
Type: k8sType,
Labels: map[string]string{
conventions.AttributeK8SCronJobUID: string(cj.UID),
conventions.AttributeK8SCronJobName: cj.Name,
conventions.AttributeK8SNamespaceName: cj.Namespace,
conventions.AttributeK8SClusterName: cj.ClusterName,
},
}
}

func getMetadataForCronJob(cj *batchv1.CronJob) map[metadata.ResourceID]*KubernetesMetadata {
rm := getGenericMetadata(&cj.ObjectMeta, k8sKindCronJob)
rm.metadata[cronJobKeySchedule] = cj.Spec.Schedule
rm.metadata[cronJobKeyConcurrencyPolicy] = string(cj.Spec.ConcurrencyPolicy)
return map[metadata.ResourceID]*KubernetesMetadata{metadata.ResourceID(cj.UID): rm}
}

func getMetadataForCronJobBeta(cj *batchv1beta1.CronJob) map[metadata.ResourceID]*KubernetesMetadata {
rm := getGenericMetadata(&cj.ObjectMeta, k8sKindCronJob)
rm.metadata[cronJobKeySchedule] = cj.Spec.Schedule
rm.metadata[cronJobKeyConcurrencyPolicy] = string(cj.Spec.ConcurrencyPolicy)
return map[metadata.ResourceID]*KubernetesMetadata{metadata.ResourceID(cj.UID): rm}
}
17 changes: 8 additions & 9 deletions receiver/k8sclusterreceiver/internal/collection/metadatastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,10 @@
package collection // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/collection"

import (
appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/tools/cache"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/gvk"
)

// metadataStore keeps track of required caches exposed by informers.
Expand All @@ -32,13 +31,13 @@ type metadataStore struct {
}

// setupStore tracks metadata of services, jobs and replicasets.
func (ms *metadataStore) setupStore(o runtime.Object, store cache.Store) {
switch o.(type) {
case *corev1.Service:
func (ms *metadataStore) setupStore(kind schema.GroupVersionKind, store cache.Store) {
switch kind {
case gvk.Service:
ms.services = store
case *batchv1.Job:
case gvk.Job:
ms.jobs = store
case *appsv1.ReplicaSet:
case gvk.ReplicaSet:
ms.replicaSets = store
}
}
36 changes: 36 additions & 0 deletions receiver/k8sclusterreceiver/internal/gvk/gvk.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright 2020, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package gvk // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/gvk"

import "k8s.io/apimachinery/pkg/runtime/schema"

// Kubernetes group version kinds
var (
Pod = schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Pod"}
Node = schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Node"}
Namespace = schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Namespace"}
ReplicationController = schema.GroupVersionKind{Group: "", Version: "v1", Kind: "ReplicationController"}
ResourceQuota = schema.GroupVersionKind{Group: "", Version: "v1", Kind: "ResourceQuota"}
Service = schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Service"}
DaemonSet = schema.GroupVersionKind{Group: "apps", Version: "v1", Kind: "DaemonSet"}
Deployment = schema.GroupVersionKind{Group: "apps", Version: "v1", Kind: "Deployment"}
ReplicaSet = schema.GroupVersionKind{Group: "apps", Version: "v1", Kind: "ReplicaSet"}
StatefulSet = schema.GroupVersionKind{Group: "apps", Version: "v1", Kind: "StatefulSet"}
Job = schema.GroupVersionKind{Group: "batch", Version: "v1", Kind: "Job"}
CronJob = schema.GroupVersionKind{Group: "batch", Version: "v1", Kind: "CronJob"}
CronJobBeta = schema.GroupVersionKind{Group: "batch", Version: "v1beta1", Kind: "CronJob"}
HorizontalPodAutoscaler = schema.GroupVersionKind{Group: "autoscaling", Version: "v2beta2", Kind: "HorizontalPodAutoscaler"}
ClusterResourceQuota = schema.GroupVersionKind{Group: "quota", Version: "v1", Kind: "ClusterResourceQuota"}
)
7 changes: 6 additions & 1 deletion receiver/k8sclusterreceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"time"

quotaclientset "github.com/openshift/client-go/quota/clientset/versioned"
"github.com/pkg/errors"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
Expand Down Expand Up @@ -115,7 +116,11 @@ func (kr *kubernetesReceiver) dispatchMetrics(ctx context.Context) {
func newReceiver(
set component.ReceiverCreateSettings, config *Config, consumer consumer.Metrics,
client kubernetes.Interface, osQuotaClient quotaclientset.Interface) (component.MetricsReceiver, error) {
resourceWatcher := newResourceWatcher(set.Logger, client, osQuotaClient, config.NodeConditionTypesToReport, config.AllocatableTypesToReport, defaultInitialSyncTimeout)
resourceWatcher, err := newResourceWatcher(set.Logger, client, osQuotaClient, config.NodeConditionTypesToReport,
config.AllocatableTypesToReport, defaultInitialSyncTimeout)
if err != nil {
return nil, errors.Wrap(err, "Failed to setup the receiver")
}

return &kubernetesReceiver{
resourceWatcher: resourceWatcher,
Expand Down
80 changes: 69 additions & 11 deletions receiver/k8sclusterreceiver/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@ import (
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/kubernetes/fake"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/gvk"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/k8sclusterreceiver/internal/testutils"
)

Expand All @@ -41,11 +43,12 @@ func TestReceiver(t *testing.T) {
require.NoError(t, err)
defer tt.Shutdown(context.Background())

client := fake.NewSimpleClientset()
client := newFakeClientWithAllResources()
osQuotaClient := fakeQuota.NewSimpleClientset()
sink := new(consumertest.MetricsSink)

r := setupReceiver(client, osQuotaClient, sink, 10*time.Second, tt)
r, err := setupReceiver(client, osQuotaClient, sink, 10*time.Second, tt)
require.NoError(t, err)

// Setup k8s resources.
numPods := 2
Expand Down Expand Up @@ -88,10 +91,11 @@ func TestReceiverTimesOutAfterStartup(t *testing.T) {
tt, err := obsreporttest.SetupTelemetry()
require.NoError(t, err)
defer tt.Shutdown(context.Background())
client := fake.NewSimpleClientset()
client := newFakeClientWithAllResources()

// Mock initial cache sync timing out, using a small timeout.
r := setupReceiver(client, nil, consumertest.NewNop(), 1*time.Millisecond, tt)
r, err := setupReceiver(client, nil, consumertest.NewNop(), 1*time.Millisecond, tt)
require.NoError(t, err)

createPods(t, client, 1)

Expand All @@ -108,11 +112,12 @@ func TestReceiverWithManyResources(t *testing.T) {
require.NoError(t, err)
defer tt.Shutdown(context.Background())

client := fake.NewSimpleClientset()
client := newFakeClientWithAllResources()
osQuotaClient := fakeQuota.NewSimpleClientset()
sink := new(consumertest.MetricsSink)

r := setupReceiver(client, osQuotaClient, sink, 10*time.Second, tt)
r, err := setupReceiver(client, osQuotaClient, sink, 10*time.Second, tt)
require.NoError(t, err)

numPods := 1000
numQuotas := 2
Expand Down Expand Up @@ -144,11 +149,12 @@ func TestReceiverWithMetadata(t *testing.T) {
require.NoError(t, err)
defer tt.Shutdown(context.Background())

client := fake.NewSimpleClientset()
client := newFakeClientWithAllResources()
next := &mockExporterWithK8sMetadata{MetricsSink: new(consumertest.MetricsSink)}
numCalls = atomic.NewInt32(0)

r := setupReceiver(client, nil, next, 10*time.Second, tt)
r, err := setupReceiver(client, nil, next, 10*time.Second, tt)
require.NoError(t, err)
r.config.MetadataExporters = []string{"nop/withmetadata"}

// Setup k8s resources.
Expand Down Expand Up @@ -196,7 +202,7 @@ func setupReceiver(
osQuotaClient quotaclientset.Interface,
consumer consumer.Metrics,
initialSyncTimeout time.Duration,
tt obsreporttest.TestTelemetry) *kubernetesReceiver {
tt obsreporttest.TestTelemetry) (*kubernetesReceiver, error) {

distribution := distributionKubernetes
if osQuotaClient != nil {
Expand All @@ -211,8 +217,12 @@ func setupReceiver(
Distribution: distribution,
}

rw := newResourceWatcher(logger, client, osQuotaClient, config.NodeConditionTypesToReport, config.AllocatableTypesToReport, initialSyncTimeout)
rw.dataCollector.SetupMetadataStore(&corev1.Service{}, &testutils.MockStore{})
rw, err := newResourceWatcher(logger, client, osQuotaClient, config.NodeConditionTypesToReport,
config.AllocatableTypesToReport, initialSyncTimeout)
if err != nil {
return nil, err
}
rw.dataCollector.SetupMetadataStore(gvk.Service, &testutils.MockStore{})

return &kubernetesReceiver{
resourceWatcher: rw,
Expand All @@ -224,5 +234,53 @@ func setupReceiver(
Transport: "http",
ReceiverCreateSettings: tt.ToReceiverCreateSettings(),
}),
}, nil
}

func newFakeClientWithAllResources() *fake.Clientset {
client := fake.NewSimpleClientset()
client.Resources = []*v1.APIResourceList{
{
GroupVersion: "v1",
APIResources: []v1.APIResource{
gvkToAPIResource(gvk.Pod),
gvkToAPIResource(gvk.Node),
gvkToAPIResource(gvk.Namespace),
gvkToAPIResource(gvk.ReplicationController),
gvkToAPIResource(gvk.ResourceQuota),
gvkToAPIResource(gvk.Service),
},
},
{
GroupVersion: "apps/v1",
APIResources: []v1.APIResource{
gvkToAPIResource(gvk.DaemonSet),
gvkToAPIResource(gvk.Deployment),
gvkToAPIResource(gvk.ReplicaSet),
gvkToAPIResource(gvk.StatefulSet),
},
},
{
GroupVersion: "batch/v1",
APIResources: []v1.APIResource{
gvkToAPIResource(gvk.Job),
gvkToAPIResource(gvk.CronJob),
},
},
{
GroupVersion: "autoscaling/v2beta2",
APIResources: []v1.APIResource{
gvkToAPIResource(gvk.HorizontalPodAutoscaler),
},
},
}
return client
}

func gvkToAPIResource(gvk schema.GroupVersionKind) v1.APIResource {
return v1.APIResource{
Group: gvk.Group,
Version: gvk.Version,
Kind: gvk.Kind,
}
}
Loading

0 comments on commit 604fdec

Please sign in to comment.