Skip to content

Commit

Permalink
teach navcat to serve zstd compressed files
Browse files Browse the repository at this point in the history
  • Loading branch information
berthubert committed Nov 22, 2022
1 parent 4394b71 commit e2260a2
Showing 1 changed file with 27 additions and 20 deletions.
47 changes: 27 additions & 20 deletions navcat.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
#include <dirent.h>
#include <inttypes.h>
#include "navmon.hh"

// #include <execution>
#include "CLI/CLI.hpp"
#include "version.hh"

Expand All @@ -27,17 +27,18 @@ using namespace std;
extern const char* g_gitHash;

// get all stations (numerical) from a directory
vector<uint64_t> getSources(string_view dirname)
static vector<uint64_t> getSources(string_view dirname)
{
DIR *dir = opendir(&dirname[0]);
if(!dir)
shared_ptr<DIR> dir;
if(auto ptr = opendir(&dirname[0]))
dir=shared_ptr<DIR>(ptr, closedir);
else
unixDie("Listing metrics from statistics storage "+(string)dirname);
struct dirent *result=0;
vector<uint64_t> ret;
for(;;) {
errno=0;
if(!(result = readdir(dir))) {
closedir(dir);
if(!(result = readdir(dir.get()))) {
if(errno)
unixDie("Reading directory entry "+(string)dirname);
else
Expand Down Expand Up @@ -67,7 +68,6 @@ void sendProtobuf(const vector<string>& dirs, vector<uint64_t> stations, time_t
start.tv_nsec = 0;

// so we have a ton of files, and internally these are not ordered
map<string,uint32_t> fpos;
vector<pair<timespec,string> > rnmms;

for(;;) {
Expand All @@ -79,34 +79,41 @@ void sendProtobuf(const vector<string>& dirs, vector<uint64_t> stations, time_t
int count=0;
for(const auto& src: srcs) {
string fname = getPath(dir, start.tv_sec, src);
FILE* fp = fopen(fname.c_str(), "r");
if(!fp)
continue;
uint32_t offset= fpos[fname];
if(fseek(fp, offset, SEEK_SET) < 0) {
cerr<<"Error seeking: "<<strerror(errno) <<endl;
fclose(fp);
continue;
FILE* ptr = fopen(fname.c_str(), "r");
shared_ptr<FILE> fp;
if(ptr) {
fp = shared_ptr<FILE>(ptr, fclose);
}
else {
fname+=".zst";
struct stat statbuf;
if(stat(fname.c_str(), &statbuf) == 0) {
ptr = popen(("zstdcat "+fname).c_str(), "r");
if(!ptr)
continue;
fp = shared_ptr<FILE>(ptr, pclose);
}
else
continue;
}
// cerr <<"Seeked to position "<<fpos[fname]<<" of "<<fname<<endl;

string msg;
struct timespec ts;
while(getRawNMM(fp, ts, msg, offset)) {
unsigned int offset=0;
while(getRawNMM(fp.get(), ts, msg, offset)) {
// don't drop data that is only 5 seconds too old
if(make_pair(ts.tv_sec + 5, ts.tv_nsec) >= make_pair(start.tv_sec, start.tv_nsec)) {
rnmms.push_back({ts, msg});
++count;
}
}
// cerr<<"Harvested "<<rnmms.size()<<" events out of "<<looked<<endl;

fpos[fname]=offset;
fclose(fp);
// fp will close itself
}
cerr<<" added "<<count<<endl;
}
// cerr<<"Sorting data"<<endl;
// sort(execution::par, rnmms.begin(), rnmms.end(), [](const auto& a, const auto& b)
sort(rnmms.begin(), rnmms.end(), [](const auto& a, const auto& b)
{
return std::tie(a.first.tv_sec, a.first.tv_nsec)
Expand Down

0 comments on commit e2260a2

Please sign in to comment.