diff --git a/Makefile b/Makefile index 6c0a9086..8042b193 100644 --- a/Makefile +++ b/Makefile @@ -28,6 +28,35 @@ erl_clean: erl_test: erl_compile @./rebar eunit skip_deps=true +REPO = riak_pb +APPS = kernel stdlib sasl erts ssl tools os_mon runtime_tools crypto inets \ + xmerl webtool snmp public_key mnesia eunit syntax_tools compiler +COMBO_PLT = $(HOME)/.$(REPO)_combo_dialyzer_plt + +check_plt: erl_deps erl_compile + dialyzer --check_plt --plt $(COMBO_PLT) --apps $(APPS) \ + deps/*/ebin + +build_plt: erl_deps erl_compile + dialyzer --build_plt --output_plt $(COMBO_PLT) --apps $(APPS) \ + deps/*/ebin + +dialyzer: erl_deps erl_compile + @echo + @echo Use "'make check_plt'" to check PLT prior to using this target. + @echo Use "'make build_plt'" to build PLT prior to using this target. + @echo + @sleep 1 + dialyzer -Wno_return --plt $(COMBO_PLT) deps/*/ebin + +cleanplt: + @echo + @echo "Are you sure? It could take a long time to rebuild." + @echo Deleting $(COMBO_PLT) in 5 seconds. + @echo + sleep 5 + rm $(COMBO_PLT) + # Python specific build steps python_compile: @echo "==> Python (compile)" diff --git a/riak_pb/__init__.py b/riak_pb/__init__.py index 1b031248..0d63e808 100644 --- a/riak_pb/__init__.py +++ b/riak_pb/__init__.py @@ -1,4 +1,5 @@ from riak_pb2 import * from riak_kv_pb2 import * from riak_search_pb2 import * +from riak_dt_pb2 import * from riak_yokozuna_pb2 import * diff --git a/src/riak_dt.proto b/src/riak_dt.proto new file mode 100644 index 00000000..307c0f62 --- /dev/null +++ b/src/riak_dt.proto @@ -0,0 +1,255 @@ +/* ------------------------------------------------------------------- +** +** riak_dt.proto: Protocol buffers for Riak data structures/types +** +** Copyright (c) 2013 Basho Technologies, Inc. All Rights Reserved. +** +** This file is provided to you under the Apache License, +** Version 2.0 (the "License"); you may not use this file +** except in compliance with the License. You may obtain +** a copy of the License at +** +** http://www.apache.org/licenses/LICENSE-2.0 +** +** Unless required by applicable law or agreed to in writing, +** software distributed under the License is distributed on an +** "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +** KIND, either express or implied. See the License for the +** specific language governing permissions and limitations +** under the License. +** +** ------------------------------------------------------------------- +*/ + +/* +** Revision: 2.0 +*/ + +// Java package specifiers +option java_package = "com.basho.riak.protobuf"; +option java_outer_classname = "RiakDtPB"; + +/* + * =============== DATA STRUCTURES ================= + */ + +/* + * Field names in maps are composed of a binary identifier and a type. + * This is so that two clients can create fields with the same name + * but different types, and they converge independently. + */ +message MapField { + /* + * The types that can be stored in a map are limited to counters, + * sets, registers, flags, and maps. + */ + enum MapFieldType { + COUNTER = 1; + SET = 2; + REGISTER = 3; + FLAG = 4; + MAP = 5; + } + + required bytes name = 1; + required MapFieldType type = 2; +} + + +/* + * An entry in a map is a pair of a field-name and value. The type + * defined in the field determines which value type is expected. + */ +message MapEntry { + required MapField field = 1; + optional sint64 counter_value = 2; + repeated bytes set_value = 3; + optional bytes register_value = 4; + optional bool flag_value = 5; + repeated MapEntry map_value = 6; +} + +/* + * =============== FETCH ================= + */ + +/* + * The equivalent of KV's "RpbGetReq", results in a DtFetchResp. The + * request-time options are limited to ones that are relevant to + * structured data-types. + */ +message DtFetchReq { + // The identifier: bucket, key and bucket-type + required bytes bucket = 1; + required bytes key = 2; + required bytes type = 3; + + // Request options + optional uint32 r = 4; + optional uint32 pr = 5; + optional bool basic_quorum = 6; + optional bool notfound_ok = 7; + optional uint32 timeout = 8; + optional bool sloppy_quorum = 9; // Experimental, may change/disappear + optional uint32 n_val = 10; // Experimental, may change/disappear + + // For read-only requests or context-free operations, you can set + // this to false to reduce the size of the response payload. + optional bool include_context = 11 [default=true]; +} + + +/* + * The value of the fetched data type. If present in the response, + * then empty values (sets, maps) should be treated as such. + */ +message DtValue { + optional sint64 counter_value = 1; + repeated bytes set_value = 2; + repeated MapEntry map_value = 3; +} + + +/* + * The response to a "Fetch" request. If the `include_context` option + * is specified, an opaque "context" value will be returned along with + * the user-friendly data. When sending an "Update" request, the + * client should send this context as well, similar to how one would + * send a vclock for KV updates. The `type` field indicates which + * value type to expect. When the `value` field is missing from the + * message, the client should interpret it as a "not found". + */ +message DtFetchResp { + enum DataType { + COUNTER = 1; + SET = 2; + MAP = 3; + } + + optional bytes context = 1; + required DataType type = 2; + optional DtValue value = 3; +} + +/* + * =============== UPDATE ================= + */ + +/* + * An operation to update a Counter, either on its own or inside a + * Map. The `increment` field can be positive or negative. When absent, + * the meaning is an increment by 1. + */ +message CounterOp { + optional sint64 increment = 1; +} + +/* + * An operation to update a Set, either on its own or inside a Map. + * Set members are opaque binary values, you can only add or remove + * them from a Set. + */ +message SetOp { + repeated bytes adds = 1; + repeated bytes removes = 2; +} + +/* + * An operation to be applied to a value stored in a Map -- the + * contents of an UPDATE operation. The operation field that is + * present depends on the type of the field to which it is applied. + */ +message MapUpdate { + /* + * Flags only exist inside Maps and can only be enabled or + * disabled, and there are no arguments to the operations. + */ + enum FlagOp { + ENABLE = 1; + DISABLE = 2; + } + + required MapField field = 1; + + optional CounterOp counter_op = 2; + optional SetOp set_op = 3; + + /* + * There is only one operation on a register, which is to set its + * value, therefore the "operation" is the new value. + */ + optional bytes register_op = 4; + optional FlagOp flag_op = 5; + optional MapOp map_op = 6; +} + +/* + * An operation to update a Map. All operations apply to individual + * fields in the Map. + */ +message MapOp { + /* + * ADD creates a new, "empty" value under a field. REMOVE removes + * a field and value from the Map. UPDATE applies type-specific + * operations to the values stored in the Map. + */ + repeated MapField adds = 1; + repeated MapField removes = 2; + repeated MapUpdate updates = 3; +} + +/* + * A "union" type for update operations. The included operation + * depends on the datatype being updated. + */ +message DtOp { + optional CounterOp counter_op = 1; + optional SetOp set_op = 2; + optional MapOp map_op = 3; +} + +/* + * The equivalent of KV's "RpbPutReq", results in an empty response or + * "DtUpdateResp" if `return_body` is specified, or the key is + * assigned by the server. The request-time options are limited to + * ones that are relevant to structured data-types. + */ +message DtUpdateReq { + // The identifier + required bytes bucket = 1; + optional bytes key = 2; // missing key results in server-assigned key, like KV + required bytes type = 3; // bucket type, not data-type (but the data-type is constrained per bucket-type) + + // Opaque update-context + optional bytes context = 4; + + // The operations + required DtOp op = 5; + + // Request options + optional uint32 w = 6; + optional uint32 dw = 7; + optional uint32 pw = 8; + optional bool return_body = 9 [default=false]; + optional uint32 timeout = 10; + optional bool sloppy_quorum = 11; // Experimental, may change/disappear + optional uint32 n_val = 12; // Experimental, may change/disappear + optional bool include_context = 13 [default=true]; // When return_body is true, should the context be returned too? +} + + +/* + * The equivalent of KV's "RpbPutResp", contains the assigned key if + * it was assigned by the server, and the resulting value and context + * if return_body was set. + */ +message DtUpdateResp { + // The key, if assigned by the server + optional bytes key = 1; + + // The opaque update context and value, if return_body was set. + optional bytes context = 2; + optional sint64 counter_value = 3; + repeated bytes set_value = 4; + repeated MapEntry map_value = 5; +} \ No newline at end of file diff --git a/src/riak_pb_codec.erl b/src/riak_pb_codec.erl index caaece40..b053e1d6 100644 --- a/src/riak_pb_codec.erl +++ b/src/riak_pb_codec.erl @@ -135,6 +135,10 @@ msg_type(57) -> rpbyokozunaindexdeletereq; msg_type(58) -> rpbyokozunaschemagetreq; msg_type(59) -> rpbyokozunaschemagetresp; msg_type(60) -> rpbyokozunaschemaputreq; +msg_type(80) -> dtfetchreq; +msg_type(81) -> dtfetchresp; +msg_type(82) -> dtupdatereq; +msg_type(83) -> dtupdateresp; msg_type(_) -> undefined. %% @doc Converts a symbolic message name into a message code. Replaces @@ -186,7 +190,11 @@ msg_code(rpbyokozunaindexputreq) -> 56; msg_code(rpbyokozunaindexdeletereq) -> 57; msg_code(rpbyokozunaschemagetreq) -> 58; msg_code(rpbyokozunaschemagetresp) -> 59; -msg_code(rpbyokozunaschemaputreq) -> 60. +msg_code(rpbyokozunaschemaputreq) -> 60; +msg_code(dtfetchreq) -> 80; +msg_code(dtfetchresp) -> 81; +msg_code(dtupdatereq) -> 82; +msg_code(dtupdateresp) -> 83. %% @doc Selects the appropriate PB decoder for a message code. -spec decoder_for(pos_integer()) -> module(). @@ -206,7 +214,9 @@ decoder_for(N) when N >= 3, N < 7; decoder_for(N) when N >= 27, N =< 28 -> riak_search_pb; decoder_for(N) when N >= 54, N =< 60 -> - riak_yokozuna_pb. + riak_yokozuna_pb; +decoder_for(N) when N >= 80, N =< 83 -> + riak_dt_pb. %% @doc Selects the appropriate PB encoder for a given message name. -spec encoder_for(atom()) -> module(). diff --git a/src/riak_pb_dt_codec.erl b/src/riak_pb_dt_codec.erl new file mode 100644 index 00000000..4ba3ad85 --- /dev/null +++ b/src/riak_pb_dt_codec.erl @@ -0,0 +1,519 @@ +%% ------------------------------------------------------------------- +%% +%% riak_pb_dt_codec: Protocol Buffers utility functions for Riak DT types +%% +%% Copyright (c) 2013 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- +-module(riak_pb_dt_codec). + +-include("riak_dt_pb.hrl"). + +-export([ + encode_fetch_request/2, + encode_fetch_request/3, + decode_fetch_response/1, + encode_fetch_response/3, + encode_fetch_response/4, + encode_update_request/3, + encode_update_request/4, + decode_operation/1, + decode_operation/2, + operation_type/1, + decode_update_response/3, + encode_update_response/4, + encode_update_response/5, + encode_operation/2 + ]). + +-import(riak_pb_kv_codec, [encode_quorum/1]). + +-export_type([context/0]). + +%% Value types +-opaque context() :: binary(). +-type counter_value() :: integer(). +-type set_value() :: [ binary() ]. +-type register_value() :: binary(). +-type flag_value() :: boolean(). +-type map_entry() :: {map_field(), embedded_value()}. +-type map_field() :: {binary(), embedded_type()}. +-type map_value() :: [ map_entry() ]. +-type embedded_value() :: counter_value() | set_value() | register_value() | flag_value() | map_value(). +-type toplevel_value() :: counter_value() | set_value() | map_value(). +-type fetch_response() :: {toplevel_type(), toplevel_value(), context()}. + +%% Type names as atoms +-type embedded_type() :: counter | set | register | flag | map. +-type toplevel_type() :: counter | set | map. + +%% Operations +-type counter_op() :: increment | decrement | {increment | decrement, integer()}. +-type simple_set_op() :: {add, binary()} | {remove, binary()} | {add_all, [binary()]} | {remove_all, [binary()]}. +-type set_op() :: simple_set_op() | {update, [simple_set_op()]}. +-type flag_op() :: enable | disable. +-type register_op() :: {assign, binary()}. +-type simple_map_op() :: {add, map_field()} | {remove, map_field()} | {update, map_field(), embedded_type_op()}. +-type map_op() :: simple_map_op() | {update, [simple_map_op()]}. +-type embedded_type_op() :: counter_op() | set_op() | register_op() | flag_op() | map_op(). +-type toplevel_op() :: counter_op() | set_op() | map_op(). +-type update() :: {toplevel_type(), toplevel_op(), context()}. + +%% Request options +-type quorum() :: riak_pb_kv_codec:quorum(). +-type update_opt() :: {w, quorum()} | {dw, quorum()} | {pw, quorum()} | + return_body | {return_body, boolean()} | + {timeout, pos_integer()} | + sloppy_quorum | {sloppy_quorum, boolean()} | + {n_val, pos_integer()}. +-type fetch_opt() :: {r, quorum()} | {pr, quorum()} | + basic_quorum | {basic_quorum, boolean()} | + notfound_ok | {notfound_ok, boolean()} | + {timeout, pos_integer()} | + sloppy_quorum | {sloppy_quorum, boolean()} | + {n_val, pos_integer()} | + include_context | {include_context, boolean()}. + +%% Server-side type<->module mappings +-type type_mappings() :: [{embedded_type(), module()}]. + + +%% ========================= +%% DATA STRUCTURES AND TYPES +%% ========================= + +%% @doc Decodes a MapField message into a tuple of name and type. +-spec decode_map_field(#mapfield{}) -> map_field(). +decode_map_field(MapField) -> + decode_map_field(MapField, []). + +%% @doc Decodes a MapField message into a tuple of name and type. +-spec decode_map_field(#mapfield{}, type_mappings()) -> map_field(). +decode_map_field(#mapfield{name=Name,type=Type}, Mods) -> + {Name, decode_type(Type, Mods)}. + +%% @doc Encodes a tuple of name and type into a MapField message. +-spec encode_map_field(map_field()) -> #mapfield{}. +encode_map_field(Field) -> + encode_map_field(Field, []). + +%% @doc Encodes a tuple of name and type into a MapField message, +%% using the given type mappings. +-spec encode_map_field(map_field(), type_mappings()) -> #mapfield{}. +encode_map_field({Name, Type}, Mods) -> + #mapfield{name=Name, type=encode_type(Type, Mods)}. + +%% @doc Decodes an MapEntry message into a tuple of field and value. +-spec decode_map_entry(#mapentry{}) -> map_entry(). +decode_map_entry(Entry) -> + decode_map_entry(Entry, []). + +%% @doc Decodes an MapEntry message into a tuple of field and value, +%% using the given type mappings. +-spec decode_map_entry(#mapentry{}, type_mappings()) -> map_entry(). +decode_map_entry(#mapentry{field=#mapfield{type='COUNTER'}=Field, counter_value=Val}, Mods) -> + {decode_map_field(Field, Mods), Val}; +decode_map_entry(#mapentry{field=#mapfield{type='SET'}=Field, set_value=Val}, Mods) -> + {decode_map_field(Field, Mods), Val}; +decode_map_entry(#mapentry{field=#mapfield{type='REGISTER'}=Field, register_value=Val}, Mods) -> + {decode_map_field(Field, Mods), Val}; +decode_map_entry(#mapentry{field=#mapfield{type='FLAG'}=Field, flag_value=Val}, Mods) -> + {decode_map_field(Field, Mods), Val}; +decode_map_entry(#mapentry{field=#mapfield{type='MAP'}=Field, map_value=Val}, Mods) -> + {decode_map_field(Field, Mods), [ decode_map_entry(Entry, Mods) || Entry <- Val ]}. + + +%% @doc Encodes a tuple of field and value into a MapEntry message. +-spec encode_map_entry(map_entry(), type_mappings()) -> #mapentry{}. +encode_map_entry({{Name, counter=Type}, Value}, _Mods) when is_integer(Value) -> + #mapentry{field=encode_map_field({Name, Type}), counter_value=Value}; +encode_map_entry({{Name, set=Type}, Value}, _Mods) when is_list(Value) -> + #mapentry{field=encode_map_field({Name, Type}), set_value=Value}; +encode_map_entry({{Name, register=Type}, Value}, _Mods) when is_binary(Value) -> + #mapentry{field=encode_map_field({Name, Type}), register_value=Value}; +encode_map_entry({{Name, flag=Type}, Value}, _Mods) when is_atom(Value) -> + #mapentry{field=encode_map_field({Name, Type}), flag_value=encode_flag_value(Value)}; +encode_map_entry({{Name, map=Type}, Value}, Mods) when is_list(Value) -> + #mapentry{field=encode_map_field({Name, Type}), + map_value=[ encode_map_entry(Entry, Mods) || Entry <- Value ]}; +encode_map_entry({{Name, Type}, Value}, Mods) -> + %% We reach this clause if the type is not in the shortname yet, + %% but is a module name. + case lists:keyfind(Type, 2, Mods) of + false -> + %% If you don't have a mapping, we can't encode it. + erlang:error(badarg, [{{Name,Type},Value}, Mods]); + {AtomType, Type} -> + encode_map_entry({{Name,AtomType}, Value}, Mods) + end. + +%% @doc Decodes a PB message type name into a module name according to +%% the passed mappings. +-spec decode_type(atom(), type_mappings()) -> atom(). +decode_type(PBType, Mods) -> + AtomType = decode_type(PBType), + proplists:get_value(AtomType, Mods, AtomType). + +%% @doc Decodes a PB message type name into an atom. +-spec decode_type(atom()) -> atom(). +decode_type('COUNTER') -> counter; +decode_type('SET') -> set; +decode_type('REGISTER') -> register; +decode_type('FLAG') -> flag; +decode_type('MAP') -> map. + +%% @doc Encodes an atom type into the PB message equivalent, using the +%% passed mappings to convert module names into shortnames. +-spec encode_type(atom(), type_mappings()) -> atom(). +encode_type(TypeOrMod, Mods) -> + case lists:keyfind(TypeOrMod, 2, Mods) of + {AtomType, TypeOrMod} -> + encode_type(AtomType); + false -> + encode_type(TypeOrMod) + end. + +%% @doc Encodes an atom type name into the PB message equivalent. +-spec encode_type(atom()) -> atom(). +encode_type(counter) -> 'COUNTER'; +encode_type(set) -> 'SET'; +encode_type(register) -> 'REGISTER'; +encode_type(flag) -> 'FLAG'; +encode_type(map) -> 'MAP'. + +%% @doc Encodes a flag value into its PB message equivalent. +encode_flag_value(on) -> true; +encode_flag_value(off) -> false; +encode_flag_value(Other) -> Other. + +%% ======================== +%% FETCH REQUEST / RESPONSE +%% ======================== + +%% @doc Encodes a fetch request into a DtFetch message. +-spec encode_fetch_request({binary(), binary()}, binary()) -> #dtfetchreq{}. +encode_fetch_request(BucketAndType, Key) -> + encode_fetch_request(BucketAndType, Key, []). + +-spec encode_fetch_request({binary(), binary()}, binary(), [fetch_opt()]) -> #dtfetchreq{}. +encode_fetch_request({BType,Bucket}, Key, Options) -> + encode_fetch_options(#dtfetchreq{bucket=Bucket,key=Key,type=BType}, Options). + +%% @doc Encodes request-time fetch options onto the DtFetch message. +%% @private +-spec encode_fetch_options(#dtfetchreq{}, [fetch_opt()]) -> #dtfetchreq{}. +encode_fetch_options(Fetch, []) -> + Fetch; +encode_fetch_options(Fetch, [{r,R}|Tail]) -> + encode_fetch_options(Fetch#dtfetchreq{r=encode_quorum(R)},Tail); +encode_fetch_options(Fetch, [{pr,PR}|Tail]) -> + encode_fetch_options(Fetch#dtfetchreq{pr=encode_quorum(PR)},Tail); +encode_fetch_options(Fetch, [basic_quorum|Tail]) -> + encode_fetch_options(Fetch, [{basic_quorum, true}|Tail]); +encode_fetch_options(Fetch, [{basic_quorum, BQ}|Tail]) -> + encode_fetch_options(Fetch#dtfetchreq{basic_quorum=BQ},Tail); +encode_fetch_options(Fetch, [notfound_ok|Tail]) -> + encode_fetch_options(Fetch, [{notfound_ok, true}|Tail]); +encode_fetch_options(Fetch, [{notfound_ok, NOK}|Tail]) -> + encode_fetch_options(Fetch#dtfetchreq{notfound_ok=NOK},Tail); +encode_fetch_options(Fetch, [{timeout, TO}|Tail]) -> + encode_fetch_options(Fetch#dtfetchreq{timeout=TO},Tail); +encode_fetch_options(Fetch, [sloppy_quorum|Tail]) -> + encode_fetch_options(Fetch, [{sloppy_quorum, true}|Tail]); +encode_fetch_options(Fetch, [{sloppy_quorum, RB}|Tail]) -> + encode_fetch_options(Fetch#dtfetchreq{sloppy_quorum=RB},Tail); +encode_fetch_options(Fetch, [{n_val, N}|Tail]) -> + encode_fetch_options(Fetch#dtfetchreq{n_val=N}, Tail); +encode_fetch_options(Fetch, [include_context|Tail]) -> + encode_fetch_options(Fetch, [{include_context, true}|Tail]); +encode_fetch_options(Fetch, [{include_context, IC}|Tail]) -> + encode_fetch_options(Fetch#dtfetchreq{include_context=IC},Tail); +encode_fetch_options(Fetch, [_|Tail]) -> + encode_fetch_options(Fetch, Tail). + +%% @doc Decodes a FetchResponse into tuple of type, value and context. +-spec decode_fetch_response(#dtfetchresp{}) -> fetch_response() | {notfound, toplevel_type()}. +decode_fetch_response(#dtfetchresp{type=T, value=undefined}) -> + {notfound, decode_type(T)}; +decode_fetch_response(#dtfetchresp{context=Context, type='COUNTER', + value=#dtvalue{counter_value=Val}}) -> + {counter, Val, Context}; +decode_fetch_response(#dtfetchresp{context=Context, type='SET', + value=#dtvalue{set_value=Val}}) -> + {set, Val, Context}; +decode_fetch_response(#dtfetchresp{context=Context, type='MAP', + value=#dtvalue{map_value=Val}}) -> + {map, [ decode_map_entry(Entry) || Entry <- Val ], Context}. + +%% @doc Encodes the result of a fetch request into a FetchResponse message. +-spec encode_fetch_response(toplevel_type(), toplevel_value(), context()) -> #dtfetchresp{}. +encode_fetch_response(Type, Value, Context) -> + encode_fetch_response(Type, Value, Context, []). + +%% @doc Encodes the result of a fetch request into a FetchResponse message. +-spec encode_fetch_response(toplevel_type(), toplevel_value(), context(), type_mappings()) -> #dtfetchresp{}. +encode_fetch_response(Type, undefined, _Context, _Mods) -> + #dtfetchresp{type=encode_type(Type)}; +encode_fetch_response(Type, Value, Context, Mods) -> + Response = #dtfetchresp{context=Context, type=encode_type(Type)}, + case Type of + counter -> + Response#dtfetchresp{value=#dtvalue{counter_value=Value}}; + set -> + Response#dtfetchresp{value=#dtvalue{set_value=Value}}; + map -> + Response#dtfetchresp{value=#dtvalue{map_value=[encode_map_entry(Entry, Mods) || Entry <- Value]}} + end. + +%% ========================= +%% UPDATE REQUEST / RESPONSE +%% ========================= + +%% @doc Decodes a CounterOp message into a counter operation. +-spec decode_counter_op(#counterop{}) -> counter_op(). +decode_counter_op(#counterop{increment=Int}) when is_integer(Int) -> + {increment, Int}; +decode_counter_op(#counterop{increment=undefined}) -> + increment. + +%% @doc Encodes a counter operation into a CounterOp message. +-spec encode_counter_op(counter_op()) -> #counterop{}. +encode_counter_op({increment, Int}) when is_integer(Int) -> + #counterop{increment=Int}; +encode_counter_op(increment) -> + #counterop{}; +encode_counter_op(decrement) -> + #counterop{increment=-1}; +encode_counter_op({decrement, Int}) when is_integer(Int) -> + #counterop{increment=(-Int)}. + +%% @doc Decodes a SetOp message into a set operation. +-spec decode_set_op(#setop{}) -> set_op(). +decode_set_op(#setop{adds=A, removes=[]}) -> + {add_all, A}; +decode_set_op(#setop{adds=[], removes=R}) -> + {remove_all, R}; +decode_set_op(#setop{adds=A, removes=R}) -> + {update, [{add_all, A}, {remove_all, R}]}. + +%% @doc Encodes a set operation into a SetOp message. +-spec encode_set_op(set_op()) -> #setop{}. +encode_set_op({update, Ops}) when is_list(Ops) -> + lists:foldr(fun encode_set_update/2, #setop{}, Ops); +encode_set_op({C, _}=Op) when add == C; add_all == C; + remove == C; remove_all == C-> + encode_set_op({update, [Op]}). + +%% @doc Folds a set update into the SetOp message. +-spec encode_set_update(simple_set_op(), #setop{}) -> #setop{}. +encode_set_update({add, Member}, #setop{adds=A}=S) when is_binary(Member) -> + S#setop{adds=[Member|A]}; +encode_set_update({add_all, Members}, #setop{adds=A}=S) when is_list(Members) -> + S#setop{adds=Members++A}; +encode_set_update({remove, Member}, #setop{removes=R}=S) when is_binary(Member) -> + S#setop{removes=[Member|R]}; +encode_set_update({remove_all, Members}, #setop{removes=R}=S) when is_list(Members) -> + S#setop{removes=Members++R}. + + +%% @doc Decodes a operation name from a PB message into an atom. +-spec decode_flag_op(atom()) -> atom(). + +decode_flag_op('ENABLE') -> enable; +decode_flag_op('DISABLE') -> disable. + +%% @doc Encodes an atom operation name into the PB message equivalent. +-spec encode_flag_op(atom()) -> atom(). +encode_flag_op(enable) -> 'ENABLE'; +encode_flag_op(disable) -> 'DISABLE'. + +%% @doc Decodes a MapUpdate message into a map field operation. +-spec decode_map_update(#mapupdate{}, type_mappings()) -> {map_field(), embedded_type_op()}. +decode_map_update(#mapupdate{field=#mapfield{name=N, type='COUNTER'=Type}, counter_op=#counterop{}=Op}, Mods) -> + COp = decode_counter_op(Op), + FType = decode_type(Type, Mods), + {{N, FType}, COp}; +decode_map_update(#mapupdate{field=#mapfield{name=N, type='SET'=Type}, set_op=#setop{}=Op}, Mods) -> + SOp = decode_set_op(Op), + FType = decode_type(Type, Mods), + {{N, FType}, SOp}; +decode_map_update(#mapupdate{field=#mapfield{name=N, type='REGISTER'=Type}, register_op=Op}, Mods) -> + FType = decode_type(Type, Mods), + {{N, FType}, {assign, Op}}; +decode_map_update(#mapupdate{field=#mapfield{name=N, type='FLAG'=Type}, flag_op=Op}, Mods) -> + FOp = decode_flag_op(Op), + FType = decode_type(Type, Mods), + {{N, FType}, FOp}; +decode_map_update(#mapupdate{field=#mapfield{name=N, type='MAP'=Type}, map_op=Op}, Mods) -> + MOp = decode_map_op(Op, Mods), + FType = decode_type(Type, Mods), + {{N, FType}, MOp}. + +%% @doc Encodes a map field operation into a MapUpdate message. +-spec encode_map_update(map_field(), embedded_type_op()) -> #mapupdate{}. +encode_map_update({_Name, counter}=Key, Op) -> + #mapupdate{field=encode_map_field(Key), counter_op=encode_counter_op(Op)}; +encode_map_update({_Name, set}=Key, Op) -> + #mapupdate{field=encode_map_field(Key), set_op=encode_set_op(Op)}; +encode_map_update({_Name, register}=Key, {assign, Value}) -> + #mapupdate{field=encode_map_field(Key), register_op=Value}; +encode_map_update({_Name, flag}=Key, Op) -> + #mapupdate{field=encode_map_field(Key), flag_op=encode_flag_op(Op)}; +encode_map_update({_Name, map}=Key, Op) -> + #mapupdate{field=encode_map_field(Key), map_op=encode_map_op(Op)}. + +%% @doc Encodes a map operation into a MapOp message. +-spec encode_map_op(map_op()) -> #mapop{}. +encode_map_op({update, Ops}) -> + lists:foldr(fun encode_map_op_update/2, #mapop{}, Ops); +encode_map_op({Op, _}=C) when add == Op; remove == Op -> + encode_map_op({update, [C]}); +encode_map_op({update, _Field, _Ops}=C) -> + encode_map_op({update, [C]}). + +%% @doc Folds a map update into the MapOp message. +-spec encode_map_op_update(simple_map_op(), #mapop{}) -> #mapop{}. +encode_map_op_update({add, F}, #mapop{adds=A}=M) -> + M#mapop{adds=[encode_map_field(F)|A]}; +encode_map_op_update({remove, F}, #mapop{removes=R}=M) -> + M#mapop{removes=[encode_map_field(F)|R]}; +encode_map_op_update({update, F, Ops}, #mapop{updates=U}=M) when is_list(Ops) -> + Updates = [ encode_map_update(F, Op) || Op <- Ops ], + M#mapop{updates=Updates ++ U}; +encode_map_op_update({update, F, Op}, #mapop{updates=U}=M) -> + M#mapop{updates=[encode_map_update(F, Op) | U]}. + + +-spec decode_map_op(#mapop{}, type_mappings()) -> map_op(). +decode_map_op(#mapop{adds=Adds, removes=Removes, updates=Updates}, Mods) -> + {update, + [ {add, decode_map_field(A, Mods)} || A <- Adds ] ++ + [ {remove, decode_map_field(R, Mods)} || R <- Removes ] ++ + [ begin + {Field, Op} = decode_map_update(U, Mods), + {update, Field, Op} + end || U <- Updates ]}. + +%% @doc Decodes a DtOperation message into a datatype-specific operation. +-spec decode_operation(#dtop{}) -> toplevel_op(). +decode_operation(Op) -> + decode_operation(Op, []). + +-spec decode_operation(#dtop{}, type_mappings()) -> toplevel_op(). +decode_operation(#dtop{counter_op=#counterop{}=Op}, _) -> + decode_counter_op(Op); +decode_operation(#dtop{set_op=#setop{}=Op}, _) -> + decode_set_op(Op); +decode_operation(#dtop{map_op=#mapop{}=Op}, Mods) -> + decode_map_op(Op, Mods). + +%% @doc Encodes a datatype-specific operation into a DtOperation message. +-spec encode_operation(toplevel_op(), toplevel_type()) -> #dtop{}. +encode_operation(Op, counter) -> + #dtop{counter_op=encode_counter_op(Op)}; +encode_operation(Op, set) -> + #dtop{set_op=encode_set_op(Op)}; +encode_operation(Op, map) -> + #dtop{map_op=encode_map_op(Op)}. + +%% @doc Returns the type that the DtOp message expects to be performed +%% on. +-spec operation_type(#dtop{}) -> toplevel_type(). +operation_type(#dtop{counter_op=#counterop{}}) -> + counter; +operation_type(#dtop{set_op=#setop{}}) -> + set; +operation_type(#dtop{map_op=#mapop{}}) -> + map. + +%% @doc Encodes an update request into a DtUpdate message. +-spec encode_update_request({binary(), binary()}, binary() | undefined, update()) -> #dtupdatereq{}. +encode_update_request({_,_}=BucketAndType, Key, {_,_,_}=Update) -> + encode_update_request(BucketAndType, Key, Update, []). + +-spec encode_update_request({binary(), binary()}, binary() | undefined, update(), [update_opt()]) -> #dtupdatereq{}. +encode_update_request({BType, Bucket}, Key, {DType, Op, Context}, Options) -> + Update = #dtupdatereq{bucket=Bucket, + key=Key, + type=BType, + context=Context, + op=encode_operation(Op, DType)}, + encode_update_options(Update, Options). + +%% @doc Encodes request-time update options onto the DtUpdate message. +%% @private +-spec encode_update_options(#dtupdatereq{}, [proplists:property()]) -> #dtupdatereq{}. +encode_update_options(Update, []) -> + Update; +encode_update_options(Update, [{w,W}|Tail]) -> + encode_update_options(Update#dtupdatereq{w=encode_quorum(W)},Tail); +encode_update_options(Update, [{dw,DW}|Tail]) -> + encode_update_options(Update#dtupdatereq{dw=encode_quorum(DW)},Tail); +encode_update_options(Update, [{pw,PW}|Tail]) -> + encode_update_options(Update#dtupdatereq{pw=encode_quorum(PW)},Tail); +encode_update_options(Update, [return_body|Tail]) -> + encode_update_options(Update, [{return_body, true}|Tail]); +encode_update_options(Update, [{return_body, RB}|Tail]) -> + encode_update_options(Update#dtupdatereq{return_body=RB},Tail); +encode_update_options(Update, [{timeout, TO}|Tail]) -> + encode_update_options(Update#dtupdatereq{timeout=TO},Tail); +encode_update_options(Update, [sloppy_quorum|Tail]) -> + encode_update_options(Update, [{sloppy_quorum, true}|Tail]); +encode_update_options(Update, [{sloppy_quorum, RB}|Tail]) -> + encode_update_options(Update#dtupdatereq{sloppy_quorum=RB},Tail); +encode_update_options(Update, [{n_val, N}|Tail]) -> + encode_update_options(Update#dtupdatereq{n_val=N}, Tail); +encode_update_options(Update, [include_context|Tail]) -> + encode_update_options(Update, [{include_context, true}|Tail]); +encode_update_options(Update, [{include_context, IC}|Tail]) -> + encode_update_options(Update#dtupdatereq{include_context=IC},Tail); +encode_update_options(Update, [_|Tail]) -> + encode_update_options(Update, Tail). + +%% @doc Decodes a DtUpdateResp message into erlang values. +-spec decode_update_response(#dtupdateresp{}, Type::toplevel_type(), ReturnBodyExpected::boolean()) -> + ok | {ok, Key::binary()} | {Key::binary(), fetch_response()} | fetch_response(). +decode_update_response(#dtupdateresp{key=K}, _, false) -> + case K of + undefined -> ok; + _ -> {ok, K} + end; +decode_update_response(#dtupdateresp{counter_value=C, context=Ctx}=Resp, counter, true) -> + maybe_wrap_key({counter, C, Ctx}, Resp); +decode_update_response(#dtupdateresp{set_value=S, context=Ctx}=Resp, set, true) -> + maybe_wrap_key({set, S, Ctx}, Resp); +decode_update_response(#dtupdateresp{map_value=M, context=Ctx}=Resp, map, true) -> + maybe_wrap_key({map, [ decode_map_field(F) || F <- M ], Ctx}, Resp). + +maybe_wrap_key(Term, #dtupdateresp{key=undefined}) -> Term; +maybe_wrap_key(Term, #dtupdateresp{key=K}) -> {K, Term}. + +%% @doc Encodes an update response into a DtUpdateResp message. +-spec encode_update_response(toplevel_type(), toplevel_value(), binary(), context()) -> #dtupdateresp{}. +encode_update_response(Type, Value, Key, Context) -> + encode_update_response(Type, Value, Key, Context, []). + +%% @doc Encodes an update response into a DtUpdateResp message. +-spec encode_update_response(toplevel_type(), toplevel_value(), binary(), context(), type_mappings()) -> #dtupdateresp{}. +encode_update_response(counter, Value, Key, Context, _Mods) -> + #dtupdateresp{key=Key, context=Context, counter_value=Value}; +encode_update_response(set, Value, Key, Context, _Mods) -> + #dtupdateresp{key=Key, context=Context, set_value=Value}; +encode_update_response(map, Value, Key, Context, Mods) when is_list(Value) -> + #dtupdateresp{key=Key, context=Context, + map_value=[ encode_map_entry(Entry, Mods) || Value /= undefined, + Entry <- Value ]}. diff --git a/src/riak_pb_kv_codec.erl b/src/riak_pb_kv_codec.erl index c95fde62..c693656e 100644 --- a/src/riak_pb_kv_codec.erl +++ b/src/riak_pb_kv_codec.erl @@ -30,8 +30,6 @@ -include("riak_kv_pb.hrl"). -include("riak_pb_kv_codec.hrl"). --import(riak_pb_codec, [to_list/1, decode_bool/1, encode_bool/1]). - -export([encode_contents/1, %% riakc_pb:pbify_rpbcontents decode_contents/1, %% riakc_pb:erlify_rpbcontents encode_content/1, %% riakc_pb:pbify_rpbcontent @@ -47,6 +45,8 @@ decode_quorum/1 %% riak_kv_pb_socket:normalize_rw_value ]). +-export_type([quorum/0]). +-type quorum() :: symbolic_quorum() | non_neg_integer(). -type symbolic_quorum() :: one | quorum | all | default. -type value() :: binary(). -type metadata() :: dict(). @@ -207,7 +207,7 @@ decode_link(#rpblink{bucket = B, key = K, tag = T}) -> %% @doc Encode a symbolic or numeric quorum value into a Protocol %% Buffers value --spec encode_quorum(symbolic_quorum() | non_neg_integer()) -> non_neg_integer(). +-spec encode_quorum(quorum()) -> non_neg_integer(). encode_quorum(Bin) when is_binary(Bin) -> encode_quorum(binary_to_existing_atom(Bin, latin1)); encode_quorum(one) -> ?RIAKPB_RW_ONE; encode_quorum(quorum) -> ?RIAKPB_RW_QUORUM; @@ -218,7 +218,7 @@ encode_quorum(I) when is_integer(I), I >= 0 -> I. %% @doc Decodes a Protocol Buffers value into a symbolic or numeric %% quorum. --spec decode_quorum(non_neg_integer()) -> symbolic_quorum() | non_neg_integer(). +-spec decode_quorum(non_neg_integer()) -> quorum(). decode_quorum(?RIAKPB_RW_ONE) -> one; decode_quorum(?RIAKPB_RW_QUORUM) -> quorum; decode_quorum(?RIAKPB_RW_ALL) -> all;