From 5cd0e5f64d851831355114c705aa4645048b8b67 Mon Sep 17 00:00:00 2001 From: Juan Facorro Date: Tue, 17 Nov 2015 18:43:25 -0300 Subject: [PATCH 1/3] [#118] Enable chunked body for PUT requests --- src/shotgun.erl | 245 +++++++++++++++++++++++++++--------------------- 1 file changed, 140 insertions(+), 105 deletions(-) diff --git a/src/shotgun.erl b/src/shotgun.erl index 459ad7f..5695073 100644 --- a/src/shotgun.erl +++ b/src/shotgun.erl @@ -9,99 +9,101 @@ -behaviour(gen_fsm). --export([ - start/0, - stop/0, - start_link/4, - open/2, - open/3, - open/4, - close/1, - %% get - get/2, - get/3, - get/4, - %% post - post/5, - %% delete - delete/4, - %% head - head/4, - %% options - options/4, - %% patch - patch/5, - %% put - put/5, - %% generic request - request/6, - %% events - events/1 +-export([ start/0 + , stop/0 + , start_link/4 ]). --export([ - init/1, - handle_event/3, - handle_sync_event/4, - handle_info/3, - terminate/3, - code_change/4 +-export([ open/2 + , open/3 + , open/4 + , close/1 ]). --export([ - at_rest/2, - wait_response/2, - receive_data/2, - receive_chunk/2, - parse_event/1 +-export([ %% get + get/2 + , get/3 + , get/4 + %% post + , post/5 + %% delete + , delete/4 + %% head + , head/4 + %% options + , options/4 + %% patch + , patch/5 + %% put + , put/4 + , put/5 + %% generic request + , request/6 + %% data + , data/2 + , data/3 + %% events + , events/1 ]). -%Work request handlers --export([ - at_rest/3, - wait_response/3, - receive_data/3, - receive_chunk/3 +-export([ init/1 + , handle_event/3 + , handle_sync_event/4 + , handle_info/3 + , terminate/3 + , code_change/4 ]). --type response() :: - #{ - status_code => integer(), - header => map(), - body => binary() - }. --type result() :: {ok, reference() | response()} | {error, term()}. --type headers() :: #{}. --type options() :: - #{ - async => boolean(), - async_mode => binary | sse, - handle_event => fun((fin | nofin, reference(), binary()) -> any()), - basic_auth => {string(), string()}, - timeout => pos_integer() | infinity %% Default 5000 ms - }. --type http_verb() :: get | post | head | delete | patch | put | options. +-export([ at_rest/2 + , wait_response/2 + , receive_data/2 + , receive_chunk/2 + , parse_event/1 + ]). --type connection_type() :: http | https. +%Work request handlers +-export([ at_rest/3 + , wait_response/3 + , receive_data/3 + , receive_chunk/3 + ]). --type open_opts() :: - #{ - %% transport_opts are passed to Ranch's TCP transport, which is +-type connection_type() :: http | https. +-type open_opts() :: + #{ %% transport_opts are passed to Ranch's TCP transport, which is %% -itself- a thin layer over gen_tcp. - transport_opts => [], + transport_opts => [] %% timeout is passed to gun:await_up. Default if not specified %% is 5000 ms. - timeout => pos_integer() | infinity + , timeout => pos_integer() | infinity }. --type event() :: - #{ - id => binary(), - event => binary(), - data => binary() +-type connection() :: pid(). +-type http_verb() :: get | post | head | delete | patch | put | options. +-type uri() :: iodata(). +-type headers() :: #{}. +-type body() :: iodata(). +-type options() :: + #{ async => boolean() + , async_mode => binary | sse + , handle_event => fun((fin | nofin, reference(), binary()) -> any()) + , basic_auth => {string(), string()} + , timeout => pos_integer() | infinity %% Default 5000 ms }. --export_type([event/0]). +-type response() :: #{ status_code => integer() + , header => map() + , body => binary() + }. + +-type result() :: {ok, reference() | response()} | {error, term()}. + +-type event() :: #{ id => binary() + , event => binary() + , data => binary() + }. + +-export_type([response/0, event/0]). %% @doc Starts the application and all the ones it depends on. -spec start() -> {ok, [atom()]}. @@ -225,9 +227,15 @@ options(Pid, Uri, Headers, Options) -> patch(Pid, Uri, Headers, Body, Options) -> request(Pid, patch, Uri, Headers, Body, Options). +%% @doc Performs a chunked PUT request to Uri +%% using %% Headers as the content data. +-spec put(connection(), uri(), headers(), options()) -> result(). +put(Pid, Uri, Headers0, Options) -> + request(Pid, put, Uri, Headers0, body_chunked, Options). + %% @doc Performs a PUT request to Uri using %% Headers and Body as the content data. --spec put(pid(), iodata(), headers(), iodata(), options()) -> result(). +-spec put(connection(), uri(), headers(), body(), options()) -> result(). put(Pid, Uri, Headers0, Body, Options) -> request(Pid, put, Uri, Headers0, Body, Options). @@ -268,6 +276,15 @@ request(Pid, Method, Uri, Headers0, Body, Options) -> _:Reason -> {error, Reason} end. +%% @doc Send data when the +-spec data(connection(), body()) -> ok. +data(Conn, Data) -> + data(Conn, Data, nofin). + +-spec data(connection(), body(), fin | nofin) -> ok. +data(Conn, Data, FinNoFin) -> + gen_fsm:send_event(Conn, {data, Data, FinNoFin}). + %% @doc Returns a list of all received events up to now. -spec events(Pid :: pid()) -> [{nofin | fin, reference(), binary()}]. events(Pid) -> @@ -386,6 +403,7 @@ terminate(_Reason, _StateName, #{pid := Pid} = _State) -> %% @private -spec at_rest(any(), state()) -> {next_state, atom(), state()}. at_rest(timeout, State) -> + io:format("at_rest timeout~n"), case get_work(State) of no_work -> {next_state, at_rest, State}; @@ -407,13 +425,18 @@ at_rest({get_async, {HandleEvent, AsyncMode}, Args, From}, }, {next_state, wait_response, NewState}; at_rest({HttpVerb, Args, From}, State = #{pid := Pid}) -> + {_, _, Body} = Args, StreamRef = do_http_verb(HttpVerb, Pid, Args), CleanState = clean_state(State), - NewState = CleanState#{ - pid => Pid, - stream => StreamRef, - from => From - }, + NewState = CleanState#{ pid => Pid + , stream => StreamRef + , from => From + }, + case Body of + body_chunked -> + gen_fsm:send_event(self(), body_chunked); + _ -> ok + end, {next_state, wait_response, NewState}. -spec at_rest(term(), pid(), term()) -> term(). @@ -447,22 +470,29 @@ wait_response({gun_response, _Pid, _StreamRef, nofin, StatusCode, Headers}, #{from := From, stream := StreamRef, async := Async} = State) -> StateName = case lists:keyfind(<<"transfer-encoding">>, 1, Headers) of - {<<"transfer-encoding">>, <<"chunked">>} when Async == true-> - Result = {ok, StreamRef}, - gen_fsm:reply(From, Result), - receive_chunk; - _ -> - receive_data + {<<"transfer-encoding">>, <<"chunked">>} when Async == true-> + Result = {ok, StreamRef}, + gen_fsm:reply(From, Result), + receive_chunk; + _ -> + receive_data end, - { - next_state, - StateName, - State#{status_code := StatusCode, headers := Headers} + { next_state + , StateName + , State#{status_code := StatusCode, headers := Headers} }; wait_response({gun_error, _Pid, _StreamRef, Error}, #{from := From} = State) -> gen_fsm:reply(From, {error, Error}), {next_state, at_rest, State, 0}; +wait_response(body_chunked, + #{stream := StreamRef, from := From} = State) -> + gen_fsm:reply(From, {ok, StreamRef}), + {next_state, wait_response, State}; +wait_response({data, Data, FinNoFin}, + #{stream := StreamRef, pid := Pid} = State) -> + ok = gun:data(Pid, StreamRef, FinNoFin, Data), + {next_state, wait_response, State}; wait_response(Event, State) -> {stop, {unexpected, Event}, State}. @@ -528,26 +558,31 @@ clean_state(State) when is_map(State) -> clean_state(get_pending_reqs(State)); clean_state(Reqs) -> #{ - pid => undefined, - stream => undefined, - handle_event => undefined, - from => undefined, - responses => queue:new(), - data => <<"">>, - status_code => undefined, - headers => undefined, - async => false, - async_mode => binary, - buffer => <<"">>, + pid => undefined, + stream => undefined, + handle_event => undefined, + from => undefined, + responses => queue:new(), + data => <<"">>, + status_code => undefined, + headers => undefined, + async => false, + async_mode => binary, + buffer => <<"">>, pending_requests => Reqs }. %% @private -spec do_http_verb(http_verb(), pid(), tuple()) -> reference(). +do_http_verb(Method, Pid, {Uri, Headers, body_chunked}) -> + gun:request(Pid, http_verb_bin(Method), Uri, Headers); do_http_verb(Method, Pid, {Uri, Headers, Body}) -> + gun:request(Pid, http_verb_bin(Method), Uri, Headers, Body). + +-spec http_verb_bin(atom()) -> binary(). +http_verb_bin(Method) -> MethodStr = string:to_upper(atom_to_list(Method)), - MethodBin = list_to_binary(MethodStr), - gun:request(Pid, MethodBin, Uri, Headers, Body). + list_to_binary(MethodStr). %% @private manage_chunk(IsFin, StreamRef, Data, From c3c08ad5b8065f712111d9dcee08b721e6aa93a8 Mon Sep 17 00:00:00 2001 From: Juan Facorro Date: Wed, 18 Nov 2015 18:32:07 -0300 Subject: [PATCH 2/3] [#118] Handle unexpected data messages --- src/shotgun.erl | 90 +++++++++++++++++++++++++++++-------------------- 1 file changed, 53 insertions(+), 37 deletions(-) diff --git a/src/shotgun.erl b/src/shotgun.erl index 5695073..884131e 100644 --- a/src/shotgun.erl +++ b/src/shotgun.erl @@ -82,7 +82,7 @@ -type http_verb() :: get | post | head | delete | patch | put | options. -type uri() :: iodata(). -type headers() :: #{}. --type body() :: iodata(). +-type body() :: iodata() | body_chunked. -type options() :: #{ async => boolean() , async_mode => binary | sse @@ -193,37 +193,37 @@ get(Pid, Uri, Headers) -> %% %% %% @end --spec get(pid(), iodata(), headers(), options()) -> result(). +-spec get(connection(), uri(), headers(), options()) -> result(). get(Pid, Uri, Headers, Options) -> request(Pid, get, Uri, Headers, [], Options). %% @doc Performs a POST request to Uri using %% Headers and Body as the content data. --spec post(pid(), iodata(), headers(), iodata(), options()) -> result(). +-spec post(connection(), uri(), headers(), body(), options()) -> result(). post(Pid, Uri, Headers, Body, Options) -> request(Pid, post, Uri, Headers, Body, Options). %% @doc Performs a DELETE request to Uri using %% Headers. --spec delete(pid(), iodata(), headers(), options()) -> result(). +-spec delete(connection(), uri(), headers(), options()) -> result(). delete(Pid, Uri, Headers, Options) -> request(Pid, delete, Uri, Headers, [], Options). %% @doc Performs a HEAD request to Uri using %% Headers. --spec head(pid(), iodata(), headers(), options()) -> result(). +-spec head(connection(), uri(), headers(), options()) -> result(). head(Pid, Uri, Headers, Options) -> request(Pid, head, Uri, Headers, [], Options). %% @doc Performs a OPTIONS request to Uri using %% Headers. --spec options(pid(), iodata(), headers(), options()) -> result(). +-spec options(connection(), uri(), headers(), options()) -> result(). options(Pid, Uri, Headers, Options) -> request(Pid, options, Uri, Headers, [], Options). %% @doc Performs a PATCH request to Uri using %% Headers and Body as the content data. --spec patch(pid(), iodata(), headers(), iodata(), options()) -> result(). +-spec patch(connection(), uri(), headers(), body(), options()) -> result(). patch(Pid, Uri, Headers, Body, Options) -> request(Pid, patch, Uri, Headers, Body, Options). @@ -242,7 +242,7 @@ put(Pid, Uri, Headers0, Body, Options) -> %% @doc Performs a request to Uri using the HTTP method %% specified by Method, Body as the content data and %% Headers as the request's headers. --spec request(pid(), http_verb(), iodata(), headers(), iodata(), options()) -> +-spec request(connection(), http_verb(), uri(), headers(), body(), options()) -> result(). request(Pid, get, Uri, Headers0, Body, Options) -> try @@ -277,13 +277,13 @@ request(Pid, Method, Uri, Headers0, Body, Options) -> end. %% @doc Send data when the --spec data(connection(), body()) -> ok. +-spec data(connection(), body()) -> ok | {error, any()}. data(Conn, Data) -> data(Conn, Data, nofin). --spec data(connection(), body(), fin | nofin) -> ok. +-spec data(connection(), body(), fin | nofin) -> ok | {error, any()}. data(Conn, Data, FinNoFin) -> - gen_fsm:send_event(Conn, {data, Data, FinNoFin}). + gen_fsm:sync_send_event(Conn, {data, Data, FinNoFin}). %% @doc Returns a list of all received events up to now. -spec events(Pid :: pid()) -> [{nofin | fin, reference(), binary()}]. @@ -398,12 +398,45 @@ terminate(_Reason, _StateName, #{pid := Pid} = _State) -> %% gen_fsm states %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% @private +-spec at_rest(term(), pid(), term()) -> term(). +at_rest({data, _, _} = Event, _From, State) -> + unexpected_event_warning(at_rest, Event), + {reply, {error, unexpected}, at_rest, State}; +at_rest(Event, From, State) -> + enqueue_work_or_stop(at_rest, Event, From, State). + +%% @private +-spec wait_response(term(), pid(), term()) -> term(). +wait_response( {data, Data, FinNoFin} + , _From + , #{stream := StreamRef, pid := Pid} = State) -> + ok = gun:data(Pid, StreamRef, FinNoFin, Data), + {reply, ok, wait_response, State}; +wait_response(Event, From, State) -> + enqueue_work_or_stop(wait_response, Event, From, State). + +%% @private +-spec receive_data(term(), pid(), term()) -> term(). +receive_data({data, _, _} = Event, _From, State) -> + unexpected_event_warning(at_rest, Event), + {reply, {error, unexpected}, receive_data, State}; +receive_data(Event, From, State) -> + enqueue_work_or_stop(receive_data, Event, From, State). + +%% @private +-spec receive_chunk(term(), pid(), term()) -> term(). +receive_chunk({data, _, _} = Event, _From, State) -> + unexpected_event_warning(at_rest, Event), + {reply, {error, unexpected}, receive_chunk, State}; +receive_chunk(Event, From, State) -> + enqueue_work_or_stop(receive_chunk, Event, From, State). + %See if we have work. If we do, dispatch. %If we don't, stay in at_rest. %% @private -spec at_rest(any(), state()) -> {next_state, atom(), state()}. at_rest(timeout, State) -> - io:format("at_rest timeout~n"), case get_work(State) of no_work -> {next_state, at_rest, State}; @@ -424,8 +457,7 @@ at_rest({get_async, {HandleEvent, AsyncMode}, Args, From}, async_mode => AsyncMode }, {next_state, wait_response, NewState}; -at_rest({HttpVerb, Args, From}, State = #{pid := Pid}) -> - {_, _, Body} = Args, +at_rest({HttpVerb, {_, _, Body} = Args, From}, State = #{pid := Pid}) -> StreamRef = do_http_verb(HttpVerb, Pid, Args), CleanState = clean_state(State), NewState = CleanState#{ pid => Pid @@ -439,15 +471,6 @@ at_rest({HttpVerb, Args, From}, State = #{pid := Pid}) -> end, {next_state, wait_response, NewState}. --spec at_rest(term(), pid(), term()) -> term(). -at_rest(Event, From, State) -> - enqueue_work_or_stop(at_rest, Event, From, State). - -%% @private --spec wait_response(term(), pid(), term()) -> term(). -wait_response(Event, From, State) -> - enqueue_work_or_stop(wait_response, Event, From, State). - %% @private -spec wait_response(term(), term()) -> term(). wait_response({'DOWN', _, _, _, Reason}, _State) -> @@ -489,18 +512,9 @@ wait_response(body_chunked, #{stream := StreamRef, from := From} = State) -> gen_fsm:reply(From, {ok, StreamRef}), {next_state, wait_response, State}; -wait_response({data, Data, FinNoFin}, - #{stream := StreamRef, pid := Pid} = State) -> - ok = gun:data(Pid, StreamRef, FinNoFin, Data), - {next_state, wait_response, State}; wait_response(Event, State) -> {stop, {unexpected, Event}, State}. -%% @private --spec receive_data(term(), pid(), term()) -> term(). -receive_data(Event, From, State) -> - enqueue_work_or_stop(receive_data, Event, From, State). - %% @private %% @doc Regular response -spec receive_data(term(), term()) -> term(). @@ -524,11 +538,6 @@ receive_data({gun_error, _Pid, StreamRef, _Reason}, #{stream := StreamRef} = State) -> {next_state, at_rest, State, 0}. -%% @private --spec receive_chunk(term(), pid(), term()) -> term(). -receive_chunk(Event, From, State) -> - enqueue_work_or_stop(receive_chunk, Event, From, State). - %% @private %% @doc Chunked data response -spec receive_chunk(term(), term()) -> term(). @@ -725,3 +734,10 @@ append_work(Work, State) -> %% @private get_pending_reqs(State) -> maps:get(pending_requests, State). + +%% @private +-spec unexpected_event_warning(atom(), any()) -> ok. +unexpected_event_warning(StateName, Event) -> + error_logger:warning_msg( "Unexpected event in state '~p': ~p~n" + , [StateName, Event] + ). From e603d9925a4b38fc5e5c4e390edba7da47352e14 Mon Sep 17 00:00:00 2001 From: Juan Facorro Date: Wed, 18 Nov 2015 18:36:27 -0300 Subject: [PATCH 3/3] [#118] Add chunked request functions for post and patch --- src/shotgun.erl | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/src/shotgun.erl b/src/shotgun.erl index 884131e..be2c877 100644 --- a/src/shotgun.erl +++ b/src/shotgun.erl @@ -25,6 +25,7 @@ , get/3 , get/4 %% post + , post/4 , post/5 %% delete , delete/4 @@ -33,6 +34,7 @@ %% options , options/4 %% patch + , patch/4 , patch/5 %% put , put/4 @@ -197,6 +199,12 @@ get(Pid, Uri, Headers) -> get(Pid, Uri, Headers, Options) -> request(Pid, get, Uri, Headers, [], Options). +%% @doc Performs a chunked POST request to Uri +%% using %% Headers as the content data. +-spec post(connection(), uri(), headers(), options()) -> result(). +post(Pid, Uri, Headers, Options) -> + request(Pid, post, Uri, Headers, body_chunked, Options). + %% @doc Performs a POST request to Uri using %% Headers and Body as the content data. -spec post(connection(), uri(), headers(), body(), options()) -> result(). @@ -221,6 +229,12 @@ head(Pid, Uri, Headers, Options) -> options(Pid, Uri, Headers, Options) -> request(Pid, options, Uri, Headers, [], Options). +%% @doc Performs a chunked PATCH request to Uri +%% using %% Headers as the content data. +-spec patch(connection(), uri(), headers(), options()) -> result(). +patch(Pid, Uri, Headers, Options) -> + request(Pid, post, Uri, Headers, body_chunked, Options). + %% @doc Performs a PATCH request to Uri using %% Headers and Body as the content data. -spec patch(connection(), uri(), headers(), body(), options()) -> result(). @@ -230,14 +244,14 @@ patch(Pid, Uri, Headers, Body, Options) -> %% @doc Performs a chunked PUT request to Uri %% using %% Headers as the content data. -spec put(connection(), uri(), headers(), options()) -> result(). -put(Pid, Uri, Headers0, Options) -> - request(Pid, put, Uri, Headers0, body_chunked, Options). +put(Pid, Uri, Headers, Options) -> + request(Pid, put, Uri, Headers, body_chunked, Options). %% @doc Performs a PUT request to Uri using %% Headers and Body as the content data. -spec put(connection(), uri(), headers(), body(), options()) -> result(). -put(Pid, Uri, Headers0, Body, Options) -> - request(Pid, put, Uri, Headers0, Body, Options). +put(Pid, Uri, Headers, Body, Options) -> + request(Pid, put, Uri, Headers, Body, Options). %% @doc Performs a request to Uri using the HTTP method %% specified by Method, Body as the content data and