Skip to content

Commit

Permalink
[onert/ggml] Add ggml worker to reduce thread create calls
Browse files Browse the repository at this point in the history
This commit is a simple implementation to reduce the number of thread create calls in ggml.
It creates multiple threads at ggml_init and when ggml_graph_compute is submitted, the operation is executed in multiple threads

ONE-DCO-1.0-Signed-off-by: youngsik kim <[email protected]>
  • Loading branch information
ys44kim committed Feb 26, 2025
1 parent d6bd10f commit 06beb28
Show file tree
Hide file tree
Showing 4 changed files with 290 additions and 0 deletions.
1 change: 1 addition & 0 deletions runtime/3rdparty/ggml/src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1302,6 +1302,7 @@ add_library(ggml
${GGML_SOURCES_LLAMAFILE} ${GGML_HEADERS_LLAMAFILE}
${GGML_SOURCES_CANN} ${GGML_HEADERS_CANN}
ggml-aarch64.c ggml-aarch64.h
ggml-worker.c ggml-worker.h
)

if (EMSCRIPTEN)
Expand Down
212 changes: 212 additions & 0 deletions runtime/3rdparty/ggml/src/ggml-worker.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
/*
* Copyright (c) 2025 Samsung Electronics Co., Ltd. All Rights Reserved
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#include <stdbool.h>
#include <errno.h>
#include <sched.h>
#include <string.h>
#include <stdint.h>

#include "ggml.h"
#include "ggml-worker.h"

static TaskQueue *g_queue[MAX_NUM_OF_QUEUES];
static pthread_t g_threads[MAX_NUM_OF_THREADS];
static int g_worker_initialized = false;
static int g_num_of_threads;

static void setSchedule(void)
{
struct sched_param param;

memset(&param, 0, sizeof(param));
if (sched_getparam(0, &param) < 0) {
GGML_ABORT("Failed to sched_getparam");
return;
}

param.sched_priority = 1;
if (sched_setscheduler(0, SCHED_RR, &param) == -1) {
GGML_ABORT("error sched_setscheduler(SCHED_RR) errno:%d", errno);
return;
}
}

static TaskQueue *create_task_queue(int capacity)
{
TaskQueue *queue = (TaskQueue *)malloc(sizeof(TaskQueue));
if (!queue) {
GGML_ABORT("Failed to allocate memory for queue");
return NULL;
}

queue->tasks = (Task *)malloc(sizeof(Task) * capacity);
if (!queue->tasks) {
GGML_ABORT("Failed to allocate memory for tasks");
free(queue);
return NULL;
}

queue->head = 0;
queue->tail = 0;
queue->capacity = capacity;
pthread_mutex_init(&queue->mutex, NULL);
pthread_cond_init(&queue->cond, NULL);
queue->stop = false;

return queue;
}

static void destroy_task_queue(TaskQueue *queue)
{
pthread_mutex_destroy(&queue->mutex);
pthread_cond_destroy(&queue->cond);
free(queue->tasks);
free(queue);
}

static void stop(TaskQueue *queue)
{
pthread_mutex_lock(&queue->mutex);
queue->stop = true;
pthread_cond_broadcast(&queue->cond);
pthread_mutex_unlock(&queue->mutex);
}

static void *worker(void *arg)
{
WorkerArg *worker_arg = (WorkerArg *)arg;
TaskQueue *queue = worker_arg->queue;
int id = worker_arg->id;

free(worker_arg);

setSchedule();

while (true)
{
pthread_mutex_lock(&queue->mutex);

while (queue->head == queue->tail && !queue->stop)
{
pthread_cond_wait(&queue->cond, &queue->mutex);
}

if (queue->stop && queue->head == queue->tail)
{
pthread_mutex_unlock(&queue->mutex);
break;
}

Task task = queue->tasks[queue->tail];
queue->tail = (queue->tail + 1) % queue->capacity;

pthread_mutex_unlock(&queue->mutex);

task.func(task.arg);
}

return NULL;
}

void ggml_worker_init(void)
{
setSchedule();

g_num_of_threads = MAX_NUM_OF_THREADS;

for (int i = 0; i < g_num_of_threads; i++)
{
g_queue[i] = create_task_queue(MAX_NUM_OF_QUEUES);
if (!g_queue[i]) {
GGML_ABORT("Failed to create task queue");
return;
}

WorkerArg *worker_arg = (WorkerArg *)malloc(sizeof(WorkerArg));
worker_arg->queue = g_queue[i];
worker_arg->id = i + 1;
if (pthread_create(&g_threads[i], NULL, worker, worker_arg) != 0) {
GGML_ABORT("Failed to create worker thread");
free(worker_arg);
return;
}
}

g_worker_initialized = true;
}

void ggml_worker_finalize(void)
{
if(g_worker_initialized == false)
{
return;
}

for (int i = 0; i < g_num_of_threads; i++)
{
stop(g_queue[i]);
}

for (int i = 0; i < g_num_of_threads; i++)
{
pthread_join(g_threads[i], NULL);
destroy_task_queue(g_queue[i]);
}

g_worker_initialized = false;
}

void ggml_worker_submit(void (*func)(void *), void *arg)
{
TaskQueue *queue;
static int current_worker_id = 0;

while(true)
{
current_worker_id = (current_worker_id + 1) % g_num_of_threads;

queue = g_queue[current_worker_id];

if(!queue->stop && ((queue->head + 1) % queue->capacity != queue->tail))
{
break;
}
}

queue->tasks[queue->head].func = func;
queue->tasks[queue->head].arg = arg;
queue->head = (queue->head + 1) % queue->capacity;

pthread_mutex_lock(&queue->mutex);
pthread_cond_signal(&queue->cond);
pthread_mutex_unlock(&queue->mutex);
}

void ggml_worker_set_num_threads(unsigned int num_of_threads)
{
if(num_of_threads >= MAX_NUM_OF_THREADS)
{
GGML_ABORT("num_of_threads[%d] exceeds maximum number of threads", num_of_threads);
return;
}

g_num_of_threads = num_of_threads;
}
44 changes: 44 additions & 0 deletions runtime/3rdparty/ggml/src/ggml-worker.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright (c) 2025 Samsung Electronics Co., Ltd. All Rights Reserved
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#define MAX_NUM_OF_THREADS 4
#define MAX_NUM_OF_QUEUES 8

typedef struct {
void (*func)(void *);
void *arg;
} Task;

typedef struct {
Task *tasks;
int head;
int tail;
int capacity;
pthread_mutex_t mutex;
pthread_cond_t cond;
bool stop;
} TaskQueue;

typedef struct {
TaskQueue *queue;
int id;
} WorkerArg;


void ggml_worker_init(void);
void ggml_worker_finalize(void);
void ggml_worker_submit(void (*func)(void *), void *arg);
void ggml_worker_set_num_threads(unsigned int num_of_threads);
33 changes: 33 additions & 0 deletions runtime/3rdparty/ggml/src/ggml.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,15 @@

#define _CRT_SECURE_NO_DEPRECATE // Disables ridiculous "unsafe" warnings on Windows
#define _USE_MATH_DEFINES // For M_PI on MSVC
#define GGML_WORKER

#include "ggml-impl.h"
#include "ggml-quants.h"
#include "ggml.h"
#include "ggml-aarch64.h"
#ifdef GGML_WORKER
#include "ggml-worker.h"
#endif

#if defined(_MSC_VER) || defined(__MINGW32__)
#include <malloc.h> // using malloc.h with MSC/MINGW
Expand Down Expand Up @@ -3593,6 +3597,10 @@ struct ggml_context * ggml_init(struct ggml_init_params params) {
GGML_PRINT_DEBUG("%s: g_state initialized in %f ms\n", __func__, (t_end - t_start)/1000.0f);
}

#ifdef GGML_WORKER
ggml_worker_init();
#endif

is_first_call = false;
}

Expand Down Expand Up @@ -3684,6 +3692,10 @@ void ggml_free(struct ggml_context * ctx) {
GGML_PRINT_DEBUG("%s: context not found\n", __func__);
}

#ifdef GGML_WORKER
ggml_worker_finalize();
#endif

ggml_critical_section_end();
}

Expand Down Expand Up @@ -18788,6 +18800,9 @@ static int ggml_get_n_tasks(struct ggml_tensor * node, int n_threads) {
return n_tasks;
}

#ifdef GGML_WORKER
static atomic_int completed_threads = 0;
#endif
struct ggml_cplan ggml_graph_plan(const struct ggml_cgraph * cgraph, int n_threads) {
if (n_threads <= 0) {
n_threads = GGML_DEFAULT_N_THREADS;
Expand Down Expand Up @@ -18981,13 +18996,19 @@ static thread_ret_t ggml_graph_compute_thread(void * data) {
state->shared->ec = GGML_STATUS_ABORTED;
}

#ifndef GGML_WORKER
ggml_barrier(state->shared);
#endif

if (state->shared->ec != GGML_STATUS_SUCCESS) {
break;
}
}

#ifdef GGML_WORKER
atomic_fetch_add(&completed_threads, 1);
#endif

return 0;
}

Expand All @@ -19010,6 +19031,10 @@ enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cpl
/*.ec =*/ GGML_STATUS_SUCCESS,
};

#ifdef GGML_WORKER
completed_threads = 0;
#endif

#ifdef GGML_USE_OPENMP
if (n_threads > 1) {
#pragma omp parallel num_threads(n_threads)
Expand Down Expand Up @@ -19049,14 +19074,21 @@ enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cpl

// create thread pool
for (int j = 1; j < n_threads; ++j) {
#ifdef GGML_WORKER
ggml_worker_submit((void (*)(void *)) ggml_graph_compute_thread, &workers[j]);
#else
const int rc = ggml_thread_create(&workers[j].thrd, NULL, ggml_graph_compute_thread, &workers[j]);
GGML_ASSERT(rc == 0);
UNUSED(rc);
#endif
}

// this is a work thread too
ggml_graph_compute_thread(&workers[0]);

#ifdef GGML_WORKER
while(completed_threads < n_threads);
#else
// join or kill thread pool
if (n_threads > 1) {
for (int j = 1; j < n_threads; j++) {
Expand All @@ -19065,6 +19097,7 @@ enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cpl
UNUSED(rc);
}
}
#endif
#endif

// don't leave affinity set on the main thread
Expand Down

0 comments on commit 06beb28

Please sign in to comment.