Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions pkg/server/repo_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ func (s *Server) loadRepo(c echo.Context, logger *slog.Logger, dirs []string, fi
}
s.repoSchedules.Set(repo.Name, schedule)

envUpstream := getEnvUpstream(repo.Envs)

logDir := filepath.Join(s.config.RepoLogsDir, repo.Name)
err = os.MkdirAll(logDir, 0o755)
if err != nil {
Expand All @@ -155,18 +157,17 @@ func (s *Server) loadRepo(c echo.Context, logger *slog.Logger, dirs []string, fi
return nil, newHTTPError(http.StatusInternalServerError, msg)
}

upstream := getUpstream(repo.Image, repo.Envs)
nextRun := schedule.Next(time.Now()).Unix()
err = db.
Clauses(clause.OnConflict{
DoUpdates: clause.Assignments(map[string]any{
"upstream": upstream,
"upstream": envUpstream,
"next_run": nextRun,
}),
}).
Create(&model.RepoMeta{
Name: repo.Name,
Upstream: upstream,
Upstream: envUpstream,
Size: s.getSize(repo.StorageDir),
NextRun: nextRun,
}).Error
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/repo_handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ storageDir: /tmp
testutils.WriteFile(t, filepath.Join(cfgDir2, "repo0.yaml"), `
image: ubuntu
envs:
UPSTREAM: http://bar.com
$UPSTREAM: http://bar.com
`)

cli := te.RESTClient()
Expand Down
140 changes: 43 additions & 97 deletions pkg/server/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"io"
"log/slog"
"net/http"
"os"
"os/exec"
"path/filepath"
"strconv"
Expand Down Expand Up @@ -65,7 +66,7 @@ func newHTTPError(code int, msg string) error {
}
}

