Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xattrs: use unix flocks when writing multiple xattrs #2528

Merged
merged 9 commits into from
Feb 18, 2022
5 changes: 5 additions & 0 deletions changelog/unreleased/flock-xattr-set-multiple.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Enhancement: use an exclcusive write lock when writing multiple attributes

The xattr package can use an exclusive write lock when writing multiple extended attributes

https://github.com/cs3org/reva/pull/2528
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ require (
github.com/go-openapi/errors v0.20.1 // indirect
github.com/go-openapi/strfmt v0.20.2 // indirect
github.com/go-sql-driver/mysql v1.6.0
github.com/gofrs/flock v0.8.1
github.com/golang-jwt/jwt v3.2.2+incompatible
github.com/golang/protobuf v1.5.2
github.com/gomodule/redigo v1.8.8
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,8 @@ github.com/gobwas/pool v0.2.1/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6Wezm
github.com/gobwas/ws v1.0.4/go.mod h1:szmBTxLgaFppYjEmNtny/v3w89xOydFnnZMcgRRu/EM=
github.com/godbus/dbus/v5 v5.0.3/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/gofrs/flock v0.8.1 h1:+gYjHKf32LDeiEEFhQaotPbLuUXjY5ZqxKgXy7n59aw=
github.com/gofrs/flock v0.8.1/go.mod h1:F1TvTiK9OcQqauNUHlbJvyl9Qa1QvF/gOUDKA14jxHU=
github.com/gofrs/uuid v3.2.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM=
github.com/gofrs/uuid v4.0.0+incompatible h1:1SD/1F5pU8p29ybwgQSwpQk+mwdRrXCYuPhW6m+TnJw=
github.com/gofrs/uuid v4.0.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM=
Expand Down
156 changes: 129 additions & 27 deletions pkg/storage/utils/decomposedfs/xattrs/xattrs.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ import (
"strings"

provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1"
"github.com/cs3org/reva/pkg/storage/utils/filelocks"
"github.com/gofrs/flock"
"github.com/pkg/errors"
"github.com/pkg/xattr"
)

Expand Down Expand Up @@ -112,27 +115,73 @@ func refFromCS3(b []byte) (*provider.Reference, error) {

// CopyMetadata copies all extended attributes from source to target.
// The optional filter function can be used to filter by attribute name, e.g. by checking a prefix
func CopyMetadata(s, t string, filter func(attributeName string) bool) error {
var attrs []string
var err error
if attrs, err = xattr.List(s); err != nil {
return err
// For the source file, a shared lock is acquired. For the target, an exclusive
// write lock is acquired.
func CopyMetadata(src, target string, filter func(attributeName string) bool) (err error) {
var writeLock, readLock *flock.Flock

// Acquire the write log on the target node first.
writeLock, err = filelocks.AcquireWriteLock(target)

if err != nil {
return errors.Wrap(err, "xattrs: Unable to lock target to write")
}
defer func() {
rerr := filelocks.ReleaseLock(writeLock)

// if err is non nil we do not overwrite that
if err == nil {
err = rerr
}
}()

// now try to get a shared lock on the source
readLock, err = filelocks.AcquireReadLock(src)

if err != nil {
return errors.Wrap(err, "xattrs: Unable to lock file for read")
}
defer func() {
rerr := filelocks.ReleaseLock(readLock)

// if err is non nil we do not overwrite that
if err == nil {
err = rerr
}
}()

// both locks are established. Copy.
var attrNameList []string
if attrNameList, err = xattr.List(src); err != nil {
return errors.Wrap(err, "Can not get xattr listing on src")
}
for i := range attrs {
if filter == nil || filter(attrs[i]) {
b, err := xattr.Get(s, attrs[i])
if err != nil {
return err

// error handling: We count errors of reads or writes of xattrs.
// if there were any read or write errors an error is returned.
var xerrs int = 0
var xerr error
for idx := range attrNameList {
attrName := attrNameList[idx]
if filter == nil || filter(attrName) {
var attrVal []byte
if attrVal, xerr = xattr.Get(src, attrName); xerr != nil {
xerrs++
}
if err := xattr.Set(t, attrs[i], b); err != nil {
return err
if xerr = xattr.Set(target, attrName, attrVal); xerr != nil {
xerrs++
}
}
}
return nil
if xerrs > 0 {
err = errors.Wrap(xerr, "Failed to copy all xattrs, last error returned.")
}

return err
}

// Set an extended attribute key to the given value
// No file locking is involved here as writing a single xattr is
// considered to be atomic.
func Set(filePath string, key string, val string) error {

if err := xattr.Set(filePath, key, []byte(val)); err != nil {
Expand All @@ -142,21 +191,48 @@ func Set(filePath string, key string, val string) error {
}

// SetMultiple allows setting multiple key value pairs at once
// TODO the changes are protected with an flock
func SetMultiple(filePath string, attribs map[string]string) error {
// the changes are protected with an file lock
// If the file lock can not be acquired the function returns a
// lock error.
func SetMultiple(filePath string, attribs map[string]string) (err error) {

// FIXME: Lock here
// h, err := lockedfile.OpenFile(filePath, os.O_WRONLY, 0) // 0? Open File only workn for files ... but we want to lock dirs ... or symlinks
// or we append .lock to the file and use https://github.com/gofrs/flock
var fileLock *flock.Flock
fileLock, err = filelocks.AcquireWriteLock(filePath)

if err != nil {
return errors.Wrap(err, "xattrs: Can not acquire write log")
}
defer func() {
rerr := filelocks.ReleaseLock(fileLock)

// if err is non nil we do not overwrite that
if err == nil {
err = rerr
}
}()

// error handling: Count if there are errors while setting the attribs.
// if there were any, return an error.
var xerrs int = 0
var xerr error
for key, val := range attribs {
if err := Set(filePath, key, val); err != nil {
return err
if xerr = xattr.Set(filePath, key, []byte(val)); xerr != nil {
// log
xerrs++
}
}
if xerrs > 0 {
err = errors.Wrap(xerr, "Failed to set all xattrs")
}
return nil
butonic marked this conversation as resolved.
Show resolved Hide resolved
}

// Get an extended attribute value for the given key
// No file locking is involved here as reading a single xattr is
// considered to be atomic.
func Get(filePath, key string) (string, error) {

v, err := xattr.Get(filePath, key)
if err != nil {
return "", err
Expand All @@ -178,21 +254,47 @@ func GetInt64(filePath, key string) (int64, error) {
return v, nil
}

// All reads all extended attributes for a node
func All(filePath string) (map[string]string, error) {
// All reads all extended attributes for a node, protected by a
// shared file lock
func All(filePath string) (attribs map[string]string, err error) {
var fileLock *flock.Flock

fileLock, err = filelocks.AcquireReadLock(filePath)

if err != nil {
return nil, errors.Wrap(err, "xattrs: Unable to lock file for read")
}
defer func() {
rerr := filelocks.ReleaseLock(fileLock)

// if err is non nil we do not overwrite that
if err == nil {
err = rerr
}
}()

attrNames, err := xattr.List(filePath)
if err != nil {
return nil, err
}

attribs := make(map[string]string, len(attrNames))
var xerrs int = 0
var xerr error
// error handling: Count if there are errors while reading all attribs.
// if there were any, return an error.
attribs = make(map[string]string, len(attrNames))
for _, name := range attrNames {
val, err := xattr.Get(filePath, name)
if err != nil {
return nil, err
var val []byte
if val, xerr = xattr.Get(filePath, name); xerr != nil {
xerrs++
} else {
attribs[name] = string(val)
}
attribs[name] = string(val)
}

return attribs, nil
if xerrs > 0 {
err = errors.Wrap(xerr, "Failed to read all xattrs")
}

return attribs, err
}
101 changes: 101 additions & 0 deletions pkg/storage/utils/filelocks/filelocks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright 2018-2021 CERN
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// In applying this license, CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.

package filelocks

import (
"errors"
"os"
"time"

"github.com/gofrs/flock"
)

// FlockFile returns the flock filename for a given file name
// it returns an empty string if the input is empty
func FlockFile(file string) string {
var n string
if len(file) > 0 {
n = file + ".flock"
}
return n
}

// acquireWriteLog acquires a lock on a file or directory.
// if the parameter write is true, it gets an exclusive write lock, otherwise a shared read lock.
// The function returns a Flock object, unlocking has to be done in the calling function.
func acquireLock(file string, write bool) (*flock.Flock, error) {
var err error

// Create a file to carry the log
n := FlockFile(file)
if len(n) == 0 {
return nil, errors.New("lock path is empty")
}
// Acquire the write log on the target node first.
lock := flock.New(n)

var ok bool
for i := 0; i < 10; i++ {
if write {
ok, err = lock.TryLock()
} else {
ok, err = lock.TryRLock()
}

if ok {
break
}

time.Sleep(time.Duration(i*3) * time.Millisecond)
}

if !ok {
err = errors.New("could not acquire lock after wait")
}

if err != nil {
return nil, err
}
return lock, nil
}

// AcquireReadLock tries to acquire a shared lock to read from the
// file and returns a lock object or an error accordingly.
func AcquireReadLock(file string) (*flock.Flock, error) {
return acquireLock(file, false)
}

// AcquireWriteLock tries to acquire a shared lock to write from the
// file and returns a lock object or an error accordingly.
func AcquireWriteLock(file string) (*flock.Flock, error) {
return acquireLock(file, true)
}

// ReleaseLock releases a lock from a file that was previously created
// by AcquireReadLock or AcquireWriteLock.
func ReleaseLock(lock *flock.Flock) error {
// there is a probability that if the file can not be unlocked,
// we also can not remove the file. We will only try to remove if it
// was successfully unlocked.
err := lock.Unlock()
if err == nil {
err = os.Remove(lock.Path())
}
return err
}