Skip to content

Commit

Permalink
Fixed global init/shutdown order
Browse files Browse the repository at this point in the history
  • Loading branch information
SChernykh committed Dec 4, 2024
1 parent ecdaa83 commit ff9e378
Show file tree
Hide file tree
Showing 15 changed files with 130 additions and 74 deletions.
4 changes: 2 additions & 2 deletions src/console_commands.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ static void do_status(p2pool *m_pool, const char * /* args */)

m_pool->print_merge_mining_status();

bkg_jobs_tracker.print_status();
bkg_jobs_tracker->print_status();

if (p2p) {
p2p->check_for_updates(true);
Expand Down Expand Up @@ -313,7 +313,7 @@ static void do_stop_mining(p2pool* m_pool, const char* /*args*/)

static void do_exit(p2pool *m_pool, const char * /* args */)
{
bkg_jobs_tracker.wait();
bkg_jobs_tracker->wait();
m_pool->stop();
}

Expand Down
22 changes: 15 additions & 7 deletions src/crypto.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,7 @@ class RandomBytes
public:
RandomBytes() : rng(RandomDeviceSeed::instance), dist(0, 255)
{
if (uv_mutex_init(&m) != 0) {
abort();
}
uv_mutex_init_checked(&m);

// Diffuse the initial state in case it has low quality
rng.discard(10000);
Expand All @@ -66,7 +64,7 @@ class RandomBytes
std::uniform_int_distribution<> dist;
};

static RandomBytes randomBytes;
static RandomBytes* randomBytes = nullptr;

}

