Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 11 additions & 26 deletions backend/pkg/server/services/flow_files.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package services

import (
"bytes"
"context"
"errors"
"fmt"
Expand Down Expand Up @@ -547,43 +546,29 @@ func (s *FlowFileService) DownloadFlowFile(c *gin.Context) {
}

// Single directory → ZIP with paths relative to that directory (backward-compat).
// The archive is buffered so an explicit Content-Length can be set; this allows
// clients (including Swagger UI) to recognise and download the file correctly.
// The archive is streamed directly to the response writer so the whole ZIP is
// never held in memory; the response uses chunked transfer encoding.
if len(entries) == 1 && entries[0].info.IsDir() {
name := filepath.Base(entries[0].localPath)
var buf bytes.Buffer
if err := flowfiles.ZipDirectory(&buf, entries[0].localPath); err != nil {
logger.FromContext(c).WithError(err).WithField("flow_id", flowID).Error("error creating ZIP archive")
response.Error(c, response.ErrInternal, err)
return
if err := streamZipArchive(c, name+".zip", func(w io.Writer) error {
return flowfiles.ZipDirectory(w, entries[0].localPath)
}); err != nil {
logger.FromContext(c).WithError(err).WithField("flow_id", flowID).Error("error streaming ZIP archive")
}
Comment thread
mason5052 marked this conversation as resolved.
c.DataFromReader(http.StatusOK, int64(buf.Len()), "application/zip", &buf,
map[string]string{
"Content-Disposition": mime.FormatMediaType("attachment", map[string]string{
"filename": name + ".zip",
}),
})
return
}

// Multiple paths (any mix of files and directories) → ZIP with cache-relative paths.
// Buffered for the same reason as above.
// Streamed directly to the response writer for the same reason as above.
relPaths := make([]string, 0, len(entries))
for _, e := range entries {
relPaths = append(relPaths, filepath.ToSlash(e.reqPath))
}
var buf bytes.Buffer
if err := flowfiles.ZipRelativePaths(&buf, s.flowDataDir(flowID), relPaths); err != nil {
logger.FromContext(c).WithError(err).WithField("flow_id", flowID).Error("error creating ZIP archive")
response.Error(c, response.ErrInternal, err)
return
if err := streamZipArchive(c, "download.zip", func(w io.Writer) error {
return flowfiles.ZipRelativePaths(w, s.flowDataDir(flowID), relPaths)
}); err != nil {
logger.FromContext(c).WithError(err).WithField("flow_id", flowID).Error("error streaming ZIP archive")
}
Comment thread
mason5052 marked this conversation as resolved.
c.DataFromReader(http.StatusOK, int64(buf.Len()), "application/zip", &buf,
map[string]string{
"Content-Disposition": mime.FormatMediaType("attachment", map[string]string{
"filename": "download.zip",
}),
})
}

// PullFlowFiles is a function to sync one or more paths from the container into the local cache
Expand Down
3 changes: 3 additions & 0 deletions backend/pkg/server/services/flow_files_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2501,6 +2501,9 @@ func TestFlowFileService_DownloadFlowFileScenarios(t *testing.T) {
assert.Contains(t, w.Header().Get("Content-Disposition"), tt.wantDispContains)
}
if tt.wantZipEntries != nil {
// A streamed ZIP download must not buffer the whole archive, so it
// must not carry a Content-Length computed from a full buffer.
assert.Empty(t, w.Header().Get("Content-Length"))
zr, err := zip.NewReader(bytes.NewReader(w.Body.Bytes()), int64(w.Body.Len()))
require.NoError(t, err)
got := map[string]string{}
Expand Down
56 changes: 33 additions & 23 deletions backend/pkg/server/services/resources.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package services

import (
"bytes"
"context"
"errors"
"fmt"
"io"
"mime"
"net/http"
"os"
Expand Down Expand Up @@ -1930,18 +1930,11 @@ func (s *ResourceService) DownloadResource(c *gin.Context) {
}

dirName := path.Base(e.rec.Path)
var buf bytes.Buffer
if err := resources.ZipResources(&buf, zipEntries); err != nil {
logger.FromContext(c).WithError(err).Error("error creating zip archive for download")
response.Error(c, response.ErrInternal, err)
return
if err := streamZipArchive(c, dirName+".zip", func(w io.Writer) error {
return resources.ZipResources(w, zipEntries)
}); err != nil {
logger.FromContext(c).WithError(err).Error("error streaming zip archive for download")
}
Comment thread
mason5052 marked this conversation as resolved.
c.DataFromReader(http.StatusOK, int64(buf.Len()), "application/zip", &buf,
map[string]string{
"Content-Disposition": mime.FormatMediaType("attachment", map[string]string{
"filename": dirName + ".zip",
}),
})
return
}

Expand Down Expand Up @@ -1983,18 +1976,35 @@ func (s *ResourceService) DownloadResource(c *gin.Context) {
}
}

var buf bytes.Buffer
if err := resources.ZipResources(&buf, zipEntries); err != nil {
logger.FromContext(c).WithError(err).Error("error creating zip archive for download")
response.Error(c, response.ErrInternal, err)
return
if err := streamZipArchive(c, "download.zip", func(w io.Writer) error {
return resources.ZipResources(w, zipEntries)
}); err != nil {
logger.FromContext(c).WithError(err).Error("error streaming zip archive for download")
}
c.DataFromReader(http.StatusOK, int64(buf.Len()), "application/zip", &buf,
map[string]string{
"Content-Disposition": mime.FormatMediaType("attachment", map[string]string{
"filename": "download.zip",
}),
})
}

// streamZipArchive streams a ZIP archive to the client as build writes it,
// instead of buffering the entire archive in memory before sending it. Memory
// stays proportional to a single file copy buffer rather than the full archive.
//
// The status line and headers commit as soon as build writes the first byte, so
// an error returned by build after streaming has started can no longer change the
// HTTP status; the request is aborted and the error returned for the caller to
// log. The client then receives a truncated (invalid) archive, which is preferable
// to risking an out-of-memory crash. Errors detected before the archive is built
// (auth, path validation, resource lookup) are handled by the caller and still
// produce normal error responses.
func streamZipArchive(c *gin.Context, filename string, build func(w io.Writer) error) error {
c.Header("Content-Type", "application/zip")
c.Header("Content-Disposition", mime.FormatMediaType("attachment", map[string]string{
"filename": filename,
}))
c.Status(http.StatusOK)
if err := build(c.Writer); err != nil {
c.Abort()
return err
}
return nil
}
Comment thread
mason5052 marked this conversation as resolved.

// ---- helper methods --------------------------------------------------------
Expand Down
70 changes: 70 additions & 0 deletions backend/pkg/server/services/resources_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"crypto/md5"
"encoding/hex"
"encoding/json"
"errors"
"io"
"mime/multipart"
"net/http"
Expand Down Expand Up @@ -1351,6 +1352,9 @@ func TestResourceService_DownloadResourceScenarios(t *testing.T) {
assert.Contains(t, w.Header().Get("Content-Disposition"), tt.wantDispContains)
}
if tt.wantZipEntries != nil {
// A streamed ZIP download must not buffer the whole archive, so it
// must not carry a Content-Length computed from a full buffer.
assert.Empty(t, w.Header().Get("Content-Length"))
zr, err := zip.NewReader(bytes.NewReader(w.Body.Bytes()), int64(w.Body.Len()))
require.NoError(t, err)
got := map[string]string{}
Expand All @@ -1370,6 +1374,72 @@ func TestResourceService_DownloadResourceScenarios(t *testing.T) {
}
}

// TestStreamZipArchive verifies the shared streaming helper writes a valid ZIP
// straight to the response writer without buffering the whole archive (no
// Content-Length is emitted) and that a build error mid-stream propagates to the
// caller and aborts the request.
func TestStreamZipArchive(t *testing.T) {
gin.SetMode(gin.TestMode)

t.Run("streams archive without content-length", func(t *testing.T) {
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Request = httptest.NewRequest(http.MethodGet, "/download", nil)

err := streamZipArchive(c, "out.zip", func(zw io.Writer) error {
z := zip.NewWriter(zw)
f, createErr := z.Create("hello.txt")
require.NoError(t, createErr)
if _, writeErr := f.Write([]byte("hello world")); writeErr != nil {
return writeErr
}
return z.Close()
})

require.NoError(t, err)
assert.False(t, c.IsAborted())
assert.Equal(t, "application/zip", w.Header().Get("Content-Type"))
assert.Contains(t, w.Header().Get("Content-Disposition"), "out.zip")
// The whole archive was never buffered, so no buffer-derived length exists.
assert.Empty(t, w.Header().Get("Content-Length"))

zr, err := zip.NewReader(bytes.NewReader(w.Body.Bytes()), int64(w.Body.Len()))
require.NoError(t, err)
require.Len(t, zr.File, 1)
assert.Equal(t, "hello.txt", zr.File[0].Name)
rc, err := zr.File[0].Open()
require.NoError(t, err)
data, err := io.ReadAll(rc)
rc.Close()
require.NoError(t, err)
Comment thread
mason5052 marked this conversation as resolved.
assert.Equal(t, "hello world", string(data))
})

t.Run("propagates build error after partial stream and aborts", func(t *testing.T) {
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
c.Request = httptest.NewRequest(http.MethodGet, "/download", nil)

sentinel := errors.New("source reader failed mid-stream")
err := streamZipArchive(c, "out.zip", func(zw io.Writer) error {
z := zip.NewWriter(zw)
f, createErr := z.Create("partial.txt")
require.NoError(t, createErr)
if _, writeErr := f.Write([]byte("partial data")); writeErr != nil {
return writeErr
}
// Flush bytes to the response, then fail as a slow/erroring source would.
if flushErr := z.Flush(); flushErr != nil {
return flushErr
}
return sentinel
})

require.ErrorIs(t, err, sentinel)
assert.True(t, c.IsAborted())
})
}

func TestResourceService_DeleteResourceScenarios(t *testing.T) {
type seed struct {
userID uint64
Expand Down