diff --git a/.github/workflows/build-test.yml b/.github/workflows/build-test.yml index 58b79d577..cad1aa4b2 100644 --- a/.github/workflows/build-test.yml +++ b/.github/workflows/build-test.yml @@ -39,7 +39,7 @@ env: jobs: ci: runs-on: ${{ matrix.os }} - #if: ${{ github.repository == 'v6d-io/v6d' }} + if: ${{ github.repository == 'v6d-io/v6d' }} strategy: matrix: os: [ubuntu-20.04] diff --git a/benchmark/alloc_test/bench_allocator.cpp b/benchmark/alloc_test/bench_allocator.cpp index 416f04848..02b5aceea 100644 --- a/benchmark/alloc_test/bench_allocator.cpp +++ b/benchmark/alloc_test/bench_allocator.cpp @@ -105,7 +105,7 @@ void bench() { PRNG rng; for (size_t k = 0; k < 32; ++k) { - for (size_t j = 0; j> 5; ++j) { + for (size_t j = 0; j < iterCount >> 5; ++j) { uint32_t rnum1 = rng.rng32(); uint32_t rnum2 = rng.rng32(); size_t idx = Pareto_80_20_6_Rand(paretoData, rnum1, rnum2); diff --git a/modules/graph/fragment/arrow_fragment.vineyard-mod b/modules/graph/fragment/arrow_fragment.vineyard-mod index 6fdaaeb6e..bf0cbea38 100644 --- a/modules/graph/fragment/arrow_fragment.vineyard-mod +++ b/modules/graph/fragment/arrow_fragment.vineyard-mod @@ -250,8 +250,8 @@ class [[vineyard]] ArrowFragment vid_parser_.GenerateId(0, label_id, tvnums_[label_id])); } - vertex_range_t InnerVerticesSlice(label_id_t label_id, vid_t start, vid_t end) - const { + vertex_range_t InnerVerticesSlice(label_id_t label_id, vid_t start, + vid_t end) const { CHECK(start <= end && start <= ivnums_[label_id]); if (end <= ivnums_[label_id]) { return vertex_range_t(vid_parser_.GenerateId(0, label_id, start), @@ -537,13 +537,13 @@ class [[vineyard]] ArrowFragment return oe_offsets_ptr_lists_[v_label][e_label]; } - inline int64_t GetIncomingOffsetLength(label_id_t v_label, label_id_t e_label) - const { + inline int64_t GetIncomingOffsetLength(label_id_t v_label, + label_id_t e_label) const { return ie_offsets_lists_[v_label][e_label]->length(); } - inline int64_t GetOutgoingOffsetLength(label_id_t v_label, label_id_t e_label) - const { + inline int64_t GetOutgoingOffsetLength(label_id_t v_label, + label_id_t e_label) const { return oe_offsets_lists_[v_label][e_label]->length(); } @@ -586,23 +586,23 @@ class [[vineyard]] ArrowFragment grape::PrepareConf conf); boost::leaf::result AddVerticesAndEdges( - Client & client, - std::map> && vertex_tables_map, - std::map> && edge_tables_map, + Client& client, + std::map>&& vertex_tables_map, + std::map>&& edge_tables_map, ObjectID vm_id, const std::vector>>& edge_relations, const int concurrency = std::thread::hardware_concurrency()) override; boost::leaf::result AddVertices( - Client & client, - std::map> && vertex_tables_map, + Client& client, + std::map>&& vertex_tables_map, ObjectID vm_id, const int concurrency = std::thread::hardware_concurrency()) override; boost::leaf::result AddEdges( - Client & client, - std::map> && edge_tables_map, + Client& client, + std::map>&& edge_tables_map, const std::vector>>& edge_relations, const int concurrency = std::thread::hardware_concurrency()) override; @@ -611,9 +611,9 @@ class [[vineyard]] ArrowFragment /// Vertex label id started from vertex_label_num_, and edge label id /// started from edge_label_num_. boost::leaf::result AddNewVertexEdgeLabels( - Client & client, - std::vector> && vertex_tables, - std::vector> && edge_tables, ObjectID vm_id, + Client& client, + std::vector>&& vertex_tables, + std::vector>&& edge_tables, ObjectID vm_id, const std::vector>>& edge_relations, const int concurrency = std::thread::hardware_concurrency()) override; @@ -621,33 +621,32 @@ class [[vineyard]] ArrowFragment /// Add a set of new vertex labels to graph. Vertex label id started from /// vertex_label_num_. boost::leaf::result AddNewVertexLabels( - Client & client, - std::vector> && vertex_tables, + Client& client, + std::vector>&& vertex_tables, ObjectID vm_id, const int concurrency = std::thread::hardware_concurrency()) override; boost::leaf::result AddVerticesToExistedLabel( - Client & client, label_id_t label_id, - std::shared_ptr && vertex_table, ObjectID vm_id, + Client& client, label_id_t label_id, + std::shared_ptr&& vertex_table, ObjectID vm_id, const int concurrency = std::thread::hardware_concurrency()) override; boost::leaf::result AddEdgesToExistedLabel( - Client & client, label_id_t label_id, - std::shared_ptr && edge_table, + Client& client, label_id_t label_id, + std::shared_ptr&& edge_table, const std::set>& edge_relations, const int concurrency = std::thread::hardware_concurrency()) override; /// Add a set of new edge labels to graph. Edge label id started from /// edge_label_num_. boost::leaf::result AddNewEdgeLabels( - Client & client, - std::vector> && edge_tables, + Client& client, std::vector>&& edge_tables, const std::vector>>& edge_relations, const int concurrency = std::thread::hardware_concurrency()) override; boost::leaf::result AddVertexColumns( - vineyard::Client & client, + vineyard::Client& client, const std::map< label_id_t, std::vector>>> @@ -655,7 +654,7 @@ class [[vineyard]] ArrowFragment bool replace = false) override; boost::leaf::result AddVertexColumns( - vineyard::Client & client, + vineyard::Client& client, const std::map>>> @@ -664,7 +663,7 @@ class [[vineyard]] ArrowFragment template boost::leaf::result AddVertexColumnsImpl( - vineyard::Client & client, + vineyard::Client& client, const std::map< label_id_t, std::vector>>> @@ -672,7 +671,7 @@ class [[vineyard]] ArrowFragment bool replace = false); boost::leaf::result AddEdgeColumns( - vineyard::Client & client, + vineyard::Client& client, const std::map< label_id_t, std::vector>>> @@ -680,7 +679,7 @@ class [[vineyard]] ArrowFragment bool replace = false) override; boost::leaf::result AddEdgeColumns( - vineyard::Client & client, + vineyard::Client& client, const std::map>>> @@ -689,7 +688,7 @@ class [[vineyard]] ArrowFragment template boost::leaf::result AddEdgeColumnsImpl( - vineyard::Client & client, + vineyard::Client& client, const std::map< label_id_t, std::vector>>> @@ -697,29 +696,29 @@ class [[vineyard]] ArrowFragment bool replace = false); boost::leaf::result Project( - vineyard::Client & client, + vineyard::Client& client, std::map> vertices, std::map> edges); boost::leaf::result TransformDirection( - vineyard::Client & client, int concurrency); + vineyard::Client& client, int concurrency); boost::leaf::result ConsolidateVertexColumns( - vineyard::Client & client, const label_id_t vlabel, + vineyard::Client& client, const label_id_t vlabel, std::vector const& prop_names, std::string const& consolidate_name); boost::leaf::result ConsolidateVertexColumns( - vineyard::Client & client, const label_id_t vlabel, + vineyard::Client& client, const label_id_t vlabel, std::vector const& props, std::string const& consolidate_name); boost::leaf::result ConsolidateEdgeColumns( - vineyard::Client & client, const label_id_t elabel, + vineyard::Client& client, const label_id_t elabel, std::vector const& prop_names, std::string const& consolidate_name); boost::leaf::result ConsolidateEdgeColumns( - vineyard::Client & client, const label_id_t elabel, + vineyard::Client& client, const label_id_t elabel, std::vector const& props, std::string const& consolidate_name); private: @@ -731,10 +730,10 @@ class [[vineyard]] ArrowFragment std::vector>>& fid_lists_offset); void directedCSR2Undirected( - vineyard::Client & client, - std::vector>>> & + vineyard::Client& client, + std::vector>>>& oe_lists, - std::vector>> & + std::vector>>& oe_offsets_lists, const int concurrency, bool& is_multigraph); diff --git a/python/client.cc b/python/client.cc index 5c4d73f8d..165559ebe 100644 --- a/python/client.cc +++ b/python/client.cc @@ -566,11 +566,7 @@ void bind_client(py::module& mod) { .def_property_readonly("rpc_endpoint", &ClientBase::RPCEndpoint, doc::ClientBase_rpc_endpoint) .def_property_readonly("version", &ClientBase::Version, - doc::ClientBase_version) - .def_property_readonly("is_ipc", &ClientBase::IsIPC, - doc::ClientBase_is_ipc) - .def_property_readonly("is_rpc", &ClientBase::IsRPC, - doc::ClientBase_is_rpc); + doc::ClientBase_version); // Client py::class_, ClientBase>(mod, "IPCClient", diff --git a/python/core.cc b/python/core.cc index 34e1af109..70ef4af5f 100644 --- a/python/core.cc +++ b/python/core.cc @@ -605,7 +605,7 @@ void bind_blobs(py::module& mod) { }); // RemoteBlob - py::class_, Object>( + py::class_>( mod, "RemoteBlob", py::buffer_protocol(), doc::RemoteBlob) .def_property_readonly( "id", [](RemoteBlob* self) -> ObjectIDWrapper { return self->id(); }, diff --git a/python/pybind11_docs.cc b/python/pybind11_docs.cc index 1e23bbaaf..386e6c2a2 100644 --- a/python/pybind11_docs.cc +++ b/python/pybind11_docs.cc @@ -830,14 +830,6 @@ The version number string of connected vineyard server, in the format of semver: :code:`MAJOR.MINOR.PATCH`. )doc"; -const char* ClientBase_is_ipc = R"doc( -Whether the client is connected to vineyard server via UNIX domain socket. -)doc"; - -const char* ClientBase_is_rpc = R"doc( -Whether the client is connected to vineyard server via RPC endpoint. -)doc"; - const char* IPCClient = R"doc( IPC client that connects to vineyard instance's UNIX domain socket. )doc"; diff --git a/python/pybind11_docs.h b/python/pybind11_docs.h index 38436beca..ae0ab2ea2 100644 --- a/python/pybind11_docs.h +++ b/python/pybind11_docs.h @@ -113,8 +113,6 @@ extern const char* ClientBase_status; extern const char* ClientBase_ipc_socket; extern const char* ClientBase_rpc_endpoint; extern const char* ClientBase_version; -extern const char* ClientBase_is_ipc; -extern const char* ClientBase_is_rpc; extern const char* IPCClient; extern const char* IPCClient_create_blob; diff --git a/python/vineyard/contrib/ml/tests/test_dali.py b/python/vineyard/contrib/ml/tests/test_dali.py index a29e5d147..0a0bbe19c 100644 --- a/python/vineyard/contrib/ml/tests/test_dali.py +++ b/python/vineyard/contrib/ml/tests/test_dali.py @@ -32,11 +32,6 @@ def vineyard_for_dali(): yield -def test_dali_tensor_with_rpc_client(vineyard_rpc_client): - test_dali_tensor(vineyard_rpc_client) - - -@pytest.mark.parametrize("vineyard_client", ["vineyard_client", "vineyard_rpc_client"]) def test_dali_tensor(vineyard_client): @dali.pipeline_def() def pipe(): diff --git a/python/vineyard/contrib/ml/tests/test_mxnet.py b/python/vineyard/contrib/ml/tests/test_mxnet.py index 4062e392c..a28cebccb 100644 --- a/python/vineyard/contrib/ml/tests/test_mxnet.py +++ b/python/vineyard/contrib/ml/tests/test_mxnet.py @@ -34,10 +34,6 @@ def vineyard_for_mxnet(): yield -def test_mxnet_tensor_with_rpc_client(vineyard_rpc_client): - test_mxnet_tensor(vineyard_rpc_client) - - def test_mxnet_tensor(vineyard_client): data = [np.random.rand(2, 3) for i in range(10)] label = [np.random.rand(2, 3) for i in range(10)] @@ -49,10 +45,6 @@ def test_mxnet_tensor(vineyard_client): assert dataset[1][0].shape == dtrain[1][0].shape -def test_mxnet_dataframe_with_rpc_client(vineyard_rpc_client): - test_mxnet_dataframe(vineyard_rpc_client) - - def test_mxnet_dataframe(vineyard_client): df = pd.DataFrame({'a': [1, 2, 3, 4], 'b': [5, 6, 7, 8], 'c': [1.0, 2.0, 3.0, 4.0]}) label = df['c'].values.astype(np.float32) @@ -67,10 +59,6 @@ def test_mxnet_dataframe(vineyard_client): assert dataset[1].shape == dtrain[1].shape -def test_mxnet_record_batch_with_rpc_client(vineyard_rpc_client): - test_mxnet_record_batch(vineyard_rpc_client) - - def test_mxnet_record_batch(vineyard_client): arrays = [ pa.array([1, 2, 3, 4]), @@ -84,10 +72,6 @@ def test_mxnet_record_batch(vineyard_client): assert len(dtrain[0][0]) == 2 -def test_mxnet_table_with_rpc_client(vineyard_rpc_client): - test_mxnet_table(vineyard_rpc_client) - - def test_mxnet_table(vineyard_client): arrays = [pa.array([1, 2]), pa.array([0, 1]), pa.array([0.1, 0.2])] batch = pa.RecordBatch.from_arrays(arrays, ['f0', 'f1', 'target']) diff --git a/python/vineyard/contrib/ml/tests/test_tensorflow.py b/python/vineyard/contrib/ml/tests/test_tensorflow.py index 97e33ad6a..101149a1e 100644 --- a/python/vineyard/contrib/ml/tests/test_tensorflow.py +++ b/python/vineyard/contrib/ml/tests/test_tensorflow.py @@ -36,10 +36,6 @@ def vineyard_for_tensorflow(): yield -def test_tensorflow_tensor_with_rpc_client(vineyard_rpc_client): - test_tensorflow_tensor(vineyard_rpc_client) - - def test_tensorflow_tensor(vineyard_client): data = [np.random.rand(2, 3) for i in range(10)] label = [np.random.rand(2, 3) for i in range(10)] @@ -57,10 +53,6 @@ def test_tensorflow_tensor(vineyard_client): assert len(dataset) == len(dtrain) -def test_tensorflow_dataframe_with_rpc_client(vineyard_rpc_client): - test_tensorflow_dataframe(vineyard_rpc_client) - - def test_tensorflow_dataframe(vineyard_client): df = pd.DataFrame( {'a': [1, 2, 3, 4], 'b': [5, 6, 7, 8], 'target': [1.0, 2.0, 3.0, 4.0]} @@ -77,10 +69,6 @@ def test_tensorflow_dataframe(vineyard_client): assert data_ncols == dtrain_ncols -def test_tensorflow_record_batch_with_rpc_client(vineyard_rpc_client): - test_tensorflow_record_batch(vineyard_rpc_client) - - def test_tensorflow_record_batch(vineyard_client): arrays = [ pa.array([1, 2, 3, 4]), @@ -96,10 +84,6 @@ def test_tensorflow_record_batch(vineyard_client): assert len(dtrain) == 4 -def test_tensorflow_table_with_rpc_client(vineyard_rpc_client): - test_tensorflow_table(vineyard_rpc_client) - - def test_tensorflow_table(vineyard_client): arrays = [pa.array([1, 2]), pa.array([0, 1]), pa.array([0.1, 0.2])] batch = pa.RecordBatch.from_arrays(arrays, ['f0', 'f1', 'label']) diff --git a/python/vineyard/contrib/ml/tests/test_torch.py b/python/vineyard/contrib/ml/tests/test_torch.py index 8364f0f8b..6fbc2bf51 100644 --- a/python/vineyard/contrib/ml/tests/test_torch.py +++ b/python/vineyard/contrib/ml/tests/test_torch.py @@ -36,10 +36,6 @@ def vineyard_for_torch(): yield -def test_torch_tensor_with_rpc_client(vineyard_rpc_client): - test_torch_tensor(vineyard_rpc_client) - - def test_torch_tensor(vineyard_client): tensor = torch.ones(5, 2) object_id = vineyard_client.put(tensor) @@ -51,10 +47,6 @@ def test_torch_tensor(vineyard_client): assert torch.equal(value, tensor) -def test_torch_dataset_with_rpc_client(vineyard_rpc_client): - test_torch_dataset(vineyard_rpc_client) - - def test_torch_dataset(vineyard_client): dataset = torch.utils.data.TensorDataset( *[torch.tensor(np.random.rand(2, 3)), torch.tensor(np.random.rand(2, 3))], @@ -70,10 +62,6 @@ def test_torch_dataset(vineyard_client): assert torch.isclose(t1, t2).all() -def test_torch_dataset_dataframe_with_rpc_client(vineyard_rpc_client): - test_torch_dataset_dataframe(vineyard_rpc_client) - - def test_torch_dataset_dataframe(vineyard_client): df = pd.DataFrame({'a': [1, 2, 3, 4], 'b': [5, 6, 7, 8], 'c': [1.0, 2.0, 3.0, 4.0]}) object_id = vineyard_client.put(df) @@ -89,10 +77,6 @@ def test_torch_dataset_dataframe(vineyard_client): ).all() -def test_torch_dataset_dataframe_multidimensional_with_rpc_client(vineyard_rpc_client): - test_torch_dataset_dataframe_multidimensional(vineyard_rpc_client) - - def test_torch_dataset_dataframe_multidimensional(vineyard_client): df = pd.DataFrame( { @@ -107,10 +91,6 @@ def test_torch_dataset_dataframe_multidimensional(vineyard_client): assert len(df.columns) == len(value.tensors) -def test_torch_dataset_recordbatch_with_rpc_client(vineyard_rpc_client): - test_torch_dataset_recordbatch(vineyard_rpc_client) - - def test_torch_dataset_recordbatch(vineyard_client): df = pd.DataFrame({'a': [1, 2, 3, 4], 'b': [5, 6, 7, 8], 'c': [1.0, 2.0, 3.0, 4.0]}) batch = pa.RecordBatch.from_pandas(df) @@ -127,10 +107,6 @@ def test_torch_dataset_recordbatch(vineyard_client): ).all() -def test_torch_dataset_table_with_rpc_client(vineyard_rpc_client): - test_torch_dataset_table(vineyard_rpc_client) - - def test_torch_dataset_table(vineyard_client): df = pd.DataFrame({'a': [1, 2, 3, 4], 'b': [5, 6, 7, 8], 'c': [1.0, 2.0, 3.0, 4.0]}) table = pa.Table.from_pandas(df) diff --git a/python/vineyard/contrib/ml/tests/test_torcharrow.py b/python/vineyard/contrib/ml/tests/test_torcharrow.py index c17487dff..a8ddf9c5d 100644 --- a/python/vineyard/contrib/ml/tests/test_torcharrow.py +++ b/python/vineyard/contrib/ml/tests/test_torcharrow.py @@ -30,10 +30,6 @@ def vineyard_for_torcharrow(): yield -def test_torch_arrow_column_with_rpc_client(vineyard_rpc_client): - test_torch_arrow_column(vineyard_rpc_client) - - def test_torch_arrow_column(vineyard_client): s = ta.column([1, 2, None, 4]) assert s.sum() == 7 @@ -43,10 +39,6 @@ def test_torch_arrow_column(vineyard_client): assert s.sum() == 7 -def test_torch_arrow_dataframe_with_rpc_client(vineyard_rpc_client): - test_torch_arrow_dataframe(vineyard_rpc_client) - - def test_torch_arrow_dataframe(vineyard_client): s = ta.dataframe({"a": [1, 2, None, 4], "b": [5, 6, None, 8]}) assert s.sum()['a'][0] == 7 diff --git a/python/vineyard/contrib/ml/tests/test_xgboost.py b/python/vineyard/contrib/ml/tests/test_xgboost.py index d2358b573..73d737409 100644 --- a/python/vineyard/contrib/ml/tests/test_xgboost.py +++ b/python/vineyard/contrib/ml/tests/test_xgboost.py @@ -31,10 +31,6 @@ def vineyard_for_xgboost(): yield -def test_numpy_ndarray_with_rpc_client(vineyard_rpc_client): - test_numpy_ndarray(vineyard_rpc_client) - - def test_numpy_ndarray(vineyard_client): arr = np.random.rand(4, 5) object_id = vineyard_client.put(arr) @@ -43,10 +39,6 @@ def test_numpy_ndarray(vineyard_client): assert dtrain.num_row() == 4 -def test_pandas_dataframe_specify_label_with_rpc_client(vineyard_rpc_client): - test_pandas_dataframe_specify_label(vineyard_rpc_client) - - def test_pandas_dataframe_specify_label(vineyard_client): df = pd.DataFrame({'a': [1, 2, 3, 4], 'b': [5, 6, 7, 8], 'c': [1.0, 2.0, 3.0, 4.0]}) object_id = vineyard_client.put(df) @@ -58,10 +50,6 @@ def test_pandas_dataframe_specify_label(vineyard_client): assert dtrain.feature_names == ['b', 'c'] -def test_pandas_dataframe_specify_data_with_rpc_client(vineyard_rpc_client): - test_pandas_dataframe_specify_data(vineyard_rpc_client) - - def test_pandas_dataframe_specify_data(vineyard_client): df = pd.DataFrame( {'a': [1, 2, 3, 4], 'b': [[5, 1.0], [6, 2.0], [7, 3.0], [8, 9.0]]} @@ -74,10 +62,6 @@ def test_pandas_dataframe_specify_data(vineyard_client): assert np.allclose(arr, dtrain.get_label()) -def test_record_batch_xgb_resolver_with_rpc_client(vineyard_rpc_client): - test_record_batch_xgb_resolver(vineyard_rpc_client) - - def test_record_batch_xgb_resolver(vineyard_client): arrays = [ pa.array([1, 2, 3, 4]), @@ -94,10 +78,6 @@ def test_record_batch_xgb_resolver(vineyard_client): assert dtrain.feature_names == ['f0', 'f1'] -def test_table_xgb_resolver_with_rpc_client(vineyard_rpc_client): - test_table_xgb_resolver(vineyard_rpc_client) - - def test_table_xgb_resolver(vineyard_client): arrays = [pa.array([1, 2]), pa.array([0, 1]), pa.array([0.1, 0.2])] batch = pa.RecordBatch.from_arrays(arrays, ['f0', 'label', 'f2']) diff --git a/python/vineyard/core/resolver.py b/python/vineyard/core/resolver.py index a01d6eda5..0fa77b121 100644 --- a/python/vineyard/core/resolver.py +++ b/python/vineyard/core/resolver.py @@ -230,13 +230,12 @@ def get( object_id = client.get_name(name) # run resolver - if client.is_rpc(): - obj = client.get_object(object_id) - elif client.is_ipc(): - obj = client.get_object(object_id, fetch=fetch) - else: - raise RuntimeError('Unknown vineyard client type: %s' % type(client)) - + obj = client.get_object(object_id, fetch=fetch) + meta = obj.meta + if not meta.islocal and not meta.isglobal: + raise ValueError( + "Not a local object: for remote object, you can only get its metadata" + ) if resolver is None: resolver = get_current_resolvers() return resolver(obj, __vineyard_client=client, **kw) diff --git a/python/vineyard/core/tests/test_rpc_client.py b/python/vineyard/core/tests/test_rpc_client.py index dd4d2e4a4..dd0996dc4 100644 --- a/python/vineyard/core/tests/test_rpc_client.py +++ b/python/vineyard/core/tests/test_rpc_client.py @@ -115,6 +115,15 @@ def test_remote_blob_create_and_get_large_object(vineyard_endpoint): assert memoryview(remote_blob) == memoryview(large_payload) +def test_remote_blob_error(vineyard_endpoint): + vineyard_rpc_client = vineyard.connect(*vineyard_endpoint.split(':')) + + with pytest.raises( + ValueError, match="Vineyard RPC client cannot be used to create local blobs" + ): + vineyard_rpc_client.put(np.ones((2, 3, 4))) + + def test_multiple_remote_blobs(vineyard_endpoint): vineyard_rpc_client = vineyard.connect(*vineyard_endpoint.split(':')) diff --git a/python/vineyard/data/arrow.py b/python/vineyard/data/arrow.py index 4f17dd2f9..b0ad65c8f 100644 --- a/python/vineyard/data/arrow.py +++ b/python/vineyard/data/arrow.py @@ -31,7 +31,6 @@ from vineyard._C import Blob from vineyard._C import IPCClient from vineyard._C import Object -from vineyard._C import ObjectID from vineyard._C import ObjectMeta from vineyard.core.builder import BuilderContext from vineyard.core.resolver import ResolverContext @@ -39,7 +38,9 @@ from vineyard.data.utils import normalize_dtype -def buffer_builder(client, buffer: Union[bytes, memoryview], builder: BuilderContext): +def buffer_builder( + client: IPCClient, buffer: Union[bytes, memoryview], builder: BuilderContext +): if buffer is None: address = None size = 0 @@ -50,13 +51,7 @@ def buffer_builder(client, buffer: Union[bytes, memoryview], builder: BuilderCon def as_arrow_buffer(blob: Blob): - if isinstance(blob, Blob): - buffer = blob.buffer - else: - if not blob.is_empty(): - buffer = memoryview(blob) - else: - buffer = memoryview(b'') + buffer = blob.buffer if buffer is None: return pa.py_buffer(bytearray()) return pa.py_buffer(buffer) diff --git a/python/vineyard/data/base.py b/python/vineyard/data/base.py index 3b3f5993d..d4e5ea353 100644 --- a/python/vineyard/data/base.py +++ b/python/vineyard/data/base.py @@ -130,7 +130,6 @@ def register_base_types( if resolver_ctx is not None: resolver_ctx.register('vineyard::Blob', bytes_resolver) - resolver_ctx.register('vineyard::RemoteBlob', bytes_resolver) resolver_ctx.register('vineyard::Scalar', scalar_resolver) resolver_ctx.register('vineyard::Array', array_resolver) resolver_ctx.register('vineyard::Sequence', sequence_resolver) diff --git a/python/vineyard/data/tests/test_arrow.py b/python/vineyard/data/tests/test_arrow.py index d697c13db..004698463 100644 --- a/python/vineyard/data/tests/test_arrow.py +++ b/python/vineyard/data/tests/test_arrow.py @@ -32,7 +32,6 @@ register_builtin_types(default_builder_context, default_resolver_context) -@pytest.mark.parametrize("vineyard_client", ["vineyard_client", "vineyard_rpc_client"]) def test_arrow_array(vineyard_client): arr = pa.array([1, 2, None, 3]) object_id = vineyard_client.put(arr) @@ -67,19 +66,13 @@ def test_arrow_array(vineyard_client): assert vineyard_client.get(object_id).values.equals(nested_arr.values) -def build_record_batch(): +def test_record_batch(vineyard_client): arrays = [ pa.array([1, 2, 3, 4]), pa.array(['foo', 'bar', 'baz', None]), pa.array([True, None, False, True]), ] batch = pa.RecordBatch.from_arrays(arrays, ['f0', 'f1', 'f2']) - return batch - - -@pytest.mark.parametrize("vineyard_client", ["vineyard_client", "vineyard_rpc_client"]) -def test_record_batch(vineyard_client): - batch = build_record_batch() _object_id = vineyard_client.put(batch) # noqa: F841 # processing tables that contains string is not roundtrip, as StringArray # will be transformed to LargeStringArray @@ -87,8 +80,7 @@ def test_record_batch(vineyard_client): # assert batch.equals(vineyard_client.get(object_id)) -@pytest.mark.skip -def build_table(): +def test_table(vineyard_client): arrays = [ pa.array([1, 2, 3, 4]), pa.array(['foo', 'bar', 'baz', None]), @@ -97,11 +89,6 @@ def build_table(): batch = pa.RecordBatch.from_arrays(arrays, ['f0', 'f1', 'f2']) batches = [batch] * 5 table = pa.Table.from_batches(batches) - return table - - -def test_table(vineyard_client): - table = build_table() _object_id = vineyard_client.put(table) # noqa: F841 # processing tables that contains string is not roundtrip, as StringArray # will be transformed to LargeStringArray @@ -110,7 +97,6 @@ def test_table(vineyard_client): @pytest.mark.skipif(polars is None, reason='polars is not installed') -@pytest.mark.parametrize("vineyard_client", ["vineyard_client", "vineyard_rpc_client"]) def test_polars_dataframe(vineyard_client): arrays = [ pa.array([1, 2, 3, 4]), @@ -126,22 +112,3 @@ def test_polars_dataframe(vineyard_client): # will be transformed to LargeStringArray # # assert table.equals(vineyard_client.get(object_id)) - - -@pytest.mark.parametrize( - "value", - [ - pa.array([1, 2, None, 3]), - pa.array(["a", None, None, None]), - pa.array([True, False, True, False]), - build_record_batch(), - build_table(), - ], -) -def test_data_consistency_between_ipc_and_rpc( - value, vineyard_client, vineyard_rpc_client -): - object_id = vineyard_client.put(value) - assert vineyard_client.get(object_id) == vineyard_rpc_client.get(object_id) - object_id = vineyard_rpc_client.put(value) - assert vineyard_client.get(object_id) == vineyard_rpc_client.get(object_id) diff --git a/python/vineyard/data/tests/test_base.py b/python/vineyard/data/tests/test_base.py index 548ebb80b..ea52b13b5 100644 --- a/python/vineyard/data/tests/test_base.py +++ b/python/vineyard/data/tests/test_base.py @@ -25,45 +25,38 @@ register_builtin_types(default_builder_context, default_resolver_context) -@pytest.mark.parametrize("vineyard_client", ["vineyard_client", "vineyard_rpc_client"]) def test_int(vineyard_client): object_id = vineyard_client.put(1) assert vineyard_client.get(object_id) == 1 -@pytest.mark.parametrize("vineyard_client", ["vineyard_client", "vineyard_rpc_client"]) def test_double(vineyard_client): object_id = vineyard_client.put(1.234) assert vineyard_client.get(object_id) == pytest.approx(1.234) -@pytest.mark.parametrize("vineyard_client", ["vineyard_client", "vineyard_rpc_client"]) def test_string(vineyard_client): object_id = vineyard_client.put('abcde') assert vineyard_client.get(object_id) == 'abcde' -@pytest.mark.parametrize("vineyard_client", ["vineyard_client", "vineyard_rpc_client"]) def test_bytes(vineyard_client): bs = b'abcde' object_id = vineyard_client.put(bs) assert vineyard_client.get(object_id) == memoryview(bs) -@pytest.mark.parametrize("vineyard_client", ["vineyard_client", "vineyard_rpc_client"]) def test_memoryview(vineyard_client): bs = memoryview(b'abcde') object_id = vineyard_client.put(bs) assert vineyard_client.get(object_id) == bs -@pytest.mark.parametrize("vineyard_client", ["vineyard_client", "vineyard_rpc_client"]) def test_pair(vineyard_client): object_id = vineyard_client.put((1, "2")) assert vineyard_client.get(object_id) == (1, "2") -@pytest.mark.parametrize("vineyard_client", ["vineyard_client", "vineyard_rpc_client"]) def test_tuple(vineyard_client): object_id = vineyard_client.put(()) assert vineyard_client.get(object_id) == () @@ -88,15 +81,3 @@ def test_tuple(vineyard_client): 4444, "5.5.5.5.5.5.5", ) - - -@pytest.mark.parametrize( - "value", [1, 1.234, 'abcd', b'abcde', memoryview(b'abcde'), (1, "2")] -) -def test_data_consistency_between_ipc_and_rpc( - value, vineyard_client, vineyard_rpc_client -): - object_id = vineyard_client.put(value) - assert vineyard_client.get(object_id) == vineyard_rpc_client.get(object_id) - object_id = vineyard_rpc_client.put(value) - assert vineyard_client.get(object_id) == vineyard_rpc_client.get(object_id) diff --git a/python/vineyard/data/tests/test_dataframe.py b/python/vineyard/data/tests/test_dataframe.py index 55e685ae8..722012fa0 100644 --- a/python/vineyard/data/tests/test_dataframe.py +++ b/python/vineyard/data/tests/test_dataframe.py @@ -19,8 +19,6 @@ import numpy as np import pandas as pd -import pytest - from vineyard.core import default_builder_context from vineyard.core import default_resolver_context from vineyard.data import register_builtin_types @@ -29,14 +27,12 @@ register_builtin_types(default_builder_context, default_resolver_context) -@pytest.mark.parametrize("vineyard_client", ["vineyard_client", "vineyard_rpc_client"]) def test_pandas_dataframe(vineyard_client): df = pd.DataFrame({'a': [1, 2, 3, 4], 'b': [5, 6, 7, 8]}) object_id = vineyard_client.put(df) pd.testing.assert_frame_equal(df, vineyard_client.get(object_id)) -@pytest.mark.parametrize("vineyard_client", ["vineyard_client", "vineyard_rpc_client"]) def test_pandas_dataframe_string(vineyard_client): # see gh#533 df = pd.DataFrame({'a': ['1', '2', '3', '4'], 'b': ['5', '6', '7', '8']}) @@ -44,7 +40,14 @@ def test_pandas_dataframe_string(vineyard_client): pd.testing.assert_frame_equal(df, vineyard_client.get(object_id)) -@pytest.mark.parametrize("vineyard_client", ["vineyard_client", "vineyard_rpc_client"]) +def test_pandas_dataframe_empty(vineyard_client): + # see gh#533 + df = pd.DataFrame({'a': [1, 2, 3, 4], 'b': ['5', '6', '7', '8']}) + df = df.iloc[0:0] + object_id = vineyard_client.put(df) + pd.testing.assert_frame_equal(df, vineyard_client.get(object_id)) + + def test_pandas_dataframe_complex_columns(vineyard_client): # see gh#533 df = pd.DataFrame([1, 2, 3, 4], columns=[['x']]) @@ -52,14 +55,12 @@ def test_pandas_dataframe_complex_columns(vineyard_client): pd.testing.assert_frame_equal(df, vineyard_client.get(object_id)) -@pytest.mark.parametrize("vineyard_client", ["vineyard_client", "vineyard_rpc_client"]) def test_pandas_dataframe_int_columns(vineyard_client): df = pd.DataFrame({1: [1, 2, 3, 4], 2: [5, 6, 7, 8]}) object_id = vineyard_client.put(df) pd.testing.assert_frame_equal(df, vineyard_client.get(object_id)) -@pytest.mark.parametrize("vineyard_client", ["vineyard_client", "vineyard_rpc_client"]) def test_pandas_dataframe_mixed_columns(vineyard_client): df = pd.DataFrame( {'a': [1, 2, 3, 4], 'b': [5, 6, 7, 8], 1: [9, 10, 11, 12], 2: [13, 14, 15, 16]} @@ -68,7 +69,6 @@ def test_pandas_dataframe_mixed_columns(vineyard_client): pd.testing.assert_frame_equal(df, vineyard_client.get(object_id)) -@pytest.mark.parametrize("vineyard_client", ["vineyard_client", "vineyard_rpc_client"]) def test_dataframe_reindex(vineyard_client): df = pd.DataFrame(np.random.rand(10, 5), columns=['c1', 'c2', 'c3', 'c4', 'c5']) expected = df.reindex(index=np.arange(10, 1, -1)) @@ -76,7 +76,6 @@ def test_dataframe_reindex(vineyard_client): pd.testing.assert_frame_equal(expected, vineyard_client.get(object_id)) -@pytest.mark.parametrize("vineyard_client", ["vineyard_client", "vineyard_rpc_client"]) def test_dataframe_set_index(vineyard_client): df1 = pd.DataFrame( [[1, 3, 3], [4, 2, 6], [7, 8, 9]], @@ -88,7 +87,6 @@ def test_dataframe_set_index(vineyard_client): pd.testing.assert_frame_equal(expected, vineyard_client.get(object_id)) -@pytest.mark.parametrize("vineyard_client", ["vineyard_client", "vineyard_rpc_client"]) def test_sparse_array(vineyard_client): arr = np.random.randn(10) arr[2:5] = np.nan @@ -98,7 +96,6 @@ def test_sparse_array(vineyard_client): pd.testing.assert_extension_array_equal(sparr, vineyard_client.get(object_id)) -@pytest.mark.parametrize("vineyard_client", ["vineyard_client", "vineyard_rpc_client"]) def test_dataframe_with_sparse_array(vineyard_client): df = pd.DataFrame(np.random.randn(100, 4), columns=['x', 'y', 'z', 'a']) df.iloc[:98] = np.nan @@ -107,7 +104,6 @@ def test_dataframe_with_sparse_array(vineyard_client): pd.testing.assert_frame_equal(df, vineyard_client.get(object_id)) -@pytest.mark.parametrize("vineyard_client", ["vineyard_client", "vineyard_rpc_client"]) def test_dataframe_with_sparse_array_int_columns(vineyard_client): df = pd.DataFrame(np.random.randn(100, 4), columns=[1, 2, 3, 4]) df.iloc[:98] = np.nan @@ -116,7 +112,6 @@ def test_dataframe_with_sparse_array_int_columns(vineyard_client): pd.testing.assert_frame_equal(df, vineyard_client.get(object_id)) -@pytest.mark.parametrize("vineyard_client", ["vineyard_client", "vineyard_rpc_client"]) def test_dataframe_with_sparse_array_mixed_columns(vineyard_client): df = pd.DataFrame(np.random.randn(100, 4), columns=['x', 'y', 'z', 0]) df.iloc[:98] = np.nan @@ -125,7 +120,6 @@ def test_dataframe_with_sparse_array_mixed_columns(vineyard_client): pd.testing.assert_frame_equal(df, vineyard_client.get(object_id)) -@pytest.mark.parametrize("vineyard_client", ["vineyard_client", "vineyard_rpc_client"]) def test_dataframe_with_datetime(vineyard_client): # GH-575 dates = [ @@ -139,7 +133,6 @@ def test_dataframe_with_datetime(vineyard_client): pd.testing.assert_frame_equal(df, vineyard_client.get(object_id)) -@pytest.mark.parametrize("vineyard_client", ["vineyard_client", "vineyard_rpc_client"]) def test_dataframe_with_multidimensional(vineyard_client): df = pd.DataFrame( { @@ -172,58 +165,3 @@ def test_dataframe_reusing(vineyard_client): meta1['__values_-value-0']['buffer_'].id == meta2['__values_-value-0']['buffer_'].id ) - - -@pytest.mark.parametrize( - "value", - [ - pd.DataFrame({'a': [1, 2, 3, 4], 'b': [5, 6, 7, 8]}), - pd.DataFrame({'a': ['1', '2', '3', '4'], 'b': ['5', '6', '7', '8']}), - pd.DataFrame([1, 2, 3, 4], columns=[['x']]), - pd.DataFrame({1: [1, 2, 3, 4], 2: [5, 6, 7, 8]}), - pd.DataFrame( - { - 'a': [1, 2, 3, 4], - 'b': [5, 6, 7, 8], - 1: [9, 10, 11, 12], - 2: [13, 14, 15, 16], - } - ), - pd.DataFrame(np.random.rand(10, 5), columns=['c1', 'c2', 'c3', 'c4', 'c5']), - pd.DataFrame( - [[1, 3, 3], [4, 2, 6], [7, 8, 9]], - index=['a1', 'a2', 'a3'], - columns=['x', 'y', 'z'], - ), - pd.arrays.SparseArray(np.random.randn(10)), - pd.DataFrame(np.random.randn(100, 4), columns=['x', 'y', 'z', 'a']).astype( - pd.SparseDtype("float", np.nan) - ), - pd.DataFrame(np.random.randn(100, 4), columns=[1, 2, 3, 4]).astype( - pd.SparseDtype("float", np.nan) - ), - pd.DataFrame(np.random.randn(100, 4), columns=['x', 'y', 'z', 0]).astype( - pd.SparseDtype("float", np.nan) - ), - pd.DataFrame( - pd.Series( - [ - pd.Timestamp("2012-05-01"), - pd.Timestamp("2012-05-02"), - pd.Timestamp("2012-05-03"), - ] - ) - ), - ], -) -def test_data_consistency_between_ipc_and_rpc( - value, vineyard_client, vineyard_rpc_client -): - object_id = vineyard_client.put(value) - df1 = vineyard_client.get(object_id) - df2 = vineyard_rpc_client.get(object_id) - assert df1.equals(df2) - object_id = vineyard_rpc_client.put(value) - df1 = vineyard_client.get(object_id) - df2 = vineyard_rpc_client.get(object_id) - assert df1.equals(df2) diff --git a/python/vineyard/data/tests/test_default.py b/python/vineyard/data/tests/test_default.py index 54a1f3d17..c579c50a9 100644 --- a/python/vineyard/data/tests/test_default.py +++ b/python/vineyard/data/tests/test_default.py @@ -18,8 +18,6 @@ import numpy as np -import pytest - from vineyard.core import default_builder_context from vineyard.core import default_resolver_context from vineyard.data import register_builtin_types @@ -27,7 +25,6 @@ register_builtin_types(default_builder_context, default_resolver_context) -@pytest.mark.parametrize("vineyard_client", ["vineyard_client", "vineyard_rpc_client"]) def test_bool(vineyard_client): value = True object_id = vineyard_client.put(value) @@ -38,7 +35,6 @@ def test_bool(vineyard_client): assert vineyard_client.get(object_id) == value -@pytest.mark.parametrize("vineyard_client", ["vineyard_client", "vineyard_rpc_client"]) def test_np_bool(vineyard_client): value = np.bool_(True) object_id = vineyard_client.put(value) @@ -49,36 +45,13 @@ def test_np_bool(vineyard_client): assert vineyard_client.get(object_id) == value -@pytest.mark.parametrize("vineyard_client", ["vineyard_client", "vineyard_rpc_client"]) def test_list(vineyard_client): value = [1, 2, 3, 4, 5, 6, None, None, 9] object_id = vineyard_client.put(value) assert vineyard_client.get(object_id) == tuple(value) -@pytest.mark.parametrize("vineyard_client", ["vineyard_client", "vineyard_rpc_client"]) def test_dict(vineyard_client): value = {1: 2, 3: 4, 5: None, None: 6} object_id = vineyard_client.put(value) assert vineyard_client.get(object_id) == value - - -@pytest.mark.parametrize( - "value", - [ - True, - False, - np.bool_(True), - np.bool_(False), - [1, 2, 3, 4, 5, 6, None, None, 9], - {1: 2, 3: 4, 5: None, None: 6}, - ], -) -def test_data_consistency_between_ipc_and_rpc( - value, vineyard_client, vineyard_rpc_client -): - object_id = vineyard_client.put(value) - assert vineyard_client.get(object_id) == vineyard_rpc_client.get(object_id) - - object_id = vineyard_rpc_client.put(value) - assert vineyard_client.get(object_id) == vineyard_rpc_client.get(object_id) diff --git a/python/vineyard/data/tests/test_pickle.py b/python/vineyard/data/tests/test_pickle.py index 2e4adef64..c07eff218 100644 --- a/python/vineyard/data/tests/test_pickle.py +++ b/python/vineyard/data/tests/test_pickle.py @@ -131,7 +131,6 @@ def test_bytes_io_roundtrip(block_size, value): assert target == value -@pytest.mark.parametrize("vineyard_client", ["vineyard_client", "vineyard_rpc_client"]) def test_bytes_io_numpy_ndarray(vineyard_client): arr = np.random.rand(4, 5, 6) object_id = vineyard_client.put(arr) @@ -139,7 +138,6 @@ def test_bytes_io_numpy_ndarray(vineyard_client): np.testing.assert_allclose(arr, target) -@pytest.mark.parametrize("vineyard_client", ["vineyard_client", "vineyard_rpc_client"]) def test_bytes_io_empty_ndarray(vineyard_client): arr = np.ones(()) object_id = vineyard_client.put(arr) @@ -182,7 +180,6 @@ def test_bytes_io_empty_ndarray(vineyard_client): np.testing.assert_allclose(arr, target) -@pytest.mark.parametrize("vineyard_client", ["vineyard_client", "vineyard_rpc_client"]) def test_bytes_io_str_ndarray(vineyard_client): arr = np.array(['', 'x', 'yz', 'uvw']) object_id = vineyard_client.put(arr) @@ -190,7 +187,6 @@ def test_bytes_io_str_ndarray(vineyard_client): np.testing.assert_equal(arr, target) -@pytest.mark.parametrize("vineyard_client", ["vineyard_client", "vineyard_rpc_client"]) def test_object_ndarray(vineyard_client): arr = np.array([1, 'x', 3.14, (1, 4)], dtype=object) object_id = vineyard_client.put(arr) @@ -203,7 +199,6 @@ def test_object_ndarray(vineyard_client): np.testing.assert_equal(arr, target) -@pytest.mark.parametrize("vineyard_client", ["vineyard_client", "vineyard_rpc_client"]) def test_bytes_io_tensor_order(vineyard_client): arr = np.asfortranarray(np.random.rand(10, 7)) object_id = vineyard_client.put(arr) @@ -212,7 +207,6 @@ def test_bytes_io_tensor_order(vineyard_client): assert res.flags['F_CONTIGUOUS'] == arr.flags['F_CONTIGUOUS'] -@pytest.mark.parametrize("vineyard_client", ["vineyard_client", "vineyard_rpc_client"]) def test_bytes_io_pandas_dataframe(vineyard_client): df = pd.DataFrame({'a': [1, 2, 3, 4], 'b': [5, 6, 7, 8]}) object_id = vineyard_client.put(df) @@ -220,7 +214,6 @@ def test_bytes_io_pandas_dataframe(vineyard_client): pd.testing.assert_frame_equal(df, target) -@pytest.mark.parametrize("vineyard_client", ["vineyard_client", "vineyard_rpc_client"]) def test_bytes_io_pandas_dataframe_int_columns(vineyard_client): df = pd.DataFrame({1: [1, 2, 3, 4], 2: [5, 6, 7, 8]}) object_id = vineyard_client.put(df) @@ -228,7 +221,6 @@ def test_bytes_io_pandas_dataframe_int_columns(vineyard_client): pd.testing.assert_frame_equal(df, target) -@pytest.mark.parametrize("vineyard_client", ["vineyard_client", "vineyard_rpc_client"]) def test_bytes_io_pandas_dataframe_mixed_columns(vineyard_client): df = pd.DataFrame( {'a': [1, 2, 3, 4], 'b': [5, 6, 7, 8], 1: [9, 10, 11, 12], 2: [13, 14, 15, 16]} @@ -238,34 +230,8 @@ def test_bytes_io_pandas_dataframe_mixed_columns(vineyard_client): pd.testing.assert_frame_equal(df, target) -@pytest.mark.parametrize("vineyard_client", ["vineyard_client", "vineyard_rpc_client"]) def test_bytes_io_pandas_series(vineyard_client): s = pd.Series([1, 3, 5, np.nan, 6, 8], name='foo') object_id = vineyard_client.put(s) target = read_and_build(b1m, vineyard_client.get(object_id)) pd.testing.assert_series_equal(s, target) - - -@pytest.mark.parametrize( - "value", - [ - np.ones((0, 1, 2, 3)), - np.zeros((0, 1, 2, 3), dtype='int'), - np.array(['', 'x', 'ht', 'yyds']), - np.array([1, 'x', 3.14, (1, 4)], dtype=object), - np.ones((), dtype='object'), - np.asfortranarray(np.random.rand(10, 7)), - ], -) -def test_data_consistency_between_ipc_and_rpc( - value, vineyard_client, vineyard_rpc_client -): - object_id = vineyard_client.put(value) - v1 = vineyard_client.get(object_id) - v2 = vineyard_rpc_client.get(object_id) - assert np.array_equal(v1, v2) - - object_id = vineyard_rpc_client.put(value) - v1 = vineyard_client.get(object_id) - v2 = vineyard_rpc_client.get(object_id) - assert np.array_equal(v1, v2) diff --git a/python/vineyard/data/tests/test_series.py b/python/vineyard/data/tests/test_series.py index 9a6c05906..ef983475f 100644 --- a/python/vineyard/data/tests/test_series.py +++ b/python/vineyard/data/tests/test_series.py @@ -19,8 +19,6 @@ import numpy as np import pandas as pd -import pytest - from vineyard.core import default_builder_context from vineyard.core import default_resolver_context from vineyard.data import register_builtin_types @@ -28,7 +26,6 @@ register_builtin_types(default_builder_context, default_resolver_context) -@pytest.mark.parametrize("vineyard_client", ["vineyard_client", "vineyard_rpc_client"]) def test_pandas_series(vineyard_client): s = pd.Series([1, 3, 5, np.nan, 6, 8], name='foo') object_id = vineyard_client.put(s) diff --git a/python/vineyard/data/tests/test_tensor.py b/python/vineyard/data/tests/test_tensor.py index 003e4d96b..7d48bd1ab 100644 --- a/python/vineyard/data/tests/test_tensor.py +++ b/python/vineyard/data/tests/test_tensor.py @@ -33,20 +33,12 @@ register_builtin_types(default_builder_context, default_resolver_context) -def test_numpy_ndarray_with_rpc_client(vineyard_rpc_client): - test_numpy_ndarray(vineyard_rpc_client) - - def test_numpy_ndarray(vineyard_client): arr = np.random.rand(4, 5, 6) object_id = vineyard_client.put(arr) np.testing.assert_allclose(arr, vineyard_client.get(object_id)) -def test_empty_ndarray_with_rpc_client(vineyard_rpc_client): - test_empty_ndarray(vineyard_rpc_client) - - def test_empty_ndarray(vineyard_client): arr = np.ones(()) object_id = vineyard_client.put(arr) @@ -81,20 +73,12 @@ def test_empty_ndarray(vineyard_client): np.testing.assert_allclose(arr, vineyard_client.get(object_id)) -def test_str_ndarray_with_rpc_client(vineyard_rpc_client): - test_str_ndarray(vineyard_rpc_client) - - def test_str_ndarray(vineyard_client): arr = np.array(['', 'x', 'yz', 'uvw']) object_id = vineyard_client.put(arr) np.testing.assert_equal(arr, vineyard_client.get(object_id)) -def test_object_ndarray_with_rpc_client(vineyard_rpc_client): - test_object_ndarray(vineyard_rpc_client) - - def test_object_ndarray(vineyard_client): arr = np.array([1, 'x', 3.14, (1, 4)], dtype=object) object_id = vineyard_client.put(arr) @@ -105,10 +89,6 @@ def test_object_ndarray(vineyard_client): np.testing.assert_equal(arr, vineyard_client.get(object_id)) -def test_tensor_order_with_rpc_client(vineyard_rpc_client): - test_tensor_order(vineyard_rpc_client) - - def test_tensor_order(vineyard_client): arr = np.asfortranarray(np.random.rand(10, 7)) object_id = vineyard_client.put(arr) @@ -117,11 +97,6 @@ def test_tensor_order(vineyard_client): assert res.flags['F_CONTIGUOUS'] == arr.flags['F_CONTIGUOUS'] -@pytest.mark.skipif(sp is None, reason="scipy.sparse is not available") -def test_bsr_matrix_with_rpc_client(vineyard_rpc_client): - test_bsr_matrix(vineyard_rpc_client) - - @pytest.mark.skipif(sp is None, reason="scipy.sparse is not available") def test_bsr_matrix(vineyard_client): arr = sp.sparse.bsr_matrix((3, 4), dtype=np.int8) @@ -129,11 +104,6 @@ def test_bsr_matrix(vineyard_client): np.testing.assert_allclose(arr.A, vineyard_client.get(object_id).A) -@pytest.mark.skipif(sp is None, reason="scipy.sparse is not available") -def test_coo_matrix_with_rpc_client(vineyard_rpc_client): - test_coo_matrix(vineyard_rpc_client) - - @pytest.mark.skipif(sp is None, reason="scipy.sparse is not available") def test_coo_matrix(vineyard_client): arr = sp.sparse.coo_matrix((3, 4), dtype=np.int8) @@ -141,11 +111,6 @@ def test_coo_matrix(vineyard_client): np.testing.assert_allclose(arr.A, vineyard_client.get(object_id).A) -@pytest.mark.skipif(sp is None, reason="scipy.sparse is not available") -def test_csc_matrix_with_rpc_client(vineyard_rpc_client): - test_csc_matrix(vineyard_rpc_client) - - @pytest.mark.skipif(sp is None, reason="scipy.sparse is not available") def test_csc_matrix(vineyard_client): arr = sp.sparse.csc_matrix((3, 4), dtype=np.int8) @@ -153,11 +118,6 @@ def test_csc_matrix(vineyard_client): np.testing.assert_allclose(arr.A, vineyard_client.get(object_id).A) -@pytest.mark.skipif(sp is None, reason="scipy.sparse is not available") -def test_csr_matrix_with_rpc_client(vineyard_rpc_client): - test_csr_matrix(vineyard_rpc_client) - - @pytest.mark.skipif(sp is None, reason="scipy.sparse is not available") def test_csr_matrix(vineyard_client): arr = sp.sparse.csr_matrix((3, 4), dtype=np.int8) @@ -165,27 +125,8 @@ def test_csr_matrix(vineyard_client): np.testing.assert_allclose(arr.A, vineyard_client.get(object_id).A) -@pytest.mark.skipif(sp is None, reason="scipy.sparse is not available") -def test_dia_matrix_with_rpc_client(vineyard_rpc_client): - test_dia_matrix(vineyard_rpc_client) - - @pytest.mark.skipif(sp is None, reason="scipy.sparse is not available") def test_dia_matrix(vineyard_client): arr = sp.sparse.dia_matrix((3, 4), dtype=np.int8) object_id = vineyard_client.put(arr) np.testing.assert_allclose(arr.A, vineyard_client.get(object_id).A) - - -@pytest.mark.skipif(sp is None, reason="scipy.sparse is not available") -def test_data_consistency_between_ipc_and_rpc(vineyard_client, vineyard_rpc_client): - value = sp.sparse.bsr_matrix((3, 4), dtype=np.int8) - object_id = vineyard_client.put(value) - v1 = vineyard_client.get(object_id) - v2 = vineyard_rpc_client.get(object_id) - assert np.array_equal(v1.todense(), v2.todense()) - - object_id = vineyard_rpc_client.put(value) - v1 = vineyard_client.get(object_id) - v2 = vineyard_rpc_client.get(object_id) - assert np.array_equal(v1.todense(), v2.todense()) diff --git a/python/vineyard/data/utils.py b/python/vineyard/data/utils.py index 3eda064d3..8619ad0bd 100644 --- a/python/vineyard/data/utils.py +++ b/python/vineyard/data/utils.py @@ -16,7 +16,6 @@ # limitations under the License. # -import ctypes import json import pickle from typing import Union @@ -27,8 +26,6 @@ from vineyard._C import Object from vineyard._C import ObjectID from vineyard._C import ObjectMeta -from vineyard._C import RemoteBlobBuilder -from vineyard._C import RPCClient if pickle.HIGHEST_PROTOCOL < 5: import pickle5 as pickle # pylint: disable=import-error @@ -131,7 +128,9 @@ def ensure_ipc_client(client, error_message=None): '''Check if the given client is an instance of IPCClient, raise ValueError if not. ''' - if not client.is_ipc(): + from vineyard._C import IPCClient + + if not isinstance(client, IPCClient): if error_message is None: error_message = "Vineyard IPC client is required, got %s" % type(client) else: @@ -145,29 +144,15 @@ def ensure_ipc_client(client, error_message=None): def build_buffer( client, address, size, *args, **kwargs ) -> Union["Object", "ObjectMeta", "ObjectID"]: - '''Build a blob or a remote blob in vineyard server - for the given bytes or memoryview. + '''Build a blob in vineyard server for the given bytes or memoryview. If address is None or size is 0, an empty blob will be returned. ''' - if client.is_rpc(): - # copy the address with size to a local payloads - if size == 0 or address is None: - meta = ObjectMeta() - meta.nbytes = 0 - meta.typename = "vineyard::RemoteBlob" - return client.create_metadata(meta) - if isinstance(address, bytes): - payload = address - else: - payload = bytearray(size) - address_bytes = (ctypes.c_byte * size).from_address(address) - payload[:size] = memoryview(address_bytes)[:size] - buffer = RemoteBlobBuilder(size) - buffer.copy(0, payload) - id = client.create_remote_blob(buffer) - meta = client.get_meta(id) - return meta + ensure_ipc_client( + client, + "Vineyard RPC client cannot be used to create local blobs, " + "try using an IPC client or `rpc_client.create_remote_blob()`", + ) if size == 0: return client.create_empty_blob() diff --git a/src/client/client.h b/src/client/client.h index ddd77e305..9d5ccf40b 100644 --- a/src/client/client.h +++ b/src/client/client.h @@ -904,13 +904,6 @@ class Client final : public BasicIPCClient, */ Status Seal(ObjectID const& object_id); - /** - * @brief Check if the client is an IPC client. - * - * @return True means the client is an IPC client. - */ - bool IsIPC() const override { return true; } - private: Status GetBuffers(const std::set& ids, const bool unsafe, std::map>& buffers); diff --git a/src/client/client_base.cc b/src/client/client_base.cc index 5bcedbaf2..8b72cca67 100644 --- a/src/client/client_base.cc +++ b/src/client/client_base.cc @@ -72,9 +72,6 @@ Status ClientBase::CreateData(const json& tree, ObjectID& id, } Status ClientBase::CreateMetaData(ObjectMeta& meta_data, ObjectID& id) { - if (this->IsRPC()) { - this->instance_id_ = this->remote_instance_id(); - } return this->CreateMetaData(meta_data, this->instance_id_, std::ref(id)); } diff --git a/src/client/client_base.h b/src/client/client_base.h index 4f0a2fa9b..93d0e25b7 100644 --- a/src/client/client_base.h +++ b/src/client/client_base.h @@ -503,20 +503,6 @@ class ClientBase { */ std::string const& RPCEndpoint() { return this->rpc_endpoint_; } - /** - * @brief Check if the client is an IPC client. - * - * @return True if the client is an IPC client, otherwise false. - */ - virtual bool IsIPC() const { return false; } - - /** - * @brief Check if the client is a RPC client. - * - * @return True if the client is a RPC client, otherwise false. - */ - virtual bool IsRPC() const { return false; } - /** * @brief Get the instance id of the connected vineyard server. * diff --git a/src/client/ds/blob.cc b/src/client/ds/blob.cc index 057d76f7a..e243bf35a 100644 --- a/src/client/ds/blob.cc +++ b/src/client/ds/blob.cc @@ -132,7 +132,7 @@ void Blob::Construct(ObjectMeta const& meta) { if (meta.GetBuffer(meta.GetId(), this->buffer_).ok()) { if (this->buffer_ == nullptr) { throw std::runtime_error( - "Blob::Construct(): Invalid internal state: local blob found but it " + "Blob::Construct(): Invalid internal state: local blob found bit it " "is nullptr: " + ObjectIDToString(meta.GetId())); } diff --git a/src/client/ds/object_meta.cc b/src/client/ds/object_meta.cc index 0c2fcef5d..0e913c3a5 100644 --- a/src/client/ds/object_meta.cc +++ b/src/client/ds/object_meta.cc @@ -18,7 +18,6 @@ limitations under the License. #include #include "client/client.h" -#include "client/client_base.h" #include "client/ds/blob.h" #include "common/util/env.h" @@ -348,15 +347,9 @@ void ObjectMeta::SetMetaData(ClientBase* client, const json& meta) { ObjectID member_id = ObjectIDFromString(tree["id"].get_ref()); if (IsBlob(member_id)) { - if (client_ == nullptr) { + if (client_ == nullptr /* traverse to account blobs */ || + tree["instance_id"].get() == client_->instance_id()) { VINEYARD_CHECK_OK(buffer_set_->EmplaceBuffer(member_id)); - } else { - if ((client_->IsIPC() && - tree["instance_id"].get() == client_->instance_id()) || - (client_->IsRPC() && tree["instance_id"].get() == - client_->remote_instance_id())) { - VINEYARD_CHECK_OK(buffer_set_->EmplaceBuffer(member_id)); - } } } else { for (auto& item : tree) { diff --git a/src/client/ds/object_meta.h b/src/client/ds/object_meta.h index 8de5cbbaa..947d77f60 100644 --- a/src/client/ds/object_meta.h +++ b/src/client/ds/object_meta.h @@ -796,7 +796,6 @@ class ObjectMeta { friend class RPCClient; friend class Blob; - friend class RemoteBlob; friend class BlobWriter; }; diff --git a/src/client/ds/remote_blob.cc b/src/client/ds/remote_blob.cc index 850eac608..72e3b61b5 100644 --- a/src/client/ds/remote_blob.cc +++ b/src/client/ds/remote_blob.cc @@ -14,12 +14,10 @@ limitations under the License. */ #include "client/ds/remote_blob.h" -#include "client/client_base.h" #include #include #include -#include #include #include "client/ds/blob.h" @@ -150,45 +148,6 @@ const std::shared_ptr& RemoteBlobWriter::Buffer() const { Status RemoteBlobWriter::Abort() { return Status::OK(); } -void RemoteBlob::Construct(ObjectMeta const& meta) { - std::string __type_name = type_name(); - VINEYARD_ASSERT(meta.GetTypeName() == __type_name, - "Expect typename '" + __type_name + "', but got '" + - meta.GetTypeName() + "'"); - this->meta_ = meta; - this->id_ = meta.GetId(); - - if (this->buffer_ != nullptr) { - return; - } - if (this->id_ == EmptyBlobID() || meta.GetNBytes() == 0) { - this->size_ = 0; - return; - } - - if (meta.GetClient()->IsRPC() && - meta.GetClient()->remote_instance_id() != meta.GetInstanceId()) { - throw std::runtime_error( - "RemoteBlob::Construct(): Invalid internal state: remote blob found " - "but it is not located with the instance connected by rpc client"); - } - - if (meta.GetBuffer(meta.GetId(), this->buffer_).ok()) { - if (this->buffer_ == nullptr) { - throw std::runtime_error( - "RemoteBlob::Construct(): Invalid internal state: remote blob found " - "but it is nullptr: " + - ObjectIDToString(meta.GetId())); - } - this->size_ = this->buffer_->size(); - } else { - throw std::runtime_error( - "RemoteBlob::Construct(): Invalid internal state: failed to construct " - "remote blob since payload is missing: " + - ObjectIDToString(meta.GetId())); - } -} - void RemoteBlobWriter::Dump() const { #ifndef NDEBUG std::stringstream ss; diff --git a/src/client/ds/remote_blob.h b/src/client/ds/remote_blob.h index 288236c6c..f90561fe7 100644 --- a/src/client/ds/remote_blob.h +++ b/src/client/ds/remote_blob.h @@ -17,7 +17,6 @@ limitations under the License. #define SRC_CLIENT_DS_REMOTE_BLOB_H_ #include -#include #include #include "client/ds/i_object.h" @@ -42,7 +41,7 @@ class RPCClient; * a chunk of memory from its memory space to the client space in a * zero-copy fashion. */ -class RemoteBlob : public Registered { +class RemoteBlob { public: ObjectID id() const; @@ -84,13 +83,6 @@ class RemoteBlob : public Registered { */ const std::shared_ptr& Buffer() const; - /** - * @brief Construct the blob locally for the given object meta. - * - * @param meta The given object meta. - */ - void Construct(ObjectMeta const& meta) override; - /** * @brief Get the arrow buffer of the blob. * @@ -117,26 +109,12 @@ class RemoteBlob : public Registered { */ const std::shared_ptr ArrowBufferOrEmpty() const; - static std::unique_ptr Create() __attribute__((used)) { - return std::static_pointer_cast( - std::unique_ptr{new RemoteBlob()}); - } - /** * @brief Dump the buffer for debugging. */ void Dump() const; private: - /** - * @brief Construct an empty RemoteBlob - */ - RemoteBlob() { - this->id_ = InvalidObjectID(); - this->size_ = std::numeric_limits::max(); - this->buffer_ = nullptr; - } - RemoteBlob(const ObjectID id, const InstanceID instance_id, const size_t size); @@ -149,7 +127,6 @@ class RemoteBlob : public Registered { friend class RPCClient; friend class RemoteBlobWriter; - friend class ObjectMeta; }; /** diff --git a/src/client/rpc_client.cc b/src/client/rpc_client.cc index 7d73f24c5..75515762c 100644 --- a/src/client/rpc_client.cc +++ b/src/client/rpc_client.cc @@ -16,7 +16,6 @@ limitations under the License. #include "client/rpc_client.h" #include -#include #include #include #include @@ -24,7 +23,6 @@ limitations under the License. #include #include -#include "client/ds/blob.h" #include "client/ds/object_factory.h" #include "client/ds/remote_blob.h" #include "client/io.h" @@ -182,49 +180,11 @@ Status RPCClient::GetObject(const ObjectID id, ObjectMeta meta; RETURN_ON_ERROR(this->GetMetaData(id, meta, true)); RETURN_ON_ASSERT(!meta.MetaData().empty()); - std::map> buffers; - std::function traverse = [&](json& meta_tree) -> json& { - if (meta_tree.is_object()) { - auto sub_id = - ObjectIDFromString(meta_tree["id"].get_ref()); - if (IsBlob(sub_id)) { - std::shared_ptr remote_blob; - VINEYARD_CHECK_OK(GetRemoteBlob(sub_id, remote_blob)); - ObjectMeta sub_meta; - sub_meta.Reset(); - VINEYARD_CHECK_OK(GetMetaData(sub_id, sub_meta)); - sub_meta.SetTypeName(type_name()); - VINEYARD_CHECK_OK(sub_meta.buffer_set_->EmplaceBuffer(sub_id)); - VINEYARD_CHECK_OK( - sub_meta.buffer_set_->EmplaceBuffer(sub_id, remote_blob->Buffer())); - buffers.emplace(sub_id, remote_blob->Buffer()); - meta_tree = sub_meta.MetaData(); - return meta_tree; - } else { - for (auto& item : meta_tree.items()) { - if (item.value().is_object() && !item.value().empty()) { - meta_tree[item.key()] = traverse(item.value()); - } - } - } - } - return meta_tree; - }; - auto meta_tree = meta.MetaData(); - auto new_meta_tree = traverse(meta_tree); - ObjectMeta new_meta; - new_meta.Reset(); - new_meta.SetMetaData(this, new_meta_tree); - for (auto& item : buffers) { - VINEYARD_CHECK_OK(new_meta.buffer_set_->EmplaceBuffer(item.first)); - VINEYARD_CHECK_OK( - new_meta.buffer_set_->EmplaceBuffer(item.first, item.second)); - } - object = ObjectFactory::Create(new_meta.GetTypeName()); + object = ObjectFactory::Create(meta.GetTypeName()); if (object == nullptr) { object = std::unique_ptr(new Object()); } - object->Construct(new_meta); + object->Construct(meta); return Status::OK(); } @@ -350,15 +310,6 @@ Status recv_and_decompress(std::shared_ptr const& decompressor, } // namespace detail -bool RPCClient::IsFetchable(const ObjectMeta& meta) { - auto instance_id = meta.meta_["instance_id"]; - if (instance_id.is_null()) { - // it is a newly created metadata - return true; - } - return remote_instance_id_ == instance_id.get(); -} - Status RPCClient::CreateRemoteBlob( std::shared_ptr const& buffer, ObjectID& id) { ENSURE_CONNECTED(this); diff --git a/src/client/rpc_client.h b/src/client/rpc_client.h index f620ed985..5363ffcd0 100644 --- a/src/client/rpc_client.h +++ b/src/client/rpc_client.h @@ -287,21 +287,6 @@ class RPCClient final : public ClientBase { return remote_instance_id_; } - /** - * @brief Check if the client is a RPC client. - * - * @return True means the client is a RPC client. - */ - bool IsRPC() const override { return true; } - - /** - * @brief Whether the instance connected by rpc client is the same as object - * metadata's instance. - * - * @return True means the instance is the same as object metadata's instance. - */ - bool IsFetchable(const ObjectMeta& meta); - Status CreateRemoteBlob(std::shared_ptr const& buffer, ObjectID& id); diff --git a/src/common/util/arrow.h b/src/common/util/arrow.h index 215607a64..9fe2329d8 100644 --- a/src/common/util/arrow.h +++ b/src/common/util/arrow.h @@ -56,8 +56,8 @@ struct hash namespace city { template <> class hash { - inline uint64_t operator()(const vineyard::arrow_string_view& data) const - noexcept { + inline uint64_t operator()( + const vineyard::arrow_string_view& data) const noexcept { return detail::CityHash64(reinterpret_cast(data.data()), data.size()); } diff --git a/src/common/util/blocking_queue.h b/src/common/util/blocking_queue.h index 56a085c1c..e22a3b1cc 100644 --- a/src/common/util/blocking_queue.h +++ b/src/common/util/blocking_queue.h @@ -171,8 +171,7 @@ class SpinLock { public: void lock() { while (locked.test_and_set(std::memory_order_acquire)) { - { - } + {} } } diff --git a/test/runner.py b/test/runner.py index d5ff82293..73aca9a4b 100755 --- a/test/runner.py +++ b/test/runner.py @@ -625,7 +625,6 @@ def run_python_contrib_distributed_tests( ['%s.%d' % (VINEYARD_CI_IPC_SOCKET, i) for i in range(instance_size)] ) start_time = time.time() - rpc_socket_port = instances[0][1] subprocess.check_call( [ 'pytest', @@ -638,7 +637,6 @@ def run_python_contrib_distributed_tests( 'python/vineyard/contrib/%s' % contrib, *test_args, '--vineyard-ipc-sockets=%s' % vineyard_ipc_sockets, - '--vineyard-endpoint=localhost:%s' % rpc_socket_port, ], cwd=os.path.join(os.path.dirname(os.path.abspath(__file__)), '..'), )