Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions common/filesystem/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"syscall"
"time"

"github.com/pkg/xattr"
"golang.org/x/sys/unix"
)

Expand Down Expand Up @@ -75,6 +76,10 @@ type Provider interface {
// lexicographical order. Note WalkDir may return an absolute path, thus the paths it returns
// should be sanitized with GetRelativePathWithinMount() if needed.
WalkDir(path string, fn fs.WalkDirFunc, walkOpts ...WalkOption) error
// Return all user extended attributes.
GetUserXattrs(path string) (map[string]string, error)
// Set all user extended attributes.
SetUserXattrs(path string, userXattrs map[string]string) error
// CopyXAttrsToFile copies the xattrs from srcPath to dstPath. If there are no xattrs or if the BeeGFS
// instance does not have user xattrs enabled, no error is returned.
CopyXAttrsToFile(srcPath, dstPath string) error
Expand Down Expand Up @@ -273,6 +278,40 @@ func (fs BeeGFS) WalkDir(path string, fn fs.WalkDirFunc, opts ...WalkOption) err
return filepath.WalkDir(root, fn)
}

func (fs BeeGFS) GetUserXattrs(path string) (map[string]string, error) {
return fs.getXattrs(path, "user.")
}

func (fs BeeGFS) SetUserXattrs(path string, userXattrs map[string]string) error {
absPath := filepath.Join(fs.MountPoint, path)
for key, value := range userXattrs {
xattr.Set(absPath, key, []byte(value))
}
return nil
}

func (fs BeeGFS) getXattrs(path string, prefix string) (map[string]string, error) {
absPath := filepath.Join(fs.MountPoint, path)
names, err := xattr.List(absPath)
if err != nil {
return nil, err
}

xattrs := map[string]string{}
for _, name := range names {
if !strings.HasPrefix(name, prefix) {
continue
}
value, err := xattr.Get(absPath, name)
if err != nil {
return nil, err
}
xattrs[name] = string(value)
}

return xattrs, nil
}

func (fs BeeGFS) CopyXAttrsToFile(srcPath, dstPath string) error {
srcPath = filepath.Join(fs.MountPoint, srcPath)
dstPath = filepath.Join(fs.MountPoint, dstPath)
Expand Down
8 changes: 8 additions & 0 deletions common/filesystem/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,14 @@ func (fs MockFS) WalkDir(path string, fn fs.WalkDirFunc, opts ...WalkOption) err
return fmt.Errorf("not implemented")
}

func (fs MockFS) GetUserXattrs(path string) (map[string]string, error) {
return nil, fmt.Errorf("not implemented")
}

func (fs MockFS) SetUserXattrs(path string, userXattrs map[string]string) error {
return fmt.Errorf("not implemented")
}

func (fs MockFS) CopyXAttrsToFile(srcPath, dstPath string) error {
return fmt.Errorf("not implemented")
}
Expand Down
8 changes: 8 additions & 0 deletions common/filesystem/unmounted.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,14 @@ func (fs UnmountedFS) WalkDir(path string, fn fs.WalkDirFunc, opts ...WalkOption
return ErrUnmounted
}

func (fs UnmountedFS) GetUserXattrs(path string) (map[string]string, error) {
return nil, ErrUnmounted
}

func (fs UnmountedFS) SetUserXattrs(path string, userXattrs map[string]string) error {
return ErrUnmounted
}

func (fs UnmountedFS) CopyXAttrsToFile(srcPath, dstPath string) error {
return ErrUnmounted
}
Expand Down
4 changes: 2 additions & 2 deletions common/rst/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,8 @@ func (c *JobBuilderClient) SanitizeRemotePath(remotePath string) string {
}

