Skip to content

Commit

Permalink
start with chunked and distributed array storage
Browse files Browse the repository at this point in the history
  • Loading branch information
philippwindischhofer committed Jan 9, 2024
1 parent 3b5054d commit d5eb152
Show file tree
Hide file tree
Showing 7 changed files with 193 additions and 0 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ add_subdirectory(src)
add_subdirectory(cpython)
add_subdirectory(examples)
add_subdirectory(extern)
add_subdirectory(tests)
104 changes: 104 additions & 0 deletions include/Eisvogel/DistributedNDArray.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
#ifndef __DISTRIBUTED_NDARRAY_HH
#define __DISTRIBUTED_NDARRAY_HH

#include <iostream>

#include <string>
#include <memory>
#include <map>

#include "Eisvogel/NDArray.hh"
#include "Eisvogel/Serialization.hh"

struct ChunkMetadata {

ChunkMetadata(const std::string filename, const IndexVector& start_ind, const IndexVector& stop_ind) :
filename(filename), start_ind(start_ind), stop_ind(stop_ind) { }

std::string filename;
IndexVector start_ind;
IndexVector stop_ind;
};

// ----

namespace stor {
template <>
struct Traits<ChunkMetadata> {
using type = ChunkMetadata;

static void serialize(std::iostream& stream, const type& val) {
Traits<std::string>::serialize(stream, val.filename);
Traits<IndexVector>::serialize(stream, val.start_ind);
Traits<IndexVector>::serialize(stream, val.stop_ind);
}

static type deserialize(std::iostream& stream) {
std::string filename = Traits<std::string>::deserialize(stream);
IndexVector start_ind = Traits<IndexVector>::deserialize(stream);
IndexVector stop_ind = Traits<IndexVector>::deserialize(stream);
return ChunkMetadata(filename, start_ind, stop_ind);
}
};
}

template <class T, std::size_t dims>
class DistributedNDArray {

public:

DistributedNDArray(std::string dirpath, std::size_t max_cache_size);

using chunk_t = NDArray<T, dims>;

// For assembling a distributed array
void RegisterChunk(const chunk_t& chunk, const IndexVector start_ind);

// For accessing a distributed array
T& operator()(DenseVector<T>& inds);

private:

std::string m_dirpath;
std::size_t m_max_cache_size;

std::vector<ChunkMetadata> m_chunk_index;

using chunk_cache_t = DenseNDArray<T, dims>;
std::map<std::size_t, chunk_cache_t> m_chunk_cache; // key is index of chunk in m_chunk_index
};

// ---

template <class T, std::size_t dims>
DistributedNDArray<T, dims>::DistributedNDArray(std::string dirpath, std::size_t max_cache_size) :
m_dirpath(dirpath), m_max_cache_size(max_cache_size) {

// create directory if it does not exist
}

template <class T, std::size_t dims>
void DistributedNDArray<T, dims>::RegisterChunk(const NDArray<T, dims>& chunk, const IndexVector start_ind) {

// make sure this chunk does not overlap with any that we already have

// add chunk metadata to index

// write chunk data

}

template <class T, std::size_t dims>
T& DistributedNDArray<T, dims>::operator()(DenseVector<T>& inds) {

// check to which chunk this index belongs

// check if chunk is in cache

// retrieve chunk from cache or load from file

// index and return element

}

#endif
8 changes: 8 additions & 0 deletions include/Eisvogel/NDArray.hh
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,14 @@ public:
bool operator==(const DenseNDArray<T, dims>& rhs) {
return rhs.m_data == m_data;
}

// printing
void print() {
for(T& cur: m_data) {
std::cout << cur << " ";
}
std::cout << std::endl;
}

