Skip to content

Commit

Permalink
Merge pull request #288 from avalluri/release-1.2
Browse files Browse the repository at this point in the history
[release-1.2]  Introduce new flag - strict-topology
  • Loading branch information
k8s-ci-robot authored Jun 14, 2019
2 parents e5b482a + 45805aa commit ecb1191
Show file tree
Hide file tree
Showing 6 changed files with 270 additions and 159 deletions.
15 changes: 14 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ Note that the external-provisioner does not scale with more replicas. Only one e

### Command line options

#### Recommended optional arguments"
#### Recommended optional arguments
* `--csi-address <path to CSI socket>`: This is the path to the CSI driver socket inside the pod that the external-provisioner container will use to issue CSI operations (`/run/csi/socket` is used by default).

* `--enable-leader-election`: Enables leader election. This is mandatory when there are multiple replicas of the same external-provisioner running for one CSI driver. Only one of them may be active (=leader). A new leader will be re-elected when current leader dies or becomes unresponsive for ~15 seconds.
Expand All @@ -64,6 +64,8 @@ Note that the external-provisioner does not scale with more replicas. Only one e
#### Other recognized arguments
* `--feature-gates <gates>`: A set of comma separated `<feature-name>=<true|false>` pairs that describe feature gates for alpha/experimental features. See [list of features](#feature-status) or `--help` output for list of recognized features. Example: `--feature-gates Topology=true` to enable Topology feature that's disabled by default.

* `--strict-topology`: This controls what topology information is passed to `CreateVolumeRequest.AccessibilityRequirements` in case of delayed binding. See [the table below](#topology-support) for an explanation how this option changes the result. This option has no effect if either `Topology` feature is disabled or `Immediate` volume binding mode is used.

* `--kubeconfig <path>`: Path to Kubernetes client configuration that the external-provisioner uses to connect to Kubernetes API server. When omitted, default token provided by Kubernetes will be used. This option is useful only when the external-provisioner does not run as a Kubernetes pod, e.g. for debugging. Either this or `--master` needs to be set if the external-provisioner is being run out of cluster.

* `--master <url>`: Master URL to build a client config from. When omitted, default token provided by Kubernetes will be used. This option is useful only when the external-provisioner does not run as a Kubernetes pod, e.g. for debugging. Either this or `--kubeconfig` needs to be set if the external-provisioner is being run out of cluster.
Expand All @@ -83,6 +85,17 @@ Note that the external-provisioner does not scale with more replicas. Only one e

* `--leader-election-type`: This option was used to choose which leader election resource type to use. Currently, the option defaults to `endpoints`, but will be removed in the future to only support `Lease` based leader election.

### Topology support
When `Topology` feature is enabled and the driver specifies `VOLUME_ACCESSIBILITY_CONSTRAINTS` in its plugin capabilities, external-provisioner prepares `CreateVolumeRequest.AccessibilityRequirements` while calling `Controller.CreateVolume`. The driver has to consider these topology constraints while creating the volume. Below table shows how these `AccessibilityRequirements` are prepared:

[Delayed binding](https://kubernetes.io/docs/concepts/storage/storage-classes/#volume-binding-mode) | Strict topology | [Allowed topologies](https://kubernetes.io/docs/concepts/storage/storage-classes/#allowed-topologies) | [Resulting accessability requirements](https://github.com/container-storage-interface/spec/blob/master/spec.md#createvolume)
:---: |:---:|:---:|:---|
Yes | Yes | Irrelevant | `Requisite` = `Preferred` = Selected node topology
Yes | No | No | `Requisite` = Aggregated cluster topology<br>`Preferred` = `Requisite` with selected node topology as first element
Yes | No | Yes | `Requisite` = Allowed topologies<br>`Preferred` = `Requisite` with selected node topology as first element
No | Irrelevant | No | `Requisite` = Aggregated cluster topology<br>`Preferred` = `Requisite` with randomly selected node topology as first element
No | Irrelevant | Yes | `Requisite` = Allowed topologies<br>`Preferred` = `Requisite` with randomly selected node topology as first element

### CSI error and timeout handling
The external-provisioner invokes all gRPC calls to CSI driver with timeout provided by `--timeout` command line argument (15 seconds by default).

Expand Down
3 changes: 2 additions & 1 deletion cmd/csi-provisioner/csi-provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ var (

enableLeaderElection = flag.Bool("enable-leader-election", false, "Enables leader election. If leader election is enabled, additional RBAC rules are required. Please refer to the Kubernetes CSI documentation for instructions on setting up these RBAC rules.")
leaderElectionType = flag.String("leader-election-type", "endpoints", "the type of leader election, options are 'endpoints' (default) or 'leases' (strongly recommended). The 'endpoints' option is deprecated in favor of 'leases'.")
strictTopology = flag.Bool("strict-topology", false, "Passes only selected node topology to CreateVolume Request, unlike default behavior of passing aggregated cluster topologies that match with topology keys of the selected node.")

featureGates map[string]bool
provisionController *controller.ProvisionController
Expand Down Expand Up @@ -178,7 +179,7 @@ func main() {

// Create the provisioner: it implements the Provisioner interface expected by
// the controller
csiProvisioner := ctrl.NewCSIProvisioner(clientset, *operationTimeout, identity, *volumeNamePrefix, *volumeNameUUIDLength, grpcClient, snapClient, provisionerName, pluginCapabilities, controllerCapabilities, supportsMigrationFromInTreePluginName)
csiProvisioner := ctrl.NewCSIProvisioner(clientset, *operationTimeout, identity, *volumeNamePrefix, *volumeNameUUIDLength, grpcClient, snapClient, provisionerName, pluginCapabilities, controllerCapabilities, supportsMigrationFromInTreePluginName, *strictTopology)
provisionController = controller.NewProvisionController(
clientset,
provisionerName,
Expand Down
8 changes: 6 additions & 2 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ type csiProvisioner struct {
pluginCapabilities connection.PluginCapabilitySet
controllerCapabilities connection.ControllerCapabilitySet
supportsMigrationFromInTreePluginName string
strictTopology bool
}

var _ controller.Provisioner = &csiProvisioner{}
Expand Down Expand Up @@ -216,7 +217,8 @@ func NewCSIProvisioner(client kubernetes.Interface,
driverName string,
pluginCapabilities connection.PluginCapabilitySet,
controllerCapabilities connection.ControllerCapabilitySet,
supportsMigrationFromInTreePluginName string) controller.Provisioner {
supportsMigrationFromInTreePluginName string,
strictTopology bool) controller.Provisioner {

csiClient := csi.NewControllerClient(grpcClient)
provisioner := &csiProvisioner{
Expand All @@ -232,6 +234,7 @@ func NewCSIProvisioner(client kubernetes.Interface,
pluginCapabilities: pluginCapabilities,
controllerCapabilities: controllerCapabilities,
supportsMigrationFromInTreePluginName: supportsMigrationFromInTreePluginName,
strictTopology: strictTopology,
}
return provisioner
}
Expand Down Expand Up @@ -432,7 +435,8 @@ func (p *csiProvisioner) Provision(options controller.ProvisionOptions) (*v1.Per
p.driverName,
options.PVC.Name,
options.StorageClass.AllowedTopologies,
options.SelectedNode)
options.SelectedNode,
p.strictTopology)
if err != nil {
return nil, fmt.Errorf("error generating accessibility requirements: %v", err)
}
Expand Down
14 changes: 7 additions & 7 deletions pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ func TestCreateDriverReturnsInvalidCapacityDuringProvision(t *testing.T) {
defer driver.Stop()

pluginCaps, controllerCaps := provisionCapabilities()
csiProvisioner := NewCSIProvisioner(nil, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "")
csiProvisioner := NewCSIProvisioner(nil, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false)

// Requested PVC with requestedBytes storage
deletePolicy := v1.PersistentVolumeReclaimDelete
Expand Down Expand Up @@ -1287,7 +1287,7 @@ func runProvisionTest(t *testing.T, k string, tc provisioningTestcase, requested
}

pluginCaps, controllerCaps := provisionCapabilities()
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "")
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false)

out := &csi.CreateVolumeResponse{
Volume: &csi.Volume{
Expand Down Expand Up @@ -1650,7 +1650,7 @@ func TestProvisionFromSnapshot(t *testing.T) {
})

pluginCaps, controllerCaps := provisionFromSnapshotCapabilities()
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, client, driverName, pluginCaps, controllerCaps, "")
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, client, driverName, pluginCaps, controllerCaps, "", false)

out := &csi.CreateVolumeResponse{
Volume: &csi.Volume{
Expand Down Expand Up @@ -1801,7 +1801,7 @@ func TestProvisionWithTopologyEnabled(t *testing.T) {
}

clientSet := fakeclientset.NewSimpleClientset(nodes, nodeInfos)
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "")
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false)

pv, err := csiProvisioner.Provision(controller.ProvisionOptions{
StorageClass: &storagev1.StorageClass{},
Expand Down Expand Up @@ -1855,7 +1855,7 @@ func TestProvisionWithTopologyDisabled(t *testing.T) {

clientSet := fakeclientset.NewSimpleClientset()
pluginCaps, controllerCaps := provisionWithTopologyCapabilities()
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "")
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false)

out := &csi.CreateVolumeResponse{
Volume: &csi.Volume{
Expand Down Expand Up @@ -1902,7 +1902,7 @@ func TestProvisionWithMountOptions(t *testing.T) {

clientSet := fakeclientset.NewSimpleClientset()
pluginCaps, controllerCaps := provisionCapabilities()
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "")
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false)

out := &csi.CreateVolumeResponse{
Volume: &csi.Volume{
Expand Down Expand Up @@ -2076,7 +2076,7 @@ func runDeleteTest(t *testing.T, k string, tc deleteTestcase) {
}

pluginCaps, controllerCaps := provisionCapabilities()
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "")
csiProvisioner := NewCSIProvisioner(clientSet, 5*time.Second, "test-provisioner", "test", 5, csiConn.conn, nil, driverName, pluginCaps, controllerCaps, "", false)

err = csiProvisioner.Delete(tc.persistentVolume)
if tc.expectErr && err == nil {
Expand Down
93 changes: 66 additions & 27 deletions pkg/controller/topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,12 +139,15 @@ func GenerateAccessibilityRequirements(
driverName string,
pvcName string,
allowedTopologies []v1.TopologySelectorTerm,
selectedNode *v1.Node) (*csi.TopologyRequirement, error) {
selectedNode *v1.Node,
strictTopology bool) (*csi.TopologyRequirement, error) {
requirement := &csi.TopologyRequirement{}

var (
selectedCSINode *storage.CSINode
err error
selectedCSINode *storage.CSINode
selectedTopology topologyTerm
requisiteTerms []topologyTerm
err error
)

// 1. Get CSINode for the selected node
Expand All @@ -158,20 +161,57 @@ func GenerateAccessibilityRequirements(
// This should only happen if the Node is on a pre-1.14 version
return nil, nil
}
topologyKeys := getTopologyKeys(selectedCSINode, driverName)
if len(topologyKeys) == 0 {
// The scheduler selected a node with no topology information.
// This can happen if:
//
// * the node driver is not deployed on all nodes.
// * the node driver is being restarted and has not re-registered yet. This should be
// temporary and a retry should eventually succeed.
//
// Returning an error in provisioning will cause the scheduler to retry and potentially
// (but not guaranteed) pick a different node.
return nil, fmt.Errorf("no topology key found on CSINode %s", selectedCSINode.Name)
}
var isMissingKey bool
selectedTopology, isMissingKey = getTopologyFromNode(selectedNode, topologyKeys)
if isMissingKey {
return nil, fmt.Errorf("topology labels from selected node %v does not match topology keys from CSINode %v", selectedNode.Labels, topologyKeys)
}

if strictTopology {
// Make sure that selected node topology is in allowed topologies list
if len(allowedTopologies) != 0 {
allowedTopologiesFlatten := flatten(allowedTopologies)
found := false
for _, t := range allowedTopologiesFlatten {
if t.equal(selectedTopology) {
found = true
break
}
}
if !found {
return nil, fmt.Errorf("selected node '%q' topology '%v' is not in allowed topologies: %v", selectedNode.Name, selectedTopology, allowedTopologiesFlatten)
}
}
// Only pass topology of selected node.
requisiteTerms = append(requisiteTerms, selectedTopology)
}
}

// 2. Generate CSI Requisite Terms
var requisiteTerms []topologyTerm
if len(allowedTopologies) == 0 {
// Aggregate existing topologies in nodes across the entire cluster.
var err error
requisiteTerms, err = aggregateTopologies(kubeClient, driverName, selectedCSINode)
if err != nil {
return nil, err
if len(requisiteTerms) == 0 {
if len(allowedTopologies) != 0 {
// Distribute out one of the OR layers in allowedTopologies
requisiteTerms = flatten(allowedTopologies)
} else {
// Aggregate existing topologies in nodes across the entire cluster.
requisiteTerms, err = aggregateTopologies(kubeClient, driverName, selectedCSINode)
if err != nil {
return nil, err
}
}
} else {
// Distribute out one of the OR layers in allowedTopologies
requisiteTerms = flatten(allowedTopologies)
}

// It might be possible to reach here if:
Expand Down Expand Up @@ -202,20 +242,19 @@ func GenerateAccessibilityRequirements(
preferredTerms = sortAndShift(requisiteTerms, nil, i)
} else {
// Delayed binding, use topology from that node to populate preferredTerms
topologyKeys := getTopologyKeys(selectedCSINode, driverName)
selectedTopology, isMissingKey := getTopologyFromNode(selectedNode, topologyKeys)
if isMissingKey {
return nil, fmt.Errorf("topology labels from selected node %v does not match topology keys from CSINode %v", selectedNode.Labels, topologyKeys)
}

preferredTerms = sortAndShift(requisiteTerms, selectedTopology, 0)
if preferredTerms == nil {
// Topology from selected node is not in requisite. This case should never be hit:
// - If AllowedTopologies is specified, the scheduler should choose a node satisfying the
// constraint.
// - Otherwise, the aggregated topology is guaranteed to contain topology information from the
// selected node.
return nil, fmt.Errorf("topology %v from selected node %q is not in requisite: %v", selectedTopology, selectedNode.Name, requisiteTerms)
if strictTopology {
// In case of strict topology, preferred = requisite
preferredTerms = requisiteTerms
} else {
preferredTerms = sortAndShift(requisiteTerms, selectedTopology, 0)
if preferredTerms == nil {
// Topology from selected node is not in requisite. This case should never be hit:
// - If AllowedTopologies is specified, the scheduler should choose a node satisfying the
// constraint.
// - Otherwise, the aggregated topology is guaranteed to contain topology information from the
// selected node.
return nil, fmt.Errorf("topology %v from selected node %q is not in requisite: %v", selectedTopology, selectedNode.Name, requisiteTerms)
}
}
}
requirement.Preferred = toCSITopology(preferredTerms)
Expand Down
Loading

0 comments on commit ecb1191

Please sign in to comment.