Skip to content

Commit

Permalink
Imporve handling of config changes in FlowAggregator
Browse files Browse the repository at this point in the history
Prior to this change, only updates to the configuration of exporters
(sinks) were handled gracefully, without requiring a FlowAggregator
restart.
After this change, we also support updating recordContents.podLabels at
runtime.
For all other unsupported config changes, we print an error log, asking
users to restart the FlowAggregator.

Signed-off-by: Antonin Bas <antonin.bas@broadcom.com>
  • Loading branch information
antoninbas committed May 28, 2024
1 parent 2801ee2 commit 30c98b0
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 48 deletions.
23 changes: 23 additions & 0 deletions pkg/flowaggregator/flowaggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -698,4 +698,27 @@ func (fa *flowAggregator) updateFlowAggregator(opt *options.Options) {
klog.InfoS("Disabled FlowLogger")
}
}
if opt.Config.RecordContents.PodLabels != fa.includePodLabels {
fa.includePodLabels = opt.Config.RecordContents.PodLabels
klog.InfoS("Updated RecordContents.PodLabels configuration", "value", fa.includePodLabels)
}
var unsupportedUpdates []string
if opt.Config.APIServer != fa.APIServer {
unsupportedUpdates = append(unsupportedUpdates, "apiServer")
}
if opt.ActiveFlowRecordTimeout != fa.activeFlowRecordTimeout {
unsupportedUpdates = append(unsupportedUpdates, "activeFlowRecordTimeout")
}
if opt.InactiveFlowRecordTimeout != fa.inactiveFlowRecordTimeout {
unsupportedUpdates = append(unsupportedUpdates, "inactiveFlowRecordTimeout")
}
if opt.AggregatorTransportProtocol != fa.aggregatorTransportProtocol {
unsupportedUpdates = append(unsupportedUpdates, "aggregatorTransportProtocol")
}
if opt.Config.FlowAggregatorAddress != fa.flowAggregatorAddress {
unsupportedUpdates = append(unsupportedUpdates, "flowAggregatorAddress")
}
if len(unsupportedUpdates) > 0 {
klog.ErrorS(nil, "Ignoring unsupported configuration updates, please restart FlowAggregator", "keys", unsupportedUpdates)
}
}
118 changes: 70 additions & 48 deletions pkg/flowaggregator/flowaggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/klog/v2"

flowaggregatorconfig "antrea.io/antrea/pkg/config/flowaggregator"
"antrea.io/antrea/pkg/flowaggregator/exporter"
Expand Down Expand Up @@ -440,6 +441,35 @@ func TestFlowAggregator_updateFlowAggregator(t *testing.T) {
mockLogExporter.EXPECT().UpdateOptions(opt)
flowAggregator.updateFlowAggregator(opt)
})
t.Run("includePodLables", func(t *testing.T) {
flowAggregator := &flowAggregator{}
require.False(t, flowAggregator.includePodLabels)
opt := &options.Options{
Config: &flowaggregatorconfig.FlowAggregatorConfig{
RecordContents: flowaggregatorconfig.RecordContentsConfig{
PodLabels: true,
},
},
}
flowAggregator.updateFlowAggregator(opt)
assert.True(t, flowAggregator.includePodLabels)
})
t.Run("unsupportedUpdate", func(t *testing.T) {
flowAggregator := &flowAggregator{}
var b bytes.Buffer
klog.SetOutput(&b)
klog.LogToStderr(false)
defer func() {
klog.SetOutput(os.Stderr)
klog.LogToStderr(true)
}()
opt := &options.Options{
ActiveFlowRecordTimeout: 30 * time.Second,
Config: &flowaggregatorconfig.FlowAggregatorConfig{},
}
flowAggregator.updateFlowAggregator(opt)
assert.Contains(t, b.String(), "Ignoring unsupported configuration updates, please restart FlowAggregator\" keys=[\"activeFlowRecordTimeout\"]")
})
}

