From da1f3f0d54f893e440ea90e089d2d23a470d2e13 Mon Sep 17 00:00:00 2001 From: Chayan Das <01chayandas@gmail.com> Date: Sun, 15 Mar 2026 11:39:10 +0530 Subject: [PATCH 1/2] feat: ipfs get progress via dag stat and dag stat file count - get: show progress (Items: N files, Blocks, Saving to) using dag stat API output only; - dag stat: add NumFiles (UnixFS file count, exclude chunk nodes); single traversal using file vs directory type; show Files column and Total Files in summary - dag: add NumFiles to DagStat and DagStatSummary for JSON and text output Signed-off-by: Chayan Das <01chayandas@gmail.com> --- core/commands/dag/dag.go | 9 ++- core/commands/dag/stat.go | 50 +++++++++++--- core/commands/get.go | 137 ++++++++++++++++++++++++++++++++++++-- 3 files changed, 178 insertions(+), 18 deletions(-) diff --git a/core/commands/dag/dag.go b/core/commands/dag/dag.go index a256213ecd0..c1c75ad490a 100644 --- a/core/commands/dag/dag.go +++ b/core/commands/dag/dag.go @@ -297,6 +297,7 @@ type DagStat struct { Cid cid.Cid Size uint64 `json:",omitempty"` NumBlocks int64 `json:",omitempty"` + NumFiles int64 `json:",omitempty"` // UnixFS file nodes (actual files, including nested) } func (s *DagStat) String() string { @@ -349,13 +350,15 @@ type DagStatSummary struct { TotalSize uint64 `json:",omitempty"` SharedSize uint64 `json:",omitempty"` Ratio float32 `json:",omitempty"` + NumFiles int64 `json:",omitempty"` // total UnixFS file count across all DAGs DagStatsArray []*DagStat `json:"DagStats,omitempty"` } func (s *DagStatSummary) String() string { - return fmt.Sprintf("Total Size: %d (%s)\nUnique Blocks: %d\nShared Size: %d (%s)\nRatio: %f", + return fmt.Sprintf("Total Size: %d (%s)\nUnique Blocks: %d\nTotal Files: %d\nShared Size: %d (%s)\nRatio: %f", s.TotalSize, humanize.Bytes(s.TotalSize), s.UniqueBlocks, + s.NumFiles, s.SharedSize, humanize.Bytes(s.SharedSize), s.Ratio) } @@ -405,15 +408,17 @@ Note: This command skips duplicate blocks in reporting both size and the number csvWriter := csv.NewWriter(w) csvWriter.Comma = '\t' cidSpacing := len(event.DagStatsArray[0].Cid.String()) - header := []string{fmt.Sprintf("%-*s", cidSpacing, "CID"), fmt.Sprintf("%-15s", "Blocks"), "Size"} + header := []string{fmt.Sprintf("%-*s", cidSpacing, "CID"), fmt.Sprintf("%-15s", "Blocks"), fmt.Sprintf("%-10s", "Files"), "Size"} if err := csvWriter.Write(header); err != nil { return err } for _, dagStat := range event.DagStatsArray { numBlocksStr := fmt.Sprint(dagStat.NumBlocks) + numFilesStr := fmt.Sprint(dagStat.NumFiles) err := csvWriter.Write([]string{ dagStat.Cid.String(), fmt.Sprintf("%-15s", numBlocksStr), + fmt.Sprintf("%-10s", numFilesStr), fmt.Sprint(dagStat.Size), }) if err != nil { diff --git a/core/commands/dag/stat.go b/core/commands/dag/stat.go index 916aae71a6b..6a686739902 100644 --- a/core/commands/dag/stat.go +++ b/core/commands/dag/stat.go @@ -8,6 +8,7 @@ import ( "github.com/dustin/go-humanize" mdag "github.com/ipfs/boxo/ipld/merkledag" "github.com/ipfs/boxo/ipld/merkledag/traverse" + "github.com/ipfs/boxo/ipld/unixfs" cid "github.com/ipfs/go-cid" cmds "github.com/ipfs/go-ipfs-cmds" "github.com/ipfs/kubo/core/commands/cmdenv" @@ -25,6 +26,7 @@ func dagStat(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) if val, specified := req.Options[progressOptionName].(bool); specified { progressive = val } + api, err := cmdenv.GetApi(env, req) if err != nil { return err @@ -33,6 +35,7 @@ func dagStat(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) cidSet := cid.NewSet() dagStatSummary := &DagStatSummary{DagStatsArray: []*DagStat{}} + for _, a := range req.Arguments { p, err := cmdutils.PathOrCidPath(a) if err != nil { @@ -50,24 +53,45 @@ func dagStat(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) if err != nil { return err } + dagstats := &DagStat{Cid: rp.RootCid()} dagStatSummary.appendStats(dagstats) + + chunkCids := cid.NewSet() err = traverse.Traverse(obj, traverse.Options{ DAG: nodeGetter, Order: traverse.DFSPre, Func: func(current traverse.State) error { - currentNodeSize := uint64(len(current.Node.RawData())) + nd := current.Node + currentNodeSize := uint64(len(nd.RawData())) dagstats.Size += currentNodeSize dagstats.NumBlocks++ - if !cidSet.Has(current.Node.Cid()) { + + if pn, ok := nd.(*mdag.ProtoNode); ok { + if fsn, err := unixfs.FSNodeFromBytes(pn.Data()); err == nil { + if fsn.Type() == unixfs.TFile && len(pn.Links()) > 0 { + for _, l := range pn.Links() { + chunkCids.Add(l.Cid) + } + } + if !chunkCids.Has(nd.Cid()) { + // Count as a file only if not a chunk block + switch fsn.Type() { + case unixfs.TFile, unixfs.TRaw, unixfs.TSymlink, unixfs.TMetadata: + dagstats.NumFiles++ + } + } + } + } + + if !cidSet.Has(nd.Cid()) { dagStatSummary.incrementTotalSize(currentNodeSize) } dagStatSummary.incrementRedundantSize(currentNodeSize) - cidSet.Add(current.Node.Cid()) + cidSet.Add(nd.Cid()) + if progressive { - if err := res.Emit(dagStatSummary); err != nil { - return err - } + return res.Emit(dagStatSummary) } return nil }, @@ -82,10 +106,13 @@ func dagStat(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) dagStatSummary.UniqueBlocks = cidSet.Len() dagStatSummary.calculateSummary() - if err := res.Emit(dagStatSummary); err != nil { - return err + var totalFiles int64 + for _, s := range dagStatSummary.DagStatsArray { + totalFiles += s.NumFiles } - return nil + dagStatSummary.NumFiles = totalFiles + + return res.Emit(dagStatSummary) } func finishCLIStat(res cmds.Response, re cmds.ResponseEmitter) error { @@ -122,7 +149,10 @@ func finishCLIStat(res cmds.Response, re cmds.ResponseEmitter) error { totalBlocks += stat.NumBlocks totalSize += stat.Size } - fmt.Fprintf(os.Stderr, "Fetched/Processed %d blocks, %d bytes (%s)\r", totalBlocks, totalSize, humanize.Bytes(totalSize)) + fmt.Fprintf(os.Stderr, + "Fetched/Processed %d blocks, %d bytes (%s)\r", + totalBlocks, totalSize, humanize.Bytes(totalSize), + ) } default: return e.TypeErr(out, v) diff --git a/core/commands/get.go b/core/commands/get.go index 804836b9afd..3b8468c6241 100644 --- a/core/commands/get.go +++ b/core/commands/get.go @@ -4,9 +4,12 @@ import ( gotar "archive/tar" "bufio" "compress/gzip" + "encoding/json" "errors" "fmt" "io" + "net/http" + "net/url" "os" gopath "path" "path/filepath" @@ -15,11 +18,13 @@ import ( "github.com/ipfs/kubo/core/commands/cmdenv" "github.com/ipfs/kubo/core/commands/cmdutils" "github.com/ipfs/kubo/core/commands/e" + fsrepo "github.com/ipfs/kubo/repo/fsrepo" "github.com/cheggaaa/pb" "github.com/ipfs/boxo/files" "github.com/ipfs/boxo/tar" cmds "github.com/ipfs/go-ipfs-cmds" + manet "github.com/multiformats/go-multiaddr/net" ) var ErrInvalidCompressionLevel = errors.New("compression level must be between 1 and 9") @@ -29,6 +34,7 @@ const ( archiveOptionName = "archive" compressOptionName = "compress" compressionLevelOptionName = "compression-level" + getTotalBlocksKey = "_getTotalBlocks" ) var GetCmd = &cmds.Command{ @@ -61,8 +67,29 @@ may also specify the level of compression by specifying '-l=<1-9>'. cmds.BoolOption(progressOptionName, "p", "Stream progress data.").WithDefault(true), }, PreRun: func(req *cmds.Request, env cmds.Environment) error { - _, err := getCompressOptions(req) - return err + if _, err := getCompressOptions(req); err != nil { + return err + } + + progress, _ := req.Options[progressOptionName].(bool) + if !progress { + return nil + } + + baseURL, err := getAPIBaseURL(env) + if err != nil { + return nil + } + + rootCID, err := resolveRootCID(baseURL, req.Arguments[0]) + if err != nil || rootCID == "" { + return nil + } + + totalBlocks, files := fetchStatInfo(baseURL, rootCID) + req.Options[getTotalBlocksKey] = totalBlocks + printGetProgress(os.Stderr, rootCID, files, totalBlocks) + return nil }, Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { ctx := req.Context @@ -141,6 +168,10 @@ may also specify the level of compression by specifying '-l=<1-9>'. archive, _ := req.Options[archiveOptionName].(bool) progress, _ := req.Options[progressOptionName].(bool) + totalBlocks, _ := req.Options[getTotalBlocksKey].(int) + if totalBlocks <= 0 { + totalBlocks = 1 + } gw := getWriter{ Out: os.Stdout, @@ -149,6 +180,7 @@ may also specify the level of compression by specifying '-l=<1-9>'. Compression: cmplvl, Size: int64(res.Length()), Progress: progress, + TotalBlocks: totalBlocks, } return gw.Write(outReader, outPath) @@ -156,6 +188,85 @@ may also specify the level of compression by specifying '-l=<1-9>'. }, } +func getAPIBaseURL(env cmds.Environment) (string, error) { + configRoot, err := cmdenv.GetConfigRoot(env) + if err != nil { + return "", err + } + apiAddr, err := fsrepo.APIAddr(configRoot) + if err != nil { + return "", err + } + network, host, err := manet.DialArgs(apiAddr) + if err != nil || (network != "tcp" && network != "tcp4" && network != "tcp6") { + return "", errors.New("unsupported network") + } + return "http://" + host, nil +} + +func resolveRootCID(baseURL, pathStr string) (string, error) { + resp, err := http.Post(baseURL+"/api/v0/dag/resolve?arg="+url.QueryEscape(pathStr), "", nil) + if err != nil { + return "", err + } + defer resp.Body.Close() + var out struct { + Cid interface{} `json:"Cid"` + } + if err := json.NewDecoder(resp.Body).Decode(&out); err != nil { + return "", err + } + switch v := out.Cid.(type) { + case string: + return v, nil + case map[string]interface{}: + cid, _ := v["/"].(string) + return cid, nil + } + return "", nil +} + +// fetchStatInfo calls dag/stat and returns totalBlocks and file count directly from NumFiles. +func fetchStatInfo(baseURL, rootCID string) (totalBlocks, numFiles int) { + resp, err := http.Post( + baseURL+"/api/v0/dag/stat?arg="+url.QueryEscape("/ipfs/"+rootCID)+"&progress=false", + "", nil, + ) + if err != nil || resp.StatusCode != http.StatusOK { + return 1, 1 + } + defer resp.Body.Close() + + var out struct { + UniqueBlocks int `json:"UniqueBlocks"` + DagStats []struct { + NumBlocks int64 `json:"NumBlocks"` + NumFiles int64 `json:"NumFiles"` + } `json:"DagStats"` + } + if err := json.NewDecoder(resp.Body).Decode(&out); err == nil && len(out.DagStats) > 0 { + ds := out.DagStats[0] + totalBlocks = int(ds.NumBlocks) + numFiles = int(ds.NumFiles) + } + if totalBlocks <= 0 { + totalBlocks = max(out.UniqueBlocks, 1) + } + if numFiles <= 0 { + numFiles = 1 + } + return +} + +func printGetProgress(w io.Writer, cidStr string, numFiles, totalBlocks int) { + fmt.Fprintf(w, "\nFetching CID: %s\n", cidStr) + if numFiles == 1 { + fmt.Fprintf(w, "Items: 1 file | Blocks: %d\n\n", totalBlocks) + } else { + fmt.Fprintf(w, "Items: %d files | Blocks: %d\n\n", numFiles, totalBlocks) + } +} + type clearlineReader struct { io.Reader out io.Writer @@ -164,8 +275,7 @@ type clearlineReader struct { func (r *clearlineReader) Read(p []byte) (n int, err error) { n, err = r.Reader.Read(p) if err == io.EOF { - // callback - fmt.Fprintf(r.out, "\033[2K\r") // clear progress bar line on EOF + fmt.Fprintf(r.out, "\033[2K\r") } return } @@ -210,6 +320,7 @@ type getWriter struct { Compression int Size int64 Progress bool + TotalBlocks int } func (gw *getWriter) Write(r io.Reader, fpath string) error { @@ -258,10 +369,24 @@ func (gw *getWriter) writeExtracted(r io.Reader, fpath string) error { var progressCb func(int64) int64 if gw.Progress { bar := makeProgressBar(gw.Err, gw.Size) + totalBlocks := gw.TotalBlocks + fmt.Fprintf(gw.Err, "Blocks: 0 / %d\n", totalBlocks) bar.Start() defer bar.Finish() - defer bar.Set64(gw.Size) - progressCb = bar.Add64 + defer func() { + bar.Set64(gw.Size) + fmt.Fprintf(gw.Err, "\033[A\033[2K\rBlocks: %d / %d\033[B", totalBlocks, totalBlocks) + }() + size := gw.Size + progressCb = func(delta int64) int64 { + total := bar.Add64(delta) + blocksEst := totalBlocks + if total < size && size > 0 { + blocksEst = int(int64(totalBlocks) * total / size) + } + fmt.Fprintf(gw.Err, "\033[A\033[2K\rBlocks: %d / %d\033[B", blocksEst, totalBlocks) + return total + } } extractor := &tar.Extractor{Path: fpath, Progress: progressCb} From 40ba833dc020aae1f7ea2327448fdec1d7c6013a Mon Sep 17 00:00:00 2001 From: Chayan Das <01chayandas@gmail.com> Date: Tue, 17 Mar 2026 20:30:53 +0530 Subject: [PATCH 2/2] feat(get): progress UI with block count and consistent size display Signed-off-by: Chayan Das <01chayandas@gmail.com> --- core/commands/dag/dag.go | 9 +- core/commands/dag/stat.go | 50 ++------- core/commands/get.go | 220 ++++++++++++++++---------------------- core/coreapi/dag.go | 38 ++++++- core/coreiface/dag.go | 12 +++ 5 files changed, 152 insertions(+), 177 deletions(-) diff --git a/core/commands/dag/dag.go b/core/commands/dag/dag.go index c1c75ad490a..a256213ecd0 100644 --- a/core/commands/dag/dag.go +++ b/core/commands/dag/dag.go @@ -297,7 +297,6 @@ type DagStat struct { Cid cid.Cid Size uint64 `json:",omitempty"` NumBlocks int64 `json:",omitempty"` - NumFiles int64 `json:",omitempty"` // UnixFS file nodes (actual files, including nested) } func (s *DagStat) String() string { @@ -350,15 +349,13 @@ type DagStatSummary struct { TotalSize uint64 `json:",omitempty"` SharedSize uint64 `json:",omitempty"` Ratio float32 `json:",omitempty"` - NumFiles int64 `json:",omitempty"` // total UnixFS file count across all DAGs DagStatsArray []*DagStat `json:"DagStats,omitempty"` } func (s *DagStatSummary) String() string { - return fmt.Sprintf("Total Size: %d (%s)\nUnique Blocks: %d\nTotal Files: %d\nShared Size: %d (%s)\nRatio: %f", + return fmt.Sprintf("Total Size: %d (%s)\nUnique Blocks: %d\nShared Size: %d (%s)\nRatio: %f", s.TotalSize, humanize.Bytes(s.TotalSize), s.UniqueBlocks, - s.NumFiles, s.SharedSize, humanize.Bytes(s.SharedSize), s.Ratio) } @@ -408,17 +405,15 @@ Note: This command skips duplicate blocks in reporting both size and the number csvWriter := csv.NewWriter(w) csvWriter.Comma = '\t' cidSpacing := len(event.DagStatsArray[0].Cid.String()) - header := []string{fmt.Sprintf("%-*s", cidSpacing, "CID"), fmt.Sprintf("%-15s", "Blocks"), fmt.Sprintf("%-10s", "Files"), "Size"} + header := []string{fmt.Sprintf("%-*s", cidSpacing, "CID"), fmt.Sprintf("%-15s", "Blocks"), "Size"} if err := csvWriter.Write(header); err != nil { return err } for _, dagStat := range event.DagStatsArray { numBlocksStr := fmt.Sprint(dagStat.NumBlocks) - numFilesStr := fmt.Sprint(dagStat.NumFiles) err := csvWriter.Write([]string{ dagStat.Cid.String(), fmt.Sprintf("%-15s", numBlocksStr), - fmt.Sprintf("%-10s", numFilesStr), fmt.Sprint(dagStat.Size), }) if err != nil { diff --git a/core/commands/dag/stat.go b/core/commands/dag/stat.go index 6a686739902..916aae71a6b 100644 --- a/core/commands/dag/stat.go +++ b/core/commands/dag/stat.go @@ -8,7 +8,6 @@ import ( "github.com/dustin/go-humanize" mdag "github.com/ipfs/boxo/ipld/merkledag" "github.com/ipfs/boxo/ipld/merkledag/traverse" - "github.com/ipfs/boxo/ipld/unixfs" cid "github.com/ipfs/go-cid" cmds "github.com/ipfs/go-ipfs-cmds" "github.com/ipfs/kubo/core/commands/cmdenv" @@ -26,7 +25,6 @@ func dagStat(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) if val, specified := req.Options[progressOptionName].(bool); specified { progressive = val } - api, err := cmdenv.GetApi(env, req) if err != nil { return err @@ -35,7 +33,6 @@ func dagStat(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) cidSet := cid.NewSet() dagStatSummary := &DagStatSummary{DagStatsArray: []*DagStat{}} - for _, a := range req.Arguments { p, err := cmdutils.PathOrCidPath(a) if err != nil { @@ -53,45 +50,24 @@ func dagStat(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) if err != nil { return err } - dagstats := &DagStat{Cid: rp.RootCid()} dagStatSummary.appendStats(dagstats) - - chunkCids := cid.NewSet() err = traverse.Traverse(obj, traverse.Options{ DAG: nodeGetter, Order: traverse.DFSPre, Func: func(current traverse.State) error { - nd := current.Node - currentNodeSize := uint64(len(nd.RawData())) + currentNodeSize := uint64(len(current.Node.RawData())) dagstats.Size += currentNodeSize dagstats.NumBlocks++ - - if pn, ok := nd.(*mdag.ProtoNode); ok { - if fsn, err := unixfs.FSNodeFromBytes(pn.Data()); err == nil { - if fsn.Type() == unixfs.TFile && len(pn.Links()) > 0 { - for _, l := range pn.Links() { - chunkCids.Add(l.Cid) - } - } - if !chunkCids.Has(nd.Cid()) { - // Count as a file only if not a chunk block - switch fsn.Type() { - case unixfs.TFile, unixfs.TRaw, unixfs.TSymlink, unixfs.TMetadata: - dagstats.NumFiles++ - } - } - } - } - - if !cidSet.Has(nd.Cid()) { + if !cidSet.Has(current.Node.Cid()) { dagStatSummary.incrementTotalSize(currentNodeSize) } dagStatSummary.incrementRedundantSize(currentNodeSize) - cidSet.Add(nd.Cid()) - + cidSet.Add(current.Node.Cid()) if progressive { - return res.Emit(dagStatSummary) + if err := res.Emit(dagStatSummary); err != nil { + return err + } } return nil }, @@ -106,13 +82,10 @@ func dagStat(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) dagStatSummary.UniqueBlocks = cidSet.Len() dagStatSummary.calculateSummary() - var totalFiles int64 - for _, s := range dagStatSummary.DagStatsArray { - totalFiles += s.NumFiles + if err := res.Emit(dagStatSummary); err != nil { + return err } - dagStatSummary.NumFiles = totalFiles - - return res.Emit(dagStatSummary) + return nil } func finishCLIStat(res cmds.Response, re cmds.ResponseEmitter) error { @@ -149,10 +122,7 @@ func finishCLIStat(res cmds.Response, re cmds.ResponseEmitter) error { totalBlocks += stat.NumBlocks totalSize += stat.Size } - fmt.Fprintf(os.Stderr, - "Fetched/Processed %d blocks, %d bytes (%s)\r", - totalBlocks, totalSize, humanize.Bytes(totalSize), - ) + fmt.Fprintf(os.Stderr, "Fetched/Processed %d blocks, %d bytes (%s)\r", totalBlocks, totalSize, humanize.Bytes(totalSize)) } default: return e.TypeErr(out, v) diff --git a/core/commands/get.go b/core/commands/get.go index 3b8468c6241..f3377089c13 100644 --- a/core/commands/get.go +++ b/core/commands/get.go @@ -4,27 +4,24 @@ import ( gotar "archive/tar" "bufio" "compress/gzip" - "encoding/json" "errors" "fmt" "io" - "net/http" - "net/url" "os" gopath "path" "path/filepath" "strings" + "sync/atomic" "github.com/ipfs/kubo/core/commands/cmdenv" "github.com/ipfs/kubo/core/commands/cmdutils" "github.com/ipfs/kubo/core/commands/e" - fsrepo "github.com/ipfs/kubo/repo/fsrepo" "github.com/cheggaaa/pb" + "github.com/dustin/go-humanize" "github.com/ipfs/boxo/files" "github.com/ipfs/boxo/tar" cmds "github.com/ipfs/go-ipfs-cmds" - manet "github.com/multiformats/go-multiaddr/net" ) var ErrInvalidCompressionLevel = errors.New("compression level must be between 1 and 9") @@ -34,7 +31,6 @@ const ( archiveOptionName = "archive" compressOptionName = "compress" compressionLevelOptionName = "compression-level" - getTotalBlocksKey = "_getTotalBlocks" ) var GetCmd = &cmds.Command{ @@ -67,29 +63,8 @@ may also specify the level of compression by specifying '-l=<1-9>'. cmds.BoolOption(progressOptionName, "p", "Stream progress data.").WithDefault(true), }, PreRun: func(req *cmds.Request, env cmds.Environment) error { - if _, err := getCompressOptions(req); err != nil { - return err - } - - progress, _ := req.Options[progressOptionName].(bool) - if !progress { - return nil - } - - baseURL, err := getAPIBaseURL(env) - if err != nil { - return nil - } - - rootCID, err := resolveRootCID(baseURL, req.Arguments[0]) - if err != nil || rootCID == "" { - return nil - } - - totalBlocks, files := fetchStatInfo(baseURL, rootCID) - req.Options[getTotalBlocksKey] = totalBlocks - printGetProgress(os.Stderr, rootCID, files, totalBlocks) - return nil + _, err := getCompressOptions(req) + return err }, Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { ctx := req.Context @@ -118,6 +93,11 @@ may also specify the level of compression by specifying '-l=<1-9>'. return err } + var numBlocks int64 + if st, err := api.Dag().Stat(ctx, p); err == nil { + numBlocks = st.NumBlocks + } + res.SetLength(uint64(size)) archive, _ := req.Options[archiveOptionName].(bool) @@ -143,7 +123,11 @@ may also specify the level of compression by specifying '-l=<1-9>'. res.SetContentType("application/x-tar") } - return res.Emit(reader) + return res.Emit(&getResponse{ + Reader: reader, + RootCID: p.String(), + NumBlocks: numBlocks, + }) }, PostRun: cmds.PostRunMap{ cmds.CLI: func(res cmds.Response, re cmds.ResponseEmitter) error { @@ -154,8 +138,16 @@ may also specify the level of compression by specifying '-l=<1-9>'. return err } - outReader, ok := v.(io.Reader) - if !ok { + var outReader io.Reader + var rootCID string + var numBlocks int64 + if gr, ok := v.(*getResponse); ok { + outReader = gr.Reader + rootCID = gr.RootCID + numBlocks = gr.NumBlocks + } else if r, ok := v.(io.Reader); ok { + outReader = r + } else { return e.New(e.TypeErr(outReader, v)) } @@ -168,9 +160,20 @@ may also specify the level of compression by specifying '-l=<1-9>'. archive, _ := req.Options[archiveOptionName].(bool) progress, _ := req.Options[progressOptionName].(bool) - totalBlocks, _ := req.Options[getTotalBlocksKey].(int) - if totalBlocks <= 0 { - totalBlocks = 1 + + if progress && rootCID != "" { + cidOnly := rootCID + if i := strings.LastIndex(rootCID, "/"); i >= 0 { + cidOnly = rootCID[i+1:] + } + fmt.Fprintf(os.Stderr, "Fetching %s\n", cidOnly) + payloadSize := int64(res.Length()) + if numBlocks > 0 { + fmt.Fprintf(os.Stderr, " Blocks: %d | Size: %s\n", numBlocks, humanize.IBytes(uint64(payloadSize))) + } else if payloadSize > 0 { + fmt.Fprintf(os.Stderr, " Size: %s\n", humanize.IBytes(uint64(payloadSize))) + } + fmt.Fprint(os.Stderr, "\n") } gw := getWriter{ @@ -180,7 +183,7 @@ may also specify the level of compression by specifying '-l=<1-9>'. Compression: cmplvl, Size: int64(res.Length()), Progress: progress, - TotalBlocks: totalBlocks, + NumBlocks: numBlocks, } return gw.Write(outReader, outPath) @@ -188,94 +191,65 @@ may also specify the level of compression by specifying '-l=<1-9>'. }, } -func getAPIBaseURL(env cmds.Environment) (string, error) { - configRoot, err := cmdenv.GetConfigRoot(env) - if err != nil { - return "", err - } - apiAddr, err := fsrepo.APIAddr(configRoot) - if err != nil { - return "", err - } - network, host, err := manet.DialArgs(apiAddr) - if err != nil || (network != "tcp" && network != "tcp4" && network != "tcp6") { - return "", errors.New("unsupported network") - } - return "http://" + host, nil +type getResponse struct { + io.Reader + RootCID string + NumBlocks int64 } -func resolveRootCID(baseURL, pathStr string) (string, error) { - resp, err := http.Post(baseURL+"/api/v0/dag/resolve?arg="+url.QueryEscape(pathStr), "", nil) - if err != nil { - return "", err - } - defer resp.Body.Close() - var out struct { - Cid interface{} `json:"Cid"` - } - if err := json.NewDecoder(resp.Body).Decode(&out); err != nil { - return "", err - } - switch v := out.Cid.(type) { - case string: - return v, nil - case map[string]interface{}: - cid, _ := v["/"].(string) - return cid, nil - } - return "", nil +type clearlineReader struct { + io.Reader + out io.Writer } -// fetchStatInfo calls dag/stat and returns totalBlocks and file count directly from NumFiles. -func fetchStatInfo(baseURL, rootCID string) (totalBlocks, numFiles int) { - resp, err := http.Post( - baseURL+"/api/v0/dag/stat?arg="+url.QueryEscape("/ipfs/"+rootCID)+"&progress=false", - "", nil, - ) - if err != nil || resp.StatusCode != http.StatusOK { - return 1, 1 - } - defer resp.Body.Close() - - var out struct { - UniqueBlocks int `json:"UniqueBlocks"` - DagStats []struct { - NumBlocks int64 `json:"NumBlocks"` - NumFiles int64 `json:"NumFiles"` - } `json:"DagStats"` - } - if err := json.NewDecoder(resp.Body).Decode(&out); err == nil && len(out.DagStats) > 0 { - ds := out.DagStats[0] - totalBlocks = int(ds.NumBlocks) - numFiles = int(ds.NumFiles) - } - if totalBlocks <= 0 { - totalBlocks = max(out.UniqueBlocks, 1) - } - if numFiles <= 0 { - numFiles = 1 +func (r *clearlineReader) Read(p []byte) (n int, err error) { + n, err = r.Reader.Read(p) + if err == io.EOF { + fmt.Fprint(r.out, "\033[2K\r") } return } -func printGetProgress(w io.Writer, cidStr string, numFiles, totalBlocks int) { - fmt.Fprintf(w, "\nFetching CID: %s\n", cidStr) - if numFiles == 1 { - fmt.Fprintf(w, "Items: 1 file | Blocks: %d\n\n", totalBlocks) - } else { - fmt.Fprintf(w, "Items: %d files | Blocks: %d\n\n", numFiles, totalBlocks) - } +type blockTrackingReader struct { + io.Reader + bar *pb.ProgressBar + totalBlocks int64 + blocksRead int64 // atomic + bytesRead int64 // atomic + blockSize int64 // = totalSize / totalBlocks } -type clearlineReader struct { - io.Reader - out io.Writer +func newBlockTrackingReader(r io.Reader, totalSize, totalBlocks int64, bar *pb.ProgressBar) *blockTrackingReader { + blockSize := int64(1) + if totalBlocks > 0 && totalSize > 0 { + blockSize = totalSize / totalBlocks + if blockSize < 1 { + blockSize = 1 + } + } + return &blockTrackingReader{ + Reader: r, + bar: bar, + totalBlocks: totalBlocks, + blockSize: blockSize, + } } -func (r *clearlineReader) Read(p []byte) (n int, err error) { +func (r *blockTrackingReader) Read(p []byte) (n int, err error) { n, err = r.Reader.Read(p) - if err == io.EOF { - fmt.Fprintf(r.out, "\033[2K\r") + if n > 0 && r.totalBlocks > 0 { + newBytes := atomic.AddInt64(&r.bytesRead, int64(n)) + newBlocks := newBytes / r.blockSize + if newBlocks > r.totalBlocks { + newBlocks = r.totalBlocks + } + old := atomic.SwapInt64(&r.blocksRead, newBlocks) + if old != newBlocks { + r.bar.Prefix(fmt.Sprintf(" block %d / %d ", newBlocks, r.totalBlocks)) + } + } + if err == io.EOF && r.totalBlocks > 0 { + r.bar.Prefix(fmt.Sprintf(" block %d / %d ", r.totalBlocks, r.totalBlocks)) } return } @@ -320,7 +294,7 @@ type getWriter struct { Compression int Size int64 Progress bool - TotalBlocks int + NumBlocks int64 } func (gw *getWriter) Write(r io.Reader, fpath string) error { @@ -369,24 +343,14 @@ func (gw *getWriter) writeExtracted(r io.Reader, fpath string) error { var progressCb func(int64) int64 if gw.Progress { bar := makeProgressBar(gw.Err, gw.Size) - totalBlocks := gw.TotalBlocks - fmt.Fprintf(gw.Err, "Blocks: 0 / %d\n", totalBlocks) + if gw.NumBlocks > 0 { + bar.Prefix(fmt.Sprintf(" block 0 / %d ", gw.NumBlocks)) + r = newBlockTrackingReader(r, gw.Size, gw.NumBlocks, bar) + } bar.Start() defer bar.Finish() - defer func() { - bar.Set64(gw.Size) - fmt.Fprintf(gw.Err, "\033[A\033[2K\rBlocks: %d / %d\033[B", totalBlocks, totalBlocks) - }() - size := gw.Size - progressCb = func(delta int64) int64 { - total := bar.Add64(delta) - blocksEst := totalBlocks - if total < size && size > 0 { - blocksEst = int(int64(totalBlocks) * total / size) - } - fmt.Fprintf(gw.Err, "\033[A\033[2K\rBlocks: %d / %d\033[B", blocksEst, totalBlocks) - return total - } + defer bar.Set64(gw.Size) + progressCb = bar.Add64 } extractor := &tar.Extractor{Path: fpath, Progress: progressCb} diff --git a/core/coreapi/dag.go b/core/coreapi/dag.go index 70686f62e03..c98ddfa6194 100644 --- a/core/coreapi/dag.go +++ b/core/coreapi/dag.go @@ -2,15 +2,19 @@ package coreapi import ( "context" + "fmt" dag "github.com/ipfs/boxo/ipld/merkledag" + "github.com/ipfs/boxo/ipld/merkledag/traverse" + "github.com/ipfs/boxo/path" pin "github.com/ipfs/boxo/pinning/pinner" cid "github.com/ipfs/go-cid" - ipld "github.com/ipfs/go-ipld-format" + coreiface "github.com/ipfs/kubo/core/coreiface" + "github.com/ipfs/kubo/tracing" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" - "github.com/ipfs/kubo/tracing" + ipld "github.com/ipfs/go-ipld-format" ) type dagAPI struct { @@ -68,6 +72,36 @@ func (api *dagAPI) Session(ctx context.Context) ipld.NodeGetter { return dag.NewSession(ctx, api.DAGService) } +func (api *dagAPI) Stat(ctx context.Context, p path.Path) (*coreiface.DagStatResult, error) { + rp, remainder, err := api.core.ResolvePath(ctx, p) + if err != nil { + return nil, err + } + if len(remainder) > 0 { + return nil, fmt.Errorf("cannot return size for anything other than a DAG with a root CID") + } + nodeGetter := dag.NewSession(ctx, api.DAGService) + obj, err := nodeGetter.Get(ctx, rp.RootCid()) + if err != nil { + return nil, err + } + result := &coreiface.DagStatResult{} + err = traverse.Traverse(obj, traverse.Options{ + DAG: nodeGetter, + Order: traverse.DFSPre, + Func: func(current traverse.State) error { + result.Size += uint64(len(current.Node.RawData())) + result.NumBlocks++ + return nil + }, + SkipDuplicates: true, + }) + if err != nil { + return nil, fmt.Errorf("error traversing DAG: %w", err) + } + return result, nil +} + var ( _ ipld.DAGService = (*dagAPI)(nil) _ dag.SessionMaker = (*dagAPI)(nil) diff --git a/core/coreiface/dag.go b/core/coreiface/dag.go index 3cc3aeb4de2..6596839cc87 100644 --- a/core/coreiface/dag.go +++ b/core/coreiface/dag.go @@ -1,13 +1,25 @@ package iface import ( + "context" + + "github.com/ipfs/boxo/path" ipld "github.com/ipfs/go-ipld-format" ) +// DagStatResult is the result of DAG Stat: size and block count for a single root. +type DagStatResult struct { + NumBlocks int64 + Size uint64 +} + // APIDagService extends ipld.DAGService type APIDagService interface { ipld.DAGService // Pinning returns special NodeAdder which recursively pins added nodes Pinning() ipld.NodeAdder + + // Stat walks the DAG from the given path and returns total size and block count. + Stat(ctx context.Context, p path.Path) (*DagStatResult, error) }