Expand All @@ -86,7 +84,7 @@ static FORCEINLINE bool less32(const uint8_t* k0, const uint8_t* k1)
void generate_keys(hash& pub, hash& sec)
{
do {
do { randomBytes(sec.h); } while (!less32(sec.h, limit));
do { (*randomBytes)(sec.h); } while (!less32(sec.h, limit));
sc_reduce32(sec.h);
} while (!sc_isnonzero(sec.h));

Expand Down Expand Up @@ -472,16 +470,26 @@ void derive_view_tag(const hash& derivation, size_t output_index, uint8_t& view_

void init_crypto_cache()
{
if (!randomBytes) {
randomBytes = new RandomBytes();
}

if (!cache) {
cache = new Cache();
}
}

void destroy_crypto_cache()
{
if (cache) {
delete cache;
{
auto p = randomBytes;
randomBytes = nullptr;
delete p;
}
{
auto p = cache;
cache = nullptr;
delete p;
}
}

Expand Down
13 changes: 5 additions & 8 deletions src/keccak.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,17 +116,14 @@ NOINLINE void keccakf_plain(std::array<uint64_t, 25>& st)
}
}

void (*keccakf)(std::array<uint64_t, 25>&) = keccakf_plain;

keccakf_func keccakf = []() {
#if defined(__x86_64__) || defined(_M_AMD64)
static struct KeccakBMI_Check {
KeccakBMI_Check() {
if (randomx::Cpu().hasBmi()) {
keccakf = keccakf_bmi;
}
if (randomx::Cpu().hasBmi()) {
return keccakf_bmi;
}
} keccak_bmi_check;
#endif
return keccakf_plain;
}();

NOINLINE void keccak_step(const uint8_t* &in, int &inlen, std::array<uint64_t, 25>& st)
{
Expand Down
4 changes: 3 additions & 1 deletion src/keccak.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ enum KeccakParams {
};

extern const uint64_t keccakf_rndc[24];
extern void (*keccakf)(std::array<uint64_t, 25>& st);

typedef void (*keccakf_func)(std::array<uint64_t, 25>&);
extern keccakf_func keccakf;

void keccakf_plain(std::array<uint64_t, 25>& st);
void keccakf_bmi(std::array<uint64_t, 25>& st);
Expand Down
57 changes: 22 additions & 35 deletions src/log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,10 @@ class Worker
FORCEINLINE Worker()
: m_writePos(0)
, m_readPos(0)
, m_started{ false }
, m_stopped(false)
{
#if defined(_WIN32) && defined(_MSC_VER) && !defined(NDEBUG)
SetUnhandledExceptionFilter(UnhandledExceptionFilter);
SymInitialize(GetCurrentProcess(), NULL, TRUE);
#endif

set_main_thread();
Expand All @@ -133,22 +131,9 @@ class Worker

m_buf.resize(BUF_SIZE);

// Create default loop here
uv_default_loop();

uv_cond_init(&m_cond);
uv_mutex_init(&m_mutex);

const int err = uv_thread_create(&m_worker, run_wrapper, this);
if (err) {
fprintf(stderr, "failed to start logger thread (%s), aborting\n", uv_err_name(err));
abort();
}

while (m_started.load() == false) {
std::this_thread::yield();
}

#ifdef _WIN32
SetConsoleMode(hStdIn, ENABLE_ECHO_INPUT | ENABLE_LINE_INPUT | ENABLE_PROCESSED_INPUT | ENABLE_EXTENDED_FLAGS);

Expand All @@ -163,13 +148,17 @@ class Worker
CONSOLE_COLORS = false;
}

LOGINFO(0, "started");

if (!m_logFile.is_open()) {
LOGERR(0, "failed to open " << log_file_name);
fprintf(stderr, "failed to open %s\n", log_file_name);
}

init_uv_threadpool();

const int err = uv_thread_create(&m_worker, run_wrapper, this);
if (err) {
fprintf(stderr, "failed to start logger thread (%s), aborting\n", uv_err_name(err));
abort();
}
}

~Worker()
Expand All @@ -196,17 +185,8 @@ class Worker
uv_thread_join(&m_worker);
uv_cond_destroy(&m_cond);
uv_mutex_destroy(&m_mutex);
uv_loop_close(uv_default_loop());

#if ((UV_VERSION_MAJOR > 1) || ((UV_VERSION_MAJOR == 1) && (UV_VERSION_MINOR >= 38)))
uv_library_shutdown();
#endif

m_logFile.close();

#if defined(_WIN32) && defined(_MSC_VER) && !defined(NDEBUG)
SymCleanup(GetCurrentProcess());
#endif
}

FORCEINLINE void write(const char* buf, uint32_t size)
Expand Down Expand Up @@ -252,13 +232,13 @@ class Worker
int err = putenv(buf);
if (err != 0) {
err = errno;
LOGWARN(0, "Couldn't set UV thread pool size to " << N << " threads, putenv returned error " << err);
fprintf(stderr, "Couldn't set UV thread pool size to %u threads, putenv returned error %d\n", N, err);
}

static uv_work_t dummy;
err = uv_queue_work(uv_default_loop_checked(), &dummy, [](uv_work_t*) {}, nullptr);
if (err) {
LOGERR(0, "init_uv_threadpool: uv_queue_work failed, error " << uv_err_name(err));
fprintf(stderr, "init_uv_threadpool: uv_queue_work failed, error %s\n", uv_err_name(err));
}
}

Expand All @@ -267,8 +247,6 @@ class Worker

