diff --git a/common/pkg/libartifact/events.go b/common/pkg/libartifact/events.go new file mode 100644 index 0000000000..09a7da3663 --- /dev/null +++ b/common/pkg/libartifact/events.go @@ -0,0 +1,62 @@ +//go:build !remote + +package libartifact + +import ( + "time" + + "github.com/sirupsen/logrus" +) + +// EventType indicates the type of an event. +type EventType int + +const ( + // EventTypeUnknown is an uninitialized EventType. + EventTypeUnknown EventType = iota + // EventTypeArtifactPull represents an artifact pull. + EventTypeArtifactPull + // EventTypeArtifactPullError represents a failed artifact pull. + EventTypeArtifactPullError + // EventTypeArtifactPush represents an artifact push. + EventTypeArtifactPush + // EventTypeArtifactPushError represents a failed artifact push. + EventTypeArtifactPushError + // EventTypeArtifactRemove represents an artifact removal. + EventTypeArtifactRemove + // EventTypeArtifactRemoveError represents a failed artifact removal. + EventTypeArtifactRemoveError + // EventTypeArtifactAdd represents an artifact being added. + EventTypeArtifactAdd + // EventTypeArtifactAddError represents a failed artifact add. + EventTypeArtifactAddError +) + +// Event represents an event such as an artifact pull or push. +type Event struct { + // ID of the object (e.g., artifact digest). + ID string + // Name of the object (e.g., artifact name "quay.io/foobar/artifact:special") + Name string + // Time of the event. + Time time.Time + // Type of the event. + Type EventType + // Error in case of failure. + Error error +} + +// writeEvent writes the specified event to the store's event channel. The +// event is discarded if no event channel has been registered (yet). +func (as *ArtifactStore) writeEvent(event *Event) { + select { + case as.eventChannel <- event: + // Done + case <-time.After(2 * time.Second): + // The store's event channel has a buffer of size 100 which + // should be enough even under high load. However, we + // shouldn't block too long in case the buffer runs full (could + // be an honest user error or bug). + logrus.Warnf("Discarding libartifact event which was not read within 2 seconds: %v", event) + } +} diff --git a/common/pkg/libartifact/store.go b/common/pkg/libartifact/store.go index 70520873da..0cea395b15 100644 --- a/common/pkg/libartifact/store.go +++ b/common/pkg/libartifact/store.go @@ -41,6 +41,7 @@ type ArtifactStore struct { SystemContext *types.SystemContext storePath string lock *lockfile.LockFile + eventChannel chan *Event } // NewArtifactStore is a constructor for artifact stores. Most artifact dealings depend on this. Store path is @@ -81,10 +82,22 @@ func NewArtifactStore(storePath string, sc *types.SystemContext) (*ArtifactStore return artifactStore, nil } +// EventChannel creates a buffered channel for events that the ArtifactStore will use +// to write events to. Callers are expected to read from the channel in a +// timely manner. +// Can be called once for a given ArtifactStore. +func (as *ArtifactStore) EventChannel() chan *Event { + if as.eventChannel != nil { + return as.eventChannel + } + as.eventChannel = make(chan *Event, 100) + return as.eventChannel +} + // lookupArtifactLocked looks up an artifact by fully qualified name, // or name@digest, full ID, or partial ID. // note: lookupArtifactLocked must be called while under a store lock -func (as ArtifactStore) lookupArtifactLocked(ctx context.Context, asr ArtifactStoreReference) (*Artifact, error) { +func (as *ArtifactStore) lookupArtifactLocked(ctx context.Context, asr ArtifactStoreReference) (*Artifact, error) { artifacts, err := as.getArtifacts(ctx, nil) if err != nil { return nil, err @@ -146,7 +159,7 @@ func (as ArtifactStore) lookupArtifactLocked(ctx context.Context, asr ArtifactSt } // Remove an artifact from the local artifact store. -func (as ArtifactStore) Remove(ctx context.Context, asr ArtifactStoreReference) (*digest.Digest, error) { +func (as *ArtifactStore) Remove(ctx context.Context, asr ArtifactStoreReference) (_ *digest.Digest, removeErr error) { as.lock.Lock() defer as.lock.Unlock() @@ -154,15 +167,31 @@ func (as ArtifactStore) Remove(ctx context.Context, asr ArtifactStoreReference) if err != nil { return nil, err } + + if as.eventChannel != nil { + defer func() { + if removeErr != nil { + as.writeEvent(&Event{ID: arty.Digest.String(), Name: arty.Name, Time: time.Now(), Type: EventTypeArtifactRemoveError, Error: removeErr}) + } + }() + } + ir, err := layout.NewReference(as.storePath, arty.Name) if err != nil { return nil, err } - return &arty.Digest, ir.DeleteImage(ctx, as.SystemContext) + if err := ir.DeleteImage(ctx, as.SystemContext); err != nil { + return nil, err + } + + if as.eventChannel != nil { + as.writeEvent(&Event{ID: arty.Digest.String(), Name: arty.Name, Time: time.Now(), Type: EventTypeArtifactRemove}) + } + return &arty.Digest, nil } // Inspect an artifact in a local store. -func (as ArtifactStore) Inspect(ctx context.Context, asr ArtifactStoreReference) (*Artifact, error) { +func (as *ArtifactStore) Inspect(ctx context.Context, asr ArtifactStoreReference) (*Artifact, error) { as.lock.RLock() defer as.lock.Unlock() @@ -170,69 +199,92 @@ func (as ArtifactStore) Inspect(ctx context.Context, asr ArtifactStoreReference) } // List artifacts in the local store. -func (as ArtifactStore) List(ctx context.Context) (ArtifactList, error) { +func (as *ArtifactStore) List(ctx context.Context) (ArtifactList, error) { as.lock.RLock() defer as.lock.Unlock() return as.getArtifacts(ctx, nil) } -// Pull an artifact from an image registry to a local store. -func (as ArtifactStore) Pull(ctx context.Context, ref ArtifactReference, opts libimage.CopyOptions) (digest.Digest, error) { - srcRef, err := docker.NewReference(ref.ref) - if err != nil { - return "", err - } +// Execute fn while holding the store lock and providing a reference to the local layout. +func (as *ArtifactStore) withLockedLayout(localName string, fn func(localRef types.ImageReference) (digest.Digest, error)) (digest.Digest, error) { as.lock.Lock() defer as.lock.Unlock() - destRef, err := layout.NewReference(as.storePath, ref.String()) + ref, err := layout.NewReference(as.storePath, localName) if err != nil { return "", err } + + return fn(ref) +} + +// Copy artifact from srcRef to destRef and return the digest of the copied artifact. +func (as *ArtifactStore) copyArtifact(ctx context.Context, srcRef types.ImageReference, destRef types.ImageReference, opts libimage.CopyOptions) (_ digest.Digest, err error) { copyer, err := libimage.NewCopier(&opts, as.SystemContext) if err != nil { return "", err } + defer func() { + closeErr := copyer.Close() + if err == nil { + err = closeErr + } + }() artifactBytes, err := copyer.Copy(ctx, srcRef, destRef) if err != nil { return "", err } - err = copyer.Close() - if err != nil { - return "", err - } return digest.FromBytes(artifactBytes), nil } -// Push an artifact to an image registry. -func (as ArtifactStore) Push(ctx context.Context, src, dest ArtifactReference, opts libimage.CopyOptions) (digest.Digest, error) { - destRef, err := docker.NewReference(dest.ref) - if err != nil { - return "", err +// Pull an artifact from an image registry to a local store. +func (as *ArtifactStore) Pull(ctx context.Context, ref ArtifactReference, opts libimage.CopyOptions) (_ digest.Digest, pullErr error) { + if as.eventChannel != nil { + defer func() { + if pullErr != nil { + as.writeEvent(&Event{Name: ref.String(), Time: time.Now(), Type: EventTypeArtifactPullError, Error: pullErr}) + } + }() } - - as.lock.Lock() - defer as.lock.Unlock() - - srcRef, err := layout.NewReference(as.storePath, src.String()) + srcRef, err := docker.NewReference(ref.ref) if err != nil { return "", err } - copyer, err := libimage.NewCopier(&opts, as.SystemContext) + artifactDigest, err := as.withLockedLayout(ref.String(), func(localRef types.ImageReference) (digest.Digest, error) { + return as.copyArtifact(ctx, srcRef, localRef, opts) + }) if err != nil { return "", err } - artifactBytes, err := copyer.Copy(ctx, srcRef, destRef) + if as.eventChannel != nil { + as.writeEvent(&Event{ID: artifactDigest.String(), Name: ref.String(), Time: time.Now(), Type: EventTypeArtifactPull}) + } + return artifactDigest, nil +} + +// Push an artifact to an image registry. +func (as *ArtifactStore) Push(ctx context.Context, src, dest ArtifactReference, opts libimage.CopyOptions) (_ digest.Digest, pushErr error) { + if as.eventChannel != nil { + defer func() { + if pushErr != nil { + as.writeEvent(&Event{Name: dest.String(), Time: time.Now(), Type: EventTypeArtifactPushError, Error: pushErr}) + } + }() + } + destRef, err := docker.NewReference(dest.ref) if err != nil { return "", err } - - err = copyer.Close() + artifactDigest, err := as.withLockedLayout(src.String(), func(localRef types.ImageReference) (digest.Digest, error) { + return as.copyArtifact(ctx, localRef, destRef, opts) + }) if err != nil { return "", err } - artifactDigest := digest.FromBytes(artifactBytes) + if as.eventChannel != nil { + as.writeEvent(&Event{ID: artifactDigest.String(), Name: dest.String(), Time: time.Now(), Type: EventTypeArtifactPush}) + } return artifactDigest, nil } @@ -256,7 +308,7 @@ func createNewArtifactManifest(options *libartTypes.AddOptions) specV1.Manifest } // cleanupAfterAppend removes previous image when doing an append. -func cleanupAfterAppend(ctx context.Context, oldDigest digest.Digest, as ArtifactStore) error { +func cleanupAfterAppend(ctx context.Context, oldDigest digest.Digest, as *ArtifactStore) error { lrs, err := layout.List(as.storePath) if err != nil { return err @@ -279,7 +331,15 @@ func cleanupAfterAppend(ctx context.Context, oldDigest digest.Digest, as Artifac // Add takes one or more artifact blobs and add them to the local artifact store. The empty // string input is for possible custom artifact types. -func (as ArtifactStore) Add(ctx context.Context, dest ArtifactReference, artifactBlobs []libartTypes.ArtifactBlob, options *libartTypes.AddOptions) (*digest.Digest, error) { +func (as *ArtifactStore) Add(ctx context.Context, dest ArtifactReference, artifactBlobs []libartTypes.ArtifactBlob, options *libartTypes.AddOptions) (_ *digest.Digest, addErr error) { + if as.eventChannel != nil { + defer func() { + if addErr != nil { + as.writeEvent(&Event{Name: dest.String(), Time: time.Now(), Type: EventTypeArtifactAddError, Error: addErr}) + } + }() + } + if options.Append && len(options.ArtifactMIMEType) > 0 { return nil, errors.New("append option is not compatible with type option") } @@ -447,10 +507,14 @@ func (as ArtifactStore) Add(ctx context.Context, dest ArtifactReference, artifac return nil, err } } + + if as.eventChannel != nil { + as.writeEvent(&Event{ID: artifactManifestDigest.String(), Name: dest.String(), Time: time.Now(), Type: EventTypeArtifactAdd}) + } return &artifactManifestDigest, nil } -func getArtifactAndImageSource(ctx context.Context, as ArtifactStore, asr ArtifactStoreReference, options *libartTypes.FilterBlobOptions) (*Artifact, types.ImageSource, error) { +func getArtifactAndImageSource(ctx context.Context, as *ArtifactStore, asr ArtifactStoreReference, options *libartTypes.FilterBlobOptions) (*Artifact, types.ImageSource, error) { if len(options.Digest) > 0 && len(options.Title) > 0 { return nil, nil, errors.New("cannot specify both digest and title") } @@ -471,7 +535,7 @@ func getArtifactAndImageSource(ctx context.Context, as ArtifactStore, asr Artifa } // BlobMountPaths allows the caller to access the file names from the store and how they should be mounted. -func (as ArtifactStore) BlobMountPaths(ctx context.Context, asr ArtifactStoreReference, options *libartTypes.BlobMountPathOptions) ([]libartTypes.BlobMountPath, error) { +func (as *ArtifactStore) BlobMountPaths(ctx context.Context, asr ArtifactStoreReference, options *libartTypes.BlobMountPathOptions) ([]libartTypes.BlobMountPath, error) { // FIX ME // LOCKING BUG: getArtifactAndImageSource assumes a locked ArtifactStore arty, imgSrc, err := getArtifactAndImageSource(ctx, as, asr, &options.FilterBlobOptions) @@ -530,7 +594,7 @@ func (as ArtifactStore) BlobMountPaths(ctx context.Context, asr ArtifactStoreRef } // Extract an artifact to local file or directory. -func (as ArtifactStore) Extract(ctx context.Context, nameOrDigest ArtifactStoreReference, target string, options *libartTypes.ExtractOptions) error { +func (as *ArtifactStore) Extract(ctx context.Context, nameOrDigest ArtifactStoreReference, target string, options *libartTypes.ExtractOptions) error { // FIX ME // LOCKING BUG: getArtifactAndImageSource assumes a locked ArtifactStore arty, imgSrc, err := getArtifactAndImageSource(ctx, as, nameOrDigest, &options.FilterBlobOptions) @@ -599,7 +663,7 @@ func (as ArtifactStore) Extract(ctx context.Context, nameOrDigest ArtifactStoreR } // Extract an artifact to tar stream. -func (as ArtifactStore) ExtractTarStream(ctx context.Context, w io.Writer, asr ArtifactStoreReference, options *libartTypes.ExtractOptions) error { +func (as *ArtifactStore) ExtractTarStream(ctx context.Context, w io.Writer, asr ArtifactStoreReference, options *libartTypes.ExtractOptions) error { if options == nil { options = &libartTypes.ExtractOptions{} } @@ -798,7 +862,7 @@ func copyTrustedImageBlobToTarStream(ctx context.Context, imgSrc types.ImageSour return nil } -func (as ArtifactStore) createEmptyManifest() error { +func (as *ArtifactStore) createEmptyManifest() error { as.lock.Lock() defer as.lock.Unlock() index := specV1.Index{ @@ -813,13 +877,13 @@ func (as ArtifactStore) createEmptyManifest() error { return os.WriteFile(as.indexPath(), rawData, 0o644) } -func (as ArtifactStore) indexPath() string { +func (as *ArtifactStore) indexPath() string { return filepath.Join(as.storePath, specV1.ImageIndexFile) } // getArtifacts returns an ArtifactList based on the artifact's store. The return error and // unused opts is meant for future growth like filters, etc so the API does not change. -func (as ArtifactStore) getArtifacts(ctx context.Context, _ *libartTypes.GetArtifactOptions) (ArtifactList, error) { +func (as *ArtifactStore) getArtifacts(ctx context.Context, _ *libartTypes.GetArtifactOptions) (ArtifactList, error) { var al ArtifactList lrs, err := layout.List(as.storePath) diff --git a/common/pkg/libartifact/store_test.go b/common/pkg/libartifact/store_test.go index cd77639f1b..c84ff5ef02 100644 --- a/common/pkg/libartifact/store_test.go +++ b/common/pkg/libartifact/store_test.go @@ -5,6 +5,7 @@ import ( "context" "crypto/rand" "crypto/sha256" + "errors" "io" "os" "path/filepath" @@ -15,7 +16,9 @@ import ( specV1 "github.com/opencontainers/image-spec/specs-go/v1" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.podman.io/common/libimage" libartTypes "go.podman.io/common/pkg/libartifact/types" + "go.podman.io/image/v5/oci/layout" "go.podman.io/image/v5/types" ) @@ -877,3 +880,184 @@ func TestDetermineBlobMIMEType(t *testing.T) { }) } } + +func TestArtifactStore_copyArtifact(t *testing.T) { + // Setup source and destination stores + asSrc, ctx := setupTestStore(t) + asDest, _ := setupTestStore(t) + + // Add an artifact to the source store + fileNames := map[string]int{"file1.txt": 128} + refName := "quay.io/test/copy-artifact:v1" + originalDigest, _ := helperAddArtifact(t, asSrc, refName, fileNames, nil) + require.NotEmpty(t, originalDigest.String()) + + // Create references for source and destination + srcRef, err := layout.NewReference(asSrc.storePath, refName) + require.NoError(t, err) + destRef, err := layout.NewReference(asDest.storePath, refName) + require.NoError(t, err) + + // Copy the artifact + opts := libimage.CopyOptions{} + copiedDigest, err := asSrc.copyArtifact(ctx, srcRef, destRef, opts) + require.NoError(t, err) + + // The manifest digest should be the same + assert.Equal(t, originalDigest.String(), copiedDigest.String()) + + // Verify the artifact exists in the destination store + artifacts, err := asDest.List(ctx) + require.NoError(t, err) + require.Len(t, artifacts, 1) + + copiedArtifact := artifacts[0] + assert.Equal(t, refName, copiedArtifact.Name) + assert.Equal(t, originalDigest.String(), copiedArtifact.Digest.String()) + assert.Len(t, copiedArtifact.Manifest.Layers, 1) + assert.Equal(t, "file1.txt", copiedArtifact.Manifest.Layers[0].Annotations[specV1.AnnotationTitle]) +} + +func TestArtifactStore_withLockedLayout(t *testing.T) { + as, _ := setupTestStore(t) + localName := "quay.io/test/locked-layout:v1" + + t.Run("successful execution", func(t *testing.T) { + var fnCalled bool + expectedDigest := digest.FromString("test-digest") + fn := func(localRef types.ImageReference) (digest.Digest, error) { + fnCalled = true + require.NotNil(t, localRef) + imageName := strings.TrimPrefix(localRef.StringWithinTransport(), as.storePath+":") + assert.Equal(t, localName, imageName) + return expectedDigest, nil + } + + d, err := as.withLockedLayout(localName, fn) + require.NoError(t, err) + assert.True(t, fnCalled, "fn should have been called") + assert.Equal(t, expectedDigest, d) + }) + + t.Run("error from callback", func(t *testing.T) { + var fnCalled bool + expectedErr := errors.New("test-error") + fn := func(localRef types.ImageReference) (digest.Digest, error) { + fnCalled = true + require.NotNil(t, localRef) + imageName := strings.TrimPrefix(localRef.StringWithinTransport(), as.storePath+":") + assert.Equal(t, localName, imageName) + return "", expectedErr + } + + d, err := as.withLockedLayout(localName, fn) + require.Error(t, err) + assert.ErrorIs(t, err, expectedErr) + assert.True(t, fnCalled, "fn should have been called") + assert.Empty(t, d) + }) + + t.Run("error creating reference", func(t *testing.T) { + invalidName := "invalid'image!value@" + fn := func(localRef types.ImageReference) (digest.Digest, error) { + t.Fatal("callback should not be called on reference creation error") + return "", nil + } + + d, err := as.withLockedLayout(invalidName, fn) + require.Error(t, err) + assert.Empty(t, d) + }) +} + +func TestArtifactStore_EventChannel(t *testing.T) { + as, _ := setupTestStore(t) + ch := as.EventChannel() + require.NotNil(t, ch) + + t.Run("AddSuccessAndAddError", func(t *testing.T) { + // AddSuccess + refName := "quay.io/test/event-add:v1" + fileNames := map[string]int{"add.txt": 64} + + digest, _ := helperAddArtifact(t, as, refName, fileNames, nil) + + event := <-as.eventChannel + require.NotNil(t, event) + assert.Equal(t, EventTypeArtifactAdd, event.Type) + assert.Equal(t, refName, event.Name) + assert.Equal(t, digest.String(), event.ID) + assert.NoError(t, event.Error) + + // AddError + ref, err := NewArtifactReference(refName) // Same name to cause conflict + require.NoError(t, err) + blob, _ := createTestBlob(t, "add-error.txt", 32) + _, err = as.Add(context.TODO(), ref, []libartTypes.ArtifactBlob{blob}, &libartTypes.AddOptions{ArtifactMIMEType: "test"}) + require.Error(t, err) + assert.ErrorIs(t, err, libartTypes.ErrArtifactAlreadyExists) + + event = <-as.eventChannel + require.NotNil(t, event) + assert.Equal(t, EventTypeArtifactAddError, event.Type) + assert.Equal(t, refName, event.Name) + assert.ErrorIs(t, event.Error, libartTypes.ErrArtifactAlreadyExists) + }) + + t.Run("RemoveSuccess", func(t *testing.T) { + refName := "quay.io/test/event-remove:v1" + fileNames := map[string]int{"remove.txt": 32} + digest, _ := helperAddArtifact(t, as, refName, fileNames, nil) + <-as.eventChannel // consume AddEvent + + asr, err := NewArtifactStorageReference(refName) + require.NoError(t, err) + + removedDigest, err := as.Remove(context.TODO(), asr) + assert.Equal(t, digest, removedDigest) + require.NoError(t, err) + + event := <-as.eventChannel + require.NotNil(t, event) + assert.Equal(t, EventTypeArtifactRemove, event.Type) + assert.Equal(t, refName, event.Name) + assert.Equal(t, digest.String(), event.ID) + assert.NoError(t, event.Error) + }) + + t.Run("PushError", func(t *testing.T) { + srcRefName := "quay.io/test/event-push:v1" + fileNames := map[string]int{"push.txt": 64} + helperAddArtifact(t, as, srcRefName, fileNames, nil) + <-as.eventChannel // consume AddEvent + srcRef, _ := NewArtifactReference(srcRefName) + + refName := "invalid-registry.invalid/test/push-error:v1" + pushRef, err := NewArtifactReference(refName) + require.NoError(t, err) + + _, err = as.Push(context.TODO(), srcRef, pushRef, libimage.CopyOptions{}) + require.Error(t, err) + + event := <-as.eventChannel + require.NotNil(t, event) + assert.Equal(t, EventTypeArtifactPushError, event.Type) + assert.Equal(t, refName, event.Name) + assert.Error(t, event.Error) + }) + + t.Run("PullError", func(t *testing.T) { + refName := "invalid-registry.invalid/test/pull-error:v1" + pullRef, err := NewArtifactReference(refName) + require.NoError(t, err) + + _, err = as.Pull(context.TODO(), pullRef, libimage.CopyOptions{}) + require.Error(t, err) + + event := <-as.eventChannel + require.NotNil(t, event) + assert.Equal(t, EventTypeArtifactPullError, event.Type) + assert.Equal(t, refName, event.Name) + assert.Error(t, event.Error) + }) +}