Skip to content

Commit 7ae15ed

Browse files
committed
all_worker_clients_unsubscribed
1 parent 215d5af commit 7ae15ed

File tree

3 files changed

+21
-3
lines changed

3 files changed

+21
-3
lines changed

include/ngx_http_push_stream_module.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,7 @@ typedef struct {
129129
ngx_http_complex_value_t *allowed_origins;
130130
ngx_http_complex_value_t *channel_created_request_url;
131131
ngx_http_complex_value_t *channel_destroyed_request_url;
132+
ngx_http_complex_value_t *all_worker_clients_unsubscribed_request_url;
132133
ngx_http_complex_value_t *client_subscribed_request_url;
133134
ngx_http_complex_value_t *client_unsubscribed_request_url;
134135
ngx_http_complex_value_t *client_publish_request_url;

src/ngx_http_push_stream_module_setup.c

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,12 @@ static ngx_command_t ngx_http_push_stream_commands[] = {
261261
NGX_HTTP_LOC_CONF_OFFSET,
262262
offsetof(ngx_http_push_stream_loc_conf_t, channel_destroyed_request_url),
263263
NULL } ,
264+
{ ngx_string("push_stream_all_worker_clients_unsubscribed_request"),
265+
NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_HTTP_LIF_CONF|NGX_CONF_TAKE1,
266+
ngx_http_set_complex_value_slot,
267+
NGX_HTTP_LOC_CONF_OFFSET,
268+
offsetof(ngx_http_push_stream_loc_conf_t, all_worker_clients_unsubscribed_request_url),
269+
NULL } ,
264270
{ ngx_string("push_stream_client_subscribed_request"),
265271
NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_HTTP_LIF_CONF|NGX_CONF_TAKE1,
266272
ngx_http_set_complex_value_slot,
@@ -620,6 +626,7 @@ ngx_http_push_stream_create_loc_conf(ngx_conf_t *cf)
620626
lcf->allowed_origins = NULL;
621627
lcf->channel_created_request_url = NULL;
622628
lcf->channel_destroyed_request_url = NULL;
629+
lcf->all_worker_clients_unsubscribed_request_url = NULL;
623630
lcf->client_subscribed_request_url = NULL;
624631
lcf->client_unsubscribed_request_url = NULL;
625632
lcf->client_publish_request_url = NULL;
@@ -681,6 +688,10 @@ ngx_http_push_stream_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child)
681688
conf->channel_destroyed_request_url = prev->channel_destroyed_request_url ;
682689
}
683690

691+
if (conf->all_worker_clients_unsubscribed_request_url == NULL) {
692+
conf->all_worker_clients_unsubscribed_request_url = prev->all_worker_clients_unsubscribed_request_url ;
693+
}
694+
684695
if (conf->client_subscribed_request_url == NULL) {
685696
conf->client_subscribed_request_url = prev->client_subscribed_request_url ;
686697
}

src/ngx_http_push_stream_module_utils.c

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,8 @@ ngx_http_push_stream_delete_channels_data(ngx_http_push_stream_shm_data_t *data)
116116

117117
ngx_http_push_stream_loc_conf_t *cf = ngx_http_get_module_loc_conf(subscriber->request, ngx_http_push_stream_module);
118118
ngx_http_push_stream_send_event(mcf, ngx_cycle->log, subscription->channel, &NGX_HTTP_PUSH_STREAM_EVENT_TYPE_CLIENT_UNSUBSCRIBED, subscriber->request->pool, subscriber->request, cf->client_unsubscribed_request_url);
119+
if (!channel->subscribers) ngx_http_push_stream_send_event(mcf, ngx_cycle->log, subscription->channel, NULL, subscriber->request->pool, subscriber->request, cf->channel_destroyed_request_url);
120+
if (!worker->subscribers) ngx_http_push_stream_send_event(mcf, ngx_cycle->log, subscription->channel, NULL, subscriber->request->pool, subscriber->request, cf->all_worker_clients_unsubscribed_request_url);
119121

120122
if (subscriber->longpolling) {
121123
ngx_http_push_stream_add_polling_headers(subscriber->request, ngx_time(), 0, subscriber->request->pool);
@@ -472,7 +474,7 @@ ngx_http_push_stream_send_event(ngx_http_push_stream_main_conf_t *mcf, ngx_log_t
472474
ngx_http_push_stream_shm_data_t *data = mcf->shm_data;
473475
ngx_pool_t *temp_pool = received_temp_pool;
474476

475-
if ((mcf->events_channel_id.len > 0) && !channel->for_events) {
477+
if ((mcf->events_channel_id.len > 0) && !channel->for_events && event_type) {
476478
if ((temp_pool == NULL) && ((temp_pool = ngx_create_pool(4096, log)) == NULL)) {
477479
return NGX_ERROR;
478480
}
@@ -489,9 +491,11 @@ ngx_http_push_stream_send_event(ngx_http_push_stream_main_conf_t *mcf, ngx_log_t
489491
}
490492
}
491493

492-
if ((r != NULL) && (uri != NULL)) {
494+
if (r && uri) {
495+
ngx_str_t vv_uri;
496+
ngx_http_push_stream_complex_value(r, uri, &vv_uri);
493497
ngx_http_request_t *sr;
494-
if (ngx_http_subrequest(r, &uri->value, &r->args, &sr, NULL, NGX_HTTP_SUBREQUEST_BACKGROUND) != NGX_OK) {
498+
if (ngx_http_subrequest(r, &vv_uri, &r->args, &sr, NULL, NGX_HTTP_SUBREQUEST_BACKGROUND) != NGX_OK) {
495499
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "ngx_http_subrequest != NGX_OK");
496500
}
497501
}
@@ -1589,6 +1593,8 @@ ngx_http_push_stream_worker_subscriber_cleanup(ngx_http_push_stream_subscriber_t
15891593
ngx_shmtx_unlock(subscription->channel->mutex);
15901594

15911595
ngx_http_push_stream_send_event(mcf, ngx_cycle->log, subscription->channel, &NGX_HTTP_PUSH_STREAM_EVENT_TYPE_CLIENT_UNSUBSCRIBED, worker_subscriber->request->pool, worker_subscriber->request, cf->client_unsubscribed_request_url);
1596+
if (!subscription->channel->subscribers) ngx_http_push_stream_send_event(mcf, ngx_cycle->log, subscription->channel, NULL, worker_subscriber->request->pool, worker_subscriber->request, cf->channel_destroyed_request_url);
1597+
if (!subscription->channel_worker_sentinel->subscribers) ngx_http_push_stream_send_event(mcf, ngx_cycle->log, subscription->channel, NULL, worker_subscriber->request->pool, worker_subscriber->request, cf->all_worker_clients_unsubscribed_request_url);
15921598
}
15931599

15941600
ngx_shmtx_lock(&shpool->mutex);

0 commit comments

Comments
 (0)