Skip to content

Commit a7806a8

Browse files
giuseppeclaude
andcommitted
json-proxy: add GetSplitFDStreamSocket endpoint
Add a local splitFDStreamStore interface with a single method SplitFDStreamSocket() that returns a socket FD. The client then speaks the jsonrpc-fdpass-go protocol directly over that socket for all splitfdstream operations (GetSplitFDStream, ApplySplitFDStream, etc.). The json-proxy does not interpret the splitfdstream protocol itself — it merely brokers the socket to the client. The store is optional: when not configured via WithSplitFDStreamStore, calling GetSplitFDStreamSocket returns an error; all other proxy APIs remain fully functional. Using a local interface instead of importing storage.SplitFDStreamStore directly avoids adding a hard dependency on go.podman.io/storage for consumers (like skopeo) that do not use splitfdstream. Bump protocolVersion to "0.2.9". Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> Signed-off-by: Giuseppe Scrivano <gscrivan@redhat.com>
1 parent b9f6e5e commit a7806a8

File tree

5 files changed

+252
-11
lines changed

5 files changed

+252
-11
lines changed

common/cmd/json-proxy-test-server/main.go

Lines changed: 81 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,29 +4,59 @@ package main
44

55
import (
66
"context"
7+
"flag"
78
"fmt"
89
"os"
9-
"strconv"
1010

11-
"go.podman.io/common/pkg/json-proxy"
11+
imgcopy "go.podman.io/image/v5/copy"
1212
"go.podman.io/image/v5/signature"
13+
istorage "go.podman.io/image/v5/storage"
14+
"go.podman.io/image/v5/transports/alltransports"
1315
"go.podman.io/image/v5/types"
16+
"go.podman.io/storage"
17+
"go.podman.io/storage/pkg/reexec"
18+
storagetypes "go.podman.io/storage/types"
19+
20+
jsonproxy "go.podman.io/common/pkg/json-proxy"
1421
)
1522

1623
func main() {
24+
if reexec.Init() {
25+
return
26+
}
1727
if err := run(); err != nil {
1828
fmt.Fprintf(os.Stderr, "error: %v\n", err)
1929
os.Exit(1)
2030
}
2131
}
2232

