Skip to content

Support namespace volume #205

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions charts/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,9 @@ The following table lists the configurable parameters of the latest Simplyblock
| `logicalVolume.numDataChunks` | The number of Erasure coding schema parameter k (distributed raid) | `1` | |
| `logicalVolume.numParityChunks` | The number of Erasure coding schema parameter n (distributed raid) | `1` | |
| `logicalVolume.lvol_priority_class` | the value of lvol parameter lvol_priority_class | `0` | |
| `podAnnotations` | Annotations to apply to all pods in the chart | `{}` | |
| `simplyBlockAnnotations` | Annotations to apply to Simplyblock kubernetes resources like DaemonSets, Deployments, or StatefulSets | `{}` | |
| `logicalVolume.nspv` | the maximum namespace per subsystem | `1` | |
| `podAnnotations` | Annotations to apply to all pods in the chart | `{}` | |
| `simplyBlockAnnotations` | Annotations to apply to Simplyblock kubernetes resources like DaemonSets, Deployments, or StatefulSets | `{}` | |
| `benchmarks` | the number of benchmarks to run | `0` | |
| `node.tolerations.create` | Whether to create tolerations for the csi node | `false` | |
| `node.tolerations.effect` | The effect of tolerations on the csi node | `<empty>` | |
Expand Down
2 changes: 2 additions & 0 deletions charts/spdk-csi/latest/spdk-csi/templates/storageclass.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ parameters:
distr_ndcs: "{{ .Values.logicalVolume.numDataChunks }}"
distr_npcs: "{{ .Values.logicalVolume.numParityChunks }}"
lvol_priority_class: "{{ .Values.logicalVolume.lvol_priority_class }}"
nspv: "{{ .Values.logicalVolume.nspv }}"
tune2fs_reserved_blocks: "{{ .Values.logicalVolume.tune2fs_reserved_blocks }}"
cluster_id: "{{ .Values.csiConfig.simplybk.uuid }}"
reclaimPolicy: Delete
Expand All @@ -46,6 +47,7 @@ parameters:
distr_ndcs: "{{ .Values.logicalVolume.numDataChunks }}"
distr_npcs: "{{ .Values.logicalVolume.numParityChunks }}"
lvol_priority_class: "{{ .Values.logicalVolume.lvol_priority_class }}"
nspv: "{{ .Values.logicalVolume.nspv }}"
tune2fs_reserved_blocks: "{{ .Values.logicalVolume.tune2fs_reserved_blocks }}"
cluster_id: "{{ .Values.csiConfig.simplybk.uuid }}"
reclaimPolicy: Delete
Expand Down
1 change: 1 addition & 0 deletions charts/spdk-csi/latest/spdk-csi/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ logicalVolume:
numDataChunks: "1"
numParityChunks: "1"
lvol_priority_class: "0"
nspv: "1"
tune2fs_reserved_blocks: "0"

podAnnotations: {}
Expand Down
75 changes: 57 additions & 18 deletions pkg/spdk/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,11 @@ func prepareCreateVolumeReq(ctx context.Context, req *csi.CreateVolumeRequest, s
return nil, err
}

maxNamespace, err := getIntParameter(params, "nspv", 1)
if err != nil {
return nil, err
}

compression := getBoolParameter(params, "compression")
encryption := getBoolParameter(params, "encryption")

Expand Down Expand Up @@ -311,25 +316,32 @@ func prepareCreateVolumeReq(ctx context.Context, req *csi.CreateVolumeRequest, s
return nil, err
}

modelID, err := getNvmfModelIDAnnotation(ctx, pvcName, pvcNamespace)
if err != nil {
return nil, err
}

