Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ClickHouse #41

Merged
merged 13 commits into from
Jun 5, 2024
2 changes: 2 additions & 0 deletions Source/moja.modules.cbm/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ set(PROJECT_MODULE_HEADERS
include/moja/modules/${PACKAGE}/ageclasshelper.h
include/moja/modules/${PACKAGE}/cbmageindicators.h
include/moja/modules/${PACKAGE}/cbmaggregatorcsvwriter.h
include/moja/modules/${PACKAGE}/cbmaggregatorhybridlibpqxxwriter.h
include/moja/modules/${PACKAGE}/cbmaggregatorlandunitdata.h
include/moja/modules/${PACKAGE}/cbmaggregatorlibpqxxwriter.h
include/moja/modules/${PACKAGE}/cbmaggregatorpostgresqlwriter.h
Expand Down Expand Up @@ -106,6 +107,7 @@ set(PROJECT_MODULE_SOURCES
src/ageclasshelper.cpp
src/cbmageindicators.cpp
src/cbmaggregatorcsvwriter.cpp
src/cbmaggregatorhybridlibpqxxwriter.cpp
src/cbmaggregatorlandunitdata.cpp
src/cbmaggregatorlibpqxxwriter.cpp
src/cbmaggregatorpostgresqlwriter.cpp
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
#ifndef MOJA_MODULES_CBM_CBMAggregatorHybridLibPQXXWriter_H_
#define MOJA_MODULES_CBM_CBMAggregatorHybridLibPQXXWriter_H_

#include "moja/modules/cbm/_modules.cbm_exports.h"
#include "moja/modules/cbm/flatrecord.h"
#include "moja/modules/cbm/cbmmodulebase.h"

#include <moja/flint/spatiallocationinfo.h>

#include <pqxx/pqxx>
#include <vector>

namespace moja {
namespace flint {
template<class TPersistable, class TRecord>
class RecordAccumulatorWithMutex2;
}

namespace modules {
namespace cbm {

class CBM_API CBMAggregatorHybridLibPQXXWriter : public CBMModuleBase {
public:
CBMAggregatorHybridLibPQXXWriter(
std::shared_ptr<flint::RecordAccumulatorWithMutex2<std::string, FlatFluxRecord>> fluxDimension,
std::shared_ptr<flint::RecordAccumulatorWithMutex2<std::string, FlatPoolRecord>> poolDimension,
std::shared_ptr<flint::RecordAccumulatorWithMutex2<std::string, FlatErrorRecord>> errorDimension,
std::shared_ptr<flint::RecordAccumulatorWithMutex2<std::string, FlatAgeAreaRecord>> ageDimension,
std::shared_ptr<flint::RecordAccumulatorWithMutex2<std::string, FlatDisturbanceRecord>> disturbanceDimension,
std::shared_ptr<std::vector<std::string>> classifierNames,
bool isPrimary = false)
: CBMModuleBase(),
_fluxDimension(fluxDimension),
_poolDimension(poolDimension),
_errorDimension(errorDimension),
_ageDimension(ageDimension),
_disturbanceDimension(disturbanceDimension),
_classifierNames(classifierNames),
_isPrimaryAggregator(isPrimary) {}

virtual ~CBMAggregatorHybridLibPQXXWriter() = default;

void configure(const DynamicObject& config) override;
void subscribe(NotificationCenter& notificationCenter) override;

flint::ModuleTypes moduleType() override { return flint::ModuleTypes::System; };

void doLocalDomainInit() override;
void doSystemShutdown() override;

private:
std::shared_ptr<flint::RecordAccumulatorWithMutex2<std::string, FlatFluxRecord>> _fluxDimension;
std::shared_ptr<flint::RecordAccumulatorWithMutex2<std::string, FlatPoolRecord>> _poolDimension;
std::shared_ptr<flint::RecordAccumulatorWithMutex2<std::string, FlatErrorRecord>> _errorDimension;
std::shared_ptr<flint::RecordAccumulatorWithMutex2<std::string, FlatAgeAreaRecord>> _ageDimension;
std::shared_ptr<flint::RecordAccumulatorWithMutex2<std::string, FlatDisturbanceRecord>> _disturbanceDimension;
std::shared_ptr<std::vector<std::string>> _classifierNames;

std::shared_ptr<const flint::SpatialLocationInfo> _spatialLocationInfo;

std::string _pgConnectionString;
std::string _chConnectionString;
std::string _schema;
Int64 _jobId;
bool _isPrimaryAggregator;

template<typename TAccumulator>
void load(pqxx::work& tx,
const std::string& table,
std::shared_ptr<TAccumulator> dataDimension);

void doIsolated(pqxx::connection_base& conn, std::string sql, bool optional = false);
void doIsolated(pqxx::connection_base& conn, std::vector<std::string> sql, bool optional = false);
};

}}} // namespace moja::modules::cbm

#endif // MOJA_MODULES_CBM_CBMAggregatorHybridLibPQXXWriter_H_
12 changes: 6 additions & 6 deletions Source/moja.modules.cbm/include/moja/modules/cbm/flatrecord.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ namespace cbm {
class CBM_API FlatRecordHelper {
public:
static const std::string BuildClassifierNamesString(const std::vector<std::string>& classifierNames, const std::string& suffix = "");
static const std::string BuildClassifierValueString(const std::vector<Poco::Nullable<std::string>>& classifierValues);
static const std::string BuildClassifierValueString(const std::vector<Poco::Nullable<std::string>>& classifierValues, bool csvFormat = true);
};

class CBM_API FlatFluxRecord {
Expand All @@ -31,7 +31,7 @@ namespace cbm {
bool operator==(const FlatFluxRecord& other) const;
size_t hash() const;
std::string header(const std::vector<std::string>& classifierNames) const;
std::string asPersistable() const;
std::string asPersistable(bool csvFormat = true) const;
std::vector<std::optional<std::string>> asVector() const;
void merge(const FlatFluxRecord& other);
void setId(Int64 id) { _id = id; }
Expand Down Expand Up @@ -67,7 +67,7 @@ namespace cbm {
bool operator==(const FlatPoolRecord& other) const;
size_t hash() const;
std::string header(const std::vector<std::string>& classifierNames) const;
std::string asPersistable() const;
std::string asPersistable(bool csvFormat = true) const;
std::vector<std::optional<std::string>> asVector() const;
void merge(const FlatPoolRecord& other);
void setId(Int64 id) { _id = id; }
Expand Down Expand Up @@ -97,7 +97,7 @@ namespace cbm {
bool operator==(const FlatErrorRecord& other) const;
size_t hash() const;
std::string header(const std::vector<std::string>& classifierNames) const;
std::string asPersistable() const;
std::string asPersistable(bool csvFormat = true) const;
std::vector<std::optional<std::string>> asVector() const;
void merge(const FlatErrorRecord& other);
void setId(Int64 id) { _id = id; }
Expand Down Expand Up @@ -126,7 +126,7 @@ namespace cbm {
bool operator==(const FlatAgeAreaRecord& other) const;
size_t hash() const;
std::string header(const std::vector<std::string>& classifierNames) const;
std::string asPersistable() const;
std::string asPersistable(bool csvFormat = true) const;
std::vector<std::optional<std::string>> asVector() const;
void merge(const FlatAgeAreaRecord& other);
void setId(Int64 id) { _id = id; }
Expand Down Expand Up @@ -160,7 +160,7 @@ namespace cbm {
bool operator==(const FlatDisturbanceRecord& other) const;
size_t hash() const;
std::string header(const std::vector<std::string>& classifierNames) const;
std::string asPersistable() const;
std::string asPersistable(bool csvFormat = true) const;
std::vector<std::optional<std::string>> asVector() const;
void merge(const FlatDisturbanceRecord& other);
void setId(Int64 id) { _id = id; }
Expand Down
166 changes: 166 additions & 0 deletions Source/moja.modules.cbm/src/cbmaggregatorhybridlibpqxxwriter.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
/**
* @file
* The CBMAggregatorHybridLibPQXXWriter module writes the stand-level information gathered
* by CBMAggregatorLandUnitData into a PostgreSQL database. It is designed mainly for
* distributed runs where the simulation is divided up and each portion of work is loaded
* into a separate set of tables before being merged together with a post-processing script,
* although this module can also be used for a standard simulation
********/

#include "moja/modules/cbm/cbmaggregatorhybridlibpqxxwriter.h"

#include <moja/flint/recordaccumulatorwithmutex.h>
#include <moja/flint/ilandunitdatawrapper.h>
#include <moja/flint/iflintdata.h>
#include <moja/flint/ivariable.h>

#include <moja/logging.h>
#include <moja/signals.h>
#include <moja/notificationcenter.h>
#include <moja/hash.h>

#include <boost/algorithm/string.hpp>
#include <boost/algorithm/string/replace.hpp>
#include <boost/format.hpp>

using namespace pqxx;
using Poco::format;
using Poco::NotFoundException;

namespace moja {
namespace modules {
namespace cbm {

void CBMAggregatorHybridLibPQXXWriter::configure(const DynamicObject& config) {
_pgConnectionString = config["connection_string"].convert<std::string>();
_chConnectionString = _pgConnectionString;
boost::replace_first(_chConnectionString, "5432", "5430");
boost::replace_first(_chConnectionString, "dbname=postgres", "dbname=default");
_schema = config["schema"].convert<std::string>();
}

void CBMAggregatorHybridLibPQXXWriter::subscribe(NotificationCenter& notificationCenter) {
notificationCenter.subscribe(signals::LocalDomainInit, &CBMAggregatorHybridLibPQXXWriter::onLocalDomainInit, *this);
notificationCenter.subscribe(signals::SystemShutdown, &CBMAggregatorHybridLibPQXXWriter::onSystemShutdown, *this);
}

void CBMAggregatorHybridLibPQXXWriter::doLocalDomainInit() {
_jobId = _landUnitData->hasVariable("job_id")
? _landUnitData->getVariable("job_id")->value().convert<Int64>()
: 0;
}

void CBMAggregatorHybridLibPQXXWriter::doSystemShutdown() {
if (!_isPrimaryAggregator) {
return;
}

if (_classifierNames->empty()) {
MOJA_LOG_INFO << "No data to load.";
return;
}

MOJA_LOG_INFO << (boost::format("Loading results into %1% on server: %2%")
% _schema % _pgConnectionString).str();

connection pgConn(_pgConnectionString);
connection chConn(_chConnectionString);
doIsolated(pgConn, (boost::format("SET search_path = %1%;") % _schema).str());

bool resultsPreviouslyLoaded = perform([&pgConn, this] {
return !nontransaction(pgConn).exec((boost::format(
"SELECT 1 FROM CompletedJobs WHERE id = %1%;"
) % _jobId).str()).empty();
});

if (resultsPreviouslyLoaded) {
MOJA_LOG_INFO << "Results previously loaded for jobId " << _jobId << " - skipping.";
return;
}

perform([&pgConn, &chConn, this] {
work pgTx(pgConn);
work chTx(chConn);

// First, try to insert into the completed jobs table - if this is a duplicate, the transaction
// will fail immediately.
pgTx.exec((boost::format("INSERT INTO CompletedJobs VALUES (%1%);") % _jobId).str());

load(chTx, (boost::format("%1%.raw_fluxes") % _schema).str(), _fluxDimension);
load(chTx, (boost::format("%1%.raw_pools") % _schema).str(), _poolDimension);
load(chTx, (boost::format("%1%.raw_errors") % _schema).str(), _errorDimension);
load(chTx, (boost::format("%1%.raw_ages") % _schema).str(), _ageDimension);
load(chTx, (boost::format("%1%.raw_disturbances") % _schema).str(), _disturbanceDimension);

pgTx.commit();
chTx.commit();
});

MOJA_LOG_INFO << "PostgreSQL insert complete." << std::endl;
}

void CBMAggregatorHybridLibPQXXWriter::doIsolated(pqxx::connection_base& conn, std::string sql, bool optional) {
perform([&conn, sql, optional] {
try {
work tx(conn);
tx.exec(sql);
tx.commit();
} catch (...) {
if (!optional) {
throw;
}
}
});
}

void CBMAggregatorHybridLibPQXXWriter::doIsolated(pqxx::connection_base& conn, std::vector<std::string> sql, bool optional) {
perform([&conn, sql, optional] {
try {
work tx(conn);
for (auto stmt : sql) {
tx.exec(stmt);
}

tx.commit();
} catch (...) {
if (!optional) {
throw;
}
}
});
}

template<typename TAccumulator>
void CBMAggregatorHybridLibPQXXWriter::load(
pqxx::work& tx,
const std::string& table,
std::shared_ptr<TAccumulator> dataDimension) {

auto records = dataDimension->records();
if (!records.empty()) {
auto columns = records[0].header(*_classifierNames);
boost::replace_first(columns, "\n", "");
MOJA_LOG_INFO << (boost::format("Loading %1% (%2%)") % table % columns).str();
auto baseStmt = "INSERT INTO %1% (%2%) VALUES (%3%)";
std::vector<std::string> batch;
int batchRecords = 0;
for (auto& record : records) {
if (batchRecords == 10000) {
auto insertStmt = (boost::format(baseStmt) % table % columns % boost::join(batch, "),(")).str();
batch.clear();
batchRecords = 0;
tx.exec(insertStmt);
}

batch.push_back(record.asPersistable(false));
batchRecords++;
}

if (!batch.empty()) {
auto insertStmt = (boost::format(baseStmt) % table % columns % boost::join(batch, "),(")).str();
tx.exec(insertStmt);
}
}
}

}}} // namespace moja::modules::cbm
Loading
Loading