2333
func run() error {
24-
if len(os.Args) < 3 || os.Args[1] != "--sockfd" {
25-
return fmt.Errorf("usage: %s --sockfd <fd>", os.Args[0])
34+
var (
35+
sockfd int
36+
graphRoot string
37+
runRoot string
38+
seedImage string
39+
)
40+
flag.IntVar(&sockfd, "sockfd", -1, "socket file descriptor")
41+
flag.StringVar(&graphRoot, "graph-root", "", "storage graph root")
42+
flag.StringVar(&runRoot, "run-root", "", "storage run root")
43+
flag.StringVar(&seedImage, "seed-image", "", "image to copy into local store")
44+
flag.Parse()
45+
46+
if sockfd < 0 {
47+
return fmt.Errorf("usage: %s --sockfd <fd> [--graph-root <path> --run-root <path> --seed-image <ref>]", os.Args[0])
2648
}
27-
sockfd, err := strconv.Atoi(os.Args[2])
28-
if err != nil {
29-
return fmt.Errorf("invalid sockfd: %v", err)
49+
50+
if graphRoot != "" {
51+
ref, store, err := setupStore(graphRoot, runRoot, seedImage)
52+
if err != nil {
53+
return fmt.Errorf("setting up store: %w", err)
54+
}
55+
defer func() {
56+
_, _ = store.Shutdown(true)
57+
}()
58+
// Print the containers-storage:// reference for the test to read.
59+
fmt.Fprintln(os.Stdout, ref)
3060
}
3161

3262
manager, err := jsonproxy.NewManager(
@@ -47,3 +77,47 @@ func run() error {
4777
defer manager.Close()
4878
return manager.Serve(context.Background(), sockfd)
4979
}
80+
81+
func setupStore(graphRoot, runRoot, seedImage string) (string, storage.Store, error) {
82+
store, err := storage.GetStore(storagetypes.StoreOptions{
83+
GraphRoot: graphRoot,
84+
RunRoot: runRoot,
85+
GraphDriverName: "overlay",
86+
})
87+
if err != nil {
88+
return "", nil, fmt.Errorf("creating store: %w", err)
89+
}
90+
91+
ctx := context.Background()
92+
93+
srcRef, err := alltransports.ParseImageName(seedImage)
94+
if err != nil {
95+
return "", nil, fmt.Errorf("parsing seed image %q: %w", seedImage, err)
96+
}
97+
98+
destRef, err := istorage.Transport.ParseStoreReference(store, "testimage:latest")
99+
if err != nil {
100+
return "", nil, fmt.Errorf("creating store reference: %w", err)
101+
}
102+
103+
policy, err := signature.DefaultPolicy(nil)
104+
if err != nil {
105+
return "", nil, fmt.Errorf("getting default policy: %w", err)
106+
}
107+
pc, err := signature.NewPolicyContext(policy)
108+
if err != nil {
109+
return "", nil, fmt.Errorf("creating policy context: %w", err)
110+
}
111+
defer func() {
112+
if err := pc.Destroy(); err != nil {
113+
fmt.Fprintf(os.Stderr, "warning: destroying policy context: %v\n", err)
114+
}
115+
}()
116+
117+
_, err = imgcopy.Image(ctx, pc, destRef, srcRef, nil)
118+
if err != nil {
119+
return "", nil, fmt.Errorf("copying seed image: %w", err)
120+
}
121+
122+
return "containers-storage:" + destRef.StringWithinTransport(), store, nil
123+
}

common/pkg/json-proxy/handler.go

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,10 @@ type handler struct {
2828
lock sync.Mutex
2929

3030
// Dependency injection functions.
31-
getSystemContext func() (*types.SystemContext, error)
32-
getPolicyContext func() (*signature.PolicyContext, error)
33-
logger logrus.FieldLogger
31+
getSystemContext func() (*types.SystemContext, error)
32+
getPolicyContext func() (*signature.PolicyContext, error)
33+
splitFDStreamStore splitFDStreamStore
34+
logger logrus.FieldLogger
3435

3536
// Internal state.
3637
sysctx *types.SystemContext
@@ -127,6 +128,12 @@ func (h *handler) openImageImpl(ctx context.Context, args []any, allowNotFound b
127128
return ret, err
128129
}
129130

131+
if h.splitFDStreamStore == nil {
132+
if sfds, ok := imgsrc.(splitFDStreamStore); ok {
133+
h.splitFDStreamStore = sfds
134+
}
135+
}
136+
130137
policyContext, err := h.getPolicyContext()
131138
if err != nil {
132139
return ret, err
@@ -707,6 +714,28 @@ func (h *handler) FinishPipe(ctx context.Context, args []any) (replyBuf, error)
707714
return ret, err
708715
}
709716

717+
// GetSplitFDStreamSocket returns a socket FD over which the client can
718+
// speak the jsonrpc-fdpass-go protocol for splitfdstream operations.
719+
// The json-proxy does not interpret the protocol; it just brokers the socket.
720+
func (h *handler) GetSplitFDStreamSocket(ctx context.Context, args []any) (replyBuf, error) {
721+
var ret replyBuf
722+
723+
if h.splitFDStreamStore == nil {
724+
return ret, errors.New("splitfdstream store not configured")
725+
}
726+
if len(args) != 0 {
727+
return ret, fmt.Errorf("found %d args, expecting none", len(args))
728+
}
729+
730+
sockFile, err := h.splitFDStreamStore.SplitFDStreamSocket()
731+
if err != nil {
732+
return ret, err
733+
}
734+
735+
ret.fd = sockFile
736+
return ret, nil
737+
}
738+
710739
// processRequest dispatches a remote request.
711740
// replyBuf is the result of the invocation.
712741
// terminate should be true if processing of requests should halt.
@@ -746,6 +775,8 @@ func (h *handler) processRequest(ctx context.Context, readBytes []byte) (rb repl
746775
rb, err = h.GetLayerInfoPiped(ctx, req.Args)
747776
case "FinishPipe":
748777
rb, err = h.FinishPipe(ctx, req.Args)
778+
case "GetSplitFDStreamSocket":
779+
rb, err = h.GetSplitFDStreamSocket(ctx, req.Args)
749780
case "Shutdown":
750781
terminate = true
751782
// NOTE: If you add a method here, you should very likely be bumping the

common/pkg/json-proxy/proxy.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,20 @@
44
package jsonproxy
55

66
import (
7+
"os"
8+
79
"github.com/sirupsen/logrus"
810
"go.podman.io/image/v5/signature"
911
"go.podman.io/image/v5/types"
1012
)
1113

14+
// splitFDStreamStore is the subset of storage.SplitFDStreamStore needed
15+
// by the json-proxy. Keeping a local interface avoids a hard dependency
16+
// on go.podman.io/storage for consumers that do not use splitfdstream.
17+
type splitFDStreamStore interface {
18+
SplitFDStreamSocket() (*os.File, error)
19+
}
20+
1221
// options holds the internal configuration for a Manager.
1322
type options struct {
1423
getSystemContext func() (*types.SystemContext, error)

common/pkg/json-proxy/proxy_test.go

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,14 @@
33
package jsonproxy_test
44

55
import (
6+
"bufio"
67
"encoding/json"
78
"fmt"
89
"io"
910
"net"
1011
"os"
1112
"os/exec"
13+
"path/filepath"
1214
"strings"
1315
"sync"
1416
"syscall"
@@ -508,3 +510,128 @@ func TestProxyGetBlob(t *testing.T) {
508510
}
509511
assert.NoError(t, err)
510512
}
513+
514+
// newProxyWithStore spawns the test binary with a local containers-storage
515+
// store seeded with the given image. It returns the proxy and the
516+
// containers-storage:// reference string for the seeded image.
517+
func newProxyWithStore(t *testing.T, seedImage string) (*proxy, string) {
518+
t.Helper()
519+
520+
proxyBinary := os.Getenv("JSON_PROXY_TEST_BINARY")
521+
if proxyBinary == "" {
522+
t.Skip("JSON_PROXY_TEST_BINARY is not set; skipping integration test")
523+
}
524+
525+
wd := t.TempDir()
526+
graphRoot := filepath.Join(wd, "root")
527+
runRoot := filepath.Join(wd, "run")
528+
529+
fds, err := syscall.Socketpair(syscall.AF_LOCAL, syscall.SOCK_SEQPACKET, 0)
530+
require.NoError(t, err)
531+
myfd := os.NewFile(uintptr(fds[0]), "myfd")
532+
defer myfd.Close()
533+
theirfd := os.NewFile(uintptr(fds[1]), "theirfd")
534+
defer theirfd.Close()
535+
536+
mysock, err := net.FileConn(myfd)
537+
require.NoError(t, err)
538+
unixConn, ok := mysock.(*net.UnixConn)
539+
require.True(t, ok, "expected *net.UnixConn, got %T", mysock)
540+
541+
proc := exec.Command(proxyBinary, //nolint:gosec
542+
"--sockfd", "3",
543+
"--graph-root", graphRoot,
544+
"--run-root", runRoot,
545+
"--seed-image", seedImage,
546+
)
547+
proc.Stderr = os.Stderr
548+
proc.ExtraFiles = append(proc.ExtraFiles, theirfd)
549+
550+
stdoutPipe, err := proc.StdoutPipe()
551+
require.NoError(t, err)
552+
553+
err = proc.Start()
554+
require.NoError(t, err)
555+
556+
// Read the containers-storage reference from stdout.
557+
scanner := bufio.NewScanner(stdoutPipe)
558+
require.True(t, scanner.Scan(), "expected storage reference on stdout")
559+
storageRef := strings.TrimSpace(scanner.Text())
560+
require.True(t, strings.HasPrefix(storageRef, "containers-storage:"), "unexpected ref: %s", storageRef)
561+
562+
p := &proxy{
563+
c: unixConn,
564+
proc: proc,
565+
}
566+
t.Cleanup(p.close)
567+
568+
v, err := p.callNoFd("Initialize", nil)
569+
require.NoError(t, err)
570+
semver, ok := v.(string)
571+
require.True(t, ok, "proxy Initialize: Unexpected value %T", v)
572+
require.True(t, strings.HasPrefix(semver, expectedProxySemverMajor), "Unexpected semver %s", semver)
573+
574+
return p, storageRef
575+
}
576+
577+
func TestGetSplitFDStreamSocket(t *testing.T) {
578+
p, storageRef := newProxyWithStore(t, knownListImage)
579+
580+
// Open the containers-storage image to trigger auto-discovery.
581+
imgidVal, err := p.callNoFd("OpenImage", []any{storageRef})
582+
require.NoError(t, err)
583+
imgid, ok := imgidVal.(float64)
584+
require.True(t, ok)
585+
require.NotZero(t, imgid)
586+
587+
// GetSplitFDStreamSocket should return a valid FD.
588+
_, fd, err := p.call("GetSplitFDStreamSocket", nil)
589+
require.NoError(t, err)
590+
require.NotNil(t, fd, "expected an FD from GetSplitFDStreamSocket")
591+
592+
// Verify the received FD is a unix socket.
593+
var stat syscall.Stat_t
594+
err = syscall.Fstat(int(fd.datafd.Fd()), &stat)
595+
require.NoError(t, err)
596+
require.True(t, stat.Mode&syscall.S_IFMT == syscall.S_IFSOCK, "expected socket, got mode %o", stat.Mode)
597+
598+
// Validate the socket speaks the splitfdstream jsonrpc-fdpass protocol.
599+
// Send a JSON-RPC request for a bogus method and expect a method-not-found error.
600+
conn, err := net.FileConn(fd.datafd)
601+
fd.datafd.Close()
602+
require.NoError(t, err)
603+
unixSock, ok := conn.(*net.UnixConn)
604+
require.True(t, ok)
605+
defer unixSock.Close()
606+
607+
rpcReq := []byte("{\"jsonrpc\":\"2.0\",\"method\":\"NoSuchMethod\",\"id\":1}\n")
608+
_, err = unixSock.Write(rpcReq)
609+
require.NoError(t, err)
610+
611+
respBuf := make([]byte, 4096)
612+
n, err := unixSock.Read(respBuf)
613+
require.NoError(t, err)
614+
var rpcResp map[string]any
615+
err = json.Unmarshal(respBuf[:n], &rpcResp)
616+
require.NoError(t, err)
617+
// A valid JSON-RPC server returns an error object for unknown methods.
618+
rpcErr, ok := rpcResp["error"].(map[string]any)
619+
require.True(t, ok, "expected JSON-RPC error object, got %v", rpcResp)
620+
require.Contains(t, rpcErr["message"], "not found")
621+
622+
_, err = p.callNoFd("CloseImage", []any{imgid})
623+
require.NoError(t, err)
624+
}
625+
626+
func TestGetSplitFDStreamSocketNotAvailable(t *testing.T) {
627+
p := newProxy(t)
628+
629+
// Open a docker:// image (no splitfdstream support).
630+
_, err := p.callNoFd("OpenImage", []any{knownNotManifestListedImageX8664})
631+
require.NoError(t, err)
632+
633+
// GetSplitFDStreamSocket should fail since no containers-storage source was opened.
634+
_, _, err = p.call("GetSplitFDStreamSocket", nil)
635+
require.Error(t, err)
636+
require.Contains(t, err.Error(), "splitfdstream store not configured")
637+
}

common/pkg/json-proxy/types.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ import (
1717
// departure from the original code which used HTTP.
1818
//
1919
// When bumping this, please also update the man page.
20-
const protocolVersion = "0.2.8"
20+
const protocolVersion = "0.2.9"
2121

2222
// maxMsgSize is the current limit on a packet size.
2323
// Note that all non-metadata (i.e. payload data) is sent over a pipe.

0 commit comments

Comments
 (0)