Skip to content

Commit c419373

Browse files
committed
notify only current worker
1 parent 7ae15ed commit c419373

6 files changed

+14
-13
lines changed

include/ngx_http_push_stream_module_ipc.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ ngx_socket_t ngx_http_push_stream_socketpairs[NGX_MAX_PROCESSES][2];
5050

5151
static ngx_int_t ngx_http_push_stream_register_worker_message_handler(ngx_cycle_t *cycle);
5252

53-
static void ngx_http_push_stream_broadcast(ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_msg_t *msg, ngx_log_t *log, ngx_http_push_stream_main_conf_t *mcf);
53+
static void ngx_http_push_stream_broadcast(ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_msg_t *msg, ngx_log_t *log, ngx_http_push_stream_main_conf_t *mcf, ngx_pid_t *pid);
5454

5555
static ngx_int_t ngx_http_push_stream_alert_worker(ngx_pid_t pid, ngx_int_t slot, ngx_log_t *log, ngx_channel_t command);
5656
#define ngx_http_push_stream_alert_worker_check_messages(pid, slot, log) ngx_http_push_stream_alert_worker(pid, slot, log, NGX_CMD_HTTP_PUSH_STREAM_CHECK_MESSAGES)

include/ngx_http_push_stream_module_utils.h

+2-3
Original file line numberDiff line numberDiff line change
@@ -262,9 +262,8 @@ ngx_chain_t * ngx_http_push_stream_get_buf(ngx_http_request_t *r);
262262
static void ngx_http_push_stream_unescape_uri(ngx_str_t *value);
263263
static void ngx_http_push_stream_complex_value(ngx_http_request_t *r, ngx_http_complex_value_t *val, ngx_str_t *value);
264264

265-
266-
ngx_int_t ngx_http_push_stream_add_msg_to_channel(ngx_http_push_stream_main_conf_t *mcf, ngx_log_t *log, ngx_http_push_stream_channel_t *channel, u_char *text, size_t len, ngx_str_t *event_id, ngx_str_t *event_type, ngx_flag_t store_messages, ngx_pool_t *temp_pool);
267-
void ngx_http_push_stream_add_msg_to_channel_my(ngx_log_t *log, ngx_str_t *id, ngx_str_t *text, ngx_str_t *event_id, ngx_str_t *event_type, ngx_flag_t store_messages, ngx_pool_t *temp_pool);
265+
ngx_int_t ngx_http_push_stream_add_msg_to_channel(ngx_http_push_stream_main_conf_t *mcf, ngx_log_t *log, ngx_http_push_stream_channel_t *channel, u_char *text, size_t len, ngx_str_t *event_id, ngx_str_t *event_type, ngx_flag_t store_messages, ngx_pool_t *temp_pool, ngx_pid_t *pid);
266+
void ngx_http_push_stream_add_msg_to_channel_my(ngx_log_t *log, ngx_str_t *id, ngx_str_t *text, ngx_str_t *event_id, ngx_str_t *event_type, ngx_flag_t store_messages, ngx_pool_t *temp_pool, ngx_pid_t *pid);
268267
ngx_int_t ngx_http_push_stream_send_event(ngx_http_push_stream_main_conf_t *mcf, ngx_log_t *log, ngx_http_push_stream_channel_t *channel, ngx_str_t *event_id, ngx_pool_t *temp_pool, ngx_http_request_t *r, ngx_http_complex_value_t *uri);
269268

270269
static void ngx_http_push_stream_ping_timer_wake_handler(ngx_event_t *ev);

src/ngx_http_push_stream_module_ipc.c

