Skip to content

Commit

Permalink
Merge pull request #38 from GolosChain/alfa
Browse files Browse the repository at this point in the history
Prepare commit for PR3
  • Loading branch information
afalaleev authored Jul 19, 2019
2 parents 1d6a048 + f18c544 commit f7d9b59
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 24 deletions.
4 changes: 3 additions & 1 deletion .buildkite/steps/build-image.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,16 @@ set -euo pipefail

IMAGETAG=${BUILDKITE_BRANCH:-master}
BRANCHNAME=${BUILDKITE_BRANCH:-master}
COMPILETYPE=RelWithDebInfo

if [[ "${IMAGETAG}" == "master" ]]; then
BUILDTYPE="stable"
COMPILETYPE=Release
elif [[ "${IMAGETAG}" == "alfa" ]]; then
BUILDTYPE="alfa"
else
BUILDTYPE="latest"
fi

cd Docker
docker build -t cyberway/cyberway-notifier:${IMAGETAG} --build-arg=branch=${BRANCHNAME} --build-arg buildtype=${BUILDTYPE} .
docker build -t cyberway/cyberway-notifier:${IMAGETAG} --build-arg=branch=${BRANCHNAME} --build-arg buildtype=${BUILDTYPE} --build-arg compiletype=${COMPILETYPE} .
17 changes: 15 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ cmake_minimum_required(VERSION 2.8)

project(cyber-notifier)

if(APPLE)
include_directories(/usr/local/include)
link_directories(/usr/local/lib)
endif(APPLE)
# Uncomment to have the build process verbose
#set(CMAKE_VERBOSE_MAKEFILE TRUE)

Expand All @@ -22,10 +26,19 @@ endif(UNIX)


set(outname "cyber-notifier")
set(PROTOBUF_LIB "/opt/nats.c-1.8.0/pbuf/lib/linux/libprotobuf-c.so")
if(APPLE)
set(PROTOBUF_LIB "/usr/local/nats.c/pbuf/lib/darwin/libprotobuf-c.a")
else(APPLE)
set(PROTOBUF_LIB "/opt/nats.c-1.8.0/pbuf/lib/linux/libprotobuf-c.so")
endif(APPLE)

# Build the executable
add_executable(${outname} ${PROJECT_SOURCE_DIR}/notifier.cpp)

# Link
target_link_libraries(${outname} nats ${NATS_EXTRA_LIB} ${PROTOBUF_LIB})
if(APPLE)
target_link_libraries(${outname} nats ${PROTOBUF_LIB})
else(APPLE)
target_link_libraries(${outname} nats ${NATS_EXTRA_LIB} ${PROTOBUF_LIB})
endif(APPLE)

5 changes: 3 additions & 2 deletions Docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ ARG buildtype=stable
FROM cyberway/builder:$buildtype as builder

ARG branch=master
ARG compiletype=RelWithDebInfo

RUN cd /opt && wget https://github.com/nats-io/cnats/archive/v1.8.0.tar.gz && tar -xzf v1.8.0.tar.gz \
&& cd nats.c-1.8.0 && mkdir build && cd ./build && cmake .. && make && make install
&& cd nats.c-1.8.0 && mkdir build && cd ./build && cmake -DCMAKE_BUILD_TYPE=$compiletype .. && make && make install

RUN echo "/opt/cnats-1.8.0/pbuf/lib/linux/" > /etc/ld.so.conf.d/protobuf-c.conf && ldconfig

Expand All @@ -15,7 +16,7 @@ RUN cd /opt && git clone -b $branch https://github.com/GolosChain/cyberway.notif
RUN cd /opt/cyberway.notifier && mkdir build && cd build \
&& echo "$branch:$(git rev-parse HEAD)" > /etc/cyberway-notifier-version \
&& cmake \
-DCMAKE_BUILD_TYPE=Release \
-DCMAKE_BUILD_TYPE=$compiletype \
-DCMAKE_C_COMPILER=clang \
-DCMAKE_CXX_COMPILER=clang++ \
.. \
Expand Down
91 changes: 72 additions & 19 deletions notifier.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include "options.h"
#include <string>
#include <iostream>
#include <memory>
#include <signal.h>

static const char* usage =
Expand All @@ -20,7 +21,31 @@ static volatile bool done = false;
// const char* subj = "foo";
// const char* txt = "hello";


struct message final {
std::string subject;
std::string data;
}; // struct message

