Skip to content

Commit fc9090b

Browse files
committed
storage/cmd: new commands json-rpc-server and apply-splitfdstream
Signed-off-by: Giuseppe Scrivano <gscrivan@redhat.com>
1 parent 503d0e0 commit fc9090b

1 file changed

Lines changed: 206 additions & 0 deletions

File tree

Lines changed: 206 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,206 @@
1+
//go:build linux
2+
3+
package main
4+
5+
import (
6+
"bytes"
7+
"fmt"
8+
"os"
9+
"os/signal"
10+
"path/filepath"
11+
"syscall"
12+
13+
"go.podman.io/storage"
14+
graphdriver "go.podman.io/storage/drivers"
15+
"go.podman.io/storage/pkg/archive"
16+
"go.podman.io/storage/pkg/mflag"
17+
"go.podman.io/storage/pkg/splitfdstream"
18+
)
19+
20+
const defaultJSONRPCSocket = "json-rpc.sock"
21+
22+
var (
23+
splitfdstreamSocket = ""
24+
applyFdstreamSocket = ""
25+
applyFdstreamParent = ""
26+
applyFdstreamMountLabel = ""
27+
)
28+
29+
// splitFDStreamDiffer implements graphdriver.Differ for splitfdstream data
30+
type splitFDStreamDiffer struct {
31+
streamData []byte
32+
fds []*os.File
33+
store storage.Store
34+
}
35+
36+
func (d *splitFDStreamDiffer) ApplyDiff(dest string, options *archive.TarOptions, differOpts *graphdriver.DifferOptions) (graphdriver.DriverWithDifferOutput, error) {
37+
driver, err := d.store.GraphDriver()
38+
if err != nil {
39+
return graphdriver.DriverWithDifferOutput{}, fmt.Errorf("failed to get graph driver: %w", err)
40+
}
41+
42+
splitDriver, ok := driver.(splitfdstream.SplitFDStreamDriver)
43+
if !ok {
44+
return graphdriver.DriverWithDifferOutput{}, fmt.Errorf("driver %s does not support splitfdstream", driver.String())
45+
}
46+
47+
opts := &splitfdstream.ApplySplitFDStreamOpts{
48+
Stream: bytes.NewReader(d.streamData),
49+
FileDescriptors: d.fds,
50+
StagingDir: dest,
51+
}
52+
53+
size, err := splitDriver.ApplySplitFDStream(opts)
54+
if err != nil {
55+
return graphdriver.DriverWithDifferOutput{}, fmt.Errorf("failed to apply splitfdstream to staging dir %s: %w", dest, err)
56+
}
57+
58+
return graphdriver.DriverWithDifferOutput{
59+
Target: dest,
60+
Size: size,
61+
}, nil
62+
}
63+
64+
func (d *splitFDStreamDiffer) Close() error {
65+
return nil
66+
}
67+
68+
func splitfdstreamServer(flags *mflag.FlagSet, action string, m storage.Store, args []string) (int, error) {
69+
driver, err := m.GraphDriver()
70+
if err != nil {
71+
return 1, fmt.Errorf("failed to get graph driver: %w", err)
72+
}
73+
74+
splitDriver, ok := driver.(splitfdstream.SplitFDStreamDriver)
75+
if !ok {
76+
return 1, fmt.Errorf("driver %s does not support splitfdstream", driver.String())
77+
}
78+
server := splitfdstream.NewJSONRPCServer(splitDriver, m)
79+
80+
socketPath := splitfdstreamSocket
81+
if socketPath == "" {
82+
socketPath = filepath.Join(m.RunRoot(), defaultJSONRPCSocket)
83+
}
84+
85+
if err := server.Start(socketPath); err != nil {
86+
return 1, fmt.Errorf("failed to start server: %w", err)
87+
}
88+
defer func() { _ = server.Stop() }()
89+
90+
fmt.Printf("%s\n", socketPath)
91+
92+
// Wait for interrupt signal
93+
sigCh := make(chan os.Signal, 1)
94+
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
95+
<-sigCh
96+
97+
return 0, nil
98+
}
99+
100+
func applySplitfdstream(flags *mflag.FlagSet, action string, m storage.Store, args []string) (int, error) {
101+
layerID := args[0]
102+
103+
socketPath := applyFdstreamSocket
104+
if socketPath == "" {
105+
socketPath = filepath.Join(m.RunRoot(), defaultJSONRPCSocket)
106+
}
107+
108+
defer func() {
109+
if _, err := m.Shutdown(false); err != nil {
110+
fmt.Fprintf(os.Stderr, "warning: failed to shutdown storage: %v\n", err)
111+
}
112+
}()
113+
114+
client, err := splitfdstream.NewJSONRPCClient(socketPath)
115+
if err != nil {
116+
return 1, fmt.Errorf("failed to connect to server: %w", err)
117+
}
118+
defer client.Close()
119+
120+
// Get splitfdstream data from remote server
121+
streamData, fds, err := client.GetSplitFDStream(layerID, "")
122+
if err != nil {
123+
return 1, fmt.Errorf("failed to get splitfdstream from server: %w", err)
124+
}
125+
126+
// Close received FDs when done
127+
defer func() {
128+
for _, fd := range fds {
129+
fd.Close()
130+
}
131+
}()
132+
133+
// Create a custom differ for splitfdstream data
134+
differ := &splitFDStreamDiffer{
135+
streamData: streamData,
136+
fds: fds,
137+
store: m,
138+
}
139+
defer differ.Close()
140+
141+
// Prepare the staged layer
142+
diffOptions := &graphdriver.ApplyDiffWithDifferOpts{}
143+
diffOutput, err := m.PrepareStagedLayer(diffOptions, differ)
144+
if err != nil {
145+
return 1, fmt.Errorf("failed to prepare staged layer: %w", err)
146+
}
147+
148+
// Apply the staged layer to create the final layer
149+
applyArgs := storage.ApplyStagedLayerOptions{
150+
ID: layerID,
151+
ParentLayer: applyFdstreamParent,
152+
MountLabel: applyFdstreamMountLabel,
153+
Writeable: false,
154+
LayerOptions: &storage.LayerOptions{},
155+
DiffOutput: diffOutput,
156+
DiffOptions: diffOptions,
157+
}
158+
159+
layer, err := m.ApplyStagedLayer(applyArgs)
160+
if err != nil {
161+
// Clean up the staged layer on failure
162+
if cleanupErr := m.CleanupStagedLayer(diffOutput); cleanupErr != nil {
163+
fmt.Fprintf(os.Stderr, "warning: failed to cleanup staged layer: %v\n", cleanupErr)
164+
}
165+
return 1, fmt.Errorf("failed to apply staged layer: %w", err)
166+
}
167+
168+
// Output the result
169+
if jsonOutput {
170+
return outputJSON(map[string]interface{}{"id": layer.ID, "size": diffOutput.Size})
171+
}
172+
fmt.Printf("%s\n", layer.ID)
173+
return 0, nil
174+
}
175+
176+
func init() {
177+
commands = append(commands, command{
178+
names: []string{"json-rpc-server"},
179+
optionsHelp: "[options]",
180+
usage: "Start a JSON-RPC server",
181+
minArgs: 0,
182+
maxArgs: 0,
183+
action: splitfdstreamServer,
184+
addFlags: func(flags *mflag.FlagSet, cmd *command) {
185+
flags.StringVar(&splitfdstreamSocket, []string{"-socket"}, "",
186+
"Path to UNIX socket")
187+
},
188+
})
189+
commands = append(commands, command{
190+
names: []string{"apply-splitfdstream"},
191+
optionsHelp: "[options] layerID",
192+
usage: "Fetch a layer from remote server and apply it locally",
193+
minArgs: 1,
194+
maxArgs: 1,
195+
action: applySplitfdstream,
196+
addFlags: func(flags *mflag.FlagSet, cmd *command) {
197+
flags.StringVar(&applyFdstreamSocket, []string{"-socket"}, "",
198+
"Path to remote UNIX socket")
199+
flags.StringVar(&applyFdstreamParent, []string{"-parent"}, "",
200+
"Parent layer ID for the new layer")
201+
flags.StringVar(&applyFdstreamMountLabel, []string{"-mount-label"}, "",
202+
"SELinux mount label for the layer")
203+
flags.BoolVar(&jsonOutput, []string{"-json", "j"}, jsonOutput, "Prefer JSON output")
204+
},
205+
})
206+
}

0 commit comments

Comments
 (0)