func (s *Server) waitForSync(name, ctID, storageDir string) {
func (s *Server) waitForSync(name, ctID, storageDir, envUpstream string) {
l := s.logger.With(slog.String("repo", name))
code, err := s.dockerCli.WaitContainerWithTimeout(ctID, s.config.SyncTimeout)
if err != nil {
Expand All @@ -87,6 +88,17 @@ func (s *Server) waitForSync(name, ctID, storageDir string) {
"exit_code": code,
"syncing": false,
}
upstream := envUpstream
if upstream == "" {
if upstreamFromLog, err := s.readUpstreamFromLog(name); err == nil {
upstream = upstreamFromLog
} else if !errors.Is(err, os.ErrNotExist) {
l.Warn("Fail to read upstream info", slogErrAttr(err))
}
}
if upstream != "" {
updates["upstream"] = upstream
}
if code == 0 {
updates["last_success"] = time.Now().Unix()
}
Expand Down Expand Up @@ -121,102 +133,24 @@ func (s *Server) waitForSync(name, ctID, storageDir string) {
}()
}

func getUpstream(image string, envs model.StringMap) (upstream string) {
image = strings.SplitN(image, ":", 2)[0]
parts := strings.Split(image, "/")
t := parts[len(parts)-1]

var ok bool
if upstream, ok = envs["$UPSTREAM"]; ok {
return upstream
}
if upstream, ok = envs["UPSTREAM"]; ok {
return upstream
}
switch t {
case "archvsync", "rsync":
return fmt.Sprintf("rsync://%s/%s", envs["RSYNC_HOST"], envs["RSYNC_PATH"])
case "aptsync", "apt-sync":
return envs["APTSYNC_URL"]
case "crates-io-index":
return "https://github.com/rust-lang/crates.io-index"
case "debian-cd":
return fmt.Sprintf("rsync://%s/%s", envs["RSYNC_HOST"], envs["RSYNC_MODULE"])
case "docker-ce":
return "https://download.docker.com/"
case "fedora":
remote, ok := envs["REMOTE"]
if !ok {
remote = "rsync://dl.fedoraproject.org"
}
return fmt.Sprintf("%s/%s", remote, envs["MODULE"])
case "freebsd-pkg":
if upstream, ok = envs["FBSD_PKG_UPSTREAM"]; !ok {
return "http://pkg.freebsd.org/"
}
case "freebsd-ports":
if upstream, ok = envs["FBSD_PORTS_DISTFILES_UPSTREAM"]; !ok {
return "http://distcache.freebsd.org/ports-distfiles/"
}
case "ghcup":
return "https://www.haskell.org/ghcup/"
case "github-release":
return "https://github.com"
case "gitsync":
return envs["GITSYNC_URL"]
case "google-repo":
return "https://android.googlesource.com/mirror/manifest"
case "gsutil-rsync":
return envs["GS_URL"]
case "hackage":
if upstream, ok = envs["HACKAGE_BASE_URL"]; !ok {
return "https://hackage.haskell.org/"
}
case "homebrew-bottles":
if upstream, ok = envs["HOMEBREW_BOTTLE_DOMAIN"]; !ok {
return "https://ghcr.io/v2/homebrew/"
}
case "julia-storage":
return "https://us-east.storage.juliahub.com, https://kr.storage.juliahub.com"
case "nix-channels":
if upstream, ok = envs["NIX_MIRROR_UPSTREAM"]; !ok {
return "https://nixos.org/channels"
}
case "lftpsync":
return fmt.Sprintf("%s/%s", envs["LFTPSYNC_HOST"], envs["LFTPSYNC_PATH"])
case "nodesource":
return "https://nodesource.com/"
case "pypi":
return "https://pypi.python.org/"
case "rclone":
remoteType := envs["RCLONE_CONFIG_REMOTE_TYPE"]
path := envs["RCLONE_PATH"]
domain := ""
switch remoteType {
case "swift":
domain = envs["RCLONE_SWIFT_STORAGE_URL"] + "/"
case "http":
domain = envs["RCLONE_CONFIG_REMOTE_URL"]
case "s3":
domain = envs["RCLONE_CONFIG_REMOTE_ENDPOINT"]
case "webdav":
domain = envs["RCLONE_CONFIG_REMOTE_URL"]
}
return fmt.Sprintf("%s%s", domain, path)
case "rubygems":
return "http://rubygems.org/"
case "stackage":
upstream = "https://github.com/commercialhaskell/"
case "tsumugu":
return envs["UPSTREAM"]
case "winget-source":
if upstream, ok = envs["WINGET_REPO_URL"]; !ok {
return "https://cdn.winget.microsoft.com/cache"
func (s *Server) readUpstreamFromLog(name string) (string, error) {
content, err := os.ReadFile(filepath.Join(s.config.RepoLogsDir, name, "yuki_upstream.txt"))
if err != nil {
return "", err
}
return strings.TrimSpace(string(content)), nil
}

func getEnvUpstream(envs model.StringMap) string {
if len(envs) == 0 {
return ""
}
if val, ok := envs["$UPSTREAM"]; ok {
if trimmed := strings.TrimSpace(val); trimmed != "" {
return trimmed
}
case "yum-sync":
return envs["YUMSYNC_URL"]
}
return
return ""
}

// cleanDeadContainers removes containers which status are `created`, `exited` or `dead`.
Expand Down Expand Up @@ -246,14 +180,25 @@ func (s *Server) waitRunningContainers() error {
name := ct.Labels[api.LabelRepoName]
dir := ct.Labels[api.LabelStorageDir]
ctID := ct.ID

envUpstream := ""
if len(name) > 0 {
var repo model.Repo
if err := s.db.Where(model.Repo{Name: name}).Limit(1).Take(&repo).Error; err == nil {
envUpstream = getEnvUpstream(repo.Envs)
} else if !errors.Is(err, gorm.ErrRecordNotFound) {
s.logger.Warn("Fail to load repo for upstream lookup", slogErrAttr(err), slog.String("repo", name))
}
}

err := s.db.
Where(model.RepoMeta{Name: name}).
Updates(&model.RepoMeta{Syncing: true}).
Error
if err != nil {
s.logger.Error("Fail to set syncing to true", slogErrAttr(err), slog.String("repo", name))
}
go s.waitForSync(name, ctID, dir)
go s.waitForSync(name, ctID, dir, envUpstream)
}
return nil
}
Expand Down Expand Up @@ -392,6 +337,7 @@ func (s *Server) syncRepo(ctx context.Context, name string, debug bool) error {
if len(envMap) == 0 {
envMap = make(map[string]string)
}
envUpstream := getEnvUpstream(envMap)
envMap["REPO"] = repo.Name
envMap["OWNER"] = repo.User
if repo.BindIP != "" {
Expand Down Expand Up @@ -444,7 +390,7 @@ func (s *Server) syncRepo(ctx context.Context, name string, debug bool) error {
if err != nil {
logger.Error("Fail to update RepoMeta", slogErrAttr(err))
}
go s.waitForSync(name, ctID, repo.StorageDir)
go s.waitForSync(name, ctID, repo.StorageDir, envUpstream)

return nil
}
Expand Down
55 changes: 53 additions & 2 deletions pkg/server/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package server

import (
"context"
"os"
"path/filepath"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -146,7 +148,7 @@ func TestWaitForSync(t *testing.T) {
Name: name,
})
require.NoError(t, err)
te.server.waitForSync(name, id, "")
te.server.waitForSync(name, id, "", "")

meta := model.RepoMeta{Name: name}
require.NoError(t, te.server.db.Take(&meta).Error)
Expand Down Expand Up @@ -174,7 +176,7 @@ func TestWaitForSync(t *testing.T) {
Name: name,
})
require.NoError(t, err)
te.server.waitForSync(name, id, "")
te.server.waitForSync(name, id, "", "")

meta := model.RepoMeta{Name: name}
require.NoError(t, te.server.db.Take(&meta).Error)
Expand All @@ -184,4 +186,53 @@ func TestWaitForSync(t *testing.T) {
require.Equal(t, -2, meta.ExitCode)
require.Equal(t, lastSuccess, meta.LastSuccess)
})

t.Run("upstream should be updated from log file", func(t *testing.T) {
te := NewTestEnv(t)
te.server.config.RepoLogsDir = t.TempDir()

logDir := filepath.Join(te.server.config.RepoLogsDir, name)
require.NoError(t, os.MkdirAll(logDir, 0o755))
content := "http://mirror.example.com/foo \n"
require.NoError(t, os.WriteFile(filepath.Join(logDir, "yuki_upstream.txt"), []byte(content), 0o644))

require.NoError(t, te.server.db.Create([]model.RepoMeta{
{
Name: name,
},
}).Error)

id, err := te.server.dockerCli.RunContainer(context.TODO(), docker.RunContainerConfig{
Name: name,
})
require.NoError(t, err)
te.server.waitForSync(name, id, "", "")

meta := model.RepoMeta{Name: name}
require.NoError(t, te.server.db.Take(&meta).Error)
require.Equal(t, "http://mirror.example.com/foo", meta.Upstream)
})

t.Run("$UPSTREAM env should override log file", func(t *testing.T) {
te := NewTestEnv(t)
te.server.config.RepoLogsDir = t.TempDir()

logDir := filepath.Join(te.server.config.RepoLogsDir, name)
require.NoError(t, os.MkdirAll(logDir, 0o755))
require.NoError(t, os.WriteFile(filepath.Join(logDir, "yuki_upstream.txt"), []byte("http://log.example.com"), 0o644))

require.NoError(t, te.server.db.Create(&model.RepoMeta{
Name: name,
}).Error)

id, err := te.server.dockerCli.RunContainer(context.TODO(), docker.RunContainerConfig{
Name: name,
})
require.NoError(t, err)
te.server.waitForSync(name, id, "", "https://env.example.com")

meta := model.RepoMeta{Name: name}
require.NoError(t, te.server.db.Take(&meta).Error)
require.Equal(t, "https://env.example.com", meta.Upstream)
})
}