Skip to content

Commit f1d9dd6

Browse files
giuseppeclaude
andcommitted
json-proxy: add OpenJSONRPCFdPass to broker splitfdstream sockets
Add a new json-proxy method that returns a Unix socket file descriptor to the client. The client then speaks the jsonrpc-fdpass protocol directly over that socket for splitfdstream operations, bypassing the json-proxy for bulk data transfer. Bump protocolVersion to "0.2.9". Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> Signed-off-by: Giuseppe Scrivano <gscrivan@redhat.com>
1 parent fa53f33 commit f1d9dd6

5 files changed

Lines changed: 245 additions & 6 deletions

File tree

common/cmd/json-proxy-test-server/main.go

Lines changed: 71 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,22 @@ import (
88
"fmt"
99
"os"
1010

11-
jsonproxy "go.podman.io/common/pkg/json-proxy"
11+
imgcopy "go.podman.io/image/v5/copy"
1212
"go.podman.io/image/v5/signature"
13+
istorage "go.podman.io/image/v5/storage"
14+
"go.podman.io/image/v5/transports/alltransports"
1315
"go.podman.io/image/v5/types"
16+
"go.podman.io/storage"
17+
"go.podman.io/storage/pkg/reexec"
18+
storagetypes "go.podman.io/storage/types"
19+
20+
jsonproxy "go.podman.io/common/pkg/json-proxy"
1421
)
1522

