diff --git a/pkg/server/repo_handlers.go b/pkg/server/repo_handlers.go index ce269ac..a7f9d8e 100644 --- a/pkg/server/repo_handlers.go +++ b/pkg/server/repo_handlers.go @@ -139,6 +139,8 @@ func (s *Server) loadRepo(c echo.Context, logger *slog.Logger, dirs []string, fi } s.repoSchedules.Set(repo.Name, schedule) + envUpstream := getEnvUpstream(repo.Envs) + logDir := filepath.Join(s.config.RepoLogsDir, repo.Name) err = os.MkdirAll(logDir, 0o755) if err != nil { @@ -155,18 +157,17 @@ func (s *Server) loadRepo(c echo.Context, logger *slog.Logger, dirs []string, fi return nil, newHTTPError(http.StatusInternalServerError, msg) } - upstream := getUpstream(repo.Image, repo.Envs) nextRun := schedule.Next(time.Now()).Unix() err = db. Clauses(clause.OnConflict{ DoUpdates: clause.Assignments(map[string]any{ - "upstream": upstream, + "upstream": envUpstream, "next_run": nextRun, }), }). Create(&model.RepoMeta{ Name: repo.Name, - Upstream: upstream, + Upstream: envUpstream, Size: s.getSize(repo.StorageDir), NextRun: nextRun, }).Error diff --git a/pkg/server/repo_handlers_test.go b/pkg/server/repo_handlers_test.go index 4730d81..afaf7cc 100644 --- a/pkg/server/repo_handlers_test.go +++ b/pkg/server/repo_handlers_test.go @@ -90,7 +90,7 @@ storageDir: /tmp testutils.WriteFile(t, filepath.Join(cfgDir2, "repo0.yaml"), ` image: ubuntu envs: - UPSTREAM: http://bar.com + $UPSTREAM: http://bar.com `) cli := te.RESTClient() diff --git a/pkg/server/utils.go b/pkg/server/utils.go index fb8513e..fd35a5e 100644 --- a/pkg/server/utils.go +++ b/pkg/server/utils.go @@ -7,6 +7,7 @@ import ( "io" "log/slog" "net/http" + "os" "os/exec" "path/filepath" "strconv" @@ -65,7 +66,7 @@ func newHTTPError(code int, msg string) error { } } -func (s *Server) waitForSync(name, ctID, storageDir string) { +func (s *Server) waitForSync(name, ctID, storageDir, envUpstream string) { l := s.logger.With(slog.String("repo", name)) code, err := s.dockerCli.WaitContainerWithTimeout(ctID, s.config.SyncTimeout) if err != nil { @@ -87,6 +88,17 @@ func (s *Server) waitForSync(name, ctID, storageDir string) { "exit_code": code, "syncing": false, } + upstream := envUpstream + if upstream == "" { + if upstreamFromLog, err := s.readUpstreamFromLog(name); err == nil { + upstream = upstreamFromLog + } else if !errors.Is(err, os.ErrNotExist) { + l.Warn("Fail to read upstream info", slogErrAttr(err)) + } + } + if upstream != "" { + updates["upstream"] = upstream + } if code == 0 { updates["last_success"] = time.Now().Unix() } @@ -121,102 +133,24 @@ func (s *Server) waitForSync(name, ctID, storageDir string) { }() } -func getUpstream(image string, envs model.StringMap) (upstream string) { - image = strings.SplitN(image, ":", 2)[0] - parts := strings.Split(image, "/") - t := parts[len(parts)-1] - - var ok bool - if upstream, ok = envs["$UPSTREAM"]; ok { - return upstream - } - if upstream, ok = envs["UPSTREAM"]; ok { - return upstream - } - switch t { - case "archvsync", "rsync": - return fmt.Sprintf("rsync://%s/%s", envs["RSYNC_HOST"], envs["RSYNC_PATH"]) - case "aptsync", "apt-sync": - return envs["APTSYNC_URL"] - case "crates-io-index": - return "https://github.com/rust-lang/crates.io-index" - case "debian-cd": - return fmt.Sprintf("rsync://%s/%s", envs["RSYNC_HOST"], envs["RSYNC_MODULE"]) - case "docker-ce": - return "https://download.docker.com/" - case "fedora": - remote, ok := envs["REMOTE"] - if !ok { - remote = "rsync://dl.fedoraproject.org" - } - return fmt.Sprintf("%s/%s", remote, envs["MODULE"]) - case "freebsd-pkg": - if upstream, ok = envs["FBSD_PKG_UPSTREAM"]; !ok { - return "http://pkg.freebsd.org/" - } - case "freebsd-ports": - if upstream, ok = envs["FBSD_PORTS_DISTFILES_UPSTREAM"]; !ok { - return "http://distcache.freebsd.org/ports-distfiles/" - } - case "ghcup": - return "https://www.haskell.org/ghcup/" - case "github-release": - return "https://github.com" - case "gitsync": - return envs["GITSYNC_URL"] - case "google-repo": - return "https://android.googlesource.com/mirror/manifest" - case "gsutil-rsync": - return envs["GS_URL"] - case "hackage": - if upstream, ok = envs["HACKAGE_BASE_URL"]; !ok { - return "https://hackage.haskell.org/" - } - case "homebrew-bottles": - if upstream, ok = envs["HOMEBREW_BOTTLE_DOMAIN"]; !ok { - return "https://ghcr.io/v2/homebrew/" - } - case "julia-storage": - return "https://us-east.storage.juliahub.com, https://kr.storage.juliahub.com" - case "nix-channels": - if upstream, ok = envs["NIX_MIRROR_UPSTREAM"]; !ok { - return "https://nixos.org/channels" - } - case "lftpsync": - return fmt.Sprintf("%s/%s", envs["LFTPSYNC_HOST"], envs["LFTPSYNC_PATH"]) - case "nodesource": - return "https://nodesource.com/" - case "pypi": - return "https://pypi.python.org/" - case "rclone": - remoteType := envs["RCLONE_CONFIG_REMOTE_TYPE"] - path := envs["RCLONE_PATH"] - domain := "" - switch remoteType { - case "swift": - domain = envs["RCLONE_SWIFT_STORAGE_URL"] + "/" - case "http": - domain = envs["RCLONE_CONFIG_REMOTE_URL"] - case "s3": - domain = envs["RCLONE_CONFIG_REMOTE_ENDPOINT"] - case "webdav": - domain = envs["RCLONE_CONFIG_REMOTE_URL"] - } - return fmt.Sprintf("%s%s", domain, path) - case "rubygems": - return "http://rubygems.org/" - case "stackage": - upstream = "https://github.com/commercialhaskell/" - case "tsumugu": - return envs["UPSTREAM"] - case "winget-source": - if upstream, ok = envs["WINGET_REPO_URL"]; !ok { - return "https://cdn.winget.microsoft.com/cache" +func (s *Server) readUpstreamFromLog(name string) (string, error) { + content, err := os.ReadFile(filepath.Join(s.config.RepoLogsDir, name, "yuki_upstream.txt")) + if err != nil { + return "", err + } + return strings.TrimSpace(string(content)), nil +} + +func getEnvUpstream(envs model.StringMap) string { + if len(envs) == 0 { + return "" + } + if val, ok := envs["$UPSTREAM"]; ok { + if trimmed := strings.TrimSpace(val); trimmed != "" { + return trimmed } - case "yum-sync": - return envs["YUMSYNC_URL"] } - return + return "" } // cleanDeadContainers removes containers which status are `created`, `exited` or `dead`. @@ -246,6 +180,17 @@ func (s *Server) waitRunningContainers() error { name := ct.Labels[api.LabelRepoName] dir := ct.Labels[api.LabelStorageDir] ctID := ct.ID + + envUpstream := "" + if len(name) > 0 { + var repo model.Repo + if err := s.db.Where(model.Repo{Name: name}).Limit(1).Take(&repo).Error; err == nil { + envUpstream = getEnvUpstream(repo.Envs) + } else if !errors.Is(err, gorm.ErrRecordNotFound) { + s.logger.Warn("Fail to load repo for upstream lookup", slogErrAttr(err), slog.String("repo", name)) + } + } + err := s.db. Where(model.RepoMeta{Name: name}). Updates(&model.RepoMeta{Syncing: true}). @@ -253,7 +198,7 @@ func (s *Server) waitRunningContainers() error { if err != nil { s.logger.Error("Fail to set syncing to true", slogErrAttr(err), slog.String("repo", name)) } - go s.waitForSync(name, ctID, dir) + go s.waitForSync(name, ctID, dir, envUpstream) } return nil } @@ -392,6 +337,7 @@ func (s *Server) syncRepo(ctx context.Context, name string, debug bool) error { if len(envMap) == 0 { envMap = make(map[string]string) } + envUpstream := getEnvUpstream(envMap) envMap["REPO"] = repo.Name envMap["OWNER"] = repo.User if repo.BindIP != "" { @@ -444,7 +390,7 @@ func (s *Server) syncRepo(ctx context.Context, name string, debug bool) error { if err != nil { logger.Error("Fail to update RepoMeta", slogErrAttr(err)) } - go s.waitForSync(name, ctID, repo.StorageDir) + go s.waitForSync(name, ctID, repo.StorageDir, envUpstream) return nil } diff --git a/pkg/server/utils_test.go b/pkg/server/utils_test.go index 00fc1f4..c69f25a 100644 --- a/pkg/server/utils_test.go +++ b/pkg/server/utils_test.go @@ -2,6 +2,8 @@ package server import ( "context" + "os" + "path/filepath" "sync" "testing" "time" @@ -146,7 +148,7 @@ func TestWaitForSync(t *testing.T) { Name: name, }) require.NoError(t, err) - te.server.waitForSync(name, id, "") + te.server.waitForSync(name, id, "", "") meta := model.RepoMeta{Name: name} require.NoError(t, te.server.db.Take(&meta).Error) @@ -174,7 +176,7 @@ func TestWaitForSync(t *testing.T) { Name: name, }) require.NoError(t, err) - te.server.waitForSync(name, id, "") + te.server.waitForSync(name, id, "", "") meta := model.RepoMeta{Name: name} require.NoError(t, te.server.db.Take(&meta).Error) @@ -184,4 +186,53 @@ func TestWaitForSync(t *testing.T) { require.Equal(t, -2, meta.ExitCode) require.Equal(t, lastSuccess, meta.LastSuccess) }) + + t.Run("upstream should be updated from log file", func(t *testing.T) { + te := NewTestEnv(t) + te.server.config.RepoLogsDir = t.TempDir() + + logDir := filepath.Join(te.server.config.RepoLogsDir, name) + require.NoError(t, os.MkdirAll(logDir, 0o755)) + content := "http://mirror.example.com/foo \n" + require.NoError(t, os.WriteFile(filepath.Join(logDir, "yuki_upstream.txt"), []byte(content), 0o644)) + + require.NoError(t, te.server.db.Create([]model.RepoMeta{ + { + Name: name, + }, + }).Error) + + id, err := te.server.dockerCli.RunContainer(context.TODO(), docker.RunContainerConfig{ + Name: name, + }) + require.NoError(t, err) + te.server.waitForSync(name, id, "", "") + + meta := model.RepoMeta{Name: name} + require.NoError(t, te.server.db.Take(&meta).Error) + require.Equal(t, "http://mirror.example.com/foo", meta.Upstream) + }) + + t.Run("$UPSTREAM env should override log file", func(t *testing.T) { + te := NewTestEnv(t) + te.server.config.RepoLogsDir = t.TempDir() + + logDir := filepath.Join(te.server.config.RepoLogsDir, name) + require.NoError(t, os.MkdirAll(logDir, 0o755)) + require.NoError(t, os.WriteFile(filepath.Join(logDir, "yuki_upstream.txt"), []byte("http://log.example.com"), 0o644)) + + require.NoError(t, te.server.db.Create(&model.RepoMeta{ + Name: name, + }).Error) + + id, err := te.server.dockerCli.RunContainer(context.TODO(), docker.RunContainerConfig{ + Name: name, + }) + require.NoError(t, err) + te.server.waitForSync(name, id, "", "https://env.example.com") + + meta := model.RepoMeta{Name: name} + require.NoError(t, te.server.db.Take(&meta).Error) + require.Equal(t, "https://env.example.com", meta.Upstream) + }) }