Skip to content

Commit

Permalink
csi: add setup for external mode for subvolume cleanup
Browse files Browse the repository at this point in the history
This commit adds extra setup that if required for
subvolume cleanup incase of external mode

Signed-off-by: yati1998 <ypadia@redhat.com>
  • Loading branch information
yati1998 committed Mar 26, 2024
1 parent 45bb076 commit 4a9ae3a
Showing 1 changed file with 110 additions and 10 deletions.
120 changes: 110 additions & 10 deletions pkg/filesystem/subvolume.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ type subVolumeInfo struct {
state string
}

type monitor struct {
ClusterID string
Monitors []string
}

const (
inUse = "in-use"
stale = "stale"
Expand All @@ -52,6 +57,65 @@ func List(ctx context.Context, clientsets *k8sutil.Clientsets, operatorNamespace
listCephFSSubvolumes(ctx, clientsets, operatorNamespace, clusterNamespace, includeStaleOnly, subvolumeNames)
}

// checkForExternalStorage checks if the external mode is enabled.
func checkForExternalStorage(ctx context.Context, clientsets *k8sutil.Clientsets, clusterNamespace string) bool {
enable := false
cephclusters, err := clientsets.Rook.CephV1().CephClusters(clusterNamespace).List(ctx, v1.ListOptions{})
if err != nil {
logging.Fatal(fmt.Errorf("failed to list CephClusters: %q", err))
}
for i := range cephclusters.Items {
enable = cephclusters.Items[i].Spec.External.Enable
if enable == true {
break
}
}
return enable
}

// getExternalClusterDetails gets the required mon-ip, id and key to connect to the
// ceph cluster.
func getExternalClusterDetails(ctx context.Context, clientsets *k8sutil.Clientsets, operatorNamespace, clusterNamespace string) (string, string, string) {
var adminId, adminKey, m string

scList, err := clientsets.Kube.CoreV1().Secrets(clusterNamespace).List(ctx, v1.ListOptions{})
if err != nil {
logging.Fatal(fmt.Errorf("Error fetching secrets: %v\n", err))
}
for i := range scList.Items {
if strings.HasPrefix(scList.Items[i].ObjectMeta.Name, "rook-csi-cephfs-provisioner") {
data := scList.Items[i].Data
if data == nil {
logging.Fatal(fmt.Errorf("Secret data is empty"))
}
adminId = string(data["adminID"])
adminKey = string(data["adminKey"])
break

}
}

cm, err := clientsets.Kube.CoreV1().ConfigMaps(clusterNamespace).Get(ctx, "rook-ceph-mon-endpoints", v1.GetOptions{})
if err != nil {
logging.Fatal(fmt.Errorf("Error fetching configmaps: %v\n", err))
}

if len(cm.Data) == 0 || cm.Data == nil {
logging.Fatal(fmt.Errorf("Configmap data is empty"))
}
monpoint := cm.Data["csi-cluster-config-json"]
var monip []monitor
json.Unmarshal([]byte(monpoint), &monip)
for _, mp := range monip {
if len(mp.Monitors) == 0 || mp.Monitors[0] == "" {
logging.Fatal(fmt.Errorf("mon ip is empty"))
}
m = mp.Monitors[0]
}

return m, adminId, adminKey
}

// getk8sRefSubvolume returns the k8s ref for the subvolumes
func getK8sRefSubvolume(ctx context.Context, clientsets *k8sutil.Clientsets) map[string]subVolumeInfo {
pvList, err := clientsets.Kube.CoreV1().PersistentVolumes().List(ctx, v1.ListOptions{})
Expand All @@ -67,6 +131,17 @@ func getK8sRefSubvolume(ctx context.Context, clientsets *k8sutil.Clientsets) map
return subvolumeNames
}

// runCommand checks for the presence of externalcluster and runs the command accordingly.
func runCommand(ctx context.Context, clientsets *k8sutil.Clientsets, operatorNamespace, clusterNamespace, cmd string, args []string) (string, error) {
if checkForExternalStorage(ctx, clientsets, clusterNamespace) {
m, admin_id, admin_key := getExternalClusterDetails(ctx, clientsets, operatorNamespace, clusterNamespace)
args = append(args, "-m", m, "--id", admin_id, "--key", admin_key)
}
list, err := exec.RunCommandInOperatorPod(ctx, clientsets, cmd, args, operatorNamespace, clusterNamespace, true)

return list, err
}

// listCephFSSubvolumes list all the subvolumes
func listCephFSSubvolumes(ctx context.Context, clientsets *k8sutil.Clientsets, operatorNamespace, clusterNamespace string, includeStaleOnly bool, subvolumeNames map[string]subVolumeInfo) {

Expand All @@ -87,7 +162,9 @@ func listCephFSSubvolumes(ctx context.Context, clientsets *k8sutil.Clientsets, o
continue
}
for _, svg := range subvolg {
svList, err := exec.RunCommandInOperatorPod(ctx, clientsets, "ceph", []string{"fs", "subvolume", "ls", fs.Name, svg.Name}, operatorNamespace, clusterNamespace, true)
cmd := "ceph"
args := []string{"fs", "subvolume", "ls", fs.Name, svg.Name, "--format", "json"}
svList, err := runCommand(ctx, clientsets, operatorNamespace, clusterNamespace, cmd, args)
if err != nil {
logging.Error(err, "failed to get subvolumes of %q", fs.Name)
continue
Expand Down Expand Up @@ -136,7 +213,10 @@ func listCephFSSubvolumes(ctx context.Context, clientsets *k8sutil.Clientsets, o

// getSubvolumeState returns the state of the subvolume
func getSubvolumeState(ctx context.Context, clientsets *k8sutil.Clientsets, operatorNamespace, clusterNamespace, fsName, SubVol, SubvolumeGroup string) string {
subVolumeInfo, errvol := exec.RunCommandInOperatorPod(ctx, clientsets, "ceph", []string{"fs", "subvolume", "info", fsName, SubVol, SubvolumeGroup}, operatorNamespace, clusterNamespace, true)
cmd := "ceph"
args := []string{"fs", "subvolume", "info", fsName, SubVol, SubvolumeGroup, "--format", "json"}

subVolumeInfo, errvol := runCommand(ctx, clientsets, operatorNamespace, clusterNamespace, cmd, args)
if errvol != nil {
logging.Error(errvol, "failed to get filesystems")
return ""
Expand All @@ -155,7 +235,11 @@ func getSubvolumeState(ctx context.Context, clientsets *k8sutil.Clientsets, oper

// gets list of filesystem
func getFileSystem(ctx context.Context, clientsets *k8sutil.Clientsets, operatorNamespace, clusterNamespace string) ([]fsStruct, error) {
fsList, err := exec.RunCommandInOperatorPod(ctx, clientsets, "ceph", []string{"fs", "ls", "--format", "json"}, operatorNamespace, clusterNamespace, true)

cmd := "ceph"
args := []string{"fs", "ls", "--format", "json"}

fsList, err := runCommand(ctx, clientsets, operatorNamespace, clusterNamespace, cmd, args)
if err != nil {
logging.Error(err, "failed to get filesystems")
return []fsStruct{}, err
Expand All @@ -170,7 +254,10 @@ func getFileSystem(ctx context.Context, clientsets *k8sutil.Clientsets, operator
// checkSnapshot checks if there are any snapshots in the subvolume
func checkSnapshot(ctx context.Context, clientsets *k8sutil.Clientsets, operatorNamespace, clusterNamespace, fs, sv, svg string) bool {

snapList, err := exec.RunCommandInOperatorPod(ctx, clientsets, "ceph", []string{"fs", "subvolume", "snapshot", "ls", fs, sv, svg}, operatorNamespace, clusterNamespace, true)
cmd := "ceph"
args := []string{"fs", "subvolume", "snapshot", "ls", fs, sv, svg, "--format", "json"}

snapList, err := runCommand(ctx, clientsets, operatorNamespace, clusterNamespace, cmd, args)
if err != nil {
logging.Error(err, "failed to get subvolume snapshots of %q/%q/%q", fs, sv, svg)
return false
Expand All @@ -185,7 +272,10 @@ func checkSnapshot(ctx context.Context, clientsets *k8sutil.Clientsets, operator

// gets the list of subvolumegroup for the specified filesystem
func getSubvolumeGroup(ctx context.Context, clientsets *k8sutil.Clientsets, operatorNamespace, clusterNamespace, fs string) ([]fsStruct, error) {
svgList, err := exec.RunCommandInOperatorPod(ctx, clientsets, "ceph", []string{"fs", "subvolumegroup", "ls", fs, "--format", "json"}, operatorNamespace, clusterNamespace, true)
cmd := "ceph"
args := []string{"fs", "subvolumegroup", "ls", fs, "--format", "json"}

svgList, err := runCommand(ctx, clientsets, operatorNamespace, clusterNamespace, cmd, args)
if err != nil {
logging.Error(err, "failed to get subvolume groups for filesystem %q", fs)
return []fsStruct{}, err
Expand All @@ -211,7 +301,10 @@ func Delete(ctx context.Context, clientsets *k8sutil.Clientsets, OperatorNamespa
_, check := k8sSubvolume[subvol]
if !check {
deleteOmapForSubvolume(ctx, clientsets, OperatorNamespace, CephClusterNamespace, subvol, fs)
_, err := exec.RunCommandInOperatorPod(ctx, clientsets, "ceph", []string{"fs", "subvolume", "rm", fs, subvol, svg, "--retain-snapshots"}, OperatorNamespace, CephClusterNamespace, true)
cmd := "ceph"
args := []string{"fs", "subvolume", "rm", fs, subvol, svg, "--retain-snapshots"}

_, err := runCommand(ctx, clientsets, OperatorNamespace, CephClusterNamespace, cmd, args)
if err != nil {
logging.Fatal(err, "failed to delete subvolume of %s/%s/%s", fs, svg, subvol)
}
Expand Down Expand Up @@ -246,17 +339,23 @@ func deleteOmapForSubvolume(ctx context.Context, clientsets *k8sutil.Clientsets,
logging.Fatal(fmt.Errorf("pool name not found: %q", err))
}
if omapval != "" {
cmd := "rados"
args := []string{"rm", omapval, "-p", poolName, "--namespace", "csi"}

// remove omap object.
_, err := exec.RunCommandInOperatorPod(ctx, clientsets, "rados", []string{"rm", omapval, "-p", poolName, "--namespace", "csi"}, OperatorNamespace, CephClusterNamespace, true)
_, err := runCommand(ctx, clientsets, OperatorNamespace, CephClusterNamespace, cmd, args)
if err != nil {
logging.Fatal(err, "failed to remove omap object for subvolume %q", subVol)
}
logging.Info("omap object:%q deleted", omapval)

}
if omapkey != "" {
cmd := "rados"
args := []string{"rmomapkey", "csi.volumes.default", omapkey, "-p", poolName, "--namespace", "csi"}

// remove omap key.
_, err := exec.RunCommandInOperatorPod(ctx, clientsets, "rados", []string{"rmomapkey", "csi.volumes.default", omapkey, "-p", poolName, "--namespace", "csi"}, OperatorNamespace, CephClusterNamespace, true)
_, err := runCommand(ctx, clientsets, OperatorNamespace, CephClusterNamespace, cmd, args)
if err != nil {
logging.Fatal(err, "failed to remove omap key for subvolume %q", subVol)
}
Expand All @@ -279,8 +378,9 @@ func getOmapKey(ctx context.Context, clientsets *k8sutil.Clientsets, OperatorNam
}
omapval := getOmapVal(subVol)

cmd := []string{"getomapval", omapval, "csi.volname", "-p", poolName, "--namespace", "csi", "/dev/stdout"}
pvname, err := exec.RunCommandInOperatorPod(ctx, clientsets, "rados", cmd, OperatorNamespace, CephClusterNamespace, true)
args := []string{"getomapval", omapval, "csi.volname", "-p", poolName, "--namespace", "csi", "/dev/stdout"}
cmd := "rados"
pvname, err := runCommand(ctx, clientsets, OperatorNamespace, CephClusterNamespace, cmd, args)
if err != nil || pvname == "" {
logging.Info("No PV found for subvolume %s: %s", subVol, err)
return ""
Expand Down

0 comments on commit 4a9ae3a

Please sign in to comment.