Skip to content

Commit 8802724

Browse files
committed
feat: add experimental on-demand pinning
Add a background checker that automatically pins content when DHT provider counts fall below a configurable replication target and unpins once enough providers exist again after a grace period. Gated behind Experimental.OnDemandPinningEnabled. New CLI commands: ipfs pin ondemand {add,rm,ls} Safety measures: - storage budget check (respects StorageMax/GCWatermark) - idle timeout on recursive DAG fetches (2 min without progress) - pin partitioning via pin name to distinguish on-demand pins from persitent pins.
1 parent 6f209df commit 8802724

File tree

15 files changed

+1247
-0
lines changed

15 files changed

+1247
-0
lines changed

config/config.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ type Config struct {
4545
Import Import
4646
Version Version
4747

48+
OnDemandPinning OnDemandPinning
49+
4850
Internal Internal // experimental/unstable options
4951

5052
Bitswap Bitswap

config/experiments.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ type Experiments struct {
1010
OptimisticProvide bool
1111
OptimisticProvideJobsPoolSize int
1212
GatewayOverLibp2p bool `json:",omitempty"`
13+
OnDemandPinningEnabled bool
1314

1415
GraphsyncEnabled graphsyncEnabled `json:",omitempty"`
1516
AcceleratedDHTClient experimentalAcceleratedDHTClient `json:",omitempty"`

config/ondemandpin.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package config
2+
3+
import "time"
4+
5+
const (
6+
DefaultOnDemandPinReplicationTarget = 5
7+
DefaultOnDemandPinCheckInterval = 10 * time.Minute
8+
DefaultOnDemandPinUnpinGracePeriod = 24 * time.Hour
9+
)
10+
11+
type OnDemandPinning struct {
12+
// Minimum providers desired in the DHT (excluding self).
13+
ReplicationTarget OptionalInteger
14+
15+
// How often the checker evaluates all registered CIDs.
16+
CheckInterval OptionalDuration
17+
18+
// How long replication must stay above target before unpinning.
19+
UnpinGracePeriod OptionalDuration
20+
}

core/commands/commands_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,10 @@ func TestCommands(t *testing.T) {
167167
"/pin/remote/service/add",
168168
"/pin/remote/service/ls",
169169
"/pin/remote/service/rm",
170+
"/pin/ondemand",
171+
"/pin/ondemand/add",
172+
"/pin/ondemand/rm",
173+
"/pin/ondemand/ls",
170174
"/pin/rm",
171175
"/pin/update",
172176
"/pin/verify",

core/commands/pin/ondemandpin.go

Lines changed: 280 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,280 @@
1+
package pin
2+
3+
import (
4+
"fmt"
5+
"io"
6+
"time"
7+
8+
cmds "github.com/ipfs/go-ipfs-cmds"
9+
"github.com/ipfs/kubo/config"
10+
cmdenv "github.com/ipfs/kubo/core/commands/cmdenv"
11+
"github.com/ipfs/kubo/core/commands/cmdutils"
12+
"github.com/ipfs/kubo/ondemandpin"
13+
)
14+
15+
const onDemandLiveOptionName = "live"
16+
17+
var onDemandPinCmd = &cmds.Command{
18+
Helptext: cmds.HelpText{
19+
Tagline: "Manage on-demand pins.",
20+
ShortDescription: `
21+
On-demand pinning automatically pins content when few target providers exist in the DHT,
22+
and unpins when replication has been above target for a grace period.
23+
Requires Experimental.OnDemandPinningEnabled = true.
24+
`,
25+
},
26+
Subcommands: map[string]*cmds.Command{
27+
"add": addOnDemandPinCmd,
28+
"rm": rmOnDemandPinCmd,
29+
"ls": listOnDemandPinCmd,
30+
},
31+
}
32+
33+
type OnDemandPinOutput struct {
34+
Cid string `json:"Cid"`
35+
}
36+
37+
var addOnDemandPinCmd = &cmds.Command{
38+
Helptext: cmds.HelpText{
39+
Tagline: "Register CIDs for on-demand pinning.",
40+
ShortDescription: `Adds the given CID(s) to the on-demand pin registry. The background checker will evaluate replication and pin if needed.`,
41+
},
42+
Arguments: []cmds.Argument{
43+
cmds.StringArg("cid", true, true, "CID(s) to register."),
44+
},
45+
Type: OnDemandPinOutput{},
46+
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
47+
n, err := cmdenv.GetNode(env)
48+
if err != nil {
49+
return err
50+
}
51+
52+
cfg, err := n.Repo.Config()
53+
if err != nil {
54+
return err
55+
}
56+
if !cfg.Experimental.OnDemandPinningEnabled {
57+
return fmt.Errorf("on-demand pinning is not enabled; set Experimental.OnDemandPinningEnabled = true in config")
58+
}
59+
60+
store := n.OnDemandPinStore
61+
62+
api, err := cmdenv.GetApi(env, req)
63+
if err != nil {
64+
return err
65+
}
66+
67+
for _, arg := range req.Arguments {
68+
p, err := cmdutils.PathOrCidPath(arg)
69+
if err != nil {
70+
return fmt.Errorf("invalid CID or path %q: %w", arg, err)
71+
}
72+
73+
rp, _, err := api.ResolvePath(req.Context, p)
74+
if err != nil {
75+
return fmt.Errorf("resolving %q: %w", arg, err)
76+
}
77+
c := rp.RootCid()
78+
79+
if err := store.Add(req.Context, c); err != nil {
80+
return err
81+
}
82+
83+
if checker := n.OnDemandPinChecker; checker != nil {
84+
checker.Enqueue(c)
85+
}
86+
87+
if err := res.Emit(&OnDemandPinOutput{
88+
Cid: c.String(),
89+
}); err != nil {
90+
return err
91+
}
92+
}
93+
return nil
94+
},
95+
Encoders: cmds.EncoderMap{
96+
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *OnDemandPinOutput) error {
97+
fmt.Fprintf(w, "registered %s for on-demand pinning\n", out.Cid)
98+
return nil
99+
}),
100+
},
101+
}
102+
103+
var rmOnDemandPinCmd = &cmds.Command{
104+
Helptext: cmds.HelpText{
105+
Tagline: "Remove CIDs from on-demand pinning.",
106+
ShortDescription: `
107+
Removes the given CID(s) from the on-demand pin registry.
108+
If the content was pinned by the checker, it is also unpinned.
109+
Works even when on-demand pinning is disabled, to clean up previously registered CIDs.
110+
`,
111+
},
112+
Arguments: []cmds.Argument{
113+
cmds.StringArg("cid", true, true, "CID(s) to remove."),
114+
},
115+
Type: OnDemandPinOutput{},
116+
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
117+
n, err := cmdenv.GetNode(env)
118+
if err != nil {
119+
return err
120+
}
121+
122+
store := n.OnDemandPinStore
123+
124+
api, err := cmdenv.GetApi(env, req)
125+
if err != nil {
126+
return err
127+
}
128+
129+
for _, arg := range req.Arguments {
130+
p, err := cmdutils.PathOrCidPath(arg)
131+
if err != nil {
132+
return fmt.Errorf("invalid CID or path %q: %w", arg, err)
133+
}
134+
135+
rp, _, err := api.ResolvePath(req.Context, p)
136+
if err != nil {
137+
return fmt.Errorf("resolving %q: %w", arg, err)
138+
}
139+
c := rp.RootCid()
140+
141+
isOurs, err := ondemandpin.PinHasName(req.Context, n.Pinning, c, ondemandpin.OnDemandPinName)
142+
if err != nil {
143+
return fmt.Errorf("checking pin state for %s: %w", c, err)
144+
}
145+
if isOurs {
146+
if err := api.Pin().Rm(req.Context, rp); err != nil {
147+
return fmt.Errorf("unpinning %s: %w", c, err)
148+
}
149+
}
150+
151+
if err := store.Remove(req.Context, c); err != nil {
152+
return err
153+
}
154+
155+
if err := res.Emit(&OnDemandPinOutput{
156+
Cid: c.String(),
157+
}); err != nil {
158+
return err
159+
}
160+
}
161+
return nil
162+
},
163+
Encoders: cmds.EncoderMap{
164+
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *OnDemandPinOutput) error {
165+
fmt.Fprintf(w, "removed %s from on-demand pinning\n", out.Cid)
166+
return nil
167+
}),
168+
},
169+
}
170+
171+
type OnDemandLsOutput struct {
172+
Cid string `json:"Cid"`
173+
PinnedByUs bool `json:"PinnedByUs"`
174+
Providers *int `json:"Providers,omitempty"`
175+
LastAboveTarget string `json:"LastAboveTarget,omitempty"`
176+
CreatedAt string `json:"CreatedAt"`
177+
}
178+
179+
var listOnDemandPinCmd = &cmds.Command{
180+
Helptext: cmds.HelpText{
181+
Tagline: "List on-demand pins.",
182+
ShortDescription: `
183+
Lists CIDs registered for on-demand pinning with their current state.
184+
Use --live to include real-time provider counts from the DHT.
185+
`,
186+
},
187+
Arguments: []cmds.Argument{
188+
cmds.StringArg("cid", false, true, "Optional CID(s) to filter."),
189+
},
190+
Options: []cmds.Option{
191+
cmds.BoolOption(onDemandLiveOptionName, "l", "Perform live provider lookup."),
192+
},
193+
Type: OnDemandLsOutput{},
194+
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
195+
n, err := cmdenv.GetNode(env)
196+
if err != nil {
197+
return err
198+
}
199+
200+
store := n.OnDemandPinStore
201+
202+
live, _ := req.Options[onDemandLiveOptionName].(bool)
203+
204+
var globalTarget int
205+
if live {
206+
cfg, err := n.Repo.Config()
207+
if err != nil {
208+
return err
209+
}
210+
globalTarget = int(cfg.OnDemandPinning.ReplicationTarget.WithDefault(config.DefaultOnDemandPinReplicationTarget))
211+
}
212+
213+
var records []ondemandpin.Record
214+
if len(req.Arguments) > 0 {
215+
api, err := cmdenv.GetApi(env, req)
216+
if err != nil {
217+
return err
218+
}
219+
for _, arg := range req.Arguments {
220+
p, err := cmdutils.PathOrCidPath(arg)
221+
if err != nil {
222+
return fmt.Errorf("invalid CID or path %q: %w", arg, err)
223+
}
224+
rp, _, err := api.ResolvePath(req.Context, p)
225+
if err != nil {
226+
return fmt.Errorf("resolving %q: %w", arg, err)
227+
}
228+
rec, err := store.Get(req.Context, rp.RootCid())
229+
if err != nil {
230+
return err
231+
}
232+
records = append(records, *rec)
233+
}
234+
} else {
235+
records, err = store.List(req.Context)
236+
if err != nil {
237+
return err
238+
}
239+
}
240+
241+
for _, rec := range records {
242+
out := OnDemandLsOutput{
243+
Cid: rec.Cid.String(),
244+
PinnedByUs: rec.PinnedByUs,
245+
CreatedAt: rec.CreatedAt.Format(time.RFC3339),
246+
}
247+
if !rec.LastAboveTarget.IsZero() {
248+
out.LastAboveTarget = rec.LastAboveTarget.Format(time.RFC3339)
249+
}
250+
251+
if live && n.Routing != nil {
252+
count := ondemandpin.CountProviders(req.Context, n.Routing, n.Identity, rec.Cid, globalTarget)
253+
out.Providers = &count
254+
}
255+
256+
if err := res.Emit(&out); err != nil {
257+
return err
258+
}
259+
}
260+
return nil
261+
},
262+
Encoders: cmds.EncoderMap{
263+
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *OnDemandLsOutput) error {
264+
pinState := "not-pinned"
265+
if out.PinnedByUs {
266+
pinState = "pinned"
267+
}
268+
fmt.Fprintf(w, "%s", out.Cid)
269+
if out.Providers != nil {
270+
fmt.Fprintf(w, " providers=%d", *out.Providers)
271+
}
272+
fmt.Fprintf(w, " %s created=%s", pinState, out.CreatedAt)
273+
if out.LastAboveTarget != "" {
274+
fmt.Fprintf(w, " above-target-since=%s", out.LastAboveTarget)
275+
}
276+
fmt.Fprintln(w)
277+
return nil
278+
}),
279+
},
280+
}