+3-1
Original file line numberDiff line numberDiff line change
@@ -463,7 +463,7 @@ ngx_http_push_stream_send_worker_message(ngx_http_push_stream_channel_t *channel
463463

464464

465465
static void
466-
ngx_http_push_stream_broadcast(ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_msg_t *msg, ngx_log_t *log, ngx_http_push_stream_main_conf_t *mcf)
466+
ngx_http_push_stream_broadcast(ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_msg_t *msg, ngx_log_t *log, ngx_http_push_stream_main_conf_t *mcf, ngx_pid_t *pid)
467467
{
468468
// subscribers are queued up in a local pool. Queue heads, however, are located
469469
// in shared memory, identified by pid.
@@ -474,12 +474,14 @@ ngx_http_push_stream_broadcast(ngx_http_push_stream_channel_t *channel, ngx_http
474474
ngx_shmtx_lock(channel->mutex);
475475
for (q = ngx_queue_head(&channel->workers_with_subscribers); q != ngx_queue_sentinel(&channel->workers_with_subscribers); q = ngx_queue_next(q)) {
476476
worker = ngx_queue_data(q, ngx_http_push_stream_pid_queue_t, queue);
477+
if (pid && *pid != worker->pid) continue;
477478
ngx_http_push_stream_send_worker_message(channel, &worker->subscriptions, worker->pid, worker->slot, msg, &queue_was_empty[worker->slot], log, mcf);
478479
}
479480
ngx_shmtx_unlock(channel->mutex);
480481

481482
for (q = ngx_queue_head(&channel->workers_with_subscribers); q != ngx_queue_sentinel(&channel->workers_with_subscribers); q = ngx_queue_next(q)) {
482483
worker = ngx_queue_data(q, ngx_http_push_stream_pid_queue_t, queue);
484+
if (pid && *pid != worker->pid) continue;
483485
// interprocess communication breakdown
484486
if (queue_was_empty[worker->slot] && (ngx_http_push_stream_alert_worker_check_messages(worker->pid, worker->slot, log) != NGX_OK)) {
485487
ngx_log_error(NGX_LOG_ERR, log, 0, "push stream module: error communicating with worker process, pid: %P, slot: %d", worker->pid, worker->slot);

src/ngx_http_push_stream_module_publisher.c

+1-1
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,7 @@ ngx_http_push_stream_publisher_body_handler(ngx_http_request_t *r)
277277
for (q = ngx_queue_head(&ctx->requested_channels->queue); q != ngx_queue_sentinel(&ctx->requested_channels->queue); q = ngx_queue_next(q)) {
278278
requested_channel = ngx_queue_data(q, ngx_http_push_stream_requested_channel_t, queue);
279279

280-
if (ngx_http_push_stream_add_msg_to_channel(mcf, r->connection->log, requested_channel->channel, buf->pos, ngx_buf_size(buf), event_id, event_type, cf->store_messages, r->pool) != NGX_OK) {
280+
if (ngx_http_push_stream_add_msg_to_channel(mcf, r->connection->log, requested_channel->channel, buf->pos, ngx_buf_size(buf), event_id, event_type, cf->store_messages, r->pool, NULL) != NGX_OK) {
281281
ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
282282
return;
283283
}

src/ngx_http_push_stream_module_utils.c

+5-5
Original file line numberDiff line numberDiff line change
@@ -375,7 +375,7 @@ ngx_http_push_stream_convert_char_to_msg_on_shared(ngx_http_push_stream_main_con
375375
}
376376

377377
void
378-
ngx_http_push_stream_add_msg_to_channel_my(ngx_log_t *log, ngx_str_t *id, ngx_str_t *text, ngx_str_t *event_id, ngx_str_t *event_type, ngx_flag_t store_messages, ngx_pool_t *temp_pool)
378+
ngx_http_push_stream_add_msg_to_channel_my(ngx_log_t *log, ngx_str_t *id, ngx_str_t *text, ngx_str_t *event_id, ngx_str_t *event_type, ngx_flag_t store_messages, ngx_pool_t *temp_pool, ngx_pid_t *pid)
379379
{
380380
// ngx_log_error(NGX_LOG_ERR, log, 0, "id = %V", id);
381381
// ngx_log_error(NGX_LOG_ERR, log, 0, "ngx_http_push_stream_global_shm_zone = %p", ngx_http_push_stream_global_shm_zone);
@@ -389,15 +389,15 @@ ngx_http_push_stream_add_msg_to_channel_my(ngx_log_t *log, ngx_str_t *id, ngx_st
389389
ngx_http_push_stream_channel_t *channel = ngx_http_push_stream_find_channel(id, log, mcf);
390390
// ngx_log_error(NGX_LOG_ERR, log, 0, "channel = %p", channel);
391391
if (channel != NULL) {
392-
if (ngx_http_push_stream_add_msg_to_channel(mcf, log, channel, text->data, text->len, event_id, event_type, store_messages, temp_pool) != NGX_OK) {
392+
if (ngx_http_push_stream_add_msg_to_channel(mcf, log, channel, text->data, text->len, event_id, event_type, store_messages, temp_pool, pid) != NGX_OK) {
393393
ngx_log_error(NGX_LOG_ERR, log, 0, "ngx_http_push_stream_add_msg_to_channel != NGX_OK");
394394
}
395395
}
396396
}
397397
}
398398

399399
ngx_int_t
400-
ngx_http_push_stream_add_msg_to_channel(ngx_http_push_stream_main_conf_t *mcf, ngx_log_t *log, ngx_http_push_stream_channel_t *channel, u_char *text, size_t len, ngx_str_t *event_id, ngx_str_t *event_type, ngx_flag_t store_messages, ngx_pool_t *temp_pool)
400+
ngx_http_push_stream_add_msg_to_channel(ngx_http_push_stream_main_conf_t *mcf, ngx_log_t *log, ngx_http_push_stream_channel_t *channel, u_char *text, size_t len, ngx_str_t *event_id, ngx_str_t *event_type, ngx_flag_t store_messages, ngx_pool_t *temp_pool, ngx_pid_t *pid)
401401
{
402402
ngx_http_push_stream_shm_data_t *data = mcf->shm_data;
403403
ngx_http_push_stream_msg_t *msg;
@@ -459,7 +459,7 @@ ngx_http_push_stream_add_msg_to_channel(ngx_http_push_stream_main_conf_t *mcf, n
459459
}
460460

461461
// send an alert to workers
462-
ngx_http_push_stream_broadcast(channel, msg, log, mcf);
462+
ngx_http_push_stream_broadcast(channel, msg, log, mcf, pid);
463463

464464
// turn on timer to cleanup buffer of old messages
465465
ngx_http_push_stream_buffer_cleanup_timer_set();
@@ -483,7 +483,7 @@ ngx_http_push_stream_send_event(ngx_http_push_stream_main_conf_t *mcf, ngx_log_t
483483
ngx_str_t *event = ngx_http_push_stream_create_str(temp_pool, len);
484484
if (event != NULL) {
485485
ngx_sprintf(event->data, NGX_HTTP_PUSH_STREAM_EVENT_TEMPLATE, event_type, &channel->id);
486-
ngx_http_push_stream_add_msg_to_channel(mcf, log, data->events_channel, event->data, ngx_strlen(event->data), NULL, event_type, 1, temp_pool);
486+
ngx_http_push_stream_add_msg_to_channel(mcf, log, data->events_channel, event->data, ngx_strlen(event->data), NULL, event_type, 1, temp_pool, NULL);
487487
}
488488

489489
if ((received_temp_pool == NULL) && (temp_pool != NULL)) {

src/ngx_http_push_stream_module_websocket.c

+2-2
Original file line numberDiff line numberDiff line change
@@ -416,7 +416,7 @@ ngx_http_push_stream_websocket_reading(ngx_http_request_t *r)
416416
}
417417
}
418418
} else {
419-
if (ngx_http_push_stream_add_msg_to_channel(mcf, r->connection->log, subscription->channel, ctx->frame->payload, ctx->frame->payload_len, NULL, NULL, cf->store_messages, ctx->temp_pool) != NGX_OK) {
419+
if (ngx_http_push_stream_add_msg_to_channel(mcf, r->connection->log, subscription->channel, ctx->frame->payload, ctx->frame->payload_len, NULL, NULL, cf->store_messages, ctx->temp_pool, NULL) != NGX_OK) {
420420
goto finalize;
421421
}
422422
}
@@ -529,7 +529,7 @@ static ngx_int_t ngx_http_push_stream_post_subrequest_handler(ngx_http_request_t
529529
ngx_http_push_stream_subscription_t *subscription = data;
530530
if (r->out != NULL) {
531531
if (r->out->buf != NULL) {
532-
if (ngx_http_push_stream_add_msg_to_channel(mcf, r->connection->log, subscription->channel, r->out->buf->pos, ngx_buf_size(r->out->buf), NULL, NULL, cf->store_messages, r->pool) != NGX_OK) {
532+
if (ngx_http_push_stream_add_msg_to_channel(mcf, r->connection->log, subscription->channel, r->out->buf->pos, ngx_buf_size(r->out->buf), NULL, NULL, cf->store_messages, r->pool, NULL) != NGX_OK) {
533533
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "ngx_http_push_stream_add_msg_to_channel != NGX_OK");
534534
}
535535
}

0 commit comments

Comments
 (0)