Skip to content

Commit

Permalink
can rebuild index
Browse files Browse the repository at this point in the history
  • Loading branch information
philippwindischhofer committed Jan 30, 2024
1 parent b5f7f8f commit a28adf0
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 39 deletions.
31 changes: 6 additions & 25 deletions include/Eisvogel/DistributedNDArray.hh
Original file line number Diff line number Diff line change
Expand Up @@ -24,28 +24,6 @@ struct ChunkMetadata {
IndexVector stop_ind;
};

namespace stor {
template <>
struct Traits<ChunkMetadata> {
using type = ChunkMetadata;

static void serialize(std::iostream& stream, const type& val) {
Traits<std::string>::serialize(stream, val.filename);
Traits<IndexVector>::serialize(stream, val.start_ind);
Traits<IndexVector>::serialize(stream, val.stop_ind);
}

static type deserialize(std::iostream& stream) {
std::string filename = Traits<std::string>::deserialize(stream);
IndexVector start_ind = Traits<IndexVector>::deserialize(stream);
IndexVector stop_ind = Traits<IndexVector>::deserialize(stream);
return ChunkMetadata(filename, start_ind, stop_ind);
}
};
}

// ----

template <class T, std::size_t dims>
class DistributedNDArray : public NDArray<T, dims> {

Expand All @@ -57,8 +35,10 @@ public:
using chunk_t = DenseNDArray<T, dims>;

// For assembling a distributed array
void RegisterChunk(const chunk_t& chunk, const IndexVector start_ind);
void Flush();
void RegisterChunk(const chunk_t& chunk, const IndexVector start_ind, bool require_nonoverlapping = false);
void FlushIndex();

void rebuildIndex();

// For accessing a distributed array
T operator()(IndexVector& inds);
Expand All @@ -69,6 +49,7 @@ private:
std::size_t getChunkIndex(const IndexVector& inds);
chunk_t& retrieveChunk(std::size_t chunk_ind);
void calculateShape();

bool isGloballyContiguous(IndexVector& global_start_inds, IndexVector& global_stop_inds);

IndexVector& getGlobalStartInd();
Expand All @@ -83,7 +64,7 @@ private:

// Keeps track of the chunks this DistributedNDArray is composed of
using index_t = std::vector<ChunkMetadata>;
index_t m_chunk_index;
index_t m_chunk_index; // TODO: maybe later when we need fancier things (e.g. predictive loading of additional neighbouring chunks), can think about turning this into a class

// Data strutures for caching of frequently-accessed elements of the array
std::map<std::size_t, chunk_t> m_chunk_cache; // key is index of chunk in m_chunk_index
Expand Down
65 changes: 56 additions & 9 deletions src/DistributedNDArray.hxx
Original file line number Diff line number Diff line change
@@ -1,3 +1,23 @@
namespace stor {
template <>
struct Traits<ChunkMetadata> {
using type = ChunkMetadata;

static void serialize(std::iostream& stream, const type& val) {
Traits<std::string>::serialize(stream, val.filename);
Traits<IndexVector>::serialize(stream, val.start_ind);
Traits<IndexVector>::serialize(stream, val.stop_ind);
}

static type deserialize(std::iostream& stream) {
std::string filename = Traits<std::string>::deserialize(stream);
IndexVector start_ind = Traits<IndexVector>::deserialize(stream);
IndexVector stop_ind = Traits<IndexVector>::deserialize(stream);
return ChunkMetadata(filename, start_ind, stop_ind);
}
};
}

template <class T, std::size_t dims>
DistributedNDArray<T, dims>::DistributedNDArray(std::string dirpath, std::size_t max_cache_size) :
NDArray<T, dims>(), m_dirpath(dirpath), m_indexpath(dirpath + "/index.bin"), m_max_cache_size(max_cache_size) {
Expand All @@ -7,33 +27,39 @@ DistributedNDArray<T, dims>::DistributedNDArray(std::string dirpath, std::size_t
std::filesystem::create_directory(m_dirpath);
}

// Load index if there is one
if(std::filesystem::exists(m_indexpath)) {
// Load index
// Load index if there is one
std::fstream ifs;
ifs.open(m_indexpath, std::ios::in | std::ios::binary);
stor::Serializer iser(ifs);
m_chunk_index = iser.deserialize<index_t>();
ifs.close();
}
else {
// Attempt to rebuild index
rebuildIndex();
}

// Calculate global shape of this array
calculateShape();
}

template <class T, std::size_t dims>
DistributedNDArray<T, dims>::~DistributedNDArray() {
Flush();
FlushIndex();
}

template <class T, std::size_t dims>
void DistributedNDArray<T, dims>::RegisterChunk(const DenseNDArray<T, dims>& chunk, const IndexVector start_ind) {
void DistributedNDArray<T, dims>::RegisterChunk(const DenseNDArray<T, dims>& chunk, const IndexVector start_ind, bool require_nonoverlapping) {

// make sure this chunk does not overlap with any that we already have and crash if it does
IndexVector stop_ind = start_ind + chunk.shape();
for(auto& chunk_meta : m_chunk_index) {
if(chunkContainsInds(chunk_meta, start_ind) || chunkContainsInds(chunk_meta, stop_ind)) {
throw std::runtime_error("Trying to add a chunk that overlaps with an already existing one!");

if(require_nonoverlapping) {
// make sure this chunk does not overlap with any that we already have and crash if it does
for(auto& chunk_meta : m_chunk_index) {
if(chunkContainsInds(chunk_meta, start_ind) || chunkContainsInds(chunk_meta, stop_ind)) {
throw std::runtime_error("Trying to add a chunk that overlaps with an already existing one!");
}
}
}

Expand All @@ -47,6 +73,7 @@ void DistributedNDArray<T, dims>::RegisterChunk(const DenseNDArray<T, dims>& chu
std::fstream ofs;
ofs.open(chunk_path, std::ios::out | std::ios::binary);
stor::Serializer oser(ofs);
oser.serialize<ChunkMetadata>(meta);
oser.serialize<chunk_t>(chunk);
ofs.close();

Expand All @@ -55,7 +82,7 @@ void DistributedNDArray<T, dims>::RegisterChunk(const DenseNDArray<T, dims>& chu
}

template <class T, std::size_t dims>
void DistributedNDArray<T, dims>::Flush() {
void DistributedNDArray<T, dims>::FlushIndex() {
// Update index on disk
std::fstream ofs;
ofs.open(m_indexpath, std::ios::out | std::ios::binary);
Expand All @@ -64,6 +91,25 @@ void DistributedNDArray<T, dims>::Flush() {
ofs.close();
}

template <class T, std::size_t dims>
void DistributedNDArray<T, dims>::rebuildIndex() {
m_chunk_index.clear();

// With the index gone, also the cache is now out of scope
m_chunk_cache.clear();
m_cache_queue = {};

for(auto const& dir_entry : std::filesystem::directory_iterator(m_dirpath)) {
std::fstream ifs;
ifs.open(dir_entry.path(), std::ios::in | std::ios::binary);
stor::Serializer iser(ifs);
ChunkMetadata meta = iser.deserialize<ChunkMetadata>();
ifs.close();

m_chunk_index.push_back(meta);
}
}

template <class T, std::size_t dims>
T DistributedNDArray<T, dims>::operator()(IndexVector& inds) {

Expand Down Expand Up @@ -116,6 +162,7 @@ DistributedNDArray<T, dims>::chunk_t& DistributedNDArray<T, dims>::retrieveChunk
std::cout << "Loading chunk from " + chunk_path + " ... ";
ifs.open(chunk_path, std::ios::in | std::ios::binary);
stor::Serializer iser(ifs);
iser.deserialize<ChunkMetadata>(); // skip metadata
m_chunk_cache.insert({chunk_ind, iser.deserialize<chunk_t>()});
m_cache_queue.push(chunk_ind);
ifs.close();
Expand Down
8 changes: 4 additions & 4 deletions src/DistributedWeightingField.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ void DistributedWeightingField::Flush() {
oser.serialize(*m_start_coords);
oser.serialize(*m_end_coords);

// Flush the actual data
m_E_r -> Flush();
m_E_z -> Flush();
m_E_phi -> Flush();
// Make the indices persistent
m_E_r -> FlushIndex();
m_E_z -> FlushIndex();
m_E_phi -> FlushIndex();
}
2 changes: 1 addition & 1 deletion tests/io/testDistributedNDArray.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ int main(int argc, char* argv[]) {
DistributedNDArray<float, 2> darr_save("./distarr/", 10);
darr_save.RegisterChunk(chunk1, start_ind1);
darr_save.RegisterChunk(chunk2, start_ind2);
darr_save.Flush();
darr_save.FlushIndex();

DistributedNDArray<float, 2> darr_load("./distarr/", 10);

Expand Down

0 comments on commit a28adf0

Please sign in to comment.