diff --git a/pkg/flowaggregator/flowaggregator.go b/pkg/flowaggregator/flowaggregator.go index e11ea72945b..695ff19e209 100644 --- a/pkg/flowaggregator/flowaggregator.go +++ b/pkg/flowaggregator/flowaggregator.go @@ -698,4 +698,27 @@ func (fa *flowAggregator) updateFlowAggregator(opt *options.Options) { klog.InfoS("Disabled FlowLogger") } } + if opt.Config.RecordContents.PodLabels != fa.includePodLabels { + fa.includePodLabels = opt.Config.RecordContents.PodLabels + klog.InfoS("Updated RecordContents.PodLabels configuration", "value", fa.includePodLabels) + } + var unsupportedUpdates []string + if opt.Config.APIServer != fa.APIServer { + unsupportedUpdates = append(unsupportedUpdates, "apiServer") + } + if opt.ActiveFlowRecordTimeout != fa.activeFlowRecordTimeout { + unsupportedUpdates = append(unsupportedUpdates, "activeFlowRecordTimeout") + } + if opt.InactiveFlowRecordTimeout != fa.inactiveFlowRecordTimeout { + unsupportedUpdates = append(unsupportedUpdates, "inactiveFlowRecordTimeout") + } + if opt.AggregatorTransportProtocol != fa.aggregatorTransportProtocol { + unsupportedUpdates = append(unsupportedUpdates, "aggregatorTransportProtocol") + } + if opt.Config.FlowAggregatorAddress != fa.flowAggregatorAddress { + unsupportedUpdates = append(unsupportedUpdates, "flowAggregatorAddress") + } + if len(unsupportedUpdates) > 0 { + klog.ErrorS(nil, "Ignoring unsupported configuration updates, please restart FlowAggregator", "keys", unsupportedUpdates) + } } diff --git a/pkg/flowaggregator/flowaggregator_test.go b/pkg/flowaggregator/flowaggregator_test.go index 99e1951a50a..81b8c504d6a 100644 --- a/pkg/flowaggregator/flowaggregator_test.go +++ b/pkg/flowaggregator/flowaggregator_test.go @@ -36,6 +36,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/fake" + "k8s.io/klog/v2" flowaggregatorconfig "antrea.io/antrea/pkg/config/flowaggregator" "antrea.io/antrea/pkg/flowaggregator/exporter" @@ -440,6 +441,35 @@ func TestFlowAggregator_updateFlowAggregator(t *testing.T) { mockLogExporter.EXPECT().UpdateOptions(opt) flowAggregator.updateFlowAggregator(opt) }) + t.Run("includePodLables", func(t *testing.T) { + flowAggregator := &flowAggregator{} + require.False(t, flowAggregator.includePodLabels) + opt := &options.Options{ + Config: &flowaggregatorconfig.FlowAggregatorConfig{ + RecordContents: flowaggregatorconfig.RecordContentsConfig{ + PodLabels: true, + }, + }, + } + flowAggregator.updateFlowAggregator(opt) + assert.True(t, flowAggregator.includePodLabels) + }) + t.Run("unsupportedUpdate", func(t *testing.T) { + flowAggregator := &flowAggregator{} + var b bytes.Buffer + klog.SetOutput(&b) + klog.LogToStderr(false) + defer func() { + klog.SetOutput(os.Stderr) + klog.LogToStderr(true) + }() + opt := &options.Options{ + ActiveFlowRecordTimeout: 30 * time.Second, + Config: &flowaggregatorconfig.FlowAggregatorConfig{}, + } + flowAggregator.updateFlowAggregator(opt) + assert.Contains(t, b.String(), "Ignoring unsupported configuration updates, please restart FlowAggregator\" keys=[\"activeFlowRecordTimeout\"]") + }) } func TestFlowAggregator_Run(t *testing.T) { @@ -492,6 +522,7 @@ func TestFlowAggregator_Run(t *testing.T) { } flowAggregator := &flowAggregator{ + // must be large enough to avoid a call to ForAllExpiredFlowRecordsDo activeFlowRecordTimeout: 1 * time.Hour, logTickerDuration: 1 * time.Hour, collectingProcess: mockCollectingProcess, @@ -534,62 +565,53 @@ func TestFlowAggregator_Run(t *testing.T) { flowAggregator.Run(stopCh) }() - disableIPFIXOptions := &options.Options{ - Config: &flowaggregatorconfig.FlowAggregatorConfig{ - FlowCollector: flowaggregatorconfig.FlowCollectorConfig{ - Enable: false, - }, - }, + makeOptions := func(config *flowaggregatorconfig.FlowAggregatorConfig) *options.Options { + return &options.Options{ + ActiveFlowRecordTimeout: flowAggregator.activeFlowRecordTimeout, + Config: config, + } } - enableIPFIXOptions := &options.Options{ - Config: &flowaggregatorconfig.FlowAggregatorConfig{ - FlowCollector: flowaggregatorconfig.FlowCollectorConfig{ - Enable: true, - }, + + disableIPFIXOptions := makeOptions(&flowaggregatorconfig.FlowAggregatorConfig{ + FlowCollector: flowaggregatorconfig.FlowCollectorConfig{ + Enable: false, }, - } - enableClickHouseOptions := &options.Options{ - Config: &flowaggregatorconfig.FlowAggregatorConfig{ - ClickHouse: flowaggregatorconfig.ClickHouseConfig{ - Enable: true, - }, + }) + enableIPFIXOptions := makeOptions(&flowaggregatorconfig.FlowAggregatorConfig{ + FlowCollector: flowaggregatorconfig.FlowCollectorConfig{ + Enable: true, }, - } - disableClickHouseOptions := &options.Options{ - Config: &flowaggregatorconfig.FlowAggregatorConfig{ - ClickHouse: flowaggregatorconfig.ClickHouseConfig{ - Enable: false, - }, + }) + enableClickHouseOptions := makeOptions(&flowaggregatorconfig.FlowAggregatorConfig{ + ClickHouse: flowaggregatorconfig.ClickHouseConfig{ + Enable: true, }, - } - enableS3UploaderOptions := &options.Options{ - Config: &flowaggregatorconfig.FlowAggregatorConfig{ - S3Uploader: flowaggregatorconfig.S3UploaderConfig{ - Enable: true, - }, + }) + disableClickHouseOptions := makeOptions(&flowaggregatorconfig.FlowAggregatorConfig{ + ClickHouse: flowaggregatorconfig.ClickHouseConfig{ + Enable: false, }, - } - disableS3UploaderOptions := &options.Options{ - Config: &flowaggregatorconfig.FlowAggregatorConfig{ - S3Uploader: flowaggregatorconfig.S3UploaderConfig{ - Enable: false, - }, + }) + enableS3UploaderOptions := makeOptions(&flowaggregatorconfig.FlowAggregatorConfig{ + S3Uploader: flowaggregatorconfig.S3UploaderConfig{ + Enable: true, }, - } - enableFlowLoggerOptions := &options.Options{ - Config: &flowaggregatorconfig.FlowAggregatorConfig{ - FlowLogger: flowaggregatorconfig.FlowLoggerConfig{ - Enable: true, - }, + }) + disableS3UploaderOptions := makeOptions(&flowaggregatorconfig.FlowAggregatorConfig{ + S3Uploader: flowaggregatorconfig.S3UploaderConfig{ + Enable: false, }, - } - disableFlowLoggerOptions := &options.Options{ - Config: &flowaggregatorconfig.FlowAggregatorConfig{ - FlowLogger: flowaggregatorconfig.FlowLoggerConfig{ - Enable: false, - }, + }) + enableFlowLoggerOptions := makeOptions(&flowaggregatorconfig.FlowAggregatorConfig{ + FlowLogger: flowaggregatorconfig.FlowLoggerConfig{ + Enable: true, }, - } + }) + disableFlowLoggerOptions := makeOptions(&flowaggregatorconfig.FlowAggregatorConfig{ + FlowLogger: flowaggregatorconfig.FlowLoggerConfig{ + Enable: false, + }, + }) // we do a few operations: the main purpose is to ensure that cleanup // (i.e., stopping the exporters) is done properly.