Skip to content

Commit

Permalink
XrdApps::JCache: re-format code and add a missing <atomic> include
Browse files Browse the repository at this point in the history
  • Loading branch information
apeters1971 committed Jun 19, 2024
1 parent 0079635 commit 869f0b7
Show file tree
Hide file tree
Showing 13 changed files with 205 additions and 168 deletions.
32 changes: 17 additions & 15 deletions src/XrdApps/XrdClJCachePlugin/cache/Journal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,13 @@
#include "Journal.hh"
/*----------------------------------------------------------------------------*/
#include <algorithm>
#include <cerrno>
#include <cstring>
#include <fcntl.h>
#include <iostream>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include <iostream>
#include <cstring>
#include <cerrno>
/*----------------------------------------------------------------------------*/

//------------------------------------------------------------------------------
Expand Down Expand Up @@ -61,7 +60,8 @@ Journal::~Journal() {

lock.l_type = F_UNLCK; // Unlock the file
if (fcntl(fd, F_SETLK, &lock) == -1) {
std::cerr << "error: failed to unlock journal: " << std::strerror(errno) << std::endl;
std::cerr << "error: failed to unlock journal: " << std::strerror(errno)
<< std::endl;
}

int rc = close(fd);
Expand All @@ -79,7 +79,7 @@ Journal::~Journal() {
//------------------------------------------------------------------------------
void Journal::read_jheader() {
jheader_t fheader;
bool exists=false;
bool exists = false;
auto hr = ::pread64(fd, &fheader, sizeof(jheader), 0);
if ((hr > 0) &&
((hr != sizeof(jheader)) || (fheader.magic != JOURNAL_MAGIC))) {
Expand All @@ -90,7 +90,7 @@ void Journal::read_jheader() {
return;
}

exists = (hr==sizeof(jheader));
exists = (hr == sizeof(jheader));

// TODO: understand why the mtime is +-1s
if (jheader.mtime) {
Expand All @@ -100,16 +100,17 @@ void Journal::read_jheader() {
(fheader.mtime_nsec != jheader.mtime_nsec) ||
(jheader.filesize && (fheader.filesize != jheader.filesize))) {
std::cerr << "warning: remote file change detected - purging path:"
<< path << std::endl;
std::cerr << fheader.mtime << ":" << jheader.mtime << " "
<< fheader.mtime_nsec << ":" << jheader.mtime_nsec << " "
<< fheader.filesize << ":" << jheader.filesize << std::endl;
<< path << std::endl;
std::cerr << fheader.mtime << ":" << jheader.mtime << " "
<< fheader.mtime_nsec << ":" << jheader.mtime_nsec << " "
<< fheader.filesize << ":" << jheader.filesize << std::endl;
reset();
return;
}
}
} else {
// we assume the contents referenced in the header is ok to allow disconnected ops
// we assume the contents referenced in the header is ok to allow
// disconnected ops
jheader.mtime = fheader.mtime;
jheader.filesize = fheader.filesize;
}
Expand Down Expand Up @@ -217,14 +218,15 @@ int Journal::attach(const std::string &lpath, uint64_t mtime,

if (fcntl(fd, F_SETLK, &lock) == -1) {
if (errno == EACCES || errno == EAGAIN) {
std::cerr << "error: journal file is already locked by another process."
<< std::endl;
std::cerr
<< "error: journal file is already locked by another process."
<< std::endl;
close(fd);
fd = -1;
return -errno;
} else {
std::cerr << "error: failed to lock journal file: " << std::strerror(errno)
<< std::endl;
std::cerr << "error: failed to lock journal file: "
<< std::strerror(errno) << std::endl;
close(fd);
fd = -1;
return -errno;
Expand Down
4 changes: 2 additions & 2 deletions src/XrdApps/XrdClJCachePlugin/cache/Journal.hh
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public:

// base class interface
int attach(const std::string &path, uint64_t mtime, uint64_t mtime_nsec,
uint64_t size, bool ifexists=false);
uint64_t size, bool ifexists = false);
int detach();
int unlink();

Expand All @@ -105,7 +105,7 @@ public:
return jheader.filesize;
}

off_t getHeaderMtime() {
off_t getHeaderMtime() {
std::lock_guard<std::mutex> guard(mtx);
return jheader.mtime;
}
Expand Down
6 changes: 3 additions & 3 deletions src/XrdApps/XrdClJCachePlugin/cache/RbTree.hh
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ protected:

template <typename PTR> // make it a template so it works both for constant
// and mutable pointers
static PTR &find_in(const K &key, PTR &node) {
static PTR &find_in(const K &key, PTR &node) {
if (!node) {
return null_node;
}
Expand All @@ -372,7 +372,7 @@ protected:

template <typename PTR> // make it a template so it works both for constant
// and mutable pointers
static PTR &find_min(PTR &node) {
static PTR &find_min(PTR &node) {
if (!node) {
return null_node;
}
Expand All @@ -386,7 +386,7 @@ protected:

template <typename PTR> // make it a template so it works both for constant
// and mutable pointers
static PTR &find_successor(PTR &node) {
static PTR &find_successor(PTR &node) {
if (!node) {
return null_node;
}
Expand Down
20 changes: 12 additions & 8 deletions src/XrdApps/XrdClJCachePlugin/cleaner/Cleaner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,9 @@ long long Cleaner::getDirectorySize(const fs::path &directory, bool scan) {
struct statfs stat;

if (statfs(directory.c_str(), &stat) != 0) {
mLog->Error(1,"JCache:Cleaner: failed to get directory size using statfs.");
return 0;
mLog->Error(1,
"JCache:Cleaner: failed to get directory size using statfs.");
return 0;
}
}
return totalSize;
Expand Down Expand Up @@ -108,8 +109,10 @@ void Cleaner::cleanDirectory(const fs::path &directory, long long highWatermark,
long long currentSize = getDirectorySize(directory);
if (currentSize <= highWatermark) {
/*----------------------------------------------------------------------------*/
mLog->Info(1,"JCache:Cleaner: Directory size is within the limit (%lu/%lu). No action needed.",
currentSize, highWatermark);
mLog->Info(1,
"JCache:Cleaner: Directory size is within the limit (%lu/%lu). "
"No action needed.",
currentSize, highWatermark);
return;
}

Expand All @@ -130,12 +133,13 @@ void Cleaner::cleanDirectory(const fs::path &directory, long long highWatermark,
std::error_code ec;
fs::remove_all(parentDir, ec);
if (ec) {
mLog->Error(1, "JCache::Cleaner: error deleting directory '%s'", parentDir.c_str());
mLog->Error(1, "JCache::Cleaner: error deleting directory '%s'",
parentDir.c_str());
}
mLog->Info(1, "JCache:Cleaner : deleted '%s' (Size: %lu bytes)", filePath.c_str(),
fileSize);
mLog->Info(1, "JCache:Cleaner : deleted '%s' (Size: %lu bytes)",
filePath.c_str(), fileSize);
} catch (const std::exception &e) {
mLog->Error(1,"JCache::Cleaner error deleting '%'", filePath.c_str());
mLog->Error(1, "JCache::Cleaner error deleting '%'", filePath.c_str());
}
}
}
Expand Down
30 changes: 15 additions & 15 deletions src/XrdApps/XrdClJCachePlugin/cleaner/Cleaner.hh
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,14 @@ public:
bool scan, size_t interval)
: lowWatermark(lowWatermark), highWatermark(highWatermark), subtree(path),
scan(scan), interval(interval), stopFlag(false) {
mLog = XrdCl::DefaultEnv::GetLog();
}
mLog = XrdCl::DefaultEnv::GetLog();
}

Cleaner()
: lowWatermark(0), highWatermark(0), subtree(""),
scan(true), interval(60), stopFlag(false) {
mLog = XrdCl::DefaultEnv::GetLog();
}
Cleaner()
: lowWatermark(0), highWatermark(0), subtree(""), scan(true),
interval(60), stopFlag(false) {
mLog = XrdCl::DefaultEnv::GetLog();
}

// Method to start the cleaning process in a separate thread
void run() {
Expand All @@ -79,22 +79,22 @@ public:
~Cleaner() { stop(); }

// Method to Define Cleaner size
void SetSize(uint64_t size, const std::string& path) {
void SetSize(uint64_t size, const std::string &path) {
stop();
if (size > 1024ll*1024ll*1024ll) {
if (size > 1024ll * 1024ll * 1024ll) {
subtree = path;
highWatermark = size;
lowWatermark = size * 0.9;
run();
} else {
mLog->Error(1, "JCache:Cleaner : the size given to the cleaner is less than 1GB - cleaning is disabled!");
mLog->Error(1, "JCache:Cleaner : the size given to the cleaner is less "
"than 1GB - cleaning is disabled!");
}
}

// Method to set the scan option (true means scan, false means don't scan but use statfs!)
void SetScan(bool sc) {
scan = sc;
}
// Method to set the scan option (true means scan, false means don't scan but
// use statfs!)
void SetScan(bool sc) { scan = sc; }

private:
// Private methods
Expand Down Expand Up @@ -132,7 +132,7 @@ private:
auto duration =
std::chrono::duration_cast<std::chrono::seconds>(end - start);

if ( (size_t)duration.count() < interval) {
if ((size_t)duration.count() < interval) {
auto s = std::chrono::seconds(interval) - duration;
std::this_thread::sleep_for(s);
}
Expand Down
2 changes: 1 addition & 1 deletion src/XrdApps/XrdClJCachePlugin/file/Art.hh
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public:
minValue = 0;

// we round the axis to clean 100
maxValue = (int)(maxValue+9) / 10 * 10;
maxValue = (int)(maxValue + 9) / 10 * 10;

const int plotHeight = 10; // Number of lines in the plot
const int plotWidth = 40; // Width of the plot in characters
Expand Down
49 changes: 27 additions & 22 deletions src/XrdApps/XrdClJCachePlugin/file/CacheStats.hh
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,13 @@ namespace JCache {
struct CacheStats {
CacheStats(bool doe = false)
: bytesRead(0), bytesReadV(0), bytesCached(0), bytesCachedV(0),
readOps(0), readVOps(0), readVreadOps(0), nreadfiles(0), totaldatasize(0),
dumponexit(doe), peakrate(0) {
readOps(0), readVOps(0), readVreadOps(0), nreadfiles(0),
totaldatasize(0), dumponexit(doe), peakrate(0) {
// Get the current real time
struct timeval now;
gettimeofday(&now, nullptr);
startTime = now.tv_sec + now.tv_usec / 1000000.0;
dumperInterval=0;
dumperInterval = 0;
}

~CacheStats() {
Expand Down Expand Up @@ -82,7 +82,8 @@ struct CacheStats {
XrdCl::JCacheFile::sStats.persistToJson(jsonpath, name);
}
if (XrdCl::JCacheFile::sEnableSummary) {
std::cerr << CacheStats::GlobalStats(XrdCl::JCacheFile::sStats, XrdCl::JCacheFile::sEnableBypass);
std::cerr << CacheStats::GlobalStats(XrdCl::JCacheFile::sStats,
XrdCl::JCacheFile::sEnableBypass);
}
std::vector<uint64_t> bins = XrdCl::JCacheFile::sStats.bench.GetBins(40);
JCache::Art art;
Expand All @@ -102,7 +103,7 @@ struct CacheStats {
}

void Reset() {
bytesRead = 0 ;
bytesRead = 0;
bytesReadV = 0;
bytesCached = 0;
bytesCachedV = 0;
Expand Down Expand Up @@ -287,16 +288,16 @@ struct CacheStats {
<< std::endl;
oss << "# JCache : cache read hit rate : " << std::fixed
<< std::setprecision(2) << (!sStats.ReadOpBytes() ? "\033[9m" : "")
<< sStats.HitRate() << " %" << (!sStats.ReadOpBytes() ? "\033[0m" : "")
<< std::endl;
<< sStats.HitRate() << " %"
<< (!sStats.ReadOpBytes() ? "\033[0m" : "") << std::endl;
oss << "# JCache : cache readv hit rate : " << std::fixed
<< std::setprecision(2) << (!sStats.ReadVOpBytes() ? "\033[9m" : "")
<< sStats.HitRateV() << " %" << (!sStats.ReadOpBytes() ? "\033[0m" : "")
<< std::endl;
<< sStats.HitRateV() << " %"
<< (!sStats.ReadOpBytes() ? "\033[0m" : "") << std::endl;
}
oss << "# "
"-------------------------------------------------------------------"
"---- #"
"-------------------------------------------------------------------"
"---- #"
<< std::endl;

oss << "# JCache : total bytes read : "
Expand All @@ -321,8 +322,8 @@ struct CacheStats {
<< std::endl;
oss << "# JCache : open unique f. read : " << sStats.UniqueUrls()
<< std::endl;
oss << "# JCache : time to open files (s) : " << std::setprecision(3) << sStats.opentime.load()
<< std::endl;
oss << "# JCache : time to open files (s) : " << std::setprecision(3)
<< sStats.opentime.load() << std::endl;
oss << "# "
"-------------------------------------------------------------------"
"---- #"
Expand Down Expand Up @@ -369,7 +370,7 @@ struct CacheStats {
std::atomic<uint64_t> readVreadOps;
std::atomic<uint64_t> nreadfiles;
std::atomic<uint64_t> totaldatasize;
std::atomic<double> opentime;
std::atomic<double> opentime;

std::atomic<bool> dumponexit;
std::set<std::string> urls;
Expand All @@ -386,26 +387,30 @@ struct CacheStats {
std::thread dumperThread;
std::atomic<bool> stopFlag;


// Loop to regulary dump the statistics and to reset global counters
static void dumper(CacheStats* stats) {
static void dumper(CacheStats *stats) {
while (!stats->stopFlag.load()) {
if (stats->dumperInterval) {
std::this_thread::sleep_for(std::chrono::seconds(stats->dumperInterval));
std::this_thread::sleep_for(
std::chrono::seconds(stats->dumperInterval));
} else {
break;
}
if (XrdCl::JCacheFile::sEnableSummary) {
XrdCl::JCacheFile::sStats.GetTimes();
XrdCl::JCacheFile::sStats.bytes_per_second =
XrdCl::JCacheFile::sStats.bench.GetBins((int)(XrdCl::JCacheFile::sStats.realTime));
XrdCl::JCacheFile::sStats.peakrate =
*(std::max_element(XrdCl::JCacheFile::sStats.bytes_per_second.begin(),
XrdCl::JCacheFile::sStats.bench.GetBins(
(int)(XrdCl::JCacheFile::sStats.realTime));
XrdCl::JCacheFile::sStats.peakrate = *(
std::max_element(XrdCl::JCacheFile::sStats.bytes_per_second.begin(),
XrdCl::JCacheFile::sStats.bytes_per_second.end()));
if (XrdCl::JCacheFile::sStats.realTime < 1) {
XrdCl::JCacheFile::sStats.peakrate = XrdCl::JCacheFile::sStats.ReadBytes() / XrdCl::JCacheFile::sStats.realTime;
XrdCl::JCacheFile::sStats.peakrate =
XrdCl::JCacheFile::sStats.ReadBytes() /
XrdCl::JCacheFile::sStats.realTime;
}
std::string st = CacheStats::GlobalStats(XrdCl::JCacheFile::sStats, XrdCl::JCacheFile::sEnableBypass);
std::string st = CacheStats::GlobalStats(
XrdCl::JCacheFile::sStats, XrdCl::JCacheFile::sEnableBypass);
std::cerr << st << std::endl;
}
}
Expand Down
Loading

0 comments on commit 869f0b7

Please sign in to comment.