func TestFlowAggregator_Run(t *testing.T) {
Expand Down Expand Up @@ -492,6 +522,7 @@ func TestFlowAggregator_Run(t *testing.T) {
}

flowAggregator := &flowAggregator{
// must be large enough to avoid a call to ForAllExpiredFlowRecordsDo
activeFlowRecordTimeout: 1 * time.Hour,
logTickerDuration: 1 * time.Hour,
collectingProcess: mockCollectingProcess,
Expand Down Expand Up @@ -534,62 +565,53 @@ func TestFlowAggregator_Run(t *testing.T) {
flowAggregator.Run(stopCh)
}()

disableIPFIXOptions := &options.Options{
Config: &flowaggregatorconfig.FlowAggregatorConfig{
FlowCollector: flowaggregatorconfig.FlowCollectorConfig{
Enable: false,
},
},
makeOptions := func(config *flowaggregatorconfig.FlowAggregatorConfig) *options.Options {
return &options.Options{
ActiveFlowRecordTimeout: flowAggregator.activeFlowRecordTimeout,
Config: config,
}
}
enableIPFIXOptions := &options.Options{
Config: &flowaggregatorconfig.FlowAggregatorConfig{
FlowCollector: flowaggregatorconfig.FlowCollectorConfig{
Enable: true,
},

disableIPFIXOptions := makeOptions(&flowaggregatorconfig.FlowAggregatorConfig{
FlowCollector: flowaggregatorconfig.FlowCollectorConfig{
Enable: false,
},
}
enableClickHouseOptions := &options.Options{
Config: &flowaggregatorconfig.FlowAggregatorConfig{
ClickHouse: flowaggregatorconfig.ClickHouseConfig{
Enable: true,
},
})
enableIPFIXOptions := makeOptions(&flowaggregatorconfig.FlowAggregatorConfig{
FlowCollector: flowaggregatorconfig.FlowCollectorConfig{
Enable: true,
},
}
disableClickHouseOptions := &options.Options{
Config: &flowaggregatorconfig.FlowAggregatorConfig{
ClickHouse: flowaggregatorconfig.ClickHouseConfig{
Enable: false,
},
})
enableClickHouseOptions := makeOptions(&flowaggregatorconfig.FlowAggregatorConfig{
ClickHouse: flowaggregatorconfig.ClickHouseConfig{
Enable: true,
},
}
enableS3UploaderOptions := &options.Options{
Config: &flowaggregatorconfig.FlowAggregatorConfig{
S3Uploader: flowaggregatorconfig.S3UploaderConfig{
Enable: true,
},
})
disableClickHouseOptions := makeOptions(&flowaggregatorconfig.FlowAggregatorConfig{
ClickHouse: flowaggregatorconfig.ClickHouseConfig{
Enable: false,
},
}
disableS3UploaderOptions := &options.Options{
Config: &flowaggregatorconfig.FlowAggregatorConfig{
S3Uploader: flowaggregatorconfig.S3UploaderConfig{
Enable: false,
},
})
enableS3UploaderOptions := makeOptions(&flowaggregatorconfig.FlowAggregatorConfig{
S3Uploader: flowaggregatorconfig.S3UploaderConfig{
Enable: true,
},
}
enableFlowLoggerOptions := &options.Options{
Config: &flowaggregatorconfig.FlowAggregatorConfig{
FlowLogger: flowaggregatorconfig.FlowLoggerConfig{
Enable: true,
},
})
disableS3UploaderOptions := makeOptions(&flowaggregatorconfig.FlowAggregatorConfig{
S3Uploader: flowaggregatorconfig.S3UploaderConfig{
Enable: false,
},
}
disableFlowLoggerOptions := &options.Options{
Config: &flowaggregatorconfig.FlowAggregatorConfig{
FlowLogger: flowaggregatorconfig.FlowLoggerConfig{
Enable: false,
},
})
enableFlowLoggerOptions := makeOptions(&flowaggregatorconfig.FlowAggregatorConfig{
FlowLogger: flowaggregatorconfig.FlowLoggerConfig{
Enable: true,
},
}
})
disableFlowLoggerOptions := makeOptions(&flowaggregatorconfig.FlowAggregatorConfig{
FlowLogger: flowaggregatorconfig.FlowLoggerConfig{
Enable: false,
},
})

// we do a few operations: the main purpose is to ensure that cleanup
// (i.e., stopping the exporters) is done properly.
Expand Down

0 comments on commit 30c98b0

Please sign in to comment.