Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions common/pkg/libartifact/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
//go:build !remote

package libartifact

import (
"time"

"github.com/sirupsen/logrus"
)

// EventType indicates the type of an event.
type EventType int

const (
// EventTypeUnknown is an uninitialized EventType.
EventTypeUnknown EventType = iota
// EventTypeArtifactPull represents an artifact pull.
EventTypeArtifactPull
// EventTypeArtifactPullError represents a failed artifact pull.
EventTypeArtifactPullError
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this new? Do we report errors as events?

Copy link
Copy Markdown
Author

@nimdrak nimdrak Apr 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, some of these are new.
Our podman/libpod and container-libs/common/libimage only handles PullError

I thought it would be beneficial to notify clients about other errors as well, such as AddError, RemoveError, and PushError. And I also add

However, if there's a specific reason why we only track PullError, I am completely open to removing these new additions.

What do you think?

Copy link
Copy Markdown
Author

@nimdrak nimdrak Apr 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While I introduced AddEvent, I intentionally left out other events like Extract and Inspect. These are new too, but I only added AddEvent since it's the only one that actually modifies the artifact store.

// EventTypeArtifactPush represents an artifact push.
EventTypeArtifactPush
// EventTypeArtifactPushError represents a failed artifact push.
EventTypeArtifactPushError
// EventTypeArtifactRemove represents an artifact removal.
EventTypeArtifactRemove
// EventTypeArtifactRemoveError represents a failed artifact removal.
EventTypeArtifactRemoveError
// EventTypeArtifactAdd represents an artifact being added.
EventTypeArtifactAdd
// EventTypeArtifactAddError represents a failed artifact add.
EventTypeArtifactAddError
)

// Event represents an event such as an artifact pull or push.
type Event struct {
// ID of the object (e.g., artifact digest).
ID string
// Name of the object (e.g., artifact name "quay.io/foobar/artifact:special")
Name string
// Time of the event.
Time time.Time
// Type of the event.
Type EventType
// Error in case of failure.
Error error
}

// writeEvent writes the specified event to the store's event channel. The
// event is discarded if no event channel has been registered (yet).
func (as *ArtifactStore) writeEvent(event *Event) {
select {
case as.eventChannel <- event:
// Done
case <-time.After(2 * time.Second):
// The store's event channel has a buffer of size 100 which
// should be enough even under high load. However, we
// shouldn't block too long in case the buffer runs full (could
// be an honest user error or bug).
logrus.Warnf("Discarding libartifact event which was not read within 2 seconds: %v", event)
}
}
144 changes: 104 additions & 40 deletions common/pkg/libartifact/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type ArtifactStore struct {
SystemContext *types.SystemContext
storePath string
lock *lockfile.LockFile
eventChannel chan *Event
}

// NewArtifactStore is a constructor for artifact stores. Most artifact dealings depend on this. Store path is
Expand Down Expand Up @@ -81,10 +82,22 @@ func NewArtifactStore(storePath string, sc *types.SystemContext) (*ArtifactStore
return artifactStore, nil
}

// EventChannel creates a buffered channel for events that the ArtifactStore will use
// to write events to. Callers are expected to read from the channel in a
// timely manner.
// Can be called once for a given ArtifactStore.
func (as *ArtifactStore) EventChannel() chan *Event {
if as.eventChannel != nil {
return as.eventChannel
}
as.eventChannel = make(chan *Event, 100)
return as.eventChannel
}

