Skip to content

Commit

Permalink
ClickHouse (#41)
Browse files Browse the repository at this point in the history
Adds a special output writer for ClickHouse database via libpqxx.
  • Loading branch information
mfellows authored Jun 5, 2024
1 parent bedc920 commit 4aa53c1
Show file tree
Hide file tree
Showing 6 changed files with 318 additions and 29 deletions.
2 changes: 2 additions & 0 deletions Source/moja.modules.cbm/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ set(PROJECT_MODULE_HEADERS
include/moja/modules/${PACKAGE}/ageclasshelper.h
include/moja/modules/${PACKAGE}/cbmageindicators.h
include/moja/modules/${PACKAGE}/cbmaggregatorcsvwriter.h
include/moja/modules/${PACKAGE}/cbmaggregatorhybridlibpqxxwriter.h
include/moja/modules/${PACKAGE}/cbmaggregatorlandunitdata.h
include/moja/modules/${PACKAGE}/cbmaggregatorlibpqxxwriter.h
include/moja/modules/${PACKAGE}/cbmaggregatorpostgresqlwriter.h
Expand Down Expand Up @@ -106,6 +107,7 @@ set(PROJECT_MODULE_SOURCES
src/ageclasshelper.cpp
src/cbmageindicators.cpp
src/cbmaggregatorcsvwriter.cpp
src/cbmaggregatorhybridlibpqxxwriter.cpp
src/cbmaggregatorlandunitdata.cpp
src/cbmaggregatorlibpqxxwriter.cpp
src/cbmaggregatorpostgresqlwriter.cpp
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
#ifndef MOJA_MODULES_CBM_CBMAggregatorHybridLibPQXXWriter_H_
#define MOJA_MODULES_CBM_CBMAggregatorHybridLibPQXXWriter_H_

#include "moja/modules/cbm/_modules.cbm_exports.h"
#include "moja/modules/cbm/flatrecord.h"
#include "moja/modules/cbm/cbmmodulebase.h"

#include <moja/flint/spatiallocationinfo.h>

#include <pqxx/pqxx>
#include <vector>

namespace moja {
namespace flint {
template<class TPersistable, class TRecord>
class RecordAccumulatorWithMutex2;
}

namespace modules {
namespace cbm {

class CBM_API CBMAggregatorHybridLibPQXXWriter : public CBMModuleBase {
public:
CBMAggregatorHybridLibPQXXWriter(
std::shared_ptr<flint::RecordAccumulatorWithMutex2<std::string, FlatFluxRecord>> fluxDimension,
std::shared_ptr<flint::RecordAccumulatorWithMutex2<std::string, FlatPoolRecord>> poolDimension,
std::shared_ptr<flint::RecordAccumulatorWithMutex2<std::string, FlatErrorRecord>> errorDimension,
std::shared_ptr<flint::RecordAccumulatorWithMutex2<std::string, FlatAgeAreaRecord>> ageDimension,
std::shared_ptr<flint::RecordAccumulatorWithMutex2<std::string, FlatDisturbanceRecord>> disturbanceDimension,
std::shared_ptr<std::vector<std::string>> classifierNames,
bool isPrimary = false)
: CBMModuleBase(),
_fluxDimension(fluxDimension),
_poolDimension(poolDimension),
_errorDimension(errorDimension),
_ageDimension(ageDimension),
_disturbanceDimension(disturbanceDimension),
_classifierNames(classifierNames),
_isPrimaryAggregator(isPrimary) {}

virtual ~CBMAggregatorHybridLibPQXXWriter() = default;

void configure(const DynamicObject& config) override;
void subscribe(NotificationCenter& notificationCenter) override;

flint::ModuleTypes moduleType() override { return flint::ModuleTypes::System; };

void doLocalDomainInit() override;
void doSystemShutdown() override;

private:
std::shared_ptr<flint::RecordAccumulatorWithMutex2<std::string, FlatFluxRecord>> _fluxDimension;
std::shared_ptr<flint::RecordAccumulatorWithMutex2<std::string, FlatPoolRecord>> _poolDimension;
std::shared_ptr<flint::RecordAccumulatorWithMutex2<std::string, FlatErrorRecord>> _errorDimension;
std::shared_ptr<flint::RecordAccumulatorWithMutex2<std::string, FlatAgeAreaRecord>> _ageDimension;
std::shared_ptr<flint::RecordAccumulatorWithMutex2<std::string, FlatDisturbanceRecord>> _disturbanceDimension;
std::shared_ptr<std::vector<std::string>> _classifierNames;

std::shared_ptr<const flint::SpatialLocationInfo> _spatialLocationInfo;

std::string _pgConnectionString;
std::string _chConnectionString;
std::string _schema;
Int64 _jobId;
bool _isPrimaryAggregator;

template<typename TAccumulator>
void load(pqxx::work& tx,
const std::string& table,
std::shared_ptr<TAccumulator> dataDimension);

void doIsolated(pqxx::connection_base& conn, std::string sql, bool optional = false);
void doIsolated(pqxx::connection_base& conn, std::vector<std::string> sql, bool optional = false);
};

}}} // namespace moja::modules::cbm

#endif // MOJA_MODULES_CBM_CBMAggregatorHybridLibPQXXWriter_H_
12 changes: 6 additions & 6 deletions Source/moja.modules.cbm/include/moja/modules/cbm/flatrecord.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ namespace cbm {
class CBM_API FlatRecordHelper {
public:
static const std::string BuildClassifierNamesString(const std::vector<std::string>& classifierNames, const std::string& suffix = "");
static const std::string BuildClassifierValueString(const std::vector<Poco::Nullable<std::string>>& classifierValues);
static const std::string BuildClassifierValueString(const std::vector<Poco::Nullable<std::string>>& classifierValues, bool csvFormat = true);
};

class CBM_API FlatFluxRecord {
Expand All @@ -31,7 +31,7 @@ namespace cbm {
bool operator==(const FlatFluxRecord& other) const;
size_t hash() const;
std::string header(const std::vector<std::string>& classifierNames) const;
std::string asPersistable() const;
std::string asPersistable(bool csvFormat = true) const;
std::vector<std::optional<std::string>> asVector() const;
void merge(const FlatFluxRecord& other);
void setId(Int64 id) { _id = id; }
Expand Down Expand Up @@ -67,7 +67,7 @@ namespace cbm {
bool operator==(const FlatPoolRecord& other) const;
size_t hash() const;
std::string header(const std::vector<std::string>& classifierNames) const;
std::string asPersistable() const;
std::string asPersistable(bool csvFormat = true) const;
std::vector<std::optional<std::string>> asVector() const;
void merge(const FlatPoolRecord& other);
void setId(Int64 id) { _id = id; }
Expand Down Expand Up @@ -97,7 +97,7 @@ namespace cbm {
bool operator==(const FlatErrorRecord& other) const;
size_t hash() const;
std::string header(const std::vector<std::string>& classifierNames) const;
std::string asPersistable() const;
std::string asPersistable(bool csvFormat = true) const;
std::vector<std::optional<std::string>> asVector() const;
void merge(const FlatErrorRecord& other);
void setId(Int64 id) { _id = id; }
Expand Down Expand Up @@ -126,7 +126,7 @@ namespace cbm {
bool operator==(const FlatAgeAreaRecord& other) const;
size_t hash() const;
std::string header(const std::vector<std::string>& classifierNames) const;
std::string asPersistable() const;
std::string asPersistable(bool csvFormat = true) const;
std::vector<std::optional<std::string>> asVector() const;
void merge(const FlatAgeAreaRecord& other);
void setId(Int64 id) { _id = id; }
Expand Down Expand Up @@ -160,7 +160,7 @@ namespace cbm {
bool operator==(const FlatDisturbanceRecord& other) const;
size_t hash() const;
std::string header(const std::vector<std::string>& classifierNames) const;
std::string asPersistable() const;
std::string asPersistable(bool csvFormat = true) const;
std::vector<std::optional<std::string>> asVector() const;
void merge(const FlatDisturbanceRecord& other);
void setId(Int64 id) { _id = id; }
Expand Down
166 changes: 166 additions & 0 deletions Source/moja.modules.cbm/src/cbmaggregatorhybridlibpqxxwriter.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
/**
* @file
* The CBMAggregatorHybridLibPQXXWriter module writes the stand-level information gathered
* by CBMAggregatorLandUnitData into a PostgreSQL database. It is designed mainly for
* distributed runs where the simulation is divided up and each portion of work is loaded
* into a separate set of tables before being merged together with a post-processing script,
* although this module can also be used for a standard simulation
********/

#include "moja/modules/cbm/cbmaggregatorhybridlibpqxxwriter.h"

#include <moja/flint/recordaccumulatorwithmutex.h>
#include <moja/flint/ilandunitdatawrapper.h>
#include <moja/flint/iflintdata.h>
#include <moja/flint/ivariable.h>

#include <moja/logging.h>
#include <moja/signals.h>
#include <moja/notificationcenter.h>
#include <moja/hash.h>

#include <boost/algorithm/string.hpp>
#include <boost/algorithm/string/replace.hpp>
#include <boost/format.hpp>

using namespace pqxx;
using Poco::format;
using Poco::NotFoundException;

namespace moja {
namespace modules {
namespace cbm {

void CBMAggregatorHybridLibPQXXWriter::configure(const DynamicObject& config) {
_pgConnectionString = config["connection_string"].convert<std::string>();
_chConnectionString = _pgConnectionString;
boost::replace_first(_chConnectionString, "5432", "5430");
boost::replace_first(_chConnectionString, "dbname=postgres", "dbname=default");
_schema = config["schema"].convert<std::string>();
}

void CBMAggregatorHybridLibPQXXWriter::subscribe(NotificationCenter& notificationCenter) {
notificationCenter.subscribe(signals::LocalDomainInit, &CBMAggregatorHybridLibPQXXWriter::onLocalDomainInit, *this);
notificationCenter.subscribe(signals::SystemShutdown, &CBMAggregatorHybridLibPQXXWriter::onSystemShutdown, *this);
}

void CBMAggregatorHybridLibPQXXWriter::doLocalDomainInit() {
_jobId = _landUnitData->hasVariable("job_id")
? _landUnitData->getVariable("job_id")->value().convert<Int64>()
: 0;
}

void CBMAggregatorHybridLibPQXXWriter::doSystemShutdown() {
if (!_isPrimaryAggregator) {
return;
}

if (_classifierNames->empty()) {
MOJA_LOG_INFO << "No data to load.";
return;
}

MOJA_LOG_INFO << (boost::format("Loading results into %1% on server: %2%")
% _schema % _pgConnectionString).str();

connection pgConn(_pgConnectionString);
connection chConn(_chConnectionString);
doIsolated(pgConn, (boost::format("SET search_path = %1%;") % _schema).str());

bool resultsPreviouslyLoaded = perform([&pgConn, this] {
return !nontransaction(pgConn).exec((boost::format(
"SELECT 1 FROM CompletedJobs WHERE id = %1%;"
) % _jobId).str()).empty();
});

if (resultsPreviouslyLoaded) {
MOJA_LOG_INFO << "Results previously loaded for jobId " << _jobId << " - skipping.";
return;
}

perform([&pgConn, &chConn, this] {
work pgTx(pgConn);
work chTx(chConn);

// First, try to insert into the completed jobs table - if this is a duplicate, the transaction
// will fail immediately.
pgTx.exec((boost::format("INSERT INTO CompletedJobs VALUES (%1%);") % _jobId).str());

load(chTx, (boost::format("%1%.raw_fluxes") % _schema).str(), _fluxDimension);
load(chTx, (boost::format("%1%.raw_pools") % _schema).str(), _poolDimension);
load(chTx, (boost::format("%1%.raw_errors") % _schema).str(), _errorDimension);
load(chTx, (boost::format("%1%.raw_ages") % _schema).str(), _ageDimension);
load(chTx, (boost::format("%1%.raw_disturbances") % _schema).str(), _disturbanceDimension);

pgTx.commit();
chTx.commit();
});

MOJA_LOG_INFO << "PostgreSQL insert complete." << std::endl;
}

void CBMAggregatorHybridLibPQXXWriter::doIsolated(pqxx::connection_base& conn, std::string sql, bool optional) {
perform([&conn, sql, optional] {
try {
work tx(conn);
tx.exec(sql);
tx.commit();
} catch (...) {
if (!optional) {
throw;
}
}
});
}

void CBMAggregatorHybridLibPQXXWriter::doIsolated(pqxx::connection_base& conn, std::vector<std::string> sql, bool optional) {
perform([&conn, sql, optional] {
try {
work tx(conn);
for (auto stmt : sql) {
tx.exec(stmt);
}

tx.commit();
} catch (...) {
if (!optional) {
throw;
}
}
});
}

template<typename TAccumulator>
void CBMAggregatorHybridLibPQXXWriter::load(
pqxx::work& tx,
const std::string& table,
std::shared_ptr<TAccumulator> dataDimension) {

auto records = dataDimension->records();
if (!records.empty()) {
auto columns = records[0].header(*_classifierNames);
boost::replace_first(columns, "\n", "");
MOJA_LOG_INFO << (boost::format("Loading %1% (%2%)") % table % columns).str();
auto baseStmt = "INSERT INTO %1% (%2%) VALUES (%3%)";
std::vector<std::string> batch;
int batchRecords = 0;
for (auto& record : records) {
if (batchRecords == 10000) {
auto insertStmt = (boost::format(baseStmt) % table % columns % boost::join(batch, "),(")).str();
batch.clear();
batchRecords = 0;
tx.exec(insertStmt);
}

batch.push_back(record.asPersistable(false));
batchRecords++;
}

if (!batch.empty()) {
auto insertStmt = (boost::format(baseStmt) % table % columns % boost::join(batch, "),(")).str();
tx.exec(insertStmt);
}
}
}

}}} // namespace moja::modules::cbm
Loading

0 comments on commit 4aa53c1

Please sign in to comment.