Skip to content

Commit

Permalink
Merge pull request #259 from basho/perf/ttb_newmsg_ts1.0_release_cand…
Browse files Browse the repository at this point in the history
…idate_v2

Perf/ttb newmsg ts1.0 release candidate v2

Reviewed-by: javajolt
  • Loading branch information
borshop committed Jan 12, 2016
2 parents 8d574c7 + 0db46d5 commit b570ef0
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 5 deletions.
25 changes: 25 additions & 0 deletions src/riakc_pb_socket.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1290,6 +1290,7 @@ replace_coverage(Pid, Bucket, Cover, Other) ->
Timeout}).

use_native_encoding(Pid, Raw) when is_boolean(Raw) ->
erlang:put(pb_use_native_encoding, Raw),
call_infinity(Pid, {use_native_encoding, Raw}).

%% ====================================================================
Expand Down Expand Up @@ -1337,6 +1338,12 @@ handle_call({req, Msg, Timeout, Ctx}, From, State) when State#state.active =/= u
{noreply, queue_request(new_request(Msg, From, Timeout, Ctx), State)};
handle_call({req, Msg, Timeout}, From, State) ->
{noreply, send_request(new_request(Msg, From, Timeout), State)};
handle_call({req, true, Msg, Timeout}, From, State) ->
{noreply, send_request(true, new_request(Msg, From, Timeout), State)};
handle_call({req, false, Msg, Timeout}, From, State) ->
{noreply, send_request(false, new_request(Msg, From, Timeout), State)};
handle_call({req, undefined, Msg, Timeout}, From, State) ->
{noreply, send_request(undefined, new_request(Msg, From, Timeout), State)};
handle_call({req, Msg, Timeout, Ctx}, From, State) ->
{noreply, send_request(new_request(Msg, From, Timeout, Ctx), State)};
handle_call(is_connected, _From, State) ->
Expand Down Expand Up @@ -1956,6 +1963,10 @@ process_response(#request{msg = #tsputreq{}},
tsputresp, State) ->
{reply, ok, State};

process_response(#request{msg = #tsttbputreq{}},
tsputresp, State) ->
{reply, ok, State};

process_response(#request{msg = #tsdelreq{}},
tsdelresp, State) ->
{reply, ok, State};
Expand Down Expand Up @@ -2272,13 +2283,27 @@ send_request(Request0, State) when State#state.active =:= undefined ->
maybe_enqueue_and_reconnect(Request, State#state{sock=undefined})
end.

send_request(UseNativeEncoding, Request0, State) when State#state.active =:= undefined ->
{Request, Pkt} = encode_request_message(UseNativeEncoding, Request0),
Transport = State#state.transport,
case Transport:send(State#state.sock, Pkt) of
ok ->
maybe_reply(after_send(Request, State#state{active = Request}));
{error, Reason} ->
error_logger:warning_msg("Socket error while sending riakc request: ~p.", [Reason]),
Transport:close(State#state.sock),
maybe_enqueue_and_reconnect(Request, State#state{sock=undefined})
end.

%% Already encoded (for tunneled messages), but must provide Message Id
%% for responding to the second form of send_request.
encode_request_message(#request{msg={tunneled,MsgId,Pkt}}=Req) ->
{Req#request{msg={tunneled,MsgId}},[MsgId|Pkt]};
%% Unencoded Request (the normal PB client path)
encode_request_message(#request{msg=Msg}=Req) ->
{Req, riak_pb_codec:encode(Msg)}.
encode_request_message(UseNativeEncoding, #request{msg=Msg}=Req) ->
{Req, riak_pb_codec:encode(UseNativeEncoding, Msg)}.

%% If the socket was closed, see if we can enqueue the request and
%% trigger a reconnect. Otherwise, return an error to the requestor.
Expand Down
11 changes: 8 additions & 3 deletions src/riakc_ts.erl
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,11 @@ put(Pid, TableName, Measurements) ->
%% As of 2015-11-05, ColumnNames parameter is ignored, the function
%% expects the full set of fields in each element of Data.
put(Pid, TableName, ColumnNames, Measurements) ->
Message = riakc_ts_put_operator:serialize(TableName, ColumnNames, Measurements),
Response = server_call(Pid, Message),
UseNativeEncoding = get(pb_use_native_encoding),
Message = riakc_ts_put_operator:serialize(UseNativeEncoding, TableName, ColumnNames, Measurements),
Response = server_call(UseNativeEncoding, Pid, Message),
riakc_ts_put_operator:deserialize(Response).


-spec delete(Pid::pid(), Table::table_name(), Key::[ts_value()],
Options::proplists:proplist()) ->
ok | {error, Reason::term()}.
Expand Down Expand Up @@ -183,3 +183,8 @@ server_call(Pid, Message) ->
gen_server:call(Pid,
{req, Message, riakc_pb_socket:default_timeout(timeseries)},
infinity).

server_call(UseNativeEncoding, Pid, Message) ->
gen_server:call(Pid,
{req, UseNativeEncoding, Message, riakc_pb_socket:default_timeout(timeseries)},
infinity).
15 changes: 13 additions & 2 deletions src/riakc_ts_put_operator.erl
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,21 @@
-include_lib("riak_pb/include/riak_pb.hrl").
-include_lib("riak_pb/include/riak_ts_pb.hrl").

-export([serialize/3,
-export([serialize/4,
deserialize/1]).

serialize(TableName, ColumnNames, Measurements) ->
%% serialize uses the process dictionary to check if native encoding
%% should be used. If true (ttb encoding) call encode_rows_for_ttb.
%% If false, call default pb encoding function.

serialize(true, TableName, ColumnNames, Measurements) ->
ColumnDescs = riak_pb_ts_codec:encode_columnnames(ColumnNames),
SerializedRows = riak_pb_ts_codec:encode_rows_for_ttb(Measurements),
#tsttbputreq{table = TableName,
columns = ColumnDescs,
rows = SerializedRows};

serialize(_, TableName, ColumnNames, Measurements) ->
ColumnDescs = riak_pb_ts_codec:encode_columnnames(ColumnNames),
SerializedRows = riak_pb_ts_codec:encode_rows_non_strict(Measurements),
#tsputreq{table = TableName,
Expand Down

0 comments on commit b570ef0

Please sign in to comment.