// lookupArtifactLocked looks up an artifact by fully qualified name,
// or name@digest, full ID, or partial ID.
// note: lookupArtifactLocked must be called while under a store lock
func (as ArtifactStore) lookupArtifactLocked(ctx context.Context, asr ArtifactStoreReference) (*Artifact, error) {
func (as *ArtifactStore) lookupArtifactLocked(ctx context.Context, asr ArtifactStoreReference) (*Artifact, error) {
artifacts, err := as.getArtifacts(ctx, nil)
if err != nil {
return nil, err
Expand Down Expand Up @@ -146,93 +159,132 @@ func (as ArtifactStore) lookupArtifactLocked(ctx context.Context, asr ArtifactSt
}

// Remove an artifact from the local artifact store.
func (as ArtifactStore) Remove(ctx context.Context, asr ArtifactStoreReference) (*digest.Digest, error) {
func (as *ArtifactStore) Remove(ctx context.Context, asr ArtifactStoreReference) (_ *digest.Digest, removeErr error) {
as.lock.Lock()
defer as.lock.Unlock()

arty, err := as.lookupArtifactLocked(ctx, asr)
if err != nil {
return nil, err
}

if as.eventChannel != nil {
defer func() {
if removeErr != nil {
as.writeEvent(&Event{ID: arty.Digest.String(), Name: arty.Name, Time: time.Now(), Type: EventTypeArtifactRemoveError, Error: removeErr})
}
}()
}

ir, err := layout.NewReference(as.storePath, arty.Name)
if err != nil {
return nil, err
}
return &arty.Digest, ir.DeleteImage(ctx, as.SystemContext)
if err := ir.DeleteImage(ctx, as.SystemContext); err != nil {
return nil, err
}

if as.eventChannel != nil {
as.writeEvent(&Event{ID: arty.Digest.String(), Name: arty.Name, Time: time.Now(), Type: EventTypeArtifactRemove})
}
return &arty.Digest, nil
}

// Inspect an artifact in a local store.
func (as ArtifactStore) Inspect(ctx context.Context, asr ArtifactStoreReference) (*Artifact, error) {
func (as *ArtifactStore) Inspect(ctx context.Context, asr ArtifactStoreReference) (*Artifact, error) {
as.lock.RLock()
defer as.lock.Unlock()

return as.lookupArtifactLocked(ctx, asr)
}

// List artifacts in the local store.
func (as ArtifactStore) List(ctx context.Context) (ArtifactList, error) {
func (as *ArtifactStore) List(ctx context.Context) (ArtifactList, error) {
as.lock.RLock()
defer as.lock.Unlock()

return as.getArtifacts(ctx, nil)
}

// Pull an artifact from an image registry to a local store.
func (as ArtifactStore) Pull(ctx context.Context, ref ArtifactReference, opts libimage.CopyOptions) (digest.Digest, error) {
srcRef, err := docker.NewReference(ref.ref)
if err != nil {
return "", err
}
// Execute fn while holding the store lock and providing a reference to the local layout.
func (as *ArtifactStore) withLockedLayout(localName string, fn func(localRef types.ImageReference) (digest.Digest, error)) (digest.Digest, error) {
as.lock.Lock()
defer as.lock.Unlock()

destRef, err := layout.NewReference(as.storePath, ref.String())
ref, err := layout.NewReference(as.storePath, localName)
if err != nil {
return "", err
}

return fn(ref)
}

// Copy artifact from srcRef to destRef and return the digest of the copied artifact.
func (as *ArtifactStore) copyArtifact(ctx context.Context, srcRef types.ImageReference, destRef types.ImageReference, opts libimage.CopyOptions) (_ digest.Digest, err error) {
copyer, err := libimage.NewCopier(&opts, as.SystemContext)
if err != nil {
return "", err
}
defer func() {
closeErr := copyer.Close()
if err == nil {
err = closeErr
}
}()
artifactBytes, err := copyer.Copy(ctx, srcRef, destRef)
if err != nil {
return "", err
}
err = copyer.Close()
if err != nil {
return "", err
}
return digest.FromBytes(artifactBytes), nil
}

// Push an artifact to an image registry.
func (as ArtifactStore) Push(ctx context.Context, src, dest ArtifactReference, opts libimage.CopyOptions) (digest.Digest, error) {
destRef, err := docker.NewReference(dest.ref)
if err != nil {
return "", err
// Pull an artifact from an image registry to a local store.
func (as *ArtifactStore) Pull(ctx context.Context, ref ArtifactReference, opts libimage.CopyOptions) (_ digest.Digest, pullErr error) {
if as.eventChannel != nil {
defer func() {
if pullErr != nil {
as.writeEvent(&Event{Name: ref.String(), Time: time.Now(), Type: EventTypeArtifactPullError, Error: pullErr})
}
}()
}

as.lock.Lock()
defer as.lock.Unlock()

srcRef, err := layout.NewReference(as.storePath, src.String())
srcRef, err := docker.NewReference(ref.ref)
if err != nil {
return "", err
}
copyer, err := libimage.NewCopier(&opts, as.SystemContext)
artifactDigest, err := as.withLockedLayout(ref.String(), func(localRef types.ImageReference) (digest.Digest, error) {
return as.copyArtifact(ctx, srcRef, localRef, opts)
})
if err != nil {
return "", err
}
artifactBytes, err := copyer.Copy(ctx, srcRef, destRef)
if as.eventChannel != nil {
as.writeEvent(&Event{ID: artifactDigest.String(), Name: ref.String(), Time: time.Now(), Type: EventTypeArtifactPull})
}
return artifactDigest, nil
}

// Push an artifact to an image registry.
func (as *ArtifactStore) Push(ctx context.Context, src, dest ArtifactReference, opts libimage.CopyOptions) (_ digest.Digest, pushErr error) {
if as.eventChannel != nil {
defer func() {
if pushErr != nil {
as.writeEvent(&Event{Name: dest.String(), Time: time.Now(), Type: EventTypeArtifactPushError, Error: pushErr})
}
}()
}
destRef, err := docker.NewReference(dest.ref)
if err != nil {
return "", err
}

err = copyer.Close()
artifactDigest, err := as.withLockedLayout(src.String(), func(localRef types.ImageReference) (digest.Digest, error) {
return as.copyArtifact(ctx, localRef, destRef, opts)
})
if err != nil {
return "", err
}
artifactDigest := digest.FromBytes(artifactBytes)
if as.eventChannel != nil {
as.writeEvent(&Event{ID: artifactDigest.String(), Name: dest.String(), Time: time.Now(), Type: EventTypeArtifactPush})
}
return artifactDigest, nil
}

Expand All @@ -256,7 +308,7 @@ func createNewArtifactManifest(options *libartTypes.AddOptions) specV1.Manifest
}

// cleanupAfterAppend removes previous image when doing an append.
func cleanupAfterAppend(ctx context.Context, oldDigest digest.Digest, as ArtifactStore) error {
func cleanupAfterAppend(ctx context.Context, oldDigest digest.Digest, as *ArtifactStore) error {
lrs, err := layout.List(as.storePath)
if err != nil {
return err
Expand All @@ -279,7 +331,15 @@ func cleanupAfterAppend(ctx context.Context, oldDigest digest.Digest, as Artifac

// Add takes one or more artifact blobs and add them to the local artifact store. The empty
// string input is for possible custom artifact types.
func (as ArtifactStore) Add(ctx context.Context, dest ArtifactReference, artifactBlobs []libartTypes.ArtifactBlob, options *libartTypes.AddOptions) (*digest.Digest, error) {
func (as *ArtifactStore) Add(ctx context.Context, dest ArtifactReference, artifactBlobs []libartTypes.ArtifactBlob, options *libartTypes.AddOptions) (_ *digest.Digest, addErr error) {
if as.eventChannel != nil {
defer func() {
if addErr != nil {
as.writeEvent(&Event{Name: dest.String(), Time: time.Now(), Type: EventTypeArtifactAddError, Error: addErr})
}
}()
}

if options.Append && len(options.ArtifactMIMEType) > 0 {
return nil, errors.New("append option is not compatible with type option")
}
Expand Down Expand Up @@ -447,10 +507,14 @@ func (as ArtifactStore) Add(ctx context.Context, dest ArtifactReference, artifac
return nil, err
}
}

if as.eventChannel != nil {
as.writeEvent(&Event{ID: artifactManifestDigest.String(), Name: dest.String(), Time: time.Now(), Type: EventTypeArtifactAdd})
}
return &artifactManifestDigest, nil
}

func getArtifactAndImageSource(ctx context.Context, as ArtifactStore, asr ArtifactStoreReference, options *libartTypes.FilterBlobOptions) (*Artifact, types.ImageSource, error) {
func getArtifactAndImageSource(ctx context.Context, as *ArtifactStore, asr ArtifactStoreReference, options *libartTypes.FilterBlobOptions) (*Artifact, types.ImageSource, error) {
if len(options.Digest) > 0 && len(options.Title) > 0 {
return nil, nil, errors.New("cannot specify both digest and title")
}
Expand All @@ -471,7 +535,7 @@ func getArtifactAndImageSource(ctx context.Context, as ArtifactStore, asr Artifa
}

// BlobMountPaths allows the caller to access the file names from the store and how they should be mounted.
func (as ArtifactStore) BlobMountPaths(ctx context.Context, asr ArtifactStoreReference, options *libartTypes.BlobMountPathOptions) ([]libartTypes.BlobMountPath, error) {
func (as *ArtifactStore) BlobMountPaths(ctx context.Context, asr ArtifactStoreReference, options *libartTypes.BlobMountPathOptions) ([]libartTypes.BlobMountPath, error) {
// FIX ME
// LOCKING BUG: getArtifactAndImageSource assumes a locked ArtifactStore
arty, imgSrc, err := getArtifactAndImageSource(ctx, as, asr, &options.FilterBlobOptions)
Expand Down Expand Up @@ -530,7 +594,7 @@ func (as ArtifactStore) BlobMountPaths(ctx context.Context, asr ArtifactStoreRef
}

// Extract an artifact to local file or directory.
func (as ArtifactStore) Extract(ctx context.Context, nameOrDigest ArtifactStoreReference, target string, options *libartTypes.ExtractOptions) error {
func (as *ArtifactStore) Extract(ctx context.Context, nameOrDigest ArtifactStoreReference, target string, options *libartTypes.ExtractOptions) error {
// FIX ME
// LOCKING BUG: getArtifactAndImageSource assumes a locked ArtifactStore
arty, imgSrc, err := getArtifactAndImageSource(ctx, as, nameOrDigest, &options.FilterBlobOptions)
Expand Down Expand Up @@ -599,7 +663,7 @@ func (as ArtifactStore) Extract(ctx context.Context, nameOrDigest ArtifactStoreR
}

// Extract an artifact to tar stream.
func (as ArtifactStore) ExtractTarStream(ctx context.Context, w io.Writer, asr ArtifactStoreReference, options *libartTypes.ExtractOptions) error {
func (as *ArtifactStore) ExtractTarStream(ctx context.Context, w io.Writer, asr ArtifactStoreReference, options *libartTypes.ExtractOptions) error {
if options == nil {
options = &libartTypes.ExtractOptions{}
}
Expand Down Expand Up @@ -798,7 +862,7 @@ func copyTrustedImageBlobToTarStream(ctx context.Context, imgSrc types.ImageSour
return nil
}

func (as ArtifactStore) createEmptyManifest() error {
func (as *ArtifactStore) createEmptyManifest() error {
as.lock.Lock()
defer as.lock.Unlock()
index := specV1.Index{
Expand All @@ -813,13 +877,13 @@ func (as ArtifactStore) createEmptyManifest() error {
return os.WriteFile(as.indexPath(), rawData, 0o644)
}

func (as ArtifactStore) indexPath() string {
func (as *ArtifactStore) indexPath() string {
return filepath.Join(as.storePath, specV1.ImageIndexFile)
}

// getArtifacts returns an ArtifactList based on the artifact's store. The return error and
// unused opts is meant for future growth like filters, etc so the API does not change.
func (as ArtifactStore) getArtifacts(ctx context.Context, _ *libartTypes.GetArtifactOptions) (ArtifactList, error) {
func (as *ArtifactStore) getArtifacts(ctx context.Context, _ *libartTypes.GetArtifactOptions) (ArtifactList, error) {
var al ArtifactList

lrs, err := layout.List(as.storePath)
Expand Down
Loading
Loading