auto begin() {return m_data.begin();}
auto cbegin() {return m_data.cbegin();}
Expand Down
22 changes: 22 additions & 0 deletions include/Eisvogel/Serialization.hh
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ namespace stor {

static void serialize(std::iostream& stream, const type& val) {
Traits<std::size_t>::serialize(stream, val.size());
// TODO: speed this up by serializing the entire array instead of values one by one
for(const T& cur : val) {
Traits<T>::serialize(stream, cur);
}
Expand All @@ -103,6 +104,7 @@ namespace stor {
static type deserialize(std::iostream& stream) {
type val;
std::size_t size = Traits<std::size_t>::deserialize(stream);
// TODO: speed this up by deserializing the entire array instead of values one by one
for(std::size_t ind = 0; ind < size; ind++) {
val.push_back(Traits<T>::deserialize(stream));
}
Expand All @@ -115,20 +117,40 @@ namespace stor {
using type = std::array<T, n>;

static void serialize(std::iostream& stream, const type& val) {
// TODO: speed this up by serializing the entire array instead of values one by one
for(const T& cur : val) {
Traits<T>::serialize(stream, cur);
}
}

static type deserialize(std::iostream& stream) {
type val;
// TODO: speed this up by serializing the entire array instead of values one by one
for(std::size_t ind = 0; ind < n; ind++) {
val[ind] = Traits<T>::deserialize(stream);
}
return val;
}
};

template <>
struct Traits<std::string> {
using type = std::string;

static void serialize(std::iostream& stream, const type& val) {
std::size_t num_chars = val.size();
Traits<std::size_t>::serialize(stream, num_chars);
stream.write(val.data(), num_chars);
}

static type deserialize(std::iostream& stream) {
std::size_t num_chars = Traits<std::size_t>::deserialize(stream);
std::vector<char> string_data(num_chars);
stream.read(string_data.data(), num_chars);
return std::string(string_data.begin(), string_data.end());
}
};

class Serializer {

public:
Expand Down
1 change: 1 addition & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
add_subdirectory(io)
6 changes: 6 additions & 0 deletions tests/io/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
set(CMAKE_CXX_STANDARD 20)
set(CMAKE_CXX_STANDARD_REQUIRED YES)
set(CMAKE_CXX_FLAGS "-O3 -ftree-vectorize -ffast-math -ftree-vectorizer-verbose=2 -funroll-loops -march=native")

add_executable(testDistributedNDArray testDistributedNDArray.cxx)
target_link_libraries(testDistributedNDArray eisvogel)
51 changes: 51 additions & 0 deletions tests/io/testDistributedNDArray.cxx
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
#include "Eisvogel/DistributedNDArray.hh"
#include "Eisvogel/NDArray.hh"

#include <fstream>

int main(int argc, char* argv[]) {

DenseNDArray<float, 2> chunk1({2, 2}, 0.0);
chunk1(0, 0) = 1.0;
chunk1(0, 1) = 2.0;
chunk1(1, 1) = 4.0;
chunk1.print();

IndexVector start_ind1({0, 0});
start_ind1.print();

DenseNDArray<float, 2> chunk2({2, 2}, 0.0);
chunk2(0, 0) = -1.0;
chunk2(0, 1) = -2.0;
chunk2(1, 1) = -4.0;
chunk2.print();

IndexVector start_ind2({2, 0});
start_ind2.print();

DistributedNDArray<float, 2> arr("./distarr/", 10);
arr.RegisterChunk(chunk1, start_ind1);
arr.RegisterChunk(chunk2, start_ind2);

ChunkMetadata chunk_meta1("bla", start_ind1, {1, 1});

std::fstream ofs;
ofs.open("meta_ser.bin", std::ios::out | std::ios::binary);
stor::Serializer oser(ofs);

std::string test_string = "test string";

oser.serialize(chunk_meta1);
ofs.close();


std::fstream ifs;
ifs.open("meta_ser.bin", std::ios::in | std::ios::binary);
stor::Serializer iser(ifs);
ChunkMetadata read_meta = iser.deserialize<ChunkMetadata>();

std::cout << read_meta.filename << std::endl;
read_meta.start_ind.print();
read_meta.stop_ind.print();

}

0 comments on commit d5eb152

Please sign in to comment.