// GetRemotePathInfo is not implemented and should never be called.
func (c *JobBuilderClient) GetRemotePathInfo(ctx context.Context, cfg *flex.JobRequestCfg) (int64, time.Time, error) {
return 0, time.Time{}, ErrUnsupportedOpForRST
func (c *JobBuilderClient) GetRemotePathInfo(ctx context.Context, cfg *flex.JobRequestCfg) (int64, time.Time, map[string]string, error) {
return 0, time.Time{}, nil, ErrUnsupportedOpForRST
}

// GenerateExternalId is not implemented and should never be called.
Expand Down
4 changes: 2 additions & 2 deletions common/rst/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,8 @@ func (r *MockClient) SanitizeRemotePath(remotePath string) string {
return remotePath
}

func (r *MockClient) GetRemotePathInfo(ctx context.Context, cfg *flex.JobRequestCfg) (int64, time.Time, error) {
return 0, time.Time{}, ErrUnsupportedOpForRST
func (r *MockClient) GetRemotePathInfo(ctx context.Context, cfg *flex.JobRequestCfg) (int64, time.Time, map[string]string, error) {
return 0, time.Time{}, nil, ErrUnsupportedOpForRST
}

func (r *MockClient) GenerateExternalId(ctx context.Context, cfg *flex.JobRequestCfg) (string, error) {
Expand Down
81 changes: 79 additions & 2 deletions common/rst/rst.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@
// It is important for providers to maintain beegfs-mtime which is the file's last modification
// time of the prior upload operation. Beegfs-mtime is used in conjunction with the file's size
// to determine whether the file is sync.
GetRemotePathInfo(ctx context.Context, cfg *flex.JobRequestCfg) (remoteSize int64, remoteMtime time.Time, err error)
GetRemotePathInfo(ctx context.Context, cfg *flex.JobRequestCfg) (remoteSize int64, remoteMtime time.Time, remoteUserXattr map[string]string, err error)
// GenerateExternalId can be used to generate an identifier for remote operations.
GenerateExternalId(ctx context.Context, cfg *flex.JobRequestCfg) (externalId string, err error)
}
Expand Down Expand Up @@ -344,9 +344,47 @@

// IsFileAlreadySynced returns whether the file is already synced with remote storage target
func IsFileAlreadySynced(lockedInfo *flex.JobLockedInfo) bool {
return lockedInfo.Size == lockedInfo.RemoteSize && lockedInfo.Mtime.AsTime().Equal(lockedInfo.RemoteMtime.AsTime()) && IsFileUserXattrsSynced(lockedInfo)
}

func IsFileContentSynced(lockedInfo *flex.JobLockedInfo) bool {
return lockedInfo.Size == lockedInfo.RemoteSize && lockedInfo.Mtime.AsTime().Equal(lockedInfo.RemoteMtime.AsTime())
}

// IsFileUserXattrsSynced returns whether the file's user extended attributes match remote's metadata.
func IsFileUserXattrsSynced(lockedInfo *flex.JobLockedInfo) bool {
userXattrsIsEmpty := lockedInfo.UserXattrs == nil || len(lockedInfo.UserXattrs) == 0

Check failure on line 356 in common/rst/rst.go

View workflow job for this annotation

GitHub Actions / checks

lockedInfo.UserXattrs undefined (type *flex.JobLockedInfo has no field or method UserXattrs)
remoteUserXattrsIsEmpty := lockedInfo.RemoteUserXattrs == nil || len(lockedInfo.RemoteUserXattrs) == 0

Check failure on line 357 in common/rst/rst.go

View workflow job for this annotation

GitHub Actions / checks

lockedInfo.RemoteUserXattrs undefined (type *flex.JobLockedInfo has no field or method RemoteUserXattrs)
if userXattrsIsEmpty && remoteUserXattrsIsEmpty {
return true
}
if userXattrsIsEmpty != remoteUserXattrsIsEmpty {
return false
}
if len(lockedInfo.UserXattrs) != len(lockedInfo.RemoteUserXattrs) {

Check failure on line 364 in common/rst/rst.go

View workflow job for this annotation

GitHub Actions / checks

lockedInfo.RemoteUserXattrs undefined (type *flex.JobLockedInfo has no field or method RemoteUserXattrs)

Check failure on line 364 in common/rst/rst.go

View workflow job for this annotation

GitHub Actions / checks

lockedInfo.UserXattrs undefined (type *flex.JobLockedInfo has no field or method UserXattrs)
return false
}

for key, value := range lockedInfo.UserXattrs {

Check failure on line 368 in common/rst/rst.go

View workflow job for this annotation

GitHub Actions / checks

lockedInfo.UserXattrs undefined (type *flex.JobLockedInfo has no field or method UserXattrs)
key_found := false
for remote_key, remote_value := range lockedInfo.RemoteUserXattrs {

Check failure on line 370 in common/rst/rst.go

View workflow job for this annotation

GitHub Actions / checks

lockedInfo.RemoteUserXattrs undefined (type *flex.JobLockedInfo has no field or method RemoteUserXattrs)
if key == remote_key {
key_found = true
if value != remote_value {
return false
}
break
}
}

if !key_found {
return false
}
}

return true
}

// IsFileSizeMatched returns whether the lockedInfo local and remote file sizes match. It is the
// responsibility of the caller to ensure lockedInfo is already populated and locked.
func IsFileSizeMatched(lockedInfo *flex.JobLockedInfo) bool {
Expand Down Expand Up @@ -424,12 +462,13 @@
}
}

remoteSize, remoteMtime, err := client.GetRemotePathInfo(ctx, cfg)
remoteSize, remoteMtime, remoteUserXattrs, err := client.GetRemotePathInfo(ctx, cfg)
if err != nil && (cfg.Download || !errors.Is(err, os.ErrNotExist)) {
return getRequestWithFailedPrecondition(fmt.Sprintf("unable to retrieve remote path information: %s", err.Error()))
}
lockedInfo.SetRemoteSize(remoteSize)
lockedInfo.SetRemoteMtime(timestamppb.New(remoteMtime))
lockedInfo.SetRemoteUserXattrs(remoteUserXattrs)

Check failure on line 471 in common/rst/rst.go

View workflow job for this annotation

GitHub Actions / checks

lockedInfo.SetRemoteUserXattrs undefined (type *flex.JobLockedInfo has no field or method SetRemoteUserXattrs)

return client.GetJobRequest(cfg)
}
Expand Down Expand Up @@ -464,6 +503,7 @@
fileDataStateCleared := false
filePreallocated := false
fileCreated := false
fileUserXattrChanged := false
defer func() {
if err != nil {
if IsFileOffloaded(lockedInfo) {
Expand All @@ -479,6 +519,12 @@
err = fmt.Errorf("%w: unable to restore stub file rst url: %s", err, restoreErr.Error())
}
}
if fileUserXattrChanged {
// Attempt to roll back the changed user extended attributes.
if restoreErr := mountPoint.SetUserXattrs(cfg.Path, lockedInfo.UserXattrs); restoreErr != nil {

Check failure on line 524 in common/rst/rst.go

View workflow job for this annotation

GitHub Actions / checks

lockedInfo.UserXattrs undefined (type *flex.JobLockedInfo has no field or method UserXattrs)
err = fmt.Errorf("%w: unable to restore user's extended attributes: %s", err, restoreErr.Error())
}
}
} else if fileCreated {
// Remove preallocated file since it previously did not exist.
if restoreErr := mountPoint.Remove(cfg.Path); restoreErr != nil {
Expand Down Expand Up @@ -511,6 +557,13 @@
if err != nil {
return
}

if cfg.EnableXattr != nil && *cfg.EnableXattr {

Check failure on line 561 in common/rst/rst.go

View workflow job for this annotation

GitHub Actions / checks

cfg.EnableXattr undefined (type *flex.JobRequestCfg has no field or method EnableXattr)
if err = mountPoint.SetUserXattrs(cfg.Path, lockedInfo.RemoteUserXattrs); err != nil {

Check failure on line 562 in common/rst/rst.go

View workflow job for this annotation

GitHub Actions / checks

lockedInfo.RemoteUserXattrs undefined (type *flex.JobLockedInfo has no field or method RemoteUserXattrs)
return
}
}

lockedInfo.SetReadWriteLocked(true)

return updateRstCfg(ErrJobAlreadyOffloaded)
Expand Down Expand Up @@ -559,6 +612,14 @@
}
filePreallocated = true
}