1623
func main() {
24+
if reexec.Init() {
25+
return
26+
}
1727
if err := run(); err != nil {
1828
fmt.Fprintf(os.Stderr, "error: %v\n", err)
1929
os.Exit(1)
@@ -24,10 +34,25 @@ func run() error {
2434
sockfd := flag.Int("sockfd", -1, "socket file descriptor")
2535
policyPath := flag.String("policy", "", "path to policy.json (default: system default)")
2636
overrideArch := flag.String("override-arch", "", "override architecture for manifest list resolution")
37+
graphRoot := flag.String("graph-root", "", "storage graph root")
38+
runRoot := flag.String("run-root", "", "storage run root")
39+
seedImage := flag.String("seed-image", "", "image to copy into local store")
2740
flag.Parse()
2841

2942
if *sockfd < 0 {
30-
return fmt.Errorf("usage: %s --sockfd <fd> [--policy <path>] [--override-arch <arch>]", os.Args[0])
43+
return fmt.Errorf("usage: %s --sockfd <fd> [--policy <path>] [--override-arch <arch>] [--graph-root <path> --run-root <path> --seed-image <ref>]", os.Args[0])
44+
}
45+
46+
if *graphRoot != "" {
47+
ref, store, err := setupStore(*graphRoot, *runRoot, *seedImage)
48+
if err != nil {
49+
return fmt.Errorf("setting up store: %w", err)
50+
}
51+
defer func() {
52+
_, _ = store.Shutdown(true)
53+
}()
54+
// Print the containers-storage:// reference for the test to read.
55+
fmt.Fprintln(os.Stdout, ref)
3156
}
3257

3358
manager, err := jsonproxy.NewManager(
@@ -58,3 +83,47 @@ func run() error {
5883
defer manager.Close()
5984
return manager.Serve(context.Background(), *sockfd)
6085
}
86+
87+
func setupStore(graphRoot, runRoot, seedImage string) (string, storage.Store, error) {
88+
store, err := storage.GetStore(storagetypes.StoreOptions{
89+
GraphRoot: graphRoot,
90+
RunRoot: runRoot,
91+
GraphDriverName: "overlay",
92+
})
93+
if err != nil {
94+
return "", nil, fmt.Errorf("creating store: %w", err)
95+
}
96+
97+
ctx := context.Background()
98+
99+
srcRef, err := alltransports.ParseImageName(seedImage)
100+
if err != nil {
101+
return "", nil, fmt.Errorf("parsing seed image %q: %w", seedImage, err)
102+
}
103+
104+
destRef, err := istorage.Transport.ParseStoreReference(store, "testimage:latest")
105+
if err != nil {
106+
return "", nil, fmt.Errorf("creating store reference: %w", err)
107+
}
108+
109+
policy, err := signature.DefaultPolicy(nil)
110+
if err != nil {
111+
return "", nil, fmt.Errorf("getting default policy: %w", err)
112+
}
113+
pc, err := signature.NewPolicyContext(policy)
114+
if err != nil {
115+
return "", nil, fmt.Errorf("creating policy context: %w", err)
116+
}
117+
defer func() {
118+
if err := pc.Destroy(); err != nil {
119+
fmt.Fprintf(os.Stderr, "warning: destroying policy context: %v\n", err)
120+
}
121+
}()
122+
123+
_, err = imgcopy.Image(ctx, pc, destRef, srcRef, nil)
124+
if err != nil {
125+
return "", nil, fmt.Errorf("copying seed image: %w", err)
126+
}
127+
128+
return "containers-storage:" + destRef.StringWithinTransport(), store, nil
129+
}

common/pkg/json-proxy/handler.go

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,10 @@ type handler struct {
2828
lock sync.Mutex
2929

3030
// Dependency injection functions.
31-
getSystemContext func() (*types.SystemContext, error)
32-
getPolicyContext func() (*signature.PolicyContext, error)
33-
logger logrus.FieldLogger
31+
getSystemContext func() (*types.SystemContext, error)
32+
getPolicyContext func() (*signature.PolicyContext, error)
33+
splitFDStreamStore splitFDStreamStore
34+
logger logrus.FieldLogger
3435

3536
// Internal state.
3637
sysctx *types.SystemContext
@@ -141,6 +142,12 @@ func (h *handler) openImageImpl(ctx context.Context, args []any, allowNotFound b
141142
return ret, err
142143
}
143144

145+
if h.splitFDStreamStore == nil {
146+
if sfds, ok := imgsrc.(splitFDStreamStore); ok {
147+
h.splitFDStreamStore = sfds
148+
}
149+
}
150+
144151
unparsedTopLevel := image.UnparsedInstance(imgsrc, nil)
145152
// Check the signature on the toplevel (possibly multi-arch) manifest, but we don't
146153
// yet propagate the error here.
@@ -689,6 +696,31 @@ func (h *handler) FinishPipe(ctx context.Context, args []any) (replyBuf, error)
689696
return ret, err
690697
}
691698

699+
// OpenJSONRPCFdPass returns a socket FD over which the client can
700+
// speak the jsonrpc-fdpass protocol for splitfdstream operations.
701+
// The json-proxy does not interpret the protocol; it just brokers the socket.
702+
func (h *handler) OpenJSONRPCFdPass(ctx context.Context, args []any) (replyBuf, error) {
703+
h.lock.Lock()
704+
defer h.lock.Unlock()
705+
706+
var ret replyBuf
707+
708+
if h.splitFDStreamStore == nil {
709+
return ret, errors.New("splitfdstream store not configured")
710+
}
711+
if len(args) != 0 {
712+
return ret, fmt.Errorf("found %d args, expecting none", len(args))
713+
}
714+
715+
sockFile, err := h.splitFDStreamStore.SplitFDStreamSocket()
716+
if err != nil {
717+
return ret, err
718+
}
719+
720+
ret.fd = sockFile
721+
return ret, nil
722+
}
723+
692724
// processRequest dispatches a remote request.
693725
// replyBuf is the result of the invocation.
694726
// terminate should be true if processing of requests should halt.
@@ -728,6 +760,8 @@ func (h *handler) processRequest(ctx context.Context, readBytes []byte) (rb repl
728760
rb, err = h.GetLayerInfoPiped(ctx, req.Args)
729761
case "FinishPipe":
730762
rb, err = h.FinishPipe(ctx, req.Args)
763+
case "OpenJSONRPCFdPass":
764+
rb, err = h.OpenJSONRPCFdPass(ctx, req.Args)
731765
case "Shutdown":
732766
terminate = true
733767
// NOTE: If you add a method here, you should very likely be bumping the

common/pkg/json-proxy/proxy.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,20 @@
44
package jsonproxy
55

66
import (
7+
"os"
8+
79
"github.com/sirupsen/logrus"
810
"go.podman.io/image/v5/signature"
911
"go.podman.io/image/v5/types"
1012
)
1113

14+
// splitFDStreamStore is the subset of storage.SplitFDStreamStore needed
15+
// by the json-proxy. Keeping a local interface avoids a hard dependency
16+
// on go.podman.io/storage for consumers that do not use splitfdstream.
17+
type splitFDStreamStore interface {
18+
SplitFDStreamSocket() (*os.File, error)
19+
}
20+
1221
// options holds the internal configuration for a Manager.
1322
type options struct {
1423
getSystemContext func() (*types.SystemContext, error)

common/pkg/json-proxy/proxy_test.go

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,14 @@
33
package jsonproxy_test
44

55
import (
6+
"bufio"
67
"encoding/json"
78
"fmt"
89
"io"
910
"net"
1011
"os"
1112
"os/exec"
13+
"path/filepath"
1214
"strings"
1315
"sync"
1416
"syscall"
@@ -560,3 +562,128 @@ func TestProxyPolicyVerification(t *testing.T) {
560562
})
561563
}
562564
}
565+
566+
// newProxyWithStore spawns the test binary with a local containers-storage
567+
// store seeded with the given image. It returns the proxy and the
568+
// containers-storage:// reference string for the seeded image.
569+
func newProxyWithStore(t *testing.T, seedImage string) (*proxy, string) {
570+
t.Helper()
571+
572+
proxyBinary := os.Getenv("JSON_PROXY_TEST_BINARY")
573+
if proxyBinary == "" {
574+
t.Skip("JSON_PROXY_TEST_BINARY is not set; skipping integration test")
575+
}
576+
577+
wd := t.TempDir()
578+
graphRoot := filepath.Join(wd, "root")
579+
runRoot := filepath.Join(wd, "run")
580+
581+
fds, err := syscall.Socketpair(syscall.AF_LOCAL, syscall.SOCK_SEQPACKET, 0)
582+
require.NoError(t, err)
583+
myfd := os.NewFile(uintptr(fds[0]), "myfd")
584+
defer myfd.Close()
585+
theirfd := os.NewFile(uintptr(fds[1]), "theirfd")
586+
defer theirfd.Close()
587+
588+
mysock, err := net.FileConn(myfd)
589+
require.NoError(t, err)
590+
unixConn, ok := mysock.(*net.UnixConn)
591+
require.True(t, ok, "expected *net.UnixConn, got %T", mysock)
592+
593+
proc := exec.Command(proxyBinary, //nolint:gosec
594+
"--sockfd", "3",
595+
"--graph-root", graphRoot,
596+
"--run-root", runRoot,
597+
"--seed-image", seedImage,
598+
)
599+
proc.Stderr = os.Stderr
600+
proc.ExtraFiles = append(proc.ExtraFiles, theirfd)
601+
602+
stdoutPipe, err := proc.StdoutPipe()
603+
require.NoError(t, err)
604+
605+
err = proc.Start()
606+
require.NoError(t, err)
607+
608+
// Read the containers-storage reference from stdout.
609+
scanner := bufio.NewScanner(stdoutPipe)
610+
require.True(t, scanner.Scan(), "expected storage reference on stdout")
611+
storageRef := strings.TrimSpace(scanner.Text())
612+
require.True(t, strings.HasPrefix(storageRef, "containers-storage:"), "unexpected ref: %s", storageRef)
613+
614+
p := &proxy{
615+
c: unixConn,
616+
proc: proc,
617+
}
618+
t.Cleanup(p.close)
619+
620+
v, err := p.callNoFd("Initialize", nil)
621+
require.NoError(t, err)
622+
semver, ok := v.(string)
623+
require.True(t, ok, "proxy Initialize: Unexpected value %T", v)
624+
require.True(t, strings.HasPrefix(semver, expectedProxySemverMajor), "Unexpected semver %s", semver)
625+
626+
return p, storageRef
627+
}
628+
629+
func TestOpenJSONRPCFdPass(t *testing.T) {
630+
p, storageRef := newProxyWithStore(t, knownListImage)
631+
632+
// Open the containers-storage image to trigger auto-discovery.
633+
imgidVal, err := p.callNoFd("OpenImage", []any{storageRef})
634+
require.NoError(t, err)
635+
imgid, ok := imgidVal.(float64)
636+
require.True(t, ok)
637+
require.NotZero(t, imgid)
638+
639+
// OpenJSONRPCFdPass should return a valid FD.
640+
_, fd, err := p.call("OpenJSONRPCFdPass", nil)
641+
require.NoError(t, err)
642+
require.NotNil(t, fd, "expected an FD from OpenJSONRPCFdPass")
643+
644+
// Verify the received FD is a unix socket.
645+
var stat syscall.Stat_t
646+
err = syscall.Fstat(int(fd.datafd.Fd()), &stat)
647+
require.NoError(t, err)
648+
require.True(t, stat.Mode&syscall.S_IFMT == syscall.S_IFSOCK, "expected socket, got mode %o", stat.Mode)
649+
650+
// Validate the socket speaks the splitfdstream jsonrpc-fdpass protocol.
651+
// Send a JSON-RPC request for a bogus method and expect a method-not-found error.
652+
conn, err := net.FileConn(fd.datafd)
653+
fd.datafd.Close()
654+
require.NoError(t, err)
655+
unixSock, ok := conn.(*net.UnixConn)
656+
require.True(t, ok)
657+
defer unixSock.Close()
658+
659+
rpcReq := []byte("{\"jsonrpc\":\"2.0\",\"method\":\"NoSuchMethod\",\"id\":1}\n")
660+
_, err = unixSock.Write(rpcReq)
661+
require.NoError(t, err)
662+
663+
respBuf := make([]byte, 4096)
664+
n, err := unixSock.Read(respBuf)
665+
require.NoError(t, err)
666+
var rpcResp map[string]any
667+
err = json.Unmarshal(respBuf[:n], &rpcResp)
668+
require.NoError(t, err)
669+
// A valid JSON-RPC server returns an error object for unknown methods.
670+
rpcErr, ok := rpcResp["error"].(map[string]any)
671+
require.True(t, ok, "expected JSON-RPC error object, got %v", rpcResp)
672+
require.Contains(t, rpcErr["message"], "not found")
673+
674+
_, err = p.callNoFd("CloseImage", []any{imgid})
675+
require.NoError(t, err)
676+
}
677+
678+
func TestOpenJSONRPCFdPassNotAvailable(t *testing.T) {
679+
p := newProxy(t)
680+
681+
// Open a docker:// image (no splitfdstream support).
682+
_, err := p.callNoFd("OpenImage", []any{knownNotManifestListedImageX8664})
683+
require.NoError(t, err)
684+
685+
// OpenJSONRPCFdPass should fail since no containers-storage source was opened.
686+
_, _, err = p.call("OpenJSONRPCFdPass", nil)
687+
require.Error(t, err)
688+
require.Contains(t, err.Error(), "splitfdstream store not configured")
689+
}

common/pkg/json-proxy/types.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ import (
1717
// departure from the original code which used HTTP.
1818
//
1919
// When bumping this, please also update the man page.
20-
const protocolVersion = "0.2.8"
20+
const protocolVersion = "0.2.9"
2121

2222
// maxMsgSize is the current limit on a packet size.
2323
// Note that all non-metadata (i.e. payload data) is sent over a pipe.

0 commit comments

Comments
 (0)