Skip to content

Commit 1f128c4

Browse files
authored
Merge pull request #213 from Gaurav-Gangalwar/push
work_pool: Fix for efficient scheduling.
2 parents 51ff851 + c8310d0 commit 1f128c4

File tree

3 files changed

+55
-39
lines changed

3 files changed

+55
-39
lines changed

ntirpc/rpc/work_pool.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ struct work_pool_thread {
7272
char worker_name[16];
7373
pthread_t pt;
7474
uint32_t worker_index;
75+
bool wakeup;
7576
};
7677

7778
typedef void (*work_pool_fun_t) (struct work_pool_entry *);

src/svc.c

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -184,10 +184,15 @@ svc_init(svc_init_params *params)
184184
if (__svc_params->ioq.thrd_max < params->ioq_thrd_max)
185185
__svc_params->ioq.thrd_max = params->ioq_thrd_max;
186186

187-
work_pool_params.thrd_min = __svc_params->ioq.thrd_min + channels;
187+
work_pool_params.thrd_min = __svc_params->ioq.thrd_min;
188188
work_pool_params.thrd_max = __svc_params->ioq.thrd_max;
189-
if (work_pool_params.thrd_max < work_pool_params.thrd_min)
190-
work_pool_params.thrd_max = work_pool_params.thrd_min;
189+
/*
190+
* thrd_max should > channels.
191+
*/
192+
if (work_pool_params.thrd_max < (work_pool_params.thrd_min +
193+
channels))
194+
work_pool_params.thrd_max = work_pool_params.thrd_min +
195+
channels;
191196

192197
if (work_pool_init(&svc_work_pool, "svc_", &work_pool_params)) {
193198
mutex_unlock(&__svc_params->mtx);

src/work_pool.c

Lines changed: 46 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,6 @@ work_pool_thread(void *arg)
154154

155155
pthread_cond_init(&wpt->pqcond, NULL);
156156
pthread_mutex_lock(&pool->pqh.qmutex);
157-
TAILQ_INSERT_TAIL(&pool->wptqh, wpt, wptq);
158157

159158
wpt->worker_index = atomic_inc_uint32_t(&pool->worker_index);
160159
snprintf(wpt->worker_name, sizeof(wpt->worker_name), "%.5s%" PRIu32,
@@ -185,21 +184,21 @@ work_pool_thread(void *arg)
185184
wpt->work = NULL;
186185
pthread_mutex_lock(&pool->pqh.qmutex);
187186
}
188-
189-
if (0 > pool->pqh.qcount++) {
190-
/* negative for task(s) */
191-
have = TAILQ_FIRST(&pool->pqh.qh);
187+
/*
188+
* Check for any queued work to avoid scheduling.
189+
*/
190+
have = TAILQ_FIRST(&pool->pqh.qh);
191+
if (have) {
192192
TAILQ_REMOVE(&pool->pqh.qh, have, q);
193-
194193
wpt->work = (struct work_pool_entry *)have;
195194
continue;
196195
}
197196

198-
/* positive for waiting worker(s):
199-
* use the otherwise empty pool to hold them,
200-
* simplifying mutex and pointer setup.
197+
/*
198+
* Add myself to waiting queue.
201199
*/
202-
TAILQ_INSERT_TAIL(&pool->pqh.qh, &wpt->pqe, q);
200+
pool->pqh.qcount++;
201+
TAILQ_INSERT_TAIL(&pool->wptqh, wpt, wptq);
203202

204203
__warnx(TIRPC_DEBUG_FLAG_WORKER,
205204
"%s() %s waiting",
@@ -208,32 +207,46 @@ work_pool_thread(void *arg)
208207
clock_gettime(CLOCK_REALTIME_FAST, &ts);
209208
timespec_addms(&ts, pool->timeout_ms);
210209

210+
wpt->wakeup = false;
211+
211212
/* Note: the mutex is the pool _head,
212213
* but the condition is per worker,
213214
* making the signal efficient!
214215
*/
215216
rc = pthread_cond_timedwait(&wpt->pqcond, &pool->pqh.qmutex,
216217
&ts);
217-
if (!wpt->work) {
218-
/* Allow for possible timing race:
219-
* work entry can be submitted by another
220-
* thread during the thread task switch
221-
* after shutdown or timeout?
222-
* Then, has already been removed there.
223-
*/
218+
219+
/*
220+
* Wokeup after work submit.
221+
* It could be shutdown also.
222+
*/
223+
if (!rc) {
224+
if (wpt->wakeup)
225+
continue;
226+
}
227+
228+
/*
229+
* It could be timeout.
230+
* There could be race if submit got lock and
231+
* it will try to wakeup me.
232+
*/
233+
if (!wpt->wakeup) {
224234
pool->pqh.qcount--;
225-
TAILQ_REMOVE(&pool->pqh.qh, &wpt->pqe, q);
235+
TAILQ_REMOVE(&pool->wptqh, wpt, wptq);
236+
} else {
237+
continue;
226238
}
239+
227240
if (rc && rc != ETIMEDOUT) {
228241
__warnx(TIRPC_DEBUG_FLAG_ERROR,
229242
"%s() cond_timedwait failed (%d)\n",
230243
__func__, rc);
231244
break;
232245
}
233-
} while (wpt->work || pool->pqh.qcount < pool->params.thrd_min);
246+
} while (wpt->work || wpt->wakeup ||
247+
pool->pqh.qcount < pool->params.thrd_min);
234248

235249
pool->n_threads--;
236-
TAILQ_REMOVE(&pool->wptqh, wpt, wptq);
237250
pthread_mutex_unlock(&pool->pqh.qmutex);
238251

239252
__warnx(TIRPC_DEBUG_FLAG_WORKER,
@@ -274,26 +287,23 @@ work_pool_submit(struct work_pool *pool, struct work_pool_entry *work)
274287
/* queue is draining */
275288
return (0);
276289
}
277-
pthread_mutex_lock(&pool->pqh.qmutex);
278-
279-
if (0 < pool->pqh.qcount--) {
280-
struct work_pool_thread *wpt = (struct work_pool_thread *)
281-
TAILQ_FIRST(&pool->pqh.qh);
282290

283-
/* positive for waiting worker(s) */
284-
TAILQ_REMOVE(&pool->pqh.qh, &wpt->pqe, q);
285-
wpt->work = work;
286-
287-
/* Note: the mutex is the pool _head,
288-
* but the condition is per worker,
289-
* making the signal efficient!
290-
*/
291+
pthread_mutex_lock(&pool->pqh.qmutex);
292+
/*
293+
* Insert in work queue so that running thread can
294+
* pickup without scheduling.
295+
*/
296+
TAILQ_INSERT_TAIL(&pool->pqh.qh, &work->pqe, q);
297+
struct work_pool_thread *wpt = TAILQ_LAST(&pool->wptqh, work_pool_s);
298+
if (wpt) {
299+
pool->pqh.qcount--;
300+
TAILQ_REMOVE(&pool->wptqh, wpt, wptq);
301+
assert(!wpt->wakeup);
302+
wpt->wakeup = true;
291303
pthread_cond_signal(&wpt->pqcond);
292304
} else {
293-
/* negative for task(s) */
294-
TAILQ_INSERT_TAIL(&pool->pqh.qh, &work->pqe, q);
305+
assert(pool->pqh.qcount == 0);
295306
}
296-
297307
pthread_mutex_unlock(&pool->pqh.qmutex);
298308
return rc;
299309
}

0 commit comments

Comments
 (0)