diff --git a/.github/dependabot.yml b/.github/dependabot.yml new file mode 100644 index 00000000..12301490 --- /dev/null +++ b/.github/dependabot.yml @@ -0,0 +1,6 @@ +version: 2 +updates: + - package-ecosystem: "github-actions" + directory: "/" + schedule: + interval: "daily" diff --git a/.github/workflows/docker-publish.yml b/.github/workflows/docker-publish.yml new file mode 100644 index 00000000..e294c68f --- /dev/null +++ b/.github/workflows/docker-publish.yml @@ -0,0 +1,22 @@ +name: Docker +on: + push: + branches: + - master + workflow_dispatch: +jobs: + dispatch: + runs-on: ubuntu-latest + steps: + - env: + GITHUB_TOKEN: ${{ secrets.PUBLIC_REPO_ACCESS_TOKEN }} + INPUTS_CLIENT_PAYLOAD: '{"repository":${{ toJson(github.event.repository.name) }}}' + INPUTS_EVENT_TYPE: latest + INPUTS_REPOSITORY: ${{ github.repository_owner }}/${{ matrix.repo }} + uses: rekgrpth/github-repository-dispatch-shell-action@v1 + strategy: + matrix: + repo: + - angie.docker + - freenginx.docker + - nginx.docker diff --git a/.github/workflows/merge-upstream.yml b/.github/workflows/merge-upstream.yml new file mode 100644 index 00000000..ab96a17a --- /dev/null +++ b/.github/workflows/merge-upstream.yml @@ -0,0 +1,15 @@ +name: Merge +on: + schedule: + - cron: '0 19 * * *' + workflow_dispatch: +jobs: + merge: + env: + GITHUB_TOKEN: ${{ secrets.PUBLIC_REPO_ACCESS_TOKEN }} + runs-on: ubuntu-latest + steps: + - uses: rekgrpth/git-clone-shell-action@v1 + - env: + INPUTS_REPOSITORY: wandenberg/nginx-push-stream-module + uses: rekgrpth/git-fetch-upstream-merge-push-shell-action@v1 diff --git a/.keepalive b/.keepalive new file mode 100644 index 00000000..700630c1 --- /dev/null +++ b/.keepalive @@ -0,0 +1 @@ +1741634331 diff --git a/config b/config index e82a0828..c01c30b6 100644 --- a/config +++ b/config @@ -1,5 +1,5 @@ ngx_addon_name=ngx_http_push_stream_module -CORE_INCS="$CORE_INCS ${ngx_addon_dir}/src ${ngx_addon_dir}/include" +#CORE_INCS="$CORE_INCS ${ngx_addon_dir}/src ${ngx_addon_dir}/include" if test -n "$ngx_module_link"; then ngx_module_type=HTTP diff --git a/include/ngx_http_push_stream_module.h b/include/ngx_http_push_stream_module.h index 3f6303ce..8163d4f2 100644 --- a/include/ngx_http_push_stream_module.h +++ b/include/ngx_http_push_stream_module.h @@ -127,6 +127,12 @@ typedef struct { ngx_str_t padding_by_user_agent; ngx_queue_t *paddings; ngx_http_complex_value_t *allowed_origins; + ngx_http_complex_value_t *channel_created_request_url; + ngx_http_complex_value_t *channel_destroyed_request_url; + ngx_http_complex_value_t *all_worker_clients_unsubscribed_request_url; + ngx_http_complex_value_t *client_subscribed_request_url; + ngx_http_complex_value_t *client_unsubscribed_request_url; + ngx_http_complex_value_t *client_publish_request_url; } ngx_http_push_stream_loc_conf_t; // shared memory segment name diff --git a/include/ngx_http_push_stream_module_ipc.h b/include/ngx_http_push_stream_module_ipc.h index 25a0d1aa..c9a2b5c9 100644 --- a/include/ngx_http_push_stream_module_ipc.h +++ b/include/ngx_http_push_stream_module_ipc.h @@ -34,8 +34,8 @@ #ifndef NGX_HTTP_PUSH_STREAM_MODULE_IPC_H_ #define NGX_HTTP_PUSH_STREAM_MODULE_IPC_H_ -#include -#include +#include "ngx_http_push_stream_module.h" +#include "ngx_http_push_stream_module_subscriber.h" #include diff --git a/include/ngx_http_push_stream_module_publisher.h b/include/ngx_http_push_stream_module_publisher.h index 1268158f..efe9c5d5 100644 --- a/include/ngx_http_push_stream_module_publisher.h +++ b/include/ngx_http_push_stream_module_publisher.h @@ -26,7 +26,7 @@ #ifndef NGX_HTTP_PUSH_STREAM_MODULE_PUBLISHER_H_ #define NGX_HTTP_PUSH_STREAM_MODULE_PUBLISHER_H_ -#include +#include "ngx_http_push_stream_module.h" static ngx_int_t ngx_http_push_stream_channels_statistics_handler(ngx_http_request_t *r); static ngx_int_t ngx_http_push_stream_publisher_handler(ngx_http_request_t *r); diff --git a/include/ngx_http_push_stream_module_setup.h b/include/ngx_http_push_stream_module_setup.h index c6706132..5a1a5aec 100644 --- a/include/ngx_http_push_stream_module_setup.h +++ b/include/ngx_http_push_stream_module_setup.h @@ -26,13 +26,13 @@ #ifndef NGX_HTTP_PUSH_STREAM_MODULE_SETUP_H_ #define NGX_HTTP_PUSH_STREAM_MODULE_SETUP_H_ -#include -#include -#include -#include -#include -#include -#include +#include "ngx_http_push_stream_module.h" +#include "ngx_http_push_stream_rbtree_util.h" +#include "ngx_http_push_stream_module_utils.h" +#include "ngx_http_push_stream_module_ipc.h" +#include "ngx_http_push_stream_module_publisher.h" +#include "ngx_http_push_stream_module_subscriber.h" +#include "ngx_http_push_stream_module_websocket.h" #define NGX_HTTP_PUSH_STREAM_MESSAGE_BUFFER_CLEANUP_INTERVAL 5000 // 5 seconds static time_t NGX_HTTP_PUSH_STREAM_DEFAULT_SHM_MEMORY_CLEANUP_OBJECTS_TTL = 10; // 10 seconds diff --git a/include/ngx_http_push_stream_module_utils.h b/include/ngx_http_push_stream_module_utils.h index 904c6974..7680837b 100644 --- a/include/ngx_http_push_stream_module_utils.h +++ b/include/ngx_http_push_stream_module_utils.h @@ -26,8 +26,8 @@ #ifndef NGX_HTTP_PUSH_STREAM_MODULE_UTILS_H_ #define NGX_HTTP_PUSH_STREAM_MODULE_UTILS_H_ -#include -#include +#include "ngx_http_push_stream_module.h" +#include "ngx_http_push_stream_module_ipc.h" typedef struct { ngx_queue_t queue; @@ -229,6 +229,7 @@ static ngx_str_t NGX_HTTP_PUSH_STREAM_EVENT_TYPE_CHANNEL_CREATED = ngx_string(" static ngx_str_t NGX_HTTP_PUSH_STREAM_EVENT_TYPE_CHANNEL_DESTROYED = ngx_string("channel_destroyed"); static ngx_str_t NGX_HTTP_PUSH_STREAM_EVENT_TYPE_CLIENT_SUBSCRIBED = ngx_string("client_subscribed"); static ngx_str_t NGX_HTTP_PUSH_STREAM_EVENT_TYPE_CLIENT_UNSUBSCRIBED = ngx_string("client_unsubscribed"); +static ngx_str_t NGX_HTTP_PUSH_STREAM_POST = ngx_string("POST"); ngx_event_t ngx_http_push_stream_memory_cleanup_event; @@ -252,6 +253,7 @@ static ngx_int_t ngx_http_push_stream_send_response_content_header(ng static ngx_int_t ngx_http_push_stream_send_response(ngx_http_request_t *r, ngx_str_t *text, const ngx_str_t *content_type, ngx_int_t status_code); static ngx_int_t ngx_http_push_stream_send_response_message(ngx_http_request_t *r, ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_msg_t *msg, ngx_flag_t send_callback, ngx_flag_t send_separator); static ngx_int_t ngx_http_push_stream_send_response_text(ngx_http_request_t *r, const u_char *text, uint len, ngx_flag_t last_buffer); +static ngx_int_t ngx_http_push_stream_output_filter(ngx_http_request_t *r, ngx_chain_t *in); static void ngx_http_push_stream_send_response_finalize(ngx_http_request_t *r); static void ngx_http_push_stream_send_response_finalize_for_longpolling_by_timeout(ngx_http_request_t *r); static ngx_int_t ngx_http_push_stream_send_websocket_close_frame(ngx_http_request_t *r, ngx_uint_t http_status, const ngx_str_t *reason); @@ -263,7 +265,8 @@ static void ngx_http_push_stream_complex_value(ngx_http_request_ ngx_int_t ngx_http_push_stream_add_msg_to_channel(ngx_http_push_stream_main_conf_t *mcf, ngx_log_t *log, ngx_http_push_stream_channel_t *channel, u_char *text, size_t len, ngx_str_t *event_id, ngx_str_t *event_type, ngx_flag_t store_messages, ngx_pool_t *temp_pool); -ngx_int_t ngx_http_push_stream_send_event(ngx_http_push_stream_main_conf_t *mcf, ngx_log_t *log, ngx_http_push_stream_channel_t *channel, ngx_str_t *event_id, ngx_pool_t *temp_pool); +ngx_int_t ngx_http_push_stream_add_msg_to_channel_my(ngx_log_t *log, ngx_str_t *id, ngx_str_t *text, ngx_str_t *event_id, ngx_str_t *event_type, ngx_flag_t store_messages, ngx_pool_t *temp_pool); +ngx_int_t ngx_http_push_stream_send_event(ngx_http_push_stream_main_conf_t *mcf, ngx_log_t *log, ngx_http_push_stream_channel_t *channel, ngx_str_t *event_id, ngx_pool_t *temp_pool, ngx_http_request_t *r, ngx_http_complex_value_t *uri); static void ngx_http_push_stream_ping_timer_wake_handler(ngx_event_t *ev); static void ngx_http_push_stream_disconnect_timer_wake_handler(ngx_event_t *ev); @@ -281,6 +284,7 @@ static ngx_str_t * ngx_http_push_stream_create_str(ngx_pool_t *pool, ui static void ngx_http_push_stream_throw_the_message_away(ngx_http_push_stream_msg_t *msg, ngx_http_push_stream_shm_data_t *data); static ngx_int_t ngx_http_push_stream_delete_channel(ngx_http_push_stream_main_conf_t *mcf, ngx_http_push_stream_channel_t *channel, u_char *text, size_t len, ngx_pool_t *temp_pool); +ngx_int_t ngx_http_push_stream_delete_channel_my(ngx_log_t *log, ngx_str_t *id, u_char *text, size_t len, ngx_pool_t *temp_pool); static void ngx_http_push_stream_collect_expired_messages_data(ngx_http_push_stream_shm_data_t *data, ngx_flag_t force); static void ngx_http_push_stream_collect_expired_messages_and_empty_channels(ngx_flag_t force); static void ngx_http_push_stream_free_message_memory(ngx_slab_pool_t *shpool, ngx_http_push_stream_msg_t *msg); diff --git a/include/ngx_http_push_stream_module_websocket.h b/include/ngx_http_push_stream_module_websocket.h index ab96dfdb..66194c05 100644 --- a/include/ngx_http_push_stream_module_websocket.h +++ b/include/ngx_http_push_stream_module_websocket.h @@ -30,8 +30,8 @@ #include #endif -#include -#include +#include "ngx_http_push_stream_module_utils.h" +#include "ngx_http_push_stream_module_subscriber.h" static ngx_int_t ngx_http_push_stream_websocket_handler(ngx_http_request_t *r); diff --git a/include/ngx_http_push_stream_rbtree_util.h b/include/ngx_http_push_stream_rbtree_util.h index 661cbde1..2a941dee 100644 --- a/include/ngx_http_push_stream_rbtree_util.h +++ b/include/ngx_http_push_stream_rbtree_util.h @@ -34,7 +34,7 @@ #ifndef NGX_HTTP_PUSH_STREAM_RBTREE_UTIL_H_ #define NGX_HTTP_PUSH_STREAM_RBTREE_UTIL_H_ -static ngx_http_push_stream_channel_t * ngx_http_push_stream_get_channel(ngx_str_t *id, ngx_log_t *log, ngx_http_push_stream_main_conf_t *mcf); +static ngx_http_push_stream_channel_t * ngx_http_push_stream_get_channel(ngx_str_t *id, ngx_log_t *log, ngx_http_push_stream_main_conf_t *mcf, ngx_http_request_t *r); static ngx_http_push_stream_channel_t * ngx_http_push_stream_find_channel(ngx_str_t *id, ngx_log_t *log, ngx_http_push_stream_main_conf_t *mcf); static void ngx_rbtree_generic_insert(ngx_rbtree_node_t *temp, ngx_rbtree_node_t *node, ngx_rbtree_node_t *sentinel, int (*compare) (const ngx_rbtree_node_t *left, const ngx_rbtree_node_t *right)); diff --git a/misc/tools/publisher.c b/misc/tools/publisher.c index 4541bfce..67a269c6 100644 --- a/misc/tools/publisher.c +++ b/misc/tools/publisher.c @@ -156,7 +156,7 @@ void write_message(Connection *connection, Statistics *stats) { char buffer[BUFFER_SIZE]; - int len = 0, bytes_written = 0; + int len = 0; if ((connection->channel_id <= connection->channel_start) || (connection->channel_id > connection->channel_end)) { connection->channel_id = connection->channel_start; diff --git a/misc/tools/subscriber.c b/misc/tools/subscriber.c index fb595c38..fc71fc0d 100644 --- a/misc/tools/subscriber.c +++ b/misc/tools/subscriber.c @@ -14,7 +14,7 @@ int main_program(int num_channels, int num_connections, const char *server_hostname, int server_port, int timeout) { struct sockaddr_in server_address; - int main_sd = -1, num_events = 0, i, j, event_mask, channels_per_connection, num, start_time = 0, iters_to_next_summary = 0; + int main_sd = -1, num_events = 0, i, j, event_mask, num, start_time = 0, iters_to_next_summary = 0; Connection *connections = NULL, *connection; Statistics stats = {0,0,0,0,0}; int exitcode = EXIT_SUCCESS; @@ -138,7 +138,7 @@ void subscribe_channels(Connection *connection, Statistics *stats) { char buffer[BUFFER_SIZE]; - int len = 0, bytes_written = 0; + int len = 0; long i = 0; len = sprintf(buffer, "GET /sub"); diff --git a/src/ngx_http_push_stream_module.c b/src/ngx_http_push_stream_module.c index ce74dcb9..fda7bfc1 100644 --- a/src/ngx_http_push_stream_module.c +++ b/src/ngx_http_push_stream_module.c @@ -23,14 +23,14 @@ * Authors: Wandenberg Peixoto , Rogério Carvalho Schneider */ -#include -#include -#include -#include -#include -#include -#include -#include +#include "ngx_http_push_stream_module.h" +#include "ngx_http_push_stream_module_setup.c" +#include "ngx_http_push_stream_rbtree_util.c" +#include "ngx_http_push_stream_module_utils.c" +#include "ngx_http_push_stream_module_ipc.c" +#include "ngx_http_push_stream_module_publisher.c" +#include "ngx_http_push_stream_module_subscriber.c" +#include "ngx_http_push_stream_module_websocket.c" static ngx_str_t * ngx_http_push_stream_channel_info_formatted(ngx_pool_t *pool, const ngx_str_t *format, ngx_str_t *id, ngx_uint_t published_messages, ngx_uint_t stored_messages, ngx_uint_t subscribers) diff --git a/src/ngx_http_push_stream_module.h b/src/ngx_http_push_stream_module.h new file mode 120000 index 00000000..c312d244 --- /dev/null +++ b/src/ngx_http_push_stream_module.h @@ -0,0 +1 @@ +../include/ngx_http_push_stream_module.h \ No newline at end of file diff --git a/src/ngx_http_push_stream_module_ipc.c b/src/ngx_http_push_stream_module_ipc.c index caa8a2fb..5db219a4 100644 --- a/src/ngx_http_push_stream_module_ipc.c +++ b/src/ngx_http_push_stream_module_ipc.c @@ -31,7 +31,7 @@ * Modifications by: Wandenberg Peixoto , Rogério Carvalho Schneider */ -#include +#include "ngx_http_push_stream_module_ipc.h" void ngx_http_push_stream_ipc_init_worker_data(ngx_http_push_stream_shm_data_t *data); static ngx_inline void ngx_http_push_stream_census_worker_subscribers_data(ngx_http_push_stream_shm_data_t *data); diff --git a/src/ngx_http_push_stream_module_ipc.h b/src/ngx_http_push_stream_module_ipc.h new file mode 120000 index 00000000..27d92384 --- /dev/null +++ b/src/ngx_http_push_stream_module_ipc.h @@ -0,0 +1 @@ +../include/ngx_http_push_stream_module_ipc.h \ No newline at end of file diff --git a/src/ngx_http_push_stream_module_publisher.c b/src/ngx_http_push_stream_module_publisher.c index d0788592..ff57b94a 100644 --- a/src/ngx_http_push_stream_module_publisher.c +++ b/src/ngx_http_push_stream_module_publisher.c @@ -23,8 +23,8 @@ * Authors: Wandenberg Peixoto , Rogério Carvalho Schneider */ -#include -#include +#include "ngx_http_push_stream_module_publisher.h" +#include "ngx_http_push_stream_module_version.h" static ngx_int_t ngx_http_push_stream_publisher_handle_after_read_body(ngx_http_request_t *r, ngx_http_client_body_handler_pt post_handler); @@ -97,7 +97,7 @@ ngx_http_push_stream_publisher_handler(ngx_http_request_t *r) if (r->method & (NGX_HTTP_POST|NGX_HTTP_PUT)) { // create the channel if doesn't exist - requested_channel->channel = ngx_http_push_stream_get_channel(requested_channel->id, r->connection->log, mcf); + requested_channel->channel = ngx_http_push_stream_get_channel(requested_channel->id, r->connection->log, mcf, r); if (requested_channel->channel == NULL) { return ngx_http_push_stream_send_only_header_response(r, NGX_HTTP_INTERNAL_SERVER_ERROR, NULL); } diff --git a/src/ngx_http_push_stream_module_publisher.h b/src/ngx_http_push_stream_module_publisher.h new file mode 120000 index 00000000..46f7f3c5 --- /dev/null +++ b/src/ngx_http_push_stream_module_publisher.h @@ -0,0 +1 @@ +../include/ngx_http_push_stream_module_publisher.h \ No newline at end of file diff --git a/src/ngx_http_push_stream_module_setup.c b/src/ngx_http_push_stream_module_setup.c index c9ed8f18..9e9ce534 100644 --- a/src/ngx_http_push_stream_module_setup.c +++ b/src/ngx_http_push_stream_module_setup.c @@ -23,7 +23,7 @@ * Authors: Wandenberg Peixoto , Rogério Carvalho Schneider */ -#include +#include "ngx_http_push_stream_module_setup.h" ngx_uint_t ngx_http_push_stream_padding_max_len = 0; ngx_flag_t ngx_http_push_stream_enabled = 0; @@ -249,6 +249,42 @@ static ngx_command_t ngx_http_push_stream_commands[] = { NGX_HTTP_LOC_CONF_OFFSET, offsetof(ngx_http_push_stream_loc_conf_t, allow_connections_to_events_channel), NULL }, + { ngx_string("push_stream_channel_created_request"), + NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_HTTP_LIF_CONF|NGX_CONF_TAKE1, + ngx_http_set_complex_value_slot, + NGX_HTTP_LOC_CONF_OFFSET, + offsetof(ngx_http_push_stream_loc_conf_t, channel_created_request_url), + NULL } , + { ngx_string("push_stream_channel_destroyed_request"), + NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_HTTP_LIF_CONF|NGX_CONF_TAKE1, + ngx_http_set_complex_value_slot, + NGX_HTTP_LOC_CONF_OFFSET, + offsetof(ngx_http_push_stream_loc_conf_t, channel_destroyed_request_url), + NULL } , + { ngx_string("push_stream_all_worker_clients_unsubscribed_request"), + NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_HTTP_LIF_CONF|NGX_CONF_TAKE1, + ngx_http_set_complex_value_slot, + NGX_HTTP_LOC_CONF_OFFSET, + offsetof(ngx_http_push_stream_loc_conf_t, all_worker_clients_unsubscribed_request_url), + NULL } , + { ngx_string("push_stream_client_subscribed_request"), + NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_HTTP_LIF_CONF|NGX_CONF_TAKE1, + ngx_http_set_complex_value_slot, + NGX_HTTP_LOC_CONF_OFFSET, + offsetof(ngx_http_push_stream_loc_conf_t, client_subscribed_request_url), + NULL } , + { ngx_string("push_stream_client_unsubscribed_request"), + NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_HTTP_LIF_CONF|NGX_CONF_TAKE1, + ngx_http_set_complex_value_slot, + NGX_HTTP_LOC_CONF_OFFSET, + offsetof(ngx_http_push_stream_loc_conf_t, client_unsubscribed_request_url), + NULL } , + { ngx_string("push_stream_client_publish_request"), + NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_HTTP_LIF_CONF|NGX_CONF_TAKE1, + ngx_http_set_complex_value_slot, + NGX_HTTP_LOC_CONF_OFFSET, + offsetof(ngx_http_push_stream_loc_conf_t, client_publish_request_url), + NULL } , ngx_null_command }; @@ -588,6 +624,12 @@ ngx_http_push_stream_create_loc_conf(ngx_conf_t *cf) ngx_str_null(&lcf->padding_by_user_agent); lcf->paddings = NULL; lcf->allowed_origins = NULL; + lcf->channel_created_request_url = NULL; + lcf->channel_destroyed_request_url = NULL; + lcf->all_worker_clients_unsubscribed_request_url = NULL; + lcf->client_subscribed_request_url = NULL; + lcf->client_unsubscribed_request_url = NULL; + lcf->client_publish_request_url = NULL; return lcf; } @@ -638,6 +680,30 @@ ngx_http_push_stream_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child) conf->allowed_origins = prev->allowed_origins ; } + if (conf->channel_created_request_url == NULL) { + conf->channel_created_request_url = prev->channel_created_request_url ; + } + + if (conf->channel_destroyed_request_url == NULL) { + conf->channel_destroyed_request_url = prev->channel_destroyed_request_url ; + } + + if (conf->all_worker_clients_unsubscribed_request_url == NULL) { + conf->all_worker_clients_unsubscribed_request_url = prev->all_worker_clients_unsubscribed_request_url ; + } + + if (conf->client_subscribed_request_url == NULL) { + conf->client_subscribed_request_url = prev->client_subscribed_request_url ; + } + + if (conf->client_unsubscribed_request_url == NULL) { + conf->client_unsubscribed_request_url = prev->client_unsubscribed_request_url ; + } + + if (conf->client_publish_request_url == NULL) { + conf->client_publish_request_url = prev->client_publish_request_url ; + } + if (conf->location_type == NGX_CONF_UNSET_UINT) { return NGX_CONF_OK; } @@ -872,14 +938,14 @@ ngx_http_push_stream_subscriber(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) } if (*field == NGX_HTTP_PUSH_STREAM_SUBSCRIBER_MODE_WEBSOCKET) { - char *rc = ngx_http_push_stream_setup_handler(cf, conf, &ngx_http_push_stream_websocket_handler); #if (NGX_HAVE_SHA1) + char *rc = ngx_http_push_stream_setup_handler(cf, conf, &ngx_http_push_stream_websocket_handler); if (rc == NGX_CONF_OK) { ngx_http_push_stream_loc_conf_t *pslcf = conf; pslcf->location_type = NGX_HTTP_PUSH_STREAM_SUBSCRIBER_MODE_WEBSOCKET; } #else - rc = NGX_CONF_ERROR; + char *rc = NGX_CONF_ERROR; ngx_conf_log_error(NGX_LOG_ERR, cf, 0, "push stream module: push stream module: sha1 support is needed to use WebSocket"); #endif return rc; @@ -1131,7 +1197,7 @@ ngx_http_push_stream_init_shm_zone(ngx_shm_zone_t *shm_zone, void *data) d->mutex_round_robin = 0; if (mcf->events_channel_id.len > 0) { - if ((d->events_channel = ngx_http_push_stream_get_channel(&mcf->events_channel_id, ngx_cycle->log, mcf)) == NULL) { + if ((d->events_channel = ngx_http_push_stream_get_channel(&mcf->events_channel_id, ngx_cycle->log, mcf, NULL)) == NULL) { ngx_log_error(NGX_LOG_ERR, ngx_cycle->log, 0, "push stream module: unable to create events channel"); return NGX_ERROR; } diff --git a/src/ngx_http_push_stream_module_setup.h b/src/ngx_http_push_stream_module_setup.h new file mode 120000 index 00000000..bb5c2648 --- /dev/null +++ b/src/ngx_http_push_stream_module_setup.h @@ -0,0 +1 @@ +../include/ngx_http_push_stream_module_setup.h \ No newline at end of file diff --git a/src/ngx_http_push_stream_module_subscriber.c b/src/ngx_http_push_stream_module_subscriber.c index d683360e..f8cf48b6 100644 --- a/src/ngx_http_push_stream_module_subscriber.c +++ b/src/ngx_http_push_stream_module_subscriber.c @@ -23,7 +23,7 @@ * Authors: Wandenberg Peixoto , Rogério Carvalho Schneider */ -#include +#include "ngx_http_push_stream_module_subscriber.h" static ngx_int_t ngx_http_push_stream_subscriber_assign_channel(ngx_http_push_stream_main_conf_t *mcf, ngx_http_push_stream_loc_conf_t *cf, ngx_http_request_t *r, ngx_http_push_stream_requested_channel_t *requested_channel, time_t if_modified_since, ngx_int_t tag, ngx_str_t *last_event_id, ngx_http_push_stream_subscriber_t *subscriber, ngx_pool_t *temp_pool); static ngx_http_push_stream_subscriber_t *ngx_http_push_stream_subscriber_prepare_request_to_keep_connected(ngx_http_request_t *r); @@ -355,7 +355,7 @@ ngx_http_push_stream_validate_channels(ngx_http_request_t *r, ngx_http_push_stre continue; } - requested_channel->channel = ngx_http_push_stream_get_channel(requested_channel->id, r->connection->log, mcf); + requested_channel->channel = ngx_http_push_stream_get_channel(requested_channel->id, r->connection->log, mcf, r); if (requested_channel->channel == NULL) { ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate memory for new channel"); *status_code = NGX_HTTP_INTERNAL_SERVER_ERROR; @@ -621,6 +621,7 @@ static ngx_int_t ngx_http_push_stream_assing_subscription_to_channel(ngx_slab_pool_t *shpool, ngx_http_push_stream_channel_t *channel, ngx_http_push_stream_subscription_t *subscription, ngx_queue_t *subscriptions, ngx_log_t *log) { ngx_http_push_stream_main_conf_t *mcf = ngx_http_get_module_main_conf(subscription->subscriber->request, ngx_http_push_stream_module); + ngx_http_push_stream_loc_conf_t *cf = ngx_http_get_module_loc_conf(subscription->subscriber->request, ngx_http_push_stream_module); ngx_http_push_stream_pid_queue_t *worker_subscribers_sentinel; ngx_shmtx_lock(channel->mutex); @@ -637,7 +638,7 @@ ngx_http_push_stream_assing_subscription_to_channel(ngx_slab_pool_t *shpool, ngx subscription->channel_worker_sentinel = worker_subscribers_sentinel; ngx_shmtx_unlock(channel->mutex); - ngx_http_push_stream_send_event(mcf, log, channel, &NGX_HTTP_PUSH_STREAM_EVENT_TYPE_CLIENT_SUBSCRIBED, NULL); + ngx_http_push_stream_send_event(mcf, log, channel, &NGX_HTTP_PUSH_STREAM_EVENT_TYPE_CLIENT_SUBSCRIBED, subscription->subscriber->request->pool, subscription->subscriber->request, cf->client_subscribed_request_url); return NGX_OK; } diff --git a/src/ngx_http_push_stream_module_subscriber.h b/src/ngx_http_push_stream_module_subscriber.h new file mode 120000 index 00000000..391abcd6 --- /dev/null +++ b/src/ngx_http_push_stream_module_subscriber.h @@ -0,0 +1 @@ +../include/ngx_http_push_stream_module_subscriber.h \ No newline at end of file diff --git a/src/ngx_http_push_stream_module_utils.c b/src/ngx_http_push_stream_module_utils.c index fdc29618..59926539 100644 --- a/src/ngx_http_push_stream_module_utils.c +++ b/src/ngx_http_push_stream_module_utils.c @@ -23,7 +23,7 @@ * Authors: Wandenberg Peixoto , Rogério Carvalho Schneider */ -#include +#include "ngx_http_push_stream_module_utils.h" static void nxg_http_push_stream_free_channel_memory(ngx_slab_pool_t *shpool, ngx_http_push_stream_channel_t *channel); static void ngx_http_push_stream_run_cleanup_pool_handler(ngx_pool_t *p, ngx_pool_cleanup_pt handler); @@ -123,7 +123,10 @@ ngx_http_push_stream_delete_channels_data(ngx_http_push_stream_shm_data_t *data) ngx_queue_remove(&subscription->channel_worker_queue); ngx_shmtx_unlock(channel->mutex); - ngx_http_push_stream_send_event(mcf, ngx_cycle->log, subscription->channel, &NGX_HTTP_PUSH_STREAM_EVENT_TYPE_CLIENT_UNSUBSCRIBED, subscriber->request->pool); + ngx_http_push_stream_loc_conf_t *cf = ngx_http_get_module_loc_conf(subscriber->request, ngx_http_push_stream_module); + ngx_http_push_stream_send_event(mcf, ngx_cycle->log, subscription->channel, &NGX_HTTP_PUSH_STREAM_EVENT_TYPE_CLIENT_UNSUBSCRIBED, subscriber->request->pool, subscriber->request, cf->client_unsubscribed_request_url); + if (!channel->subscribers) ngx_http_push_stream_send_event(mcf, ngx_cycle->log, subscription->channel, NULL, subscriber->request->pool, subscriber->request, cf->channel_destroyed_request_url); + if (!worker->subscribers) ngx_http_push_stream_send_event(mcf, ngx_cycle->log, subscription->channel, NULL, subscriber->request->pool, subscriber->request, cf->all_worker_clients_unsubscribed_request_url); if (subscriber->longpolling) { ngx_http_push_stream_add_polling_headers(subscriber->request, ngx_time(), 0, subscriber->request->pool); @@ -145,7 +148,7 @@ ngx_http_push_stream_delete_channels_data(ngx_http_push_stream_shm_data_t *data) ngx_shmtx_unlock(&data->channels_to_delete_mutex); } -void +static void ngx_http_push_stream_collect_deleted_channels_data(ngx_http_push_stream_shm_data_t *data) { ngx_http_push_stream_main_conf_t *mcf = data->mcf; @@ -183,7 +186,7 @@ ngx_http_push_stream_collect_deleted_channels_data(ngx_http_push_stream_shm_data data->channels_in_trash++; ngx_shmtx_unlock(&data->channels_trash_mutex); - ngx_http_push_stream_send_event(mcf, ngx_cycle->log, channel, &NGX_HTTP_PUSH_STREAM_EVENT_TYPE_CHANNEL_DESTROYED, temp_pool); + ngx_http_push_stream_send_event(mcf, ngx_cycle->log, channel, &NGX_HTTP_PUSH_STREAM_EVENT_TYPE_CHANNEL_DESTROYED, temp_pool, NULL, NULL); } } ngx_shmtx_unlock(&data->channels_to_delete_mutex); @@ -242,7 +245,7 @@ ngx_http_push_stream_cleanup_shutting_down_worker_data(ngx_http_push_stream_shm_ ngx_http_push_stream_clean_worker_data(data); } -ngx_uint_t +static ngx_uint_t ngx_http_push_stream_apply_text_template(ngx_str_t **dst_value, ngx_str_t **dst_message, ngx_str_t *text, const ngx_str_t *template, const ngx_str_t *token, ngx_slab_pool_t *shpool, ngx_pool_t *temp_pool) { if (text != NULL) { @@ -377,6 +380,21 @@ ngx_http_push_stream_convert_char_to_msg_on_shared(ngx_http_push_stream_main_con return msg; } +ngx_int_t ngx_http_push_stream_add_msg_to_channel_my(ngx_log_t *log, ngx_str_t *id, ngx_str_t *text, ngx_str_t *event_id, ngx_str_t *event_type, ngx_flag_t store_messages, ngx_pool_t *temp_pool) { + ngx_http_push_stream_global_shm_data_t *global_data = (ngx_http_push_stream_global_shm_data_t *) ngx_http_push_stream_global_shm_zone->data; + for (ngx_queue_t *q = ngx_queue_head(&global_data->shm_datas_queue); q != ngx_queue_sentinel(&global_data->shm_datas_queue); q = ngx_queue_next(q)) { + ngx_http_push_stream_shm_data_t *data = ngx_queue_data(q, ngx_http_push_stream_shm_data_t, shm_data_queue); + ngx_http_push_stream_main_conf_t *mcf = data->mcf; + ngx_http_push_stream_channel_t *channel = ngx_http_push_stream_find_channel(id, log, mcf); + if (!channel) continue; + if (store_messages) for (ngx_queue_t *qq = ngx_queue_head(&channel->message_queue); qq != ngx_queue_sentinel(&channel->message_queue); qq = ngx_queue_next(qq)) { + ngx_http_push_stream_msg_t *message = ngx_queue_data(qq, ngx_http_push_stream_msg_t, queue); + if (message->raw.len == text->len && !ngx_strncmp(message->raw.data, text->data, text->len)) return NGX_DONE; + } + return ngx_http_push_stream_add_msg_to_channel(mcf, log, channel, text->data, text->len, event_id, event_type, store_messages, temp_pool); + } + return NGX_DECLINED; +} ngx_int_t ngx_http_push_stream_add_msg_to_channel(ngx_http_push_stream_main_conf_t *mcf, ngx_log_t *log, ngx_http_push_stream_channel_t *channel, u_char *text, size_t len, ngx_str_t *event_id, ngx_str_t *event_type, ngx_flag_t store_messages, ngx_pool_t *temp_pool) @@ -451,12 +469,12 @@ ngx_http_push_stream_add_msg_to_channel(ngx_http_push_stream_main_conf_t *mcf, n ngx_int_t -ngx_http_push_stream_send_event(ngx_http_push_stream_main_conf_t *mcf, ngx_log_t *log, ngx_http_push_stream_channel_t *channel, ngx_str_t *event_type, ngx_pool_t *received_temp_pool) +ngx_http_push_stream_send_event(ngx_http_push_stream_main_conf_t *mcf, ngx_log_t *log, ngx_http_push_stream_channel_t *channel, ngx_str_t *event_type, ngx_pool_t *received_temp_pool, ngx_http_request_t *r, ngx_http_complex_value_t *uri) { ngx_http_push_stream_shm_data_t *data = mcf->shm_data; ngx_pool_t *temp_pool = received_temp_pool; - if ((mcf->events_channel_id.len > 0) && !channel->for_events) { + if ((mcf->events_channel_id.len > 0) && !channel->for_events && event_type) { if ((temp_pool == NULL) && ((temp_pool = ngx_create_pool(4096, log)) == NULL)) { return NGX_ERROR; } @@ -473,6 +491,16 @@ ngx_http_push_stream_send_event(ngx_http_push_stream_main_conf_t *mcf, ngx_log_t } } + if (r && uri) { + ngx_str_t vv_uri; + ngx_http_push_stream_complex_value(r, uri, &vv_uri); + ngx_str_t args = r->args; + ngx_uint_t flags = NGX_HTTP_SUBREQUEST_BACKGROUND; + if (ngx_http_parse_unsafe_uri(r, &vv_uri, &args, &flags) != NGX_OK) { ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "ngx_http_complex_value != NGX_OK"); return NGX_ERROR; } + ngx_http_request_t *sr; + if (ngx_http_subrequest(r, &vv_uri, &args, &sr, NULL, flags) == NGX_ERROR) { ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "ngx_http_subrequest == NGX_ERROR"); return NGX_ERROR; } + } + return NGX_OK; } @@ -656,7 +684,7 @@ ngx_http_push_stream_get_buf(ngx_http_request_t *r) } -ngx_int_t +static ngx_int_t ngx_http_push_stream_output_filter(ngx_http_request_t *r, ngx_chain_t *in) { ngx_http_core_loc_conf_t *clcf; @@ -909,7 +937,7 @@ ngx_http_push_stream_send_response_finalize_for_longpolling_by_timeout(ngx_http_ if (mcf->timeout_with_body && (mcf->longpooling_timeout_msg == NULL)) { // create longpooling timeout message - if ((mcf->longpooling_timeout_msg == NULL) && (mcf->longpooling_timeout_msg = ngx_http_push_stream_convert_char_to_msg_on_shared(mcf, (u_char *) NGX_HTTP_PUSH_STREAM_LONGPOOLING_TIMEOUT_MESSAGE_TEXT, ngx_strlen(NGX_HTTP_PUSH_STREAM_LONGPOOLING_TIMEOUT_MESSAGE_TEXT), NULL, NGX_HTTP_PUSH_STREAM_LONGPOOLING_TIMEOUT_MESSAGE_ID, NULL, NULL, 0, 0, r->pool)) == NULL) { + if ((mcf->longpooling_timeout_msg = ngx_http_push_stream_convert_char_to_msg_on_shared(mcf, (u_char *) NGX_HTTP_PUSH_STREAM_LONGPOOLING_TIMEOUT_MESSAGE_TEXT, ngx_strlen(NGX_HTTP_PUSH_STREAM_LONGPOOLING_TIMEOUT_MESSAGE_TEXT), NULL, NGX_HTTP_PUSH_STREAM_LONGPOOLING_TIMEOUT_MESSAGE_ID, NULL, NULL, 0, 0, r->pool)) == NULL) { ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate long pooling timeout message in shared memory"); } } @@ -941,6 +969,20 @@ ngx_http_push_stream_send_websocket_close_frame(ngx_http_request_t *r, ngx_uint_ return (rc == NGX_ERROR) ? NGX_DONE : NGX_OK; } +ngx_int_t ngx_http_push_stream_delete_channel_my(ngx_log_t *log, ngx_str_t *id, u_char *text, size_t len, ngx_pool_t *temp_pool) { + ngx_http_push_stream_global_shm_data_t *global_data = (ngx_http_push_stream_global_shm_data_t *) ngx_http_push_stream_global_shm_zone->data; + for (ngx_queue_t *q = ngx_queue_head(&global_data->shm_datas_queue); q != ngx_queue_sentinel(&global_data->shm_datas_queue); q = ngx_queue_next(q)) { + ngx_http_push_stream_shm_data_t *data = ngx_queue_data(q, ngx_http_push_stream_shm_data_t, shm_data_queue); + ngx_http_push_stream_main_conf_t *mcf = data->mcf; + ngx_http_push_stream_channel_t *channel = ngx_http_push_stream_find_channel(id, log, mcf); + if (!channel) continue; + if (!text) text = mcf->channel_deleted_message_text.data; + if (!len) len = mcf->channel_deleted_message_text.len; + return ngx_http_push_stream_delete_channel(mcf, channel, text, len, temp_pool); + } + return NGX_DECLINED; +} + static ngx_int_t ngx_http_push_stream_delete_channel(ngx_http_push_stream_main_conf_t *mcf, ngx_http_push_stream_channel_t *channel, u_char *text, size_t len, ngx_pool_t *temp_pool) { @@ -1039,7 +1081,7 @@ ngx_http_push_stream_collect_expired_messages_and_empty_channels_data(ngx_http_p data->channels_in_trash++; ngx_shmtx_unlock(&data->channels_trash_mutex); - ngx_http_push_stream_send_event(mcf, ngx_cycle->log, channel, &NGX_HTTP_PUSH_STREAM_EVENT_TYPE_CHANNEL_DESTROYED, temp_pool); + ngx_http_push_stream_send_event(mcf, ngx_cycle->log, channel, &NGX_HTTP_PUSH_STREAM_EVENT_TYPE_CHANNEL_DESTROYED, temp_pool, NULL, NULL); } } ngx_shmtx_unlock(&data->channels_queue_mutex); @@ -1299,7 +1341,7 @@ ngx_http_push_stream_ping_timer_wake_handler(ngx_event_t *ev) } else { if (mcf->ping_msg == NULL) { // create ping message - if ((mcf->ping_msg == NULL) && (mcf->ping_msg = ngx_http_push_stream_convert_char_to_msg_on_shared(mcf, mcf->ping_message_text.data, mcf->ping_message_text.len, NULL, NGX_HTTP_PUSH_STREAM_PING_MESSAGE_ID, NULL, NULL, 0, 0, r->pool)) == NULL) { + if ((mcf->ping_msg = ngx_http_push_stream_convert_char_to_msg_on_shared(mcf, mcf->ping_message_text.data, mcf->ping_message_text.len, NULL, NGX_HTTP_PUSH_STREAM_PING_MESSAGE_ID, NULL, NULL, 0, 0, r->pool)) == NULL) { ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "push stream module: unable to allocate ping message in shared memory"); } } @@ -1550,6 +1592,7 @@ static void ngx_http_push_stream_worker_subscriber_cleanup(ngx_http_push_stream_subscriber_t *worker_subscriber) { ngx_http_push_stream_main_conf_t *mcf = ngx_http_get_module_main_conf(worker_subscriber->request, ngx_http_push_stream_module); + ngx_http_push_stream_loc_conf_t *cf = ngx_http_get_module_loc_conf(worker_subscriber->request, ngx_http_push_stream_module); ngx_http_push_stream_shm_data_t *data = mcf->shm_data; ngx_slab_pool_t *shpool = mcf->shpool; ngx_queue_t *cur; @@ -1564,7 +1607,9 @@ ngx_http_push_stream_worker_subscriber_cleanup(ngx_http_push_stream_subscriber_t ngx_queue_remove(&subscription->queue); ngx_shmtx_unlock(subscription->channel->mutex); - ngx_http_push_stream_send_event(mcf, ngx_cycle->log, subscription->channel, &NGX_HTTP_PUSH_STREAM_EVENT_TYPE_CLIENT_UNSUBSCRIBED, worker_subscriber->request->pool); + ngx_http_push_stream_send_event(mcf, ngx_cycle->log, subscription->channel, &NGX_HTTP_PUSH_STREAM_EVENT_TYPE_CLIENT_UNSUBSCRIBED, worker_subscriber->request->pool, worker_subscriber->request, cf->client_unsubscribed_request_url); + if (!subscription->channel->subscribers) ngx_http_push_stream_send_event(mcf, ngx_cycle->log, subscription->channel, NULL, worker_subscriber->request->pool, worker_subscriber->request, cf->channel_destroyed_request_url); + if (!subscription->channel_worker_sentinel->subscribers) ngx_http_push_stream_send_event(mcf, ngx_cycle->log, subscription->channel, NULL, worker_subscriber->request->pool, worker_subscriber->request, cf->all_worker_clients_unsubscribed_request_url); } ngx_shmtx_lock(&shpool->mutex); @@ -1583,7 +1628,7 @@ ngx_http_push_stream_match_channel_info_format_and_content_type(ngx_http_request if (r->headers_in.accept) { u_char *cur = r->headers_in.accept->value.data; - size_t rem = 0; + size_t rem; while ((cur != NULL) && (cur = ngx_strnstr(cur, "/", r->headers_in.accept->value.len)) != NULL) { cur = cur + 1; @@ -1726,7 +1771,7 @@ ngx_http_push_stream_split_by_crlf(ngx_str_t *msg, ngx_pool_t *temp_pool) { ngx_queue_t *lines = NULL; u_char *pos = NULL, *start = NULL, *crlf_pos, *cr_pos, *lf_pos; - u_int step = 0, len = 0; + u_int step, len = 0; if ((lines = ngx_pcalloc(temp_pool, sizeof(ngx_queue_t))) == NULL) { return NULL; diff --git a/src/ngx_http_push_stream_module_utils.h b/src/ngx_http_push_stream_module_utils.h new file mode 120000 index 00000000..e880cd05 --- /dev/null +++ b/src/ngx_http_push_stream_module_utils.h @@ -0,0 +1 @@ +../include/ngx_http_push_stream_module_utils.h \ No newline at end of file diff --git a/src/ngx_http_push_stream_module_version.h b/src/ngx_http_push_stream_module_version.h new file mode 120000 index 00000000..575ea7dd --- /dev/null +++ b/src/ngx_http_push_stream_module_version.h @@ -0,0 +1 @@ +../include/ngx_http_push_stream_module_version.h \ No newline at end of file diff --git a/src/ngx_http_push_stream_module_websocket.c b/src/ngx_http_push_stream_module_websocket.c index 10efbfd5..eccd8089 100644 --- a/src/ngx_http_push_stream_module_websocket.c +++ b/src/ngx_http_push_stream_module_websocket.c @@ -23,8 +23,9 @@ * Authors: Wandenberg Peixoto , Rogério Carvalho Schneider */ -#include +#include "ngx_http_push_stream_module_websocket.h" +static ngx_int_t ngx_http_push_stream_post_subrequest_handler(ngx_http_request_t *r, void *data, ngx_int_t rc); ngx_str_t *ngx_http_push_stream_generate_websocket_accept_value(ngx_http_request_t *r, ngx_str_t *sec_key, ngx_pool_t *temp_pool); ngx_int_t ngx_http_push_stream_recv(ngx_connection_t *c, ngx_event_t *rev, ngx_buf_t *buf, ssize_t len); void ngx_http_push_stream_set_buffer(ngx_buf_t *buf, u_char *start, u_char *last, ssize_t len); @@ -194,6 +195,8 @@ ngx_http_push_stream_websocket_reading(ngx_http_request_t *r) u_char *aux, *last; unsigned char opcode; + ngx_http_push_stream_content_subtype_t *subtype = ngx_http_push_stream_match_channel_info_format_and_content_type(r, 1); + ngx_http_push_stream_set_buffer(&ctx->frame->buf, ctx->frame->buf.start, ctx->frame->buf.last, 0); c = r->connection; @@ -371,9 +374,55 @@ ngx_http_push_stream_websocket_reading(ngx_http_request_t *r) // skip events channel on publish by websocket connections continue; } - - if (ngx_http_push_stream_add_msg_to_channel(mcf, r->connection->log, subscription->channel, ctx->frame->payload, ctx->frame->payload_len, NULL, NULL, cf->store_messages, ctx->temp_pool) != NGX_OK) { - goto finalize; + if (cf->client_publish_request_url != NULL) { + ngx_http_request_t *sr; + ngx_http_post_subrequest_t *psr = ngx_palloc(r->pool, sizeof(ngx_http_post_subrequest_t)); + if (psr == NULL) { + ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "psr == NULL"); + } else { + psr->handler = ngx_http_push_stream_post_subrequest_handler; + psr->data = subscription; + ngx_str_t args = r->args; + ngx_uint_t flags = NGX_HTTP_SUBREQUEST_WAITED|NGX_HTTP_SUBREQUEST_BACKGROUND; + if (ngx_http_parse_unsafe_uri(r, &cf->client_publish_request_url->value, &args, &flags) != NGX_OK) { + ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "ngx_http_complex_value != NGX_OK"); + } else if (ngx_http_subrequest(r, &cf->client_publish_request_url->value, &args, &sr, psr, flags) == NGX_ERROR) { + ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "ngx_http_subrequest == NGX_ERROR"); + } else { + sr->method = NGX_HTTP_POST; + sr->method_name = NGX_HTTP_PUSH_STREAM_POST; + sr->request_body = ngx_pcalloc(sr->pool, sizeof(ngx_http_request_body_t)); + if (sr->request_body == NULL) { + ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "sr->request_body == NULL"); + } else { + sr->request_body->buf = ngx_calloc_buf(sr->pool); + if (sr->request_body->buf == NULL) { + ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "sr->request_body->buf == NULL"); + } else { + sr->request_body->bufs = ngx_alloc_chain_link(sr->pool); + if (sr->request_body->bufs == NULL) { + ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "sr->request_body->bufs == NULL"); + } else { + sr->headers_in.content_type = ngx_palloc(sr->pool, sizeof(ngx_table_elt_t)); + sr->headers_in.content_type->value = *subtype->content_type; + sr->headers_in.content_length_n = ctx->frame->payload_len; + sr->request_body->buf->pos = ctx->frame->payload; + sr->request_body->buf->last = ctx->frame->payload + ctx->frame->payload_len; + sr->request_body->buf->memory = 1; + sr->request_body->bufs->buf = sr->request_body->buf; + sr->request_body->bufs->next = NULL; + if (r->headers_in.headers.last == &r->headers_in.headers.part) { + sr->headers_in.headers.last = &sr->headers_in.headers.part; + } + } + } + } + } + } + } else { + if (ngx_http_push_stream_add_msg_to_channel(mcf, r->connection->log, subscription->channel, ctx->frame->payload, ctx->frame->payload_len, NULL, NULL, cf->store_messages, ctx->temp_pool) != NGX_OK) { + goto finalize; + } } } } @@ -477,3 +526,17 @@ ngx_http_push_stream_set_buffer(ngx_buf_t *buf, u_char *start, u_char *last, ssi buf->temporary = 0; buf->memory = 1; } + +static ngx_int_t ngx_http_push_stream_post_subrequest_handler(ngx_http_request_t *r, void *data, ngx_int_t rc) { + ngx_http_push_stream_main_conf_t *mcf = ngx_http_get_module_main_conf(r, ngx_http_push_stream_module); + ngx_http_push_stream_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module); + ngx_http_push_stream_subscription_t *subscription = data; + if (r->out != NULL) { + if (r->out->buf != NULL) { + if (ngx_http_push_stream_add_msg_to_channel(mcf, r->connection->log, subscription->channel, r->out->buf->pos, ngx_buf_size(r->out->buf), NULL, NULL, cf->store_messages, r->pool) != NGX_OK) { + ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, "ngx_http_push_stream_add_msg_to_channel != NGX_OK"); + } + } + } + return rc; +} diff --git a/src/ngx_http_push_stream_module_websocket.h b/src/ngx_http_push_stream_module_websocket.h new file mode 120000 index 00000000..18feec18 --- /dev/null +++ b/src/ngx_http_push_stream_module_websocket.h @@ -0,0 +1 @@ +../include/ngx_http_push_stream_module_websocket.h \ No newline at end of file diff --git a/src/ngx_http_push_stream_rbtree_util.c b/src/ngx_http_push_stream_rbtree_util.c index 08d11ba2..e5f776bf 100644 --- a/src/ngx_http_push_stream_rbtree_util.c +++ b/src/ngx_http_push_stream_rbtree_util.c @@ -31,7 +31,7 @@ * Modifications by: Wandenberg Peixoto , Rogério Carvalho Schneider */ -#include +#include "ngx_http_push_stream_rbtree_util.h" static ngx_http_push_stream_channel_t * ngx_http_push_stream_find_channel_on_tree(ngx_str_t *id, ngx_log_t *log, ngx_rbtree_t *tree) @@ -94,7 +94,7 @@ ngx_http_push_stream_find_channel(ngx_str_t *id, ngx_log_t *log, ngx_http_push_s // find a channel by id. if channel not found, make one, insert it, and return that. static ngx_http_push_stream_channel_t * -ngx_http_push_stream_get_channel(ngx_str_t *id, ngx_log_t *log, ngx_http_push_stream_main_conf_t *mcf) +ngx_http_push_stream_get_channel(ngx_str_t *id, ngx_log_t *log, ngx_http_push_stream_main_conf_t *mcf, ngx_http_request_t *r) { ngx_http_push_stream_shm_data_t *data = mcf->shm_data; ngx_http_push_stream_channel_t *channel; @@ -166,7 +166,8 @@ ngx_http_push_stream_get_channel(ngx_str_t *id, ngx_log_t *log, ngx_http_push_st ngx_shmtx_unlock(&data->channels_queue_mutex); - ngx_http_push_stream_send_event(mcf, log, channel, &NGX_HTTP_PUSH_STREAM_EVENT_TYPE_CHANNEL_CREATED, NULL); + ngx_http_push_stream_loc_conf_t *cf = ngx_http_get_module_loc_conf(r, ngx_http_push_stream_module); + ngx_http_push_stream_send_event(mcf, log, channel, &NGX_HTTP_PUSH_STREAM_EVENT_TYPE_CHANNEL_CREATED, r->pool, r, cf->channel_created_request_url); return channel; } diff --git a/src/ngx_http_push_stream_rbtree_util.h b/src/ngx_http_push_stream_rbtree_util.h new file mode 120000 index 00000000..e91d7623 --- /dev/null +++ b/src/ngx_http_push_stream_rbtree_util.h @@ -0,0 +1 @@ +../include/ngx_http_push_stream_rbtree_util.h \ No newline at end of file