core/commands/pin/pin.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ var PinCmd = &cmds.Command{
3838
"verify": verifyPinCmd,
3939
"update": updatePinCmd,
4040
"remote": remotePinCmd,
41+
42+
"ondemand": onDemandPinCmd,
4143
},
4244
}
4345

core/core.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ import (
5656
"github.com/ipfs/kubo/core/node"
5757
"github.com/ipfs/kubo/core/node/libp2p"
5858
"github.com/ipfs/kubo/fuse/mount"
59+
"github.com/ipfs/kubo/ondemandpin"
5960
"github.com/ipfs/kubo/p2p"
6061
"github.com/ipfs/kubo/repo"
6162
irouting "github.com/ipfs/kubo/routing"
@@ -76,6 +77,9 @@ type IpfsNode struct {
7677
PrivateKey ic.PrivKey `optional:"true"` // the local node's private Key
7778
PNetFingerprint libp2p.PNetFingerprint `optional:"true"` // fingerprint of private network
7879

80+
OnDemandPinStore *ondemandpin.Store
81+
OnDemandPinChecker *ondemandpin.Checker `optional:"true"`
82+
7983
// Services
8084
Peerstore pstore.Peerstore `optional:"true"` // storage for other Peer instances
8185
Blockstore bstore.GCBlockstore // the block store (lower level)

core/node/groups.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -350,6 +350,8 @@ func Online(bcfg *BuildCfg, cfg *config.Config, userResourceOverrides rcmgr.Part
350350
// Disabling is controlled by Provide.Enabled=false or setting Interval to 0.
351351
isProviderEnabled := cfg.Provide.Enabled.WithDefault(config.DefaultProvideEnabled) && cfg.Provide.DHT.Interval.WithDefault(config.DefaultProvideDHTInterval) != 0
352352

353+
isOnDemandPinEnabled := cfg.Experimental.OnDemandPinningEnabled
354+
353355
return fx.Options(
354356
fx.Provide(BitswapOptions(cfg)),
355357
fx.Provide(Bitswap(isBitswapServerEnabled, isBitswapLibp2pEnabled, isHTTPRetrievalEnabled)),
@@ -365,6 +367,8 @@ func Online(bcfg *BuildCfg, cfg *config.Config, userResourceOverrides rcmgr.Part
365367

366368
LibP2P(bcfg, cfg, userResourceOverrides),
367369
OnlineProviders(isProviderEnabled, cfg),
370+
371+
maybeProvide(OnDemandPinChecker(cfg.OnDemandPinning), isOnDemandPinEnabled),
368372
)
369373
}
370374

@@ -458,6 +462,7 @@ func IPFS(ctx context.Context, bcfg *BuildCfg) fx.Option {
458462
fx.Provide(BlockService(cfg)),
459463
fx.Provide(Pinning(providerStrategy)),
460464
fx.Provide(Files(providerStrategy)),
465+
fx.Provide(OnDemandPinStore),
461466
Core,
462467
)
463468
}

0 commit comments

Comments
 (0)