diff --git a/common/filesystem/fs.go b/common/filesystem/fs.go index fd882cff9..0c3bc1fe4 100644 --- a/common/filesystem/fs.go +++ b/common/filesystem/fs.go @@ -12,6 +12,7 @@ import ( "syscall" "time" + "github.com/pkg/xattr" "golang.org/x/sys/unix" ) @@ -75,6 +76,10 @@ type Provider interface { // lexicographical order. Note WalkDir may return an absolute path, thus the paths it returns // should be sanitized with GetRelativePathWithinMount() if needed. WalkDir(path string, fn fs.WalkDirFunc, walkOpts ...WalkOption) error + // Return all user extended attributes. + GetUserXattrs(path string) (map[string]string, error) + // Set all user extended attributes. + SetUserXattrs(path string, userXattrs map[string]string) error // CopyXAttrsToFile copies the xattrs from srcPath to dstPath. If there are no xattrs or if the BeeGFS // instance does not have user xattrs enabled, no error is returned. CopyXAttrsToFile(srcPath, dstPath string) error @@ -273,6 +278,40 @@ func (fs BeeGFS) WalkDir(path string, fn fs.WalkDirFunc, opts ...WalkOption) err return filepath.WalkDir(root, fn) } +func (fs BeeGFS) GetUserXattrs(path string) (map[string]string, error) { + return fs.getXattrs(path, "user.") +} + +func (fs BeeGFS) SetUserXattrs(path string, userXattrs map[string]string) error { + absPath := filepath.Join(fs.MountPoint, path) + for key, value := range userXattrs { + xattr.Set(absPath, key, []byte(value)) + } + return nil +} + +func (fs BeeGFS) getXattrs(path string, prefix string) (map[string]string, error) { + absPath := filepath.Join(fs.MountPoint, path) + names, err := xattr.List(absPath) + if err != nil { + return nil, err + } + + xattrs := map[string]string{} + for _, name := range names { + if !strings.HasPrefix(name, prefix) { + continue + } + value, err := xattr.Get(absPath, name) + if err != nil { + return nil, err + } + xattrs[name] = string(value) + } + + return xattrs, nil +} + func (fs BeeGFS) CopyXAttrsToFile(srcPath, dstPath string) error { srcPath = filepath.Join(fs.MountPoint, srcPath) dstPath = filepath.Join(fs.MountPoint, dstPath) diff --git a/common/filesystem/mock.go b/common/filesystem/mock.go index f0d41bc28..d66564ff2 100644 --- a/common/filesystem/mock.go +++ b/common/filesystem/mock.go @@ -101,6 +101,14 @@ func (fs MockFS) WalkDir(path string, fn fs.WalkDirFunc, opts ...WalkOption) err return fmt.Errorf("not implemented") } +func (fs MockFS) GetUserXattrs(path string) (map[string]string, error) { + return nil, fmt.Errorf("not implemented") +} + +func (fs MockFS) SetUserXattrs(path string, userXattrs map[string]string) error { + return fmt.Errorf("not implemented") +} + func (fs MockFS) CopyXAttrsToFile(srcPath, dstPath string) error { return fmt.Errorf("not implemented") } diff --git a/common/filesystem/unmounted.go b/common/filesystem/unmounted.go index ec0c44326..6ed2937b4 100644 --- a/common/filesystem/unmounted.go +++ b/common/filesystem/unmounted.go @@ -69,6 +69,14 @@ func (fs UnmountedFS) WalkDir(path string, fn fs.WalkDirFunc, opts ...WalkOption return ErrUnmounted } +func (fs UnmountedFS) GetUserXattrs(path string) (map[string]string, error) { + return nil, ErrUnmounted +} + +func (fs UnmountedFS) SetUserXattrs(path string, userXattrs map[string]string) error { + return ErrUnmounted +} + func (fs UnmountedFS) CopyXAttrsToFile(srcPath, dstPath string) error { return ErrUnmounted } diff --git a/common/rst/builder.go b/common/rst/builder.go index 744ed3dc9..917b5318f 100644 --- a/common/rst/builder.go +++ b/common/rst/builder.go @@ -117,8 +117,8 @@ func (c *JobBuilderClient) SanitizeRemotePath(remotePath string) string { } // GetRemotePathInfo is not implemented and should never be called. -func (c *JobBuilderClient) GetRemotePathInfo(ctx context.Context, cfg *flex.JobRequestCfg) (int64, time.Time, error) { - return 0, time.Time{}, ErrUnsupportedOpForRST +func (c *JobBuilderClient) GetRemotePathInfo(ctx context.Context, cfg *flex.JobRequestCfg) (int64, time.Time, map[string]string, error) { + return 0, time.Time{}, nil, ErrUnsupportedOpForRST } // GenerateExternalId is not implemented and should never be called. diff --git a/common/rst/mock.go b/common/rst/mock.go index ebbfdd467..19aa84c46 100644 --- a/common/rst/mock.go +++ b/common/rst/mock.go @@ -119,8 +119,8 @@ func (r *MockClient) SanitizeRemotePath(remotePath string) string { return remotePath } -func (r *MockClient) GetRemotePathInfo(ctx context.Context, cfg *flex.JobRequestCfg) (int64, time.Time, error) { - return 0, time.Time{}, ErrUnsupportedOpForRST +func (r *MockClient) GetRemotePathInfo(ctx context.Context, cfg *flex.JobRequestCfg) (int64, time.Time, map[string]string, error) { + return 0, time.Time{}, nil, ErrUnsupportedOpForRST } func (r *MockClient) GenerateExternalId(ctx context.Context, cfg *flex.JobRequestCfg) (string, error) { diff --git a/common/rst/rst.go b/common/rst/rst.go index c213001cb..19441c4a8 100644 --- a/common/rst/rst.go +++ b/common/rst/rst.go @@ -107,7 +107,7 @@ type Provider interface { // It is important for providers to maintain beegfs-mtime which is the file's last modification // time of the prior upload operation. Beegfs-mtime is used in conjunction with the file's size // to determine whether the file is sync. - GetRemotePathInfo(ctx context.Context, cfg *flex.JobRequestCfg) (remoteSize int64, remoteMtime time.Time, err error) + GetRemotePathInfo(ctx context.Context, cfg *flex.JobRequestCfg) (remoteSize int64, remoteMtime time.Time, remoteUserXattr map[string]string, err error) // GenerateExternalId can be used to generate an identifier for remote operations. GenerateExternalId(ctx context.Context, cfg *flex.JobRequestCfg) (externalId string, err error) } @@ -344,9 +344,47 @@ func FileExists(lockedInfo *flex.JobLockedInfo) bool { // IsFileAlreadySynced returns whether the file is already synced with remote storage target func IsFileAlreadySynced(lockedInfo *flex.JobLockedInfo) bool { + return lockedInfo.Size == lockedInfo.RemoteSize && lockedInfo.Mtime.AsTime().Equal(lockedInfo.RemoteMtime.AsTime()) && IsFileUserXattrsSynced(lockedInfo) +} + +func IsFileContentSynced(lockedInfo *flex.JobLockedInfo) bool { return lockedInfo.Size == lockedInfo.RemoteSize && lockedInfo.Mtime.AsTime().Equal(lockedInfo.RemoteMtime.AsTime()) } +// IsFileUserXattrsSynced returns whether the file's user extended attributes match remote's metadata. +func IsFileUserXattrsSynced(lockedInfo *flex.JobLockedInfo) bool { + userXattrsIsEmpty := lockedInfo.UserXattrs == nil || len(lockedInfo.UserXattrs) == 0 + remoteUserXattrsIsEmpty := lockedInfo.RemoteUserXattrs == nil || len(lockedInfo.RemoteUserXattrs) == 0 + if userXattrsIsEmpty && remoteUserXattrsIsEmpty { + return true + } + if userXattrsIsEmpty != remoteUserXattrsIsEmpty { + return false + } + if len(lockedInfo.UserXattrs) != len(lockedInfo.RemoteUserXattrs) { + return false + } + + for key, value := range lockedInfo.UserXattrs { + key_found := false + for remote_key, remote_value := range lockedInfo.RemoteUserXattrs { + if key == remote_key { + key_found = true + if value != remote_value { + return false + } + break + } + } + + if !key_found { + return false + } + } + + return true +} + // IsFileSizeMatched returns whether the lockedInfo local and remote file sizes match. It is the // responsibility of the caller to ensure lockedInfo is already populated and locked. func IsFileSizeMatched(lockedInfo *flex.JobLockedInfo) bool { @@ -424,12 +462,13 @@ func BuildJobRequest(ctx context.Context, client Provider, mountPoint filesystem } } - remoteSize, remoteMtime, err := client.GetRemotePathInfo(ctx, cfg) + remoteSize, remoteMtime, remoteUserXattrs, err := client.GetRemotePathInfo(ctx, cfg) if err != nil && (cfg.Download || !errors.Is(err, os.ErrNotExist)) { return getRequestWithFailedPrecondition(fmt.Sprintf("unable to retrieve remote path information: %s", err.Error())) } lockedInfo.SetRemoteSize(remoteSize) lockedInfo.SetRemoteMtime(timestamppb.New(remoteMtime)) + lockedInfo.SetRemoteUserXattrs(remoteUserXattrs) return client.GetJobRequest(cfg) } @@ -464,6 +503,7 @@ func PrepareFileStateForWorkRequests(ctx context.Context, client Provider, mount fileDataStateCleared := false filePreallocated := false fileCreated := false + fileUserXattrChanged := false defer func() { if err != nil { if IsFileOffloaded(lockedInfo) { @@ -479,6 +519,12 @@ func PrepareFileStateForWorkRequests(ctx context.Context, client Provider, mount err = fmt.Errorf("%w: unable to restore stub file rst url: %s", err, restoreErr.Error()) } } + if fileUserXattrChanged { + // Attempt to roll back the changed user extended attributes. + if restoreErr := mountPoint.SetUserXattrs(cfg.Path, lockedInfo.UserXattrs); restoreErr != nil { + err = fmt.Errorf("%w: unable to restore user's extended attributes: %s", err, restoreErr.Error()) + } + } } else if fileCreated { // Remove preallocated file since it previously did not exist. if restoreErr := mountPoint.Remove(cfg.Path); restoreErr != nil { @@ -511,6 +557,13 @@ func PrepareFileStateForWorkRequests(ctx context.Context, client Provider, mount if err != nil { return } + + if cfg.EnableXattr != nil && *cfg.EnableXattr { + if err = mountPoint.SetUserXattrs(cfg.Path, lockedInfo.RemoteUserXattrs); err != nil { + return + } + } + lockedInfo.SetReadWriteLocked(true) return updateRstCfg(ErrJobAlreadyOffloaded) @@ -559,6 +612,14 @@ func PrepareFileStateForWorkRequests(ctx context.Context, client Provider, mount } filePreallocated = true } + + if cfg.EnableXattr != nil && *cfg.EnableXattr && !IsFileUserXattrsSynced(cfg.LockedInfo) { + if err = mountPoint.SetUserXattrs(cfg.Path, lockedInfo.RemoteUserXattrs); err != nil { + return + } + fileUserXattrChanged = true + } + } else if IsFileOffloaded(lockedInfo) { err = fmt.Errorf("unable to upload stub file: %w", ErrUnsupportedOpForRST) return @@ -571,6 +632,13 @@ func PrepareFileStateForWorkRequests(ctx context.Context, client Provider, mount fileCreated = true filePreallocated = true + if cfg.EnableXattr != nil && *cfg.EnableXattr { + if err = mountPoint.SetUserXattrs(cfg.Path, lockedInfo.RemoteUserXattrs); err != nil { + return + } + } + fileUserXattrChanged = true + var info *flex.JobLockedInfo if info, _, _, entryInfo, ownerNode, err = GetLockedInfo(ctx, mountPoint, cfg, cfg.Path, false); err != nil { err = fmt.Errorf("failed to collect information for new file: %w", err) @@ -659,6 +727,15 @@ func GetLockedInfo( lockedInfo.Mtime = timestamppb.New(stat.ModTime()) lockedInfo.Mode = uint32(stat.Mode()) + if cfg.EnableXattr != nil && *cfg.EnableXattr { + var xattrs map[string]string + xattrs, err = mountPoint.GetUserXattrs(inMountPath) + if err != nil { + return + } + lockedInfo.SetUserXattrs(xattrs) + } + if entryInfo.Entry.FileState.GetDataState() == DataStateOffloaded { if lockedInfo.StubUrlRstId, lockedInfo.StubUrlPath, err = GetOffloadedUrlPartsFromFile(mountPoint, inMountPath); err != nil { if errors.Is(err, syscall.EWOULDBLOCK) { diff --git a/common/rst/s3.go b/common/rst/s3.go index 5588a9f6c..5651e0e2b 100644 --- a/common/rst/s3.go +++ b/common/rst/s3.go @@ -26,6 +26,8 @@ import ( "github.com/thinkparq/beegfs-go/common/filesystem" "github.com/thinkparq/beegfs-go/ctl/pkg/ctl/entry" + "maps" + "github.com/thinkparq/protobuf/go/beeremote" "github.com/thinkparq/protobuf/go/flex" "google.golang.org/protobuf/proto" @@ -104,7 +106,8 @@ func (r *S3Client) GetJobRequest(cfg *flex.JobRequestCfg) *beeremote.JobRequest Tagging: cfg.Tagging, }, }, - Update: cfg.Update, + Update: cfg.Update, + EnableXattr: cfg.EnableXattr, } } @@ -123,6 +126,7 @@ func (r *S3Client) getJobRequestCfg(request *beeremote.JobRequest) *flex.JobRequ Update: request.Update, Metadata: sync.Metadata, Tagging: sync.Tagging, + EnableXattr: request.EnableXattr, } } @@ -188,7 +192,7 @@ func (r *S3Client) ExecuteWorkRequestPart(ctx context.Context, request *flex.Wor var err error switch sync.Operation { case flex.SyncJob_UPLOAD: - err = r.upload(ctx, request.Path, sync.RemotePath, request.ExternalId, part, sync.LockedInfo.Mtime.AsTime(), sync.Metadata, sync.Tagging) + err = r.upload(ctx, request.Path, sync.RemotePath, request.ExternalId, part, sync.LockedInfo.Mtime.AsTime(), sync.Metadata, sync.Tagging, sync.LockedInfo.UserXattrs) case flex.SyncJob_DOWNLOAD: err = r.download(ctx, request.Path, sync.RemotePath, part) } @@ -296,7 +300,7 @@ func (r *S3Client) GetWalk(ctx context.Context, prefix string, chanSize int) (<- return walkChan, nil } -func (r *S3Client) GetRemotePathInfo(ctx context.Context, cfg *flex.JobRequestCfg) (int64, time.Time, error) { +func (r *S3Client) GetRemotePathInfo(ctx context.Context, cfg *flex.JobRequestCfg) (int64, time.Time, map[string]string, error) { return r.getObjectMetadata(ctx, cfg.RemotePath, cfg.Download) } @@ -304,7 +308,7 @@ func (r *S3Client) GenerateExternalId(ctx context.Context, cfg *flex.JobRequestC if !cfg.Download { segCount, _ := r.recommendedSegments(cfg.LockedInfo.Size) if segCount > 1 { - return r.createUpload(ctx, cfg.RemotePath, cfg.LockedInfo.Mtime.AsTime(), cfg.Metadata, cfg.Tagging) + return r.createUpload(ctx, cfg.RemotePath, cfg.LockedInfo.Mtime.AsTime(), cfg.Metadata, cfg.Tagging, cfg.LockedInfo.UserXattrs) } } return "", nil @@ -412,7 +416,7 @@ func (r *S3Client) completeSyncWorkRequests_Download(ctx context.Context, job *b request := job.GetRequest() sync := request.GetSync() - _, mtime, err := r.getObjectMetadata(ctx, sync.RemotePath, false) + _, mtime, _, err := r.getObjectMetadata(ctx, sync.RemotePath, false) if err != nil { return fmt.Errorf("unable to verify the remote object has not changed: %w", err) } @@ -494,12 +498,12 @@ func (r *S3Client) prepareJobRequest(ctx context.Context, cfg *flex.JobRequestCf } // getObjectMetadata returns the object's size in bytes, modification time if it exists. -func (r *S3Client) getObjectMetadata(ctx context.Context, key string, keyMustExist bool) (int64, time.Time, error) { +func (r *S3Client) getObjectMetadata(ctx context.Context, key string, keyMustExist bool) (int64, time.Time, map[string]string, error) { if key == "" { if keyMustExist { - return 0, time.Time{}, fmt.Errorf("unable to retrieve object metadata! --remote-path must be specified") + return 0, time.Time{}, nil, fmt.Errorf("unable to retrieve object metadata! --remote-path must be specified") } - return 0, time.Time{}, nil + return 0, time.Time{}, nil, nil } headObjectInput := &s3.HeadObjectInput{ @@ -512,26 +516,36 @@ func (r *S3Client) getObjectMetadata(ctx context.Context, key string, keyMustExi var apiErr smithy.APIError if errors.As(err, &apiErr) { if apiErr.ErrorCode() == "NotFound" || apiErr.ErrorCode() == "NoSuchKey" { - return 0, time.Time{}, os.ErrNotExist + return 0, time.Time{}, nil, os.ErrNotExist } } - return 0, time.Time{}, err + return 0, time.Time{}, nil, err + } + + xattrs := map[string]string{} + var beegfsMtime string + var ok bool + for key, value := range resp.Metadata { + if !ok && key == "beegfs-mtime" { + beegfsMtime = value + ok = true + } else if strings.HasPrefix(key, "user.") { + xattrs[key] = value + } } - - beegfsMtime, ok := resp.Metadata["beegfs-mtime"] if !ok { - return *resp.ContentLength, *resp.LastModified, nil + return *resp.ContentLength, *resp.LastModified, xattrs, nil } mtime, err := time.Parse(time.RFC3339, beegfsMtime) if err != nil { - return *resp.ContentLength, *resp.LastModified, fmt.Errorf("unable to parse remote object's beegfs-mtime") + return *resp.ContentLength, *resp.LastModified, xattrs, fmt.Errorf("unable to parse remote object's beegfs-mtime") } - return *resp.ContentLength, mtime, nil + return *resp.ContentLength, mtime, xattrs, nil } -func (r *S3Client) createUpload(ctx context.Context, path string, mtime time.Time, metadata map[string]string, tagging *string) (uploadID string, err error) { +func (r *S3Client) createUpload(ctx context.Context, path string, mtime time.Time, metadata map[string]string, tagging *string, userXattrs map[string]string) (uploadID string, err error) { beegfsMtime := mtime.Format(time.RFC3339) if metadata == nil { metadata = map[string]string{"beegfs-mtime": beegfsMtime} @@ -540,6 +554,7 @@ func (r *S3Client) createUpload(ctx context.Context, path string, mtime time.Tim } else { metadata["beegfs-mtime"] = beegfsMtime } + maps.Copy(metadata, userXattrs) createMultipartUploadInput := &s3.CreateMultipartUploadInput{ Bucket: aws.String(r.config.GetS3().Bucket), @@ -611,6 +626,7 @@ func (r *S3Client) upload( mtime time.Time, metadata map[string]string, tagging *string, + userXattrs map[string]string, ) error { filePart, sha256sum, err := r.mountPoint.ReadFilePart(path, part.OffsetStart, part.OffsetStop) @@ -641,6 +657,7 @@ func (r *S3Client) upload( } else { metadata["beegfs-mtime"] = beegfsMtime } + maps.Copy(metadata, userXattrs) resp, err := r.client.PutObject(ctx, &s3.PutObjectInput{ Bucket: aws.String(r.config.GetS3().Bucket), diff --git a/ctl/internal/cmd/rst/pushpull.go b/ctl/internal/cmd/rst/pushpull.go index 29e040a7b..66c025bdf 100644 --- a/ctl/internal/cmd/rst/pushpull.go +++ b/ctl/internal/cmd/rst/pushpull.go @@ -23,7 +23,8 @@ type pushPullCfg struct { func newPushCmd() *cobra.Command { frontendCfg := pushPullCfg{} backendCfg := flex.JobRequestCfg{ - Update: new(bool), + Update: new(bool), + EnableXattr: new(bool), } var metadata map[string]string @@ -78,8 +79,10 @@ WARNING: Files are always uploaded and existing files overwritten unless the rem cmd.Flags().BoolVar(backendCfg.Update, "update", false, "Set the file's persistent remote target. Requires --remote-target.") cmd.Flags().StringToStringVar(&metadata, "metadata", nil, "Include optional metadata specified as 'key=value,[key=value]'.") cmd.Flags().StringToStringVar(&tagging, "tagging", nil, "Include optional tag-set specified as 'key=value,[key=value]'.") + cmd.Flags().BoolVar(backendCfg.EnableXattr, "with-xattr", false, "Push user defined extended attributes as tags.") cmd.Flags().MarkHidden("metadata") cmd.Flags().MarkHidden("tagging") + cmd.Flags().MarkHidden("xattr") return cmd } @@ -87,8 +90,9 @@ WARNING: Files are always uploaded and existing files overwritten unless the rem func newPullCmd() *cobra.Command { frontendCfg := pushPullCfg{} backendCfg := flex.JobRequestCfg{ - Download: true, - Update: new(bool), + Download: true, + Update: new(bool), + EnableXattr: new(bool), } cmd := &cobra.Command{ Use: "pull --remote-target= --remote-path= ", @@ -117,6 +121,7 @@ func newPullCmd() *cobra.Command { cmd.Flags().BoolVarP(&frontendCfg.verbose, "verbose", "v", false, "Print additional details about each job (use --debug) to also print work requests and results.") cmd.Flags().IntVar(&frontendCfg.width, "column-width", 35, "Set the maximum width of some columns before they overflow.") cmd.Flags().BoolVar(backendCfg.Update, "update", false, "Set the file's persistent remote target. Requires --remote-target.") + cmd.Flags().BoolVar(backendCfg.EnableXattr, "with-xattr", false, "Pull user defined tags as extended attributes.") return cmd } diff --git a/ctl/internal/cmd/rst/status.go b/ctl/internal/cmd/rst/status.go index c43c2012e..8eb01b481 100644 --- a/ctl/internal/cmd/rst/status.go +++ b/ctl/internal/cmd/rst/status.go @@ -50,6 +50,9 @@ Specifying Paths: if len(args) < 1 { return fmt.Errorf("missing argument. Usage: %s", cmd.Use) } + if backendCfg.EnableXattr && !backendCfg.VerifyRemote { + return fmt.Errorf("unable to verify user extended attributes without --verify-remote flag. Usage: %s", cmd.Use) + } return nil }, RunE: func(cmd *cobra.Command, args []string) error { @@ -66,6 +69,7 @@ Specifying Paths: cmd.Flags().BoolVar(&frontendCfg.summarize, "summarize", false, "Don't print results for individual paths and only print a summary.") cmd.Flags().StringVar(&backendCfg.FilterExpr, "filter-files", "", util.FilterFilesHelp) cmd.Flags().BoolVar(&backendCfg.VerifyRemote, "verify-remote", false, "Also queries the remote storage target(s) to detect changes not tracked by BeeGFS Remote (slower than local-only verification).") + cmd.Flags().BoolVar(&backendCfg.EnableXattr, "verify-xattrs", false, "Verify tags on the remote file are set as user extended attributes on the local file (--verify-remote must be specified).") cmd.MarkFlagsMutuallyExclusive("verbose", "summarize") return cmd } diff --git a/ctl/pkg/ctl/rst/status.go b/ctl/pkg/ctl/rst/status.go index ea1806284..00fccce9f 100644 --- a/ctl/pkg/ctl/rst/status.go +++ b/ctl/pkg/ctl/rst/status.go @@ -38,6 +38,8 @@ type GetStatusCfg struct { // Usually set based on viper.GetBool(config.DebugKey). This is passed in using the GetStatusCfg // to avoid an expensive call to Viper for every path. Debug bool + // Verify user extended attributes + EnableXattr bool } type GetStatusResult struct { @@ -412,7 +414,7 @@ func getPathStatusFromTarget( ) (*GetStatusResult, error) { // Default to any specified targets specified in cfg otherwise attempt to use rstIds returned // from GetLockedInfo. - lockedInfo, _, rstIds, _, _, err := rst.GetLockedInfo(ctx, mountPoint, &flex.JobRequestCfg{}, fsPath, true) + lockedInfo, _, rstIds, _, _, err := rst.GetLockedInfo(ctx, mountPoint, &flex.JobRequestCfg{EnableXattr: &cfg.EnableXattr}, fsPath, true) if len(cfg.RemoteTargets) != 0 { rstIds = cfg.RemoteTargets } @@ -491,7 +493,11 @@ func getPathStatusFromTarget( continue } - remoteSize, remoteMtime, err := client.GetRemotePathInfo(ctx, &flex.JobRequestCfg{Path: fsPath, RemotePath: client.SanitizeRemotePath(fsPath)}) + remoteSize, remoteMtime, remoteUserXattrs, err := client.GetRemotePathInfo(ctx, &flex.JobRequestCfg{ + Path: fsPath, + RemotePath: client.SanitizeRemotePath(fsPath), + }) + if err != nil { if errors.Is(err, os.ErrNotExist) { result.SyncStatus = NotAttempted @@ -502,12 +508,19 @@ func getPathStatusFromTarget( } lockedInfo.SetRemoteSize(remoteSize) lockedInfo.SetRemoteMtime(timestamppb.New(remoteMtime)) + lockedInfo.SetRemoteUserXattrs(remoteUserXattrs) - if rst.IsFileAlreadySynced(lockedInfo) { + contentSynced := rst.IsFileContentSynced(lockedInfo) + userXattrsSynced := rst.IsFileUserXattrsSynced(lockedInfo) + if contentSynced && userXattrsSynced { syncReason.WriteString(fmt.Sprintf("Target %d: File is synced based on the remote storage target.\n", tgt)) } else { result.SyncStatus = Unsynchronized - syncReason.WriteString(fmt.Sprintf("Target %d: File is not synced with remote storage target.\n", tgt)) + if contentSynced && !userXattrsSynced { + syncReason.WriteString(fmt.Sprintf("Target %d: File user extended attributes are not synced with remote storage target.\n", tgt)) + } else { + syncReason.WriteString(fmt.Sprintf("Target %d: File is not synced with remote storage target.\n", tgt)) + } } } diff --git a/go.mod b/go.mod index 2150fd1c6..9c99b5eec 100644 --- a/go.mod +++ b/go.mod @@ -30,6 +30,8 @@ require ( gopkg.in/natefinch/lumberjack.v2 v2.2.1 ) +require github.com/pkg/xattr v0.4.12 // indirect + require ( github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.10 // indirect github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.31 // indirect diff --git a/go.sum b/go.sum index a8a1c5086..ed06a6499 100644 --- a/go.sum +++ b/go.sum @@ -89,6 +89,8 @@ github.com/mitchellh/go-wordwrap v1.0.1 h1:TLuKupo69TCn6TQSyGxwI1EblZZEsQ0vMlAFQ github.com/mitchellh/go-wordwrap v1.0.1/go.mod h1:R62XHJLzvMFRBbcrT7m7WgmE1eOyTSsCt+hzestvNj0= github.com/pelletier/go-toml/v2 v2.2.4 h1:mye9XuhQ6gvn5h28+VilKrrPoQVanw5PMw/TB0t5Ec4= github.com/pelletier/go-toml/v2 v2.2.4/go.mod h1:2gIqNv+qfxSVS7cM2xJQKtLSTLUE9V8t9Stt+h56mCY= +github.com/pkg/xattr v0.4.12 h1:rRTkSyFNTRElv6pkA3zpjHpQ90p/OdHQC1GmGh1aTjM= +github.com/pkg/xattr v0.4.12/go.mod h1:di8WF84zAKk8jzR1UBTEWh9AUlIZZ7M/JNt8e9B6ktU= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= @@ -141,6 +143,7 @@ golang.org/x/net v0.41.0 h1:vBTly1HeNPEn3wtREYfy4GZ/NECgw2Cnl+nK6Nz3uvw= golang.org/x/net v0.41.0/go.mod h1:B/K4NNqkfmg07DQYrbwvSluqCJOOXwUjeb/5lOisjbA= golang.org/x/sync v0.15.0 h1:KWH3jNZsfyT6xfAfKiz6MRNmd46ByHDYaZ7KSkCtdW8= golang.org/x/sync v0.15.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= +golang.org/x/sys v0.0.0-20220408201424-a24fb2fb8a0f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.33.0 h1:q3i8TbbEz+JRD9ywIRlyRAQbM0qF7hu24q3teo2hbuw= golang.org/x/sys v0.33.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= golang.org/x/term v0.32.0 h1:DR4lr0TjUs3epypdhTOkMmuF5CDFJ/8pOnbzMZPQ7bg=