NOINLINE void run()
{
m_started.exchange(true);

do {
uv_mutex_lock(&m_mutex);
if (m_readPos == m_writePos.load()) {
Expand Down Expand Up @@ -392,13 +370,12 @@ class Worker
uv_mutex_t m_mutex;
uv_thread_t m_worker;

std::atomic<bool> m_started;
bool m_stopped;

std::ofstream m_logFile;
};

static Worker worker;
static Worker* worker = nullptr;

#endif // P2POOL_LOG_DISABLE

Expand Down Expand Up @@ -445,7 +422,16 @@ NOINLINE Writer::~Writer()
m_buf[2] = static_cast<uint8_t>(size >> 8);
m_buf[m_pos] = '\n';
#ifndef P2POOL_LOG_DISABLE
worker.write(m_buf, size);
worker->write(m_buf, size);
#endif
}

void start()
{
#ifndef P2POOL_LOG_DISABLE
worker = new Worker();

LOGINFO(0, "started");
#endif
}

Expand All @@ -458,7 +444,8 @@ void reopen()
void stop()
{
#ifndef P2POOL_LOG_DISABLE
worker.stop();
delete worker;
worker = nullptr;
#endif
}

Expand Down
1 change: 1 addition & 0 deletions src/log.h
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,7 @@ struct DummyStream

#endif

void start();
void reopen();
void stop();

Expand Down
29 changes: 29 additions & 0 deletions src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,14 @@
#include "randomx.h"
#endif

#if defined(_WIN32) && defined(_MSC_VER) && !defined(NDEBUG)

#include <DbgHelp.h>

#pragma comment(lib, "Dbghelp.lib")

#endif

void p2pool_usage()
{
printf("P2Pool %s\n"
Expand Down Expand Up @@ -188,8 +196,17 @@ int main(int argc, char* argv[])
}
}

#if defined(_WIN32) && defined(_MSC_VER) && !defined(NDEBUG)
SymInitialize(GetCurrentProcess(), NULL, TRUE);
#endif

memory_tracking_start();

// Create default loop here
uv_default_loop();

p2pool::log::start();

p2pool::init_crypto_cache();

int result = static_cast<int>(curl_global_init_mem(CURL_GLOBAL_ALL, p2pool::malloc_hook, p2pool::free_hook, p2pool::realloc_hook, p2pool::strdup_hook, p2pool::calloc_hook));
Expand All @@ -209,9 +226,21 @@ int main(int argc, char* argv[])

p2pool::destroy_crypto_cache();

p2pool::log::stop();

uv_loop_close(uv_default_loop());

#if ((UV_VERSION_MAJOR > 1) || ((UV_VERSION_MAJOR == 1) && (UV_VERSION_MINOR >= 38)))
uv_library_shutdown();
#endif

if (!memory_tracking_stop()) {
result = 1;
}

#if defined(_WIN32) && defined(_MSC_VER) && !defined(NDEBUG)
SymCleanup(GetCurrentProcess());
#endif

return result;
}
17 changes: 11 additions & 6 deletions src/memory_leak_debug.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
#include <atomic>
#include <type_traits>
#include <iostream>
#include <fstream>
#include <mutex>

#include <DbgHelp.h>

Expand Down Expand Up @@ -79,7 +81,7 @@ struct TrackedAllocation

static_assert(sizeof(TrackedAllocation) == 256, "");

uv_mutex_t allocation_lock;
std::mutex allocation_lock;
std::hash<void*> hasher;
uint32_t first[N];
uint32_t next[N];
Expand All @@ -98,7 +100,7 @@ void show_top_10_allocations()
const HANDLE h = GetCurrentProcess();

{
p2pool::MutexLock lock(allocation_lock);
std::lock_guard<std::mutex> lock(allocation_lock);

TrackedAllocation* end = buf;
for (size_t i = 0; i < N; ++i) {
Expand Down Expand Up @@ -173,7 +175,7 @@ FORCEINLINE static void add_alocation(void* p, size_t size)

const size_t index = hasher(p) & (N - 1);

p2pool::MutexLock lock(allocation_lock);
std::lock_guard<std::mutex> lock(allocation_lock);

++num_allocations;
if (num_allocations >= N / 2) {
Expand Down Expand Up @@ -204,7 +206,7 @@ FORCEINLINE static void remove_allocation(void* p)
return;
}

p2pool::MutexLock lock(allocation_lock);
std::lock_guard<std::mutex> lock(allocation_lock);

--num_allocations;

Expand Down Expand Up @@ -292,10 +294,14 @@ void memory_tracking_start()
// Trigger std::ostream initialization to avoid reporting it as leaks
std::cout << "Memory leak detection = " << 1 << std::endl;

// Trigger std::ofstream initialization to avoid reporting it as leaks
{
std::ofstream tmp("memory_tracking.tmp");
}

using namespace p2pool;

uv_replace_allocator(malloc_hook, realloc_hook, calloc_hook, free_hook);
uv_mutex_init_checked(&allocation_lock);
track_memory = true;
}

Expand All @@ -304,7 +310,6 @@ bool memory_tracking_stop()
using namespace p2pool;

track_memory = false;
uv_mutex_destroy(&allocation_lock);

const HANDLE h = GetCurrentProcess();

Expand Down
Loading

0 comments on commit ff9e378

Please sign in to comment.