Skip to content

server : use std::move whenever possible #12936

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Apr 18, 2025
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 36 additions & 36 deletions examples/server/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1552,11 +1552,11 @@ struct server_queue {
std::condition_variable condition_tasks;

// callback functions
std::function<void(server_task)> callback_new_task;
std::function<void(void)> callback_update_slots;
std::function<void(server_task&&)> callback_new_task;
std::function<void(void)> callback_update_slots;

// Add a new task to the end of the queue
int post(server_task task, bool front = false) {
int post(server_task && task, bool front = false) {
std::unique_lock<std::mutex> lock(mutex_tasks);
GGML_ASSERT(task.id != -1);
// if this is cancel task make sure to clean up pending tasks
Expand All @@ -1565,16 +1565,16 @@ struct server_queue {
}
QUE_DBG("new task, id = %d, front = %d\n", task.id, front);
if (front) {
queue_tasks.push_front(std::move(task));
queue_tasks.push_front(task);
} else {
queue_tasks.push_back(std::move(task));
queue_tasks.push_back(task);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without these moves, the task will be copied here. Better to keep them.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, seems like I mistakenly removed this move. Added them back in 240ea24

}
condition_tasks.notify_one();
return task.id;
}

// multi-task version of post()
int post(std::vector<server_task> & tasks, bool front = false) {
int post(std::vector<server_task> && tasks, bool front = false) {
std::unique_lock<std::mutex> lock(mutex_tasks);
for (auto & task : tasks) {
if (task.id == -1) {
Expand All @@ -1596,10 +1596,10 @@ struct server_queue {
}

// Add a new task, but defer until one slot is available
void defer(server_task task) {
void defer(server_task && task) {
std::unique_lock<std::mutex> lock(mutex_tasks);
QUE_DBG("defer task, id = %d\n", task.id);
queue_tasks_deferred.push_back(std::move(task));
queue_tasks_deferred.push_back(task);
condition_tasks.notify_one();
}

Expand All @@ -1611,7 +1611,7 @@ struct server_queue {
}

// Register function to process a new task
void on_new_task(std::function<void(server_task)> callback) {
void on_new_task(std::function<void(server_task&&)> callback) {
callback_new_task = std::move(callback);
}

Expand Down Expand Up @@ -1660,7 +1660,7 @@ struct server_queue {
lock.unlock();
break;
}
server_task task = queue_tasks.front();
server_task task = std::move(queue_tasks.front());
queue_tasks.pop_front();
lock.unlock();

Expand Down Expand Up @@ -2004,7 +2004,7 @@ struct server_context {

slot.reset();

slots.push_back(slot);
slots.push_back(std::move(slot));
}

default_generation_settings_for_props = slots[0].to_json();
Expand Down Expand Up @@ -2105,7 +2105,7 @@ struct server_context {
return true;
}

bool launch_slot_with_task(server_slot & slot, const server_task & task) {
bool launch_slot_with_task(server_slot & slot, const server_task && task) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
bool launch_slot_with_task(server_slot & slot, const server_task && task) {
bool launch_slot_with_task(server_slot & slot, server_task && task) {

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also move the params and prompt_tokens from task to slot, so I think this is currently not possible

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is not possible? With the const present, the moves of params and prompt_tokens are actually copies. Removing the const from the argument will make them proper moves.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is that the prompt_tokens will soon contains image tokens as unique_ptr, so it will require a proper move.

And even without image (the current case where we have only text tokens), I think a proper move still make sense here. This funtion is the last place where the task want to go (i.e. we expect task to be destroyed here)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah now looking back at this, I realized that I'm thinking the reverted (I thought that you want to add the const) ; Sorry for the confusion 🥲

slot.reset();
slot.id_task = task.id;
slot.index = task.index;
Expand Down Expand Up @@ -2547,10 +2547,10 @@ struct server_context {
server_task task(SERVER_TASK_TYPE_CANCEL);
task.id_target = id_task;
queue_results.remove_waiting_task_id(id_task);
cancel_tasks.push_back(task);
cancel_tasks.push_back(std::move(task));
}
// push to beginning of the queue, so it has highest priority
queue_tasks.post(cancel_tasks, true);
queue_tasks.post(std::move(cancel_tasks), true);
}

// receive the results from task(s)
Expand Down Expand Up @@ -2637,7 +2637,7 @@ struct server_context {
// Functions to process the task
//

void process_single_task(server_task task) {
void process_single_task(server_task && task) {
switch (task.type) {
case SERVER_TASK_TYPE_COMPLETION:
case SERVER_TASK_TYPE_INFILL:
Expand All @@ -2651,17 +2651,17 @@ struct server_context {
if (slot == nullptr) {
// if no slot is available, we defer this task for processing later
SRV_DBG("no slot is available, defer task, id_task = %d\n", task.id);
queue_tasks.defer(task);
queue_tasks.defer(std::move(task));
break;
}
if (slot->is_processing()) {
// if requested slot is unavailable, we defer this task for processing later
SRV_DBG("requested slot is unavailable, defer task, id_task = %d\n", task.id);
queue_tasks.defer(task);
queue_tasks.defer(std::move(task));
break;
}

if (!launch_slot_with_task(*slot, task)) {
if (!launch_slot_with_task(*slot, std::move(task))) {
SRV_ERR("failed to launch slot with task, id_task = %d\n", task.id);
break;
}
Expand Down Expand Up @@ -2740,7 +2740,7 @@ struct server_context {
if (slot->is_processing()) {
// if requested slot is unavailable, we defer this task for processing later
SRV_DBG("requested slot is unavailable, defer task, id_task = %d\n", task.id);
queue_tasks.defer(task);
queue_tasks.defer(std::move(task));
break;
}

Expand Down Expand Up @@ -2776,7 +2776,7 @@ struct server_context {
if (slot->is_processing()) {
// if requested slot is unavailable, we defer this task for processing later
SRV_DBG("requested slot is unavailable, defer task, id_task = %d\n", task.id);
queue_tasks.defer(task);
queue_tasks.defer(std::move(task));
break;
}

Expand Down Expand Up @@ -2819,7 +2819,7 @@ struct server_context {
if (slot->is_processing()) {
// if requested slot is unavailable, we defer this task for processing later
SRV_DBG("requested slot is unavailable, defer task, id_task = %d\n", task.id);
queue_tasks.defer(task);
queue_tasks.defer(std::move(task));
break;
}

Expand Down Expand Up @@ -2871,7 +2871,7 @@ struct server_context {

server_task task(SERVER_TASK_TYPE_NEXT_RESPONSE);
task.id = queue_tasks.get_new_id();
queue_tasks.post(task);
queue_tasks.post(std::move(task));
}

// apply context-shift if needed
Expand Down Expand Up @@ -3636,7 +3636,7 @@ int main(int argc, char ** argv) {
server_task task(SERVER_TASK_TYPE_METRICS);
task.id = ctx_server.queue_tasks.get_new_id();
ctx_server.queue_results.add_waiting_task_id(task.id);
ctx_server.queue_tasks.post(task, true); // high-priority task
ctx_server.queue_tasks.post(std::move(task), true); // high-priority task

// get the result
server_task_result_ptr result = ctx_server.queue_results.recv(task.id);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The task should not be referenced after being moved. We can make this safer like this:

        const auto id = ctx_server.queue_tasks.get_new_id();

        {
            server_task task(SERVER_TASK_TYPE_METRICS);
            task.id = id;
            ctx_server.queue_results.add_waiting_task_id(task.id);
            ctx_server.queue_tasks.post(std::move(task), true); // high-priority task
        }

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, that's a good idea. I also spotted some places where task is used after the move. It should be fixed in my last commit: 9487165

Expand Down Expand Up @@ -3674,7 +3674,7 @@ int main(int argc, char ** argv) {
task.metrics_reset_bucket = true;

ctx_server.queue_results.add_waiting_task_id(task.id);
ctx_server.queue_tasks.post(task, true); // high-priority task
ctx_server.queue_tasks.post(std::move(task), true); // high-priority task

// get the result
server_task_result_ptr result = ctx_server.queue_results.recv(task.id);
Expand Down Expand Up @@ -3782,7 +3782,7 @@ int main(int argc, char ** argv) {
task.slot_action.filepath = filepath;

ctx_server.queue_results.add_waiting_task_id(task.id);
ctx_server.queue_tasks.post(task);
ctx_server.queue_tasks.post(std::move(task));

server_task_result_ptr result = ctx_server.queue_results.recv(task.id);
ctx_server.queue_results.remove_waiting_task_id(task.id);
Expand Down Expand Up @@ -3811,7 +3811,7 @@ int main(int argc, char ** argv) {
task.slot_action.filepath = filepath;

ctx_server.queue_results.add_waiting_task_id(task.id);
ctx_server.queue_tasks.post(task);
ctx_server.queue_tasks.post(std::move(task));

server_task_result_ptr result = ctx_server.queue_results.recv(task.id);
ctx_server.queue_results.remove_waiting_task_id(task.id);
Expand All @@ -3831,7 +3831,7 @@ int main(int argc, char ** argv) {
task.slot_action.slot_id = id_slot;

ctx_server.queue_results.add_waiting_task_id(task.id);
ctx_server.queue_tasks.post(task);
ctx_server.queue_tasks.post(std::move(task));

server_task_result_ptr result = ctx_server.queue_results.recv(task.id);
ctx_server.queue_results.remove_waiting_task_id(task.id);
Expand Down Expand Up @@ -3965,15 +3965,15 @@ int main(int argc, char ** argv) {
task.params.oaicompat_cmpl_id = completion_id;
// oaicompat_model is already populated by params_from_json_cmpl

tasks.push_back(task);
tasks.push_back(std::move(task));
}
} catch (const std::exception & e) {
res_error(res, format_error_response(e.what(), ERROR_TYPE_INVALID_REQUEST));
return;
}

ctx_server.queue_results.add_waiting_tasks(tasks);
ctx_server.queue_tasks.post(tasks);
ctx_server.queue_tasks.post(std::move(tasks));

bool stream = json_value(data, "stream", false);
const auto task_ids = server_task::get_list_id(tasks);
Expand Down Expand Up @@ -4280,11 +4280,11 @@ int main(int argc, char ** argv) {
// OAI-compat
task.params.oaicompat = oaicompat;

tasks.push_back(task);
tasks.push_back(std::move(task));
}

ctx_server.queue_results.add_waiting_tasks(tasks);
ctx_server.queue_tasks.post(tasks);
ctx_server.queue_tasks.post(std::move(tasks));

// get the result
std::unordered_set<int> task_ids = server_task::get_list_id(tasks);
Expand Down Expand Up @@ -4376,11 +4376,11 @@ int main(int argc, char ** argv) {
task.id = ctx_server.queue_tasks.get_new_id();
task.index = i;
task.prompt_tokens = format_rerank(ctx_server.vocab, tokenized_query, tokenized_docs[i]);
tasks.push_back(task);
tasks.push_back(std::move(task));
}

ctx_server.queue_results.add_waiting_tasks(tasks);
ctx_server.queue_tasks.post(tasks);
ctx_server.queue_tasks.post(std::move(tasks));

// get the result
std::unordered_set<int> task_ids = server_task::get_list_id(tasks);
Expand Down Expand Up @@ -4435,7 +4435,7 @@ int main(int argc, char ** argv) {
task.id = ctx_server.queue_tasks.get_new_id();
task.set_lora = parse_lora_request(ctx_server.params_base.lora_adapters, body);
ctx_server.queue_results.add_waiting_task_id(task.id);
ctx_server.queue_tasks.post(task);
ctx_server.queue_tasks.post(std::move(task));

server_task_result_ptr result = ctx_server.queue_results.recv(task.id);
ctx_server.queue_results.remove_waiting_task_id(task.id);
Expand Down Expand Up @@ -4582,8 +4582,8 @@ int main(int argc, char ** argv) {
common_chat_templates_source(ctx_server.chat_templates.get()),
common_chat_format_example(ctx_server.chat_templates.get(), ctx_server.params_base.use_jinja).c_str());

ctx_server.queue_tasks.on_new_task([&ctx_server](const server_task & task) {
ctx_server.process_single_task(task);
ctx_server.queue_tasks.on_new_task([&ctx_server](server_task && task) {
ctx_server.process_single_task(std::move(task));
});

ctx_server.queue_tasks.on_update_slots([&ctx_server]() {
Expand Down
Loading