std::string get_subject(const std::string& data) {
std::string subject;
static const auto start = "{\"msg_channel\":\""; // ok, it's ugly. TODO: ?parse json
static const auto start_len = strlen(start);
if (!data.size()) {
subject = "bad.empty";
} else if (0 != data.find(start)) {
subject = "bad.start";
} else {
auto end = data.find('"', start_len);
subject = std::string::npos == end
? "bad.end"
: data.substr(start_len, end - start_len).c_str(); // TODO: there are forbidden symbols in NATS
}
return subject;
}

static void _publish_ack_cb(const char* guid, const char* error, void* closure) {
std::unique_ptr<message> msg(static_cast<message*>(closure));
// TODO: delete object from waiting list, so we can check if some object didn't published for a long time
//std::cout << "#Ack#, " << guid << std::endl;
// myPubMsgInfo* pubMsg = (myPubMsgInfo*)closure;
Expand All @@ -38,18 +63,34 @@ static void sigusr1_handler(int signum) {
print = !print;
}

static void sig_int_term_handler(int signum) {
if (signum == SIGINT)
std::cout << "Interrupt signal (" << signum << ") received." << std::endl;
else if (signum == SIGTERM)
std::cout << "Termination signal (" << signum << ") received." << std::endl;
done = true;
}

int main(int argc, char** argv) {
opts = parseArgs(argc, argv, usage);
std::cout << "Sending pipe messages" << std::endl;

signal(SIGUSR1, sigusr1_handler);
signal(SIGINT, sig_int_term_handler);
signal(SIGTERM, sig_int_term_handler);

// Now create STAN Connection Options and set the NATS Options.
stanConnOptions* connOpts;
natsStatus s = stanConnOptions_Create(&connOpts);
if (s == NATS_OK) {
s = stanConnOptions_SetNATSOptions(connOpts, opts);
}
if (s == NATS_OK) {
s = stanConnOptions_SetPings(connOpts, 5 /* seconds */, 24 /* maximum missed pings */);
}
if (s == NATS_OK) {
s = stanConnOptions_SetPubAckWait(connOpts, 120 * 1000 /* ms */);
}
// Create the Connection using the STAN Connection Options
stanConnection* sc;
if (s == NATS_OK) {
Expand All @@ -59,29 +100,28 @@ int main(int argc, char** argv) {
natsOptions_Destroy(opts);
stanConnOptions_Destroy(connOpts);

std::string line;
while (!done && s == NATS_OK) {
static const auto start = "{\"msg_type\":\""; // ok, it's ugly. TODO: ?parse json
static const auto start_len = strlen(start);
std::getline(std::cin, line);
bool is_warn = false;
while (s == NATS_OK) {
auto msg = std::make_unique<message>();
std::getline(std::cin, msg->data);
if (done) {
if (msg->data.size()) {
if (!is_warn) {
std::cerr << "WARNING! Pipe hasn't empty." << std::endl;
is_warn = true;
}
} else
break;
}
if (std::cin.eof()) {
nats_Sleep(50);
continue;
} else {
if (print) {
std::cout << line << std::endl;
std::cout << msg->data << std::endl;
}
}
if (!line.size()) {
subj = "bad.empty";
} else if (0 != line.find(start)) {
subj = "bad.start";
} else {
auto end = line.find('"', start_len);
subj = std::string::npos == end
? "bad.end"
: line.substr(start_len, end - start_len).c_str(); // TODO: there are forbidden symbols in NATS
}
msg->subject = get_subject(msg->data);

// TODO: create object to check in ack
// TODO: cpp
Expand All @@ -95,11 +135,24 @@ int main(int argc, char** argv) {
// }
// if (s == NATS_OK) {
// s = stanConnection_PublishAsync(sc, subj, pubMsg->payload, pubMsg->size, _pubAckHandler, (void*)pubMsg);
s = stanConnection_PublishAsync(sc, subj, line.c_str(), line.size(), _publish_ack_cb, NULL);
for (int i = 0; i < 24 * 1000; ++i) {
if (async) {
s = stanConnection_PublishAsync(sc, msg->subject.c_str(), msg->data.c_str(), msg->data.size(), _publish_ack_cb, msg.get());
} else {
s = stanConnection_Publish(sc, msg->subject.c_str(), msg->data.c_str(), msg->data.size());
}
if (s == NATS_TIMEOUT) {
nats_Sleep(50);
continue;
}
break;
}

// Note that if this call fails, then we need to free the pubMsg object here since it won't be passed to the ack handler.
// if (s != NATS_OK)
// free(pubMsg);
if (s == NATS_OK && async) {
msg.release();
}

}

if (s != NATS_OK) {
Expand Down

0 comments on commit f7d9b59

Please sign in to comment.