From e0957747fd35c70b6e3f64cbfd9199eaf97671db Mon Sep 17 00:00:00 2001 From: Milot Mirdita Date: Sat, 18 Jan 2025 19:16:03 +0900 Subject: [PATCH] gpu client waits for gpuserver to be ready --- src/commons/GpuUtil.h | 8 +++ src/commons/Parameters.cpp | 7 +++ src/commons/Parameters.h | 2 + src/prefiltering/PrefilteringIndexReader.cpp | 2 +- src/prefiltering/PrefilteringIndexReader.h | 2 +- src/prefiltering/ungappedprefilter.cpp | 56 ++++++++++++++++---- src/util/gpuserver.cpp | 10 ++-- 7 files changed, 68 insertions(+), 19 deletions(-) diff --git a/src/commons/GpuUtil.h b/src/commons/GpuUtil.h index 4995168a..deb354a9 100644 --- a/src/commons/GpuUtil.h +++ b/src/commons/GpuUtil.h @@ -2,6 +2,8 @@ #define GPUUTIL_H #include "Debug.h" +#include "FileUtil.h" +#include "PrefilteringIndexReader.h" #include "marv.h" #include #include @@ -37,6 +39,12 @@ struct GPUSharedMemory { sizeof(int8_t) * 20 * maxSeqLen; // Size for profile data } + static std::string getShmHash(const std::string& db) { + std::string dbpath = FileUtil::getRealPathFromSymLink(PrefilteringIndexReader::dbPathWithoutIndex(db)); + size_t hash = Util::hash(dbpath.c_str(), dbpath.length()); + return SSTR(hash); + } + // Allocate and initialize shared memory static GPUSharedMemory* alloc(const std::string& name, unsigned int maxSeqLen, unsigned int maxResListLen) { size_t shm_size = calculateSize(maxSeqLen, maxResListLen); diff --git a/src/commons/Parameters.cpp b/src/commons/Parameters.cpp index a0e6d321..3e4b9bd5 100644 --- a/src/commons/Parameters.cpp +++ b/src/commons/Parameters.cpp @@ -104,6 +104,7 @@ Parameters::Parameters(): // gpu PARAM_GPU(PARAM_GPU_ID, "--gpu", "Use GPU", "Use GPU (CUDA) if possible", typeid(int), (void *) &gpu, "^[0-1]{1}$", MMseqsParameter::COMMAND_COMMON), PARAM_GPU_SERVER(PARAM_GPU_SERVER_ID, "--gpu-server", "Use GPU server", "Use GPU server", typeid(int), (void *) &gpuServer, "^[0-1]{1}$", MMseqsParameter::COMMAND_COMMON), + PARAM_GPU_SERVER_WAIT_TIMEOUT(PARAM_GPU_SERVER_WAIT_TIMEOUT_ID, "--gpu-server-wait-timeout", "Wait for GPU server", "Wait for GPU server for 0: don't wait -1: no wait limit: >0 this many seconds", typeid(int), (void *) &gpuServerWaitTimeout, "^-?[0-9]+", MMseqsParameter::COMMAND_COMMON), // convertalignments PARAM_FORMAT_MODE(PARAM_FORMAT_MODE_ID, "--format-mode", "Alignment format", "Output format:\n0: BLAST-TAB\n1: SAM\n2: BLAST-TAB + query/db length\n3: Pretty HTML\n4: BLAST-TAB + column headers\nBLAST-TAB (0) and BLAST-TAB + column headers (4) support custom output formats (--format-output)", typeid(int), (void *) &formatAlignmentMode, "^[0-4]{1}$"), PARAM_FORMAT_OUTPUT(PARAM_FORMAT_OUTPUT_ID, "--format-output", "Format alignment output", "Choose comma separated list of output columns from: query,target,evalue,gapopen,pident,fident,nident,qstart,qend,qlen\ntstart,tend,tlen,alnlen,raw,bits,cigar,qseq,tseq,qheader,theader,qaln,taln,qframe,tframe,mismatch,qcov,tcov\nqset,qsetid,tset,tsetid,taxid,taxname,taxlineage,qorfstart,qorfend,torfstart,torfend,ppos", typeid(std::string), (void *) &outfmt, ""), @@ -455,6 +456,7 @@ Parameters::Parameters(): ungappedprefilter.push_back(&PARAM_PRELOAD_MODE); ungappedprefilter.push_back(&PARAM_GPU); ungappedprefilter.push_back(&PARAM_GPU_SERVER); + ungappedprefilter.push_back(&PARAM_GPU_SERVER_WAIT_TIMEOUT); ungappedprefilter.push_back(&PARAM_PREF_MODE); ungappedprefilter.push_back(&PARAM_THREADS); ungappedprefilter.push_back(&PARAM_COMPRESSED); @@ -1357,6 +1359,7 @@ Parameters::Parameters(): clusterworkflow = combineList(clusterworkflow, linclustworkflow); clusterworkflow = removeParameter(clusterworkflow, PARAM_GPU); clusterworkflow = removeParameter(clusterworkflow, PARAM_GPU_SERVER); + clusterworkflow = removeParameter(clusterworkflow, PARAM_GPU_SERVER_WAIT_TIMEOUT); // easyclusterworkflow easyclusterworkflow = combineList(clusterworkflow, createdb); @@ -1400,6 +1403,7 @@ Parameters::Parameters(): clusterUpdate.push_back(&PARAM_RECOVER_DELETED); clusterUpdate = removeParameter(clusterUpdate, PARAM_GPU); clusterUpdate = removeParameter(clusterUpdate, PARAM_GPU_SERVER); + clusterUpdate = removeParameter(clusterUpdate, PARAM_GPU_SERVER_WAIT_TIMEOUT); mapworkflow = combineList(prefilter, rescorediagonal); mapworkflow = combineList(mapworkflow, extractorfs); @@ -1410,6 +1414,7 @@ Parameters::Parameters(): mapworkflow.push_back(&PARAM_REMOVE_TMP_FILES); mapworkflow = removeParameter(mapworkflow, PARAM_GPU); mapworkflow = removeParameter(mapworkflow, PARAM_GPU_SERVER); + mapworkflow = removeParameter(mapworkflow, PARAM_GPU_SERVER_WAIT_TIMEOUT); enrichworkflow = combineList(searchworkflow, prefilter); enrichworkflow = combineList(enrichworkflow, subtractdbs); @@ -1418,6 +1423,7 @@ Parameters::Parameters(): enrichworkflow = combineList(enrichworkflow, result2profile); enrichworkflow = removeParameter(enrichworkflow, PARAM_GPU); enrichworkflow = removeParameter(enrichworkflow, PARAM_GPU_SERVER); + enrichworkflow = removeParameter(enrichworkflow, PARAM_GPU_SERVER_WAIT_TIMEOUT); databases.push_back(&PARAM_HELP); databases.push_back(&PARAM_HELP_LONG); @@ -2468,6 +2474,7 @@ void Parameters::setDefaults() { } #endif gpuServer = 0; + gpuServerWaitTimeout = 10 * 60; #ifdef HAVE_CUDA char* gpuServerEnv = getenv("MMSEQS_FORCE_GPUSERVER"); if (gpuServerEnv != NULL) { diff --git a/src/commons/Parameters.h b/src/commons/Parameters.h index efc1ebda..17414501 100644 --- a/src/commons/Parameters.h +++ b/src/commons/Parameters.h @@ -394,6 +394,7 @@ class Parameters { int verbosity; // log level int gpu; // use GPU int gpuServer; // use the gpu server + int gpuServerWaitTimeout; // wait for this many seconds until GPU server is ready int threads; // Amounts of threads int compressed; // compressed writer bool removeTmpFiles; // Do not delete temp files @@ -820,6 +821,7 @@ class Parameters { // gpu PARAMETER(PARAM_GPU) PARAMETER(PARAM_GPU_SERVER) + PARAMETER(PARAM_GPU_SERVER_WAIT_TIMEOUT) // format alignment PARAMETER(PARAM_FORMAT_MODE) PARAMETER(PARAM_FORMAT_OUTPUT) diff --git a/src/prefiltering/PrefilteringIndexReader.cpp b/src/prefiltering/PrefilteringIndexReader.cpp index dc0e4406..603448bc 100644 --- a/src/prefiltering/PrefilteringIndexReader.cpp +++ b/src/prefiltering/PrefilteringIndexReader.cpp @@ -593,7 +593,7 @@ std::string PrefilteringIndexReader::searchForIndex(const std::string &pathToDB) return ""; } -std::string PrefilteringIndexReader::dbPathWithoutIndex(std::string & dbname) { +std::string PrefilteringIndexReader::dbPathWithoutIndex(const std::string& dbname) { std::string rawname = dbname; // check for .idx size_t idxlastpos = dbname.rfind(".idx"); diff --git a/src/prefiltering/PrefilteringIndexReader.h b/src/prefiltering/PrefilteringIndexReader.h index b04f16b7..b2880b84 100644 --- a/src/prefiltering/PrefilteringIndexReader.h +++ b/src/prefiltering/PrefilteringIndexReader.h @@ -86,7 +86,7 @@ class PrefilteringIndexReader { static std::string searchForIndex(const std::string &pathToDB); - static std::string dbPathWithoutIndex(std::string &dbname); + static std::string dbPathWithoutIndex(const std::string &dbname); private: static void printMeta(int *meta); diff --git a/src/prefiltering/ungappedprefilter.cpp b/src/prefiltering/ungappedprefilter.cpp index 1640dd56..29347c38 100644 --- a/src/prefiltering/ungappedprefilter.cpp +++ b/src/prefiltering/ungappedprefilter.cpp @@ -18,6 +18,8 @@ #include #include +#include +#include #ifdef OPENMP #include @@ -60,14 +62,48 @@ void runFilterOnGpu(Parameters & par, BaseMatrix * subMat, memset(compositionBias, 0, compBufferSize); } - // hash the realpath of par.db2 - std::string tdbrName = par.db2; - std::string tdbrRelPath = FileUtil::getRealPathFromSymLink(PrefilteringIndexReader::dbPathWithoutIndex(tdbrName)); - size_t hash = Util::hash(tdbrRelPath.c_str(), tdbrRelPath.length()); - std::string shmFileInFile = (par.gpuServer == 0) ? "" : "/dev/shm/" + SSTR(hash); - if (shmFileInFile != "" && FileUtil::fileExists(shmFileInFile.c_str()) == false) { - Debug(Debug::ERROR) << "--gpu-server " << shmFileInFile << " does not exist"; - EXIT(EXIT_FAILURE); + std::string hash = ""; + if (par.gpuServer != 0) { + hash = GPUSharedMemory::getShmHash(par.db2); + std::string path = "/dev/shm/" + hash; + int waitTimeout = par.gpuServerWaitTimeout; + std::chrono::steady_clock::time_point startTime = std::chrono::steady_clock::now(); + bool statusPrinted = false; + while (true) { + size_t shmSize = FileUtil::getFileSize(path); + // server is ready once the shm file exists and is not 0 byte large + if (shmSize != (size_t)-1 && shmSize > 0) { + break; + } + + if (waitTimeout == 0) { + Debug(Debug::ERROR) + << "gpuserver for database " << par.db2 << " not found.\n" + << "Please start gpuserver with the same CUDA_VISIBLE_DEVICES\n"; + EXIT(EXIT_FAILURE); + } + + if (waitTimeout > 0) { + if (statusPrinted == false) { + Debug(Debug::INFO) << "Waiting for `gpuserver`"; + statusPrinted = true; + } else { + Debug(Debug::INFO) << "."; + } + std::chrono::steady_clock::time_point now = std::chrono::steady_clock::now(); + auto elapsed = std::chrono::duration_cast(now - startTime).count(); + if (elapsed >= waitTimeout) { + Debug(Debug::ERROR) + << "gpuserver for database " << par.db2 << " not found after " << elapsed << "seconds.\n" + << "Please start gpuserver with the same CUDA_VISIBLE_DEVICES\n"; + EXIT(EXIT_FAILURE); + } + } + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + } + if (waitTimeout > 0 && statusPrinted) { + Debug(Debug::INFO) << "\n"; + } } size_t* offsetData = NULL; @@ -76,7 +112,7 @@ void runFilterOnGpu(Parameters & par, BaseMatrix * subMat, std::vector lengths; GPUSharedMemory* layout = NULL; pid_t pid = 0; // current process ID, only for server - if (FileUtil::fileExists(shmFileInFile.c_str()) == false) { + if (hash.empty()) { offsets.reserve(tdbr->getSize() + 1); lengths.reserve(tdbr->getSize()); for (size_t id = 0; id < tdbr->getSize(); id++) { @@ -88,7 +124,7 @@ void runFilterOnGpu(Parameters & par, BaseMatrix * subMat, lengthData = lengths.data(); } else { pid = getpid(); - layout = GPUSharedMemory::openSharedMemory(SSTR(hash)); + layout = GPUSharedMemory::openSharedMemory(hash); } const bool serverMode = par.gpuServer; diff --git a/src/util/gpuserver.cpp b/src/util/gpuserver.cpp index db4887f2..cb475fd8 100644 --- a/src/util/gpuserver.cpp +++ b/src/util/gpuserver.cpp @@ -48,13 +48,6 @@ int gpuserver(int argc, const char **argv, const Command& command) { offsets.emplace_back(offsets.back() + lengths.back()); int32_t maxTargetLength = lengths.back(); - std::string dbrName = par.db1; - std::string dbrRelPath = FileUtil::getRealPathFromSymLink(PrefilteringIndexReader::dbPathWithoutIndex(dbrName)); - size_t hash = Util::hash(dbrRelPath.c_str(), dbrRelPath.length()); - - std::string shmFile = SSTR(hash); - GPUSharedMemory* layout = GPUSharedMemory::alloc(shmFile, par.maxSeqLen , par.maxResListLen); // Adjust sizes as necessary - BaseMatrix *subMat; if (Parameters::isEqualDbtype(dbrIdx.sequenceReader->getDbtype(), Parameters::DBTYPE_NUCLEOTIDES)) { subMat = new NucleotideMatrix(par.scoringMatrixFile.values.nucleotide().c_str(), 1.0, 0.0); @@ -78,6 +71,9 @@ int gpuserver(int argc, const char **argv, const Command& command) { // Set up the handler for SIGINT and SIGTERM sigaction(SIGINT, &act, NULL); sigaction(SIGTERM, &act, NULL); + + std::string shmFile = GPUSharedMemory::getShmHash(par.db1); + GPUSharedMemory* layout = GPUSharedMemory::alloc(shmFile, par.maxSeqLen, par.maxResListLen); Debug(Debug::WARNING) << shmFile << "\n"; while (keepRunning) { while (layout->serverReady.load(std::memory_order_acquire) == 0 || layout->clientReady.load(std::memory_order_acquire) == 0) {