createVolReq := util.CreateLVolData{
LvolName: req.GetName(),
Size: fmt.Sprintf("%dM", sizeMiB),
LvsName: params["pool_name"],
MaxRWIOPS: params["qos_rw_iops"],
MaxRWmBytes: params["qos_rw_mbytes"],
MaxRmBytes: params["qos_r_mbytes"],
MaxWmBytes: params["qos_w_mbytes"],
MaxSize: params["max_size"],
PriorClass: priorClass,
Compression: compression,
Encryption: encryption,
DistNdcs: distrNdcs,
DistNpcs: distrNpcs,
CryptoKey1: cryptoKey1,
CryptoKey2: cryptoKey2,
HostID: hostID,
LvolID: lvolID,
PvcName: pvcFullName,
LvolName: req.GetName(),
Size: fmt.Sprintf("%dM", sizeMiB),
LvsName: params["pool_name"],
MaxRWIOPS: params["qos_rw_iops"],
MaxRWmBytes: params["qos_rw_mbytes"],
MaxRmBytes: params["qos_r_mbytes"],
MaxWmBytes: params["qos_w_mbytes"],
MaxSize: params["max_size"],
MaxNamespace: maxNamespace,
PriorClass: priorClass,
Compression: compression,
Encryption: encryption,
DistNdcs: distrNdcs,
DistNpcs: distrNpcs,
CryptoKey1: cryptoKey1,
CryptoKey2: cryptoKey2,
HostID: hostID,
LvolID: lvolID,
ModelID: modelID,
PvcName: pvcFullName,
}
return &createVolReq, nil
}
Expand Down Expand Up @@ -831,3 +843,30 @@ func getLvolIDAnnotation(ctx context.Context, pvcName, pvcNamespace string) (str

return lvolID, nil
}

func getNvmfModelIDAnnotation(ctx context.Context, pvcName, pvcNamespace string) (string, error) {
config, err := rest.InClusterConfig()
if err != nil {
klog.Errorf("failed to get in-cluster config: %v", err)
return "", fmt.Errorf("could not get in-cluster config: %w", err)
}

clientset, err := kubernetes.NewForConfig(config)
if err != nil {
klog.Errorf("failed to create clientset: %v", err)
return "", fmt.Errorf("could not create clientset: %w", err)
}

pvc, err := clientset.CoreV1().PersistentVolumeClaims(pvcNamespace).Get(ctx, pvcName, metav1.GetOptions{})
if err != nil {
klog.Errorf("failed to get PVC %s in namespace %s: %v", pvcName, pvcNamespace, err)
return "", fmt.Errorf("could not get PVC %s in namespace %s: %w", pvcName, pvcNamespace, err)
}

modelID, ok := pvc.ObjectMeta.Annotations["simplybk/nvmf-model-id"]
if !ok {
return "", nil
}

return modelID, nil
}
106 changes: 70 additions & 36 deletions pkg/util/initiator.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ type initiatorNVMf struct {
nrIoQueues string
ctrlLossTmo string
model string
nsId string
}

// initiatorCache is an implementation of NVMf cache initiator
Expand Down Expand Up @@ -118,7 +119,7 @@ type ClusterConfig struct {
}

type ClustersInfo struct {
Clusters []ClusterConfig `json:"clusters"`
Clusters []ClusterConfig `json:"clusters"`
}

// NewsimplyBlockClient create a new Simplyblock client
Expand All @@ -138,7 +139,7 @@ func NewsimplyBlockClient(clusterID string) (*NodeNVMf, error) {
break
}
}

if clusterConfig == nil {
return nil, fmt.Errorf("failed to find secret for clusterID %s", clusterID)
}
Expand All @@ -148,9 +149,9 @@ func NewsimplyBlockClient(clusterID string) (*NodeNVMf, error) {
}

// Log and return the newly created Simplyblock client.
klog.Infof("Simplyblock client created for ClusterID:%s, Endpoint:%s",
clusterConfig.ClusterID,
clusterConfig.ClusterEndpoint,
klog.Infof("Simplyblock client created for ClusterID:%s, Endpoint:%s",
clusterConfig.ClusterID,
clusterConfig.ClusterEndpoint,
)
return NewNVMf(clusterID, clusterConfig.ClusterEndpoint, clusterConfig.ClusterSecret), nil
}
Expand All @@ -175,6 +176,7 @@ func NewSpdkCsiInitiator(volumeContext map[string]string) (SpdkCsiInitiator, err
nrIoQueues: volumeContext["nrIoQueues"],
ctrlLossTmo: volumeContext["ctrlLossTmo"],
model: volumeContext["model"],
nsId: volumeContext["nsId"],
}, nil

