Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions common/common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1312,6 +1312,12 @@ std::vector<llama_adapter_lora_ptr> & common_init_result::lora() {
}

common_init_result_ptr common_init_from_params(common_params & params, bool model_only) {
// report the load phase up front (router progress UI); fit + metadata read emit nothing,
// so without this the UI sticks on "download 100%" until the per-tensor callback starts
if (params.load_stage_callback) {
params.load_stage_callback(COMMON_LOAD_STAGE_LOAD, -1.0f, params.load_stage_callback_user_data);
}

common_init_result_ptr res(new common_init_result(params, model_only));

llama_model * model = res->model();
Expand Down Expand Up @@ -1387,6 +1393,10 @@ common_init_result_ptr common_init_from_params(common_params & params, bool mode
}

if (params.warmup) {
// report the warmup phase (router progress UI)
if (params.load_stage_callback) {
params.load_stage_callback(COMMON_LOAD_STAGE_WARMUP, -1.0f, params.load_stage_callback_user_data);
}
LOG_INF("%s: warming up the model with an empty run - please wait ... (--no-warmup to disable)\n", __func__);

std::vector<llama_token> tmp;
Expand Down
12 changes: 12 additions & 0 deletions common/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,15 @@ struct lr_opt {

struct ggml_opt_optimizer_params common_opt_lr_pars(void * userdata);

// load-stage names for common_load_stage_callback; keep the webui switch in sync (getModelLoadPhase in models/utils.ts)
#define COMMON_LOAD_STAGE_DOWNLOAD "download" // file download (router child only)
#define COMMON_LOAD_STAGE_LOAD "load" // tensor loading
#define COMMON_LOAD_STAGE_WARMUP "warmup" // empty-run warmup
#define COMMON_LOAD_STAGE_FINALIZE "finalize" // post-warmup setup: chat templates / seq-rm tests (context alloc already happened during load)

// coarse load-stage reporting for router mode; progress in [0,1], or <0 if indeterminate
typedef void (*common_load_stage_callback)(const char * stage, float progress, void * user_data);

struct common_params {
int32_t n_predict = -1; // max. number of new tokens to predict, -1 == no limit
int32_t n_ctx = 0; // context size, 0 == context the model was trained with
Expand Down Expand Up @@ -701,6 +710,9 @@ struct common_params {
// return false from callback to abort model loading or true to continue
llama_progress_callback load_progress_callback = NULL;
void * load_progress_callback_user_data = NULL;
// optional callback for coarse load-stage reporting (used by router mode to drive a progress UI)
common_load_stage_callback load_stage_callback = NULL;
void * load_stage_callback_user_data = NULL;
bool no_alloc = false; // Don't allocate model buffers
};

Expand Down
44 changes: 36 additions & 8 deletions common/download.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -475,17 +475,38 @@ std::pair<long, std::vector<char>> common_remote_get_content(const std::string
return { res->status, std::move(buf) };
}

static common_download_callback * g_default_download_callback = nullptr;

void common_download_set_default_callback(common_download_callback * callback) {
g_default_download_callback = callback;
}

int common_download_file_single(const std::string & url,
const std::string & path,
const common_download_opts & opts,
bool skip_etag) {
if (!opts.offline) {
// resolve the effective callback: per-call > process-wide default
common_download_opts eff = opts;
if (!eff.callback) {
eff.callback = g_default_download_callback;
}

if (!eff.offline) {
ProgressBar tty_cb;
common_download_opts online_opts = opts;
if (!online_opts.callback) {
online_opts.callback = &tty_cb;
if (!eff.callback) {
eff.callback = &tty_cb;
}
return common_download_file_single_online(url, path, online_opts, skip_etag);
const int status = common_download_file_single_online(url, path, eff, skip_etag);
// the online path returns 304 (cached, not modified) before emitting any callback;
// surface a cached start/done pair so aggregators still see every file exactly once
if (status == 304 && eff.callback) {
common_download_progress p;
p.url = url;
p.cached = true;
eff.callback->on_start(p);
eff.callback->on_done(p, true);
}
return status;
}

if (!std::filesystem::exists(path)) {
Expand All @@ -496,12 +517,12 @@ int common_download_file_single(const std::string & url,
LOG_DBG("%s: using cached file (offline mode): %s\n", __func__, path.c_str());

// notify the callback that the file was cached
if (opts.callback) {
if (eff.callback) {
common_download_progress p;
p.url = url;
p.cached = true;
opts.callback->on_start(p);
opts.callback->on_done(p, true);
eff.callback->on_start(p);
eff.callback->on_done(p, true);
}

return 304; // Not Modified - fake cached response
Expand Down Expand Up @@ -814,6 +835,13 @@ common_download_model_result common_download_model(const common_params_model &
return result;
}

// announce the full file set up front so a progress aggregator can form a stable
// denominator (multi-part GGUFs download in parallel below). use the effective callback,
// mirroring common_download_file_single's per-call > process-wide resolution.
if (common_download_callback * cb = opts.callback ? opts.callback : g_default_download_callback) {
cb->on_plan(tasks.size());
}

std::vector<std::future<int>> futures;
for (const auto & task : tasks) {
futures.push_back(std::async(std::launch::async,
Expand Down
8 changes: 8 additions & 0 deletions common/download.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,20 @@ struct common_download_progress {
class common_download_callback {
public:
virtual ~common_download_callback() = default;
// called once before any file starts, with the number of files about to be downloaded;
// lets aggregators know the full set up front (e.g. multi-part GGUFs) instead of discovering
// files lazily as their callbacks fire. optional: default no-op.
virtual void on_plan(size_t total_files) { (void) total_files; }
virtual void on_start(const common_download_progress & p) = 0;
virtual void on_update(const common_download_progress & p) = 0;
virtual void on_done(const common_download_progress & p, bool ok) = 0;
virtual bool is_cancelled() const { return false; }
};

// process-wide default download callback, used when common_download_opts::callback is unset (nullptr to clear).
// borrowed, not owned: must outlive any download that uses it.
void common_download_set_default_callback(common_download_callback * callback);

struct common_remote_params {
common_header_list headers;
long timeout = 0; // in seconds, 0 means no timeout
Expand Down
6 changes: 6 additions & 0 deletions tools/server/server-context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1004,6 +1004,12 @@ struct server_context_impl {
}
}

// init done (weights + warmup); mark finalize so the UI doesn't stick on the warmup phase
// while the post-warmup setup below (chat templates / seq-rm tests) runs
if (params_base.load_stage_callback) {
params_base.load_stage_callback(COMMON_LOAD_STAGE_FINALIZE, -1.0f, params_base.load_stage_callback_user_data);
}

if (!llama_memory_can_shift(llama_get_memory(ctx_tgt))) {
if (params_base.ctx_shift) {
params_base.ctx_shift = false;
Expand Down
147 changes: 144 additions & 3 deletions tools/server/server-models.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
#include <atomic>
#include <chrono>
#include <queue>
#include <unordered_map>
#include <unordered_set>
#include <filesystem>
#include <random>
#include <sstream>
Expand Down Expand Up @@ -46,6 +48,9 @@ extern char **environ;
#define CMD_CHILD_TO_ROUTER_READY "cmd_child_to_router:ready" // also sent when waking up from sleep
#define CMD_CHILD_TO_ROUTER_SLEEP "cmd_child_to_router:sleep"
#define CMD_CHILD_TO_ROUTER_INFO "cmd_child_to_router:info:" // followed by json string
// load stage report: "<stage>" or "<stage>:<fraction 0..1>" (no fraction => indeterminate stage)
// stages are the COMMON_LOAD_STAGE_* names (download / load / warmup / finalize)
#define CMD_CHILD_TO_ROUTER_STAGE "cmd_child_to_router:stage:"

// address for child process, this is needed because router may run on 0.0.0.0
// ref: https://github.com/ggml-org/llama.cpp/issues/17862
Expand Down Expand Up @@ -762,9 +767,11 @@ void server_models::load(const std::string & name) {
instance_t inst;
inst.meta = meta;
inst.meta.port = get_free_port();
inst.meta.status = SERVER_MODEL_STATUS_LOADING;
inst.meta.loaded_info = json{};
inst.meta.last_used = ggml_time_ms();
inst.meta.status = SERVER_MODEL_STATUS_LOADING;
inst.meta.loaded_info = json{};
inst.meta.load_stage = ""; // reset stale stage/progress from a previous load
inst.meta.load_progress = -1.0f;
inst.meta.last_used = ggml_time_ms();

if (inst.meta.port <= 0) {
throw std::runtime_error("failed to get a port number");
Expand Down Expand Up @@ -821,6 +828,16 @@ void server_models::load(const std::string & name) {
this->update_loaded_info(name, str);
} else if (string_starts_with(buffer, CMD_CHILD_TO_ROUTER_SLEEP)) {
this->update_status(name, SERVER_MODEL_STATUS_SLEEPING, 0);
} else if (string_starts_with(buffer, CMD_CHILD_TO_ROUTER_STAGE)) {
std::string payload = string_strip(str.substr(strlen(CMD_CHILD_TO_ROUTER_STAGE)));
std::string stage = payload;
float progress = -1.0f;
auto colon = payload.find(':');
if (colon != std::string::npos) {
stage = payload.substr(0, colon);
progress = strtof(payload.c_str() + colon + 1, nullptr);
}
this->update_stage(name, stage, progress);
}
}
} else {
Expand Down Expand Up @@ -985,6 +1002,16 @@ void server_models::update_loaded_info(const std::string & name, std::string & r
cv.notify_all();
}

void server_models::update_stage(const std::string & name, const std::string & stage, float progress) {
std::unique_lock<std::mutex> lk(mutex);
auto it = mapping.find(name);
if (it != mapping.end()) {
it->second.meta.load_stage = stage;
it->second.meta.load_progress = progress;
}
cv.notify_all();
}

void server_models::wait_until_loading_finished(const std::string & name) {
std::unique_lock<std::mutex> lk(mutex);
cv.wait(lk, [this, &name]() {
Expand Down Expand Up @@ -1106,6 +1133,112 @@ void server_models::notify_router_sleeping_state(bool is_sleeping) {
common_log_resume(common_log_main());
}

void server_models::notify_router_stage(const char * stage, float progress) {
// write in a single fputs to avoid interleaving with loader logging on the shared stdout
char line[96];
if (progress >= 0.0f) {
snprintf(line, sizeof(line), "%s%s:%.4f\n", CMD_CHILD_TO_ROUTER_STAGE, stage, progress);
} else {
snprintf(line, sizeof(line), "%s%s\n", CMD_CHILD_TO_ROUTER_STAGE, stage);
}
common_log_pause(common_log_main());
fflush(stdout);
fputs(line, stdout);
fflush(stdout);
common_log_resume(common_log_main());
}

// funnels all stage emissions to the router. progress callbacks fire per tensor/chunk (hundreds of
// times), so only forward on phase change, integer-percent advance, or completion.
struct stage_emitter {
std::string last_stage;
int last_pct = -1;

void emit(const char * stage, float progress) {
const int pct = (int) (progress * 100.0f);
if (last_stage != stage || pct != last_pct || progress >= 1.0f) {
last_stage = stage;
last_pct = pct;
server_models::notify_router_stage(stage, progress);
}
}
};

// single model per child; emission sources don't overlap in time (download at arg-parse, then
// single-threaded load/warmup/finalize), and the download callback serializes its part threads.
static stage_emitter g_stage_emitter;

bool server_models::child_load_progress_callback(float progress, void * /*user_data*/) {
g_stage_emitter.emit(COMMON_LOAD_STAGE_LOAD, progress);
return true; // never abort loading
}

void server_models::child_load_stage_callback(const char * stage, float progress, void * /*user_data*/) {
g_stage_emitter.emit(stage, progress); // coarse phase markers (e.g. warmup / finalize)
}

// forwards download progress to the router. multi-part GGUFs download parts in parallel, each with
// its own byte counts, so aggregate across parts for a whole-model 0->1 instead of one racing part.
//
// the per-file total arrives lazily (from each file's HEAD), so a naive sum over only-seen files has
// a growing denominator and the percent regresses (e.g. one part hits 100% before the rest register).
// to avoid that, stay indeterminate until every expected file is resolved (size known, or finished
// for a cached/unknown-size file); only then is the denominator stable and the percent monotonic.
struct child_download_progress_callback : common_download_callback {
void on_plan(size_t total_files) override {
std::lock_guard<std::mutex> lock(mutex);
// fresh accounting per download pass (defensive: child loads one model, but a validation
// pass could call this twice) so a stale denominator can't leak across passes
files.clear();
resolved.clear();
expected = total_files;
}
void on_start(const common_download_progress & p) override { record(p, /*done=*/false); }
void on_update(const common_download_progress & p) override { record(p, /*done=*/false); }
void on_done(const common_download_progress & p, bool /*ok*/) override { record(p, /*done=*/true); }

private:
std::mutex mutex;
std::unordered_map<std::string, std::pair<size_t, size_t>> files; // url -> {downloaded, total}
std::unordered_set<std::string> resolved; // urls with a known size or finished
size_t expected = 0; // total files, from on_plan

void record(const common_download_progress & p, bool done) {
// serializing here also protects g_stage_emitter's state from the parallel part threads
std::lock_guard<std::mutex> lock(mutex);

if (p.total > 0) {
files[p.url] = { p.downloaded, p.total };
resolved.insert(p.url);
} else if (done) {
// cached / unknown-size file finished: count it resolved so we don't wait forever
files.emplace(p.url, std::make_pair<size_t, size_t>(0, 0));
resolved.insert(p.url);
}

// until every file's size is known the denominator keeps growing; report indeterminate so
// the UI shows an animated download phase instead of a percentage that jumps backwards
if (expected == 0 || resolved.size() < expected) {
g_stage_emitter.emit(COMMON_LOAD_STAGE_DOWNLOAD, -1.0f);
return;
}

size_t downloaded = 0;
size_t total = 0;
for (const auto & f : files) {
downloaded += f.second.first;
total += f.second.second;
}
// total == 0 means every file was cached/unknown-size: nothing to transfer, report complete
g_stage_emitter.emit(COMMON_LOAD_STAGE_DOWNLOAD, total > 0 ? (float) downloaded / (float) total : 1.0f);
}
};

void server_models::register_child_download_progress() {
static child_download_progress_callback cb;
common_download_set_default_callback(&cb);
}


//
// server_models_routes
Expand Down Expand Up @@ -1238,6 +1371,14 @@ void server_models_routes::init_routes() {
{"value", server_model_status_to_string(meta.status)},
{"args", meta.args},
};
if (meta.status == SERVER_MODEL_STATUS_LOADING) {
if (!meta.load_stage.empty()) {
status["stage"] = meta.load_stage;
}
if (meta.load_progress >= 0.0f) {
status["progress"] = meta.load_progress;
}
}
if (!meta.preset.name.empty()) {
common_preset preset_copy = meta.preset;
unset_reserved_args(preset_copy, false);
Expand Down
16 changes: 16 additions & 0 deletions tools/server/server-models.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ struct server_model_meta {
int stop_timeout = 0; // seconds to wait before force-killing the model instance during shutdown
mtmd_caps multimodal; // multimodal capabilities
bool need_download = false; // whether the model needs to be downloaded before loading
std::string load_stage = ""; // current load stage ("download"/"load"/"warmup"/"finalize"), valid while status == LOADING
float load_progress = -1.0f; // progress 0..1 within the current stage, or <0 if indeterminate

bool is_ready() const {
return status == SERVER_MODEL_STATUS_LOADED;
Expand Down Expand Up @@ -150,6 +152,7 @@ struct server_models {
// update the status of a model instance (thread-safe)
void update_status(const std::string & name, server_model_status status, int exit_code);
void update_loaded_info(const std::string & name, std::string & raw_info);
void update_stage(const std::string & name, const std::string & stage, float progress);

// wait until the model instance is fully loaded (thread-safe)
// return when the model no longer in "loading" state
Expand All @@ -172,6 +175,19 @@ struct server_models {

// notify the router server that the sleeping state has changed
static void notify_router_sleeping_state(bool sleeping);

// notify the router server of the current load stage (progress < 0 => indeterminate phase)
static void notify_router_stage(const char * stage, float progress);

// llama_progress_callback for tensor loading: forwards throttled "load" progress to the router
static bool child_load_progress_callback(float progress, void * user_data);

// common_params load_stage_callback: forwards a coarse phase marker (e.g. "finalize") to the router
static void child_load_stage_callback(const char * stage, float progress, void * user_data);

// register a process-wide download progress callback that forwards "download" progress to the router
// (must be called before model download happens, i.e. before common_params_parse)
static void register_child_download_progress();
};

struct server_models_routes {
Expand Down
Loading