if cfg.EnableXattr != nil && *cfg.EnableXattr && !IsFileUserXattrsSynced(cfg.LockedInfo) {
if err = mountPoint.SetUserXattrs(cfg.Path, lockedInfo.RemoteUserXattrs); err != nil {
return
}
fileUserXattrChanged = true
}

} else if IsFileOffloaded(lockedInfo) {
err = fmt.Errorf("unable to upload stub file: %w", ErrUnsupportedOpForRST)
return
Expand All @@ -571,6 +632,13 @@
fileCreated = true
filePreallocated = true

if cfg.EnableXattr != nil && *cfg.EnableXattr {
if err = mountPoint.SetUserXattrs(cfg.Path, lockedInfo.RemoteUserXattrs); err != nil {
return
}
}
fileUserXattrChanged = true

var info *flex.JobLockedInfo
if info, _, _, entryInfo, ownerNode, err = GetLockedInfo(ctx, mountPoint, cfg, cfg.Path, false); err != nil {
err = fmt.Errorf("failed to collect information for new file: %w", err)
Expand Down Expand Up @@ -659,6 +727,15 @@
lockedInfo.Mtime = timestamppb.New(stat.ModTime())
lockedInfo.Mode = uint32(stat.Mode())

if cfg.EnableXattr != nil && *cfg.EnableXattr {
var xattrs map[string]string
xattrs, err = mountPoint.GetUserXattrs(inMountPath)
if err != nil {
return
}
lockedInfo.SetUserXattrs(xattrs)
}

if entryInfo.Entry.FileState.GetDataState() == DataStateOffloaded {
if lockedInfo.StubUrlRstId, lockedInfo.StubUrlPath, err = GetOffloadedUrlPartsFromFile(mountPoint, inMountPath); err != nil {
if errors.Is(err, syscall.EWOULDBLOCK) {
Expand Down
Loading
Loading