case "cache":
Expand Down Expand Up @@ -322,40 +324,49 @@ func (nvmf *initiatorNVMf) Connect() (string, error) {
if len(nvmf.connections) == 1 {
ctrlLossTmo *= 15
}
for i, conn := range nvmf.connections {
cmdLine := []string{
"nvme", "connect", "-t", strings.ToLower(nvmf.targetType),
"-a", conn.IP, "-s", strconv.Itoa(conn.Port), "-n", nvmf.nqn, "-l", strconv.Itoa(ctrlLossTmo),
"-c", nvmf.reconnectDelay, "-i", nvmf.nrIoQueues,
}
err := execWithTimeoutRetry(cmdLine, 40, len(nvmf.connections))
if err != nil {
// go on checking device status in case caused by duplicated request
klog.Errorf("command %v failed: %s", cmdLine, err)

// disconnect the primary connection if secondary connection fails
if i == 1 {
klog.Warning("Secondary connection failed, disconnecting primary...")
alreadyConnected, err := isNqnConnected(nvmf.nqn)
if err != nil {
klog.Errorf("Failed to check existing connections: %v", err)
return "", err
}

deviceGlob := fmt.Sprintf(DevDiskByID, nvmf.model)
devicePath, err := waitForDeviceReady(deviceGlob, 20)
if err != nil {
return "", err
}
err = disconnectDevicePath(devicePath)
if err != nil {
klog.Errorf("Failed to disconnect primary: %v", err)
return "", err
} else {
klog.Infof("Primary connection disconnected due to secondary failure")
}
if !alreadyConnected {
for i, conn := range nvmf.connections {
cmdLine := []string{
"nvme", "connect", "-t", strings.ToLower(nvmf.targetType),
"-a", conn.IP, "-s", strconv.Itoa(conn.Port), "-n", nvmf.nqn, "-l", strconv.Itoa(ctrlLossTmo),
"-c", nvmf.reconnectDelay, "-i", nvmf.nrIoQueues,
}
err := execWithTimeoutRetry(cmdLine, 40, len(nvmf.connections))
if err != nil {
// go on checking device status in case caused by duplicated request
klog.Errorf("command %v failed: %s", cmdLine, err)

// disconnect the primary connection if secondary connection fails
if i == 1 {
klog.Warning("Secondary connection failed, disconnecting primary...")

deviceGlob := fmt.Sprintf(DevDiskByID, fmt.Sprintf("%s*_%s", nvmf.model, nvmf.nsId))
devicePath, err := waitForDeviceReady(deviceGlob, 20)
if err != nil {
return "", err
}
err = disconnectDevicePath(devicePath)
if err != nil {
klog.Errorf("Failed to disconnect primary: %v", err)
return "", err
} else {
klog.Infof("Primary connection disconnected due to secondary failure")
}
}

return "", err
return "", err
}
}
}

deviceGlob := fmt.Sprintf(DevDiskByID, nvmf.model)
deviceGlob := fmt.Sprintf(DevDiskByID, fmt.Sprintf("%s*_%s", nvmf.model, nvmf.nsId))
devicePath, err := waitForDeviceReady(deviceGlob, 20)
if err != nil {
return "", err
Expand All @@ -364,13 +375,17 @@ func (nvmf *initiatorNVMf) Connect() (string, error) {
}

func (nvmf *initiatorNVMf) Disconnect() error {
deviceGlob := fmt.Sprintf(DevDiskByID, nvmf.model)
//deviceGlob := fmt.Sprintf(DevDiskByID, nvmf.model)
deviceGlob := fmt.Sprintf(DevDiskByID, fmt.Sprintf("%s*_[0-9]*", nvmf.model))
devicePath, err := filepath.Glob(deviceGlob)
if err != nil {
return fmt.Errorf("failed to find device paths matching %s: %v", deviceGlob, err)
}

if len(devicePath) > 0 {
if len(devicePath) > 1 {
return nil

} else if len(devicePath) == 1 {
err = disconnectDevicePath(devicePath[0])

if err != nil {
Expand Down Expand Up @@ -504,6 +519,25 @@ func getNVMeDeviceInfos() ([]nvmeDeviceInfo, error) {
return devices, nil
}

func isNqnConnected(nqn string) (bool, error) {
cmd := exec.Command("nvme", "list-subsys")
output, err := cmd.Output()
if err != nil {
return false, fmt.Errorf("failed to execute nvme list-subsys: %v", err)
}

lines := strings.Split(string(output), "\n")
for _, line := range lines {
if strings.Contains(line, nqn) {
parts := strings.Fields(line)
if len(parts) > 0 {
return true, nil
}
}
}
return false, nil
}

func getSubsystemsForDevice(devicePath string) ([]subsystemResponse, error) {
cmd := exec.Command("nvme", "list-subsys", "-o", "json", devicePath)
output, err := cmd.Output()
Expand Down Expand Up @@ -567,8 +601,8 @@ func reconnectSubsystems() error {
continue
}
for _, path := range subsystem.Paths {
if path.State == "connecting" && device.serialNumber == "single" ||
((path.ANAState == "optimized" || path.ANAState == "non-optimized") && device.serialNumber == "ha") {
if path.State == "connecting" && device.serialNumber == "single" ||
((path.ANAState == "optimized" || path.ANAState == "non-optimized") && device.serialNumber == "ha") {
if err := checkOnlineNode(clusterID, lvolID, path); err != nil {
klog.Errorf("failed to reconnect subsystem for lvolID %s: %v", lvolID, err)
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/util/jsonrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ type LvolConnectResp struct {
Port int `json:"port"`
IP string `json:"ip"`
Connect string `json:"connect"`
NSID int `json:"ns_id"`
}

type connectionInfo struct {
Expand Down Expand Up @@ -294,6 +295,7 @@ func (client *RPCClient) getVolumeInfo(lvolID string) (map[string]string, error)
connections = append(connections, connectionInfo{IP: r.IP, Port: r.Port})
}

_, model := getLvolIDFromNQN(result[0].Nqn)
connectionsData, err := json.Marshal(connections)
if err != nil {
klog.Error(err)
Expand All @@ -307,9 +309,10 @@ func (client *RPCClient) getVolumeInfo(lvolID string) (map[string]string, error)
"reconnectDelay": strconv.Itoa(result[0].ReconnectDelay),
"nrIoQueues": strconv.Itoa(result[0].NrIoQueues),
"ctrlLossTmo": strconv.Itoa(result[0].CtrlLossTmo),
"model": lvolID,
"model": model,
"targetType": "tcp",
"connections": string(connectionsData),
"nsId": strconv.Itoa(result[0].NSID),
}, nil
}

Expand Down
38 changes: 20 additions & 18 deletions pkg/util/nvmf.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,24 +63,26 @@ func (node *NodeNVMf) VolumeInfo(lvolID string) (map[string]string, error) {

// CreateLVolData is the data structure for creating a logical volume
type CreateLVolData struct {
LvolName string `json:"name"`
Size string `json:"size"`
LvsName string `json:"pool"`
Compression bool `json:"comp"`
Encryption bool `json:"crypto"`
MaxRWIOPS string `json:"max_rw_iops"`
MaxRWmBytes string `json:"max_rw_mbytes"`
MaxRmBytes string `json:"max_r_mbytes"`
MaxWmBytes string `json:"max_w_mbytes"`
MaxSize string `json:"max_size"`
DistNdcs int `json:"distr_ndcs"`
DistNpcs int `json:"distr_npcs"`
PriorClass int `json:"lvol_priority_class"`
CryptoKey1 string `json:"crypto_key1"`
CryptoKey2 string `json:"crypto_key2"`
HostID string `json:"host_id"`
LvolID string `json:"uid"`
PvcName string `json:"pvc_name"`
LvolName string `json:"name"`
Size string `json:"size"`
LvsName string `json:"pool"`
Compression bool `json:"comp"`
Encryption bool `json:"crypto"`
MaxRWIOPS string `json:"max_rw_iops"`
MaxRWmBytes string `json:"max_rw_mbytes"`
MaxRmBytes string `json:"max_r_mbytes"`
MaxWmBytes string `json:"max_w_mbytes"`
MaxSize string `json:"max_size"`
MaxNamespace int `json:"nspv"`
DistNdcs int `json:"distr_ndcs"`
DistNpcs int `json:"distr_npcs"`
PriorClass int `json:"lvol_priority_class"`
CryptoKey1 string `json:"crypto_key1"`
CryptoKey2 string `json:"crypto_key2"`
HostID string `json:"host_id"`
LvolID string `json:"uid"`
ModelID string `json:"namespace"`
PvcName string `json:"pvc_name"`
}

// CreateVolume creates a logical volume and returns volume ID
Expand Down
Loading