diff --git a/src/XrdApps/XrdClJCachePlugin/cache/Journal.hh b/src/XrdApps/XrdClJCachePlugin/cache/Journal.hh index badc3225266..2bc3b91f4bd 100644 --- a/src/XrdApps/XrdClJCachePlugin/cache/Journal.hh +++ b/src/XrdApps/XrdClJCachePlugin/cache/Journal.hh @@ -29,8 +29,8 @@ #include #include #include +#include /*----------------------------------------------------------------------------*/ - class Journal { static constexpr uint64_t JOURNAL_MAGIC = 0xcafecafecafecafe; @@ -128,3 +128,40 @@ private: }; +class JournalManager { +private: + std::map> journals; + std::mutex jMutex; + +public: + JournalManager() {} + virtual ~JournalManager() {} + + // Attach method: creates or retrieves a Journal object by key + std::shared_ptr attach(const std::string &key) { + std::lock_guard guard(jMutex); + auto it = journals.find(key); + if (it == journals.end()) { + // Create a new Journal object if it doesn't exist + auto journal = std::make_shared(); + journals[key] = journal; + return journal; + } else { + // Return the existing Journal object + return it->second; + } + } + + // Detach method: checks reference count and removes Journal object if necessary + void detach(const std::string &key) { + std::lock_guard guard(jMutex); + auto it = journals.find(key); + if (it != journals.end()) { + if (it->second.use_count() == 1) { + // Only one reference exists, so erase the entry from the map + journals.erase(it); + } + // If more than one reference exists, do nothing + } + } +}; diff --git a/src/XrdApps/XrdClJCachePlugin/file/XrdClJCacheFile.cc b/src/XrdApps/XrdClJCachePlugin/file/XrdClJCacheFile.cc index 9f53113281d..04485aef2f2 100644 --- a/src/XrdApps/XrdClJCachePlugin/file/XrdClJCacheFile.cc +++ b/src/XrdApps/XrdClJCachePlugin/file/XrdClJCacheFile.cc @@ -30,6 +30,7 @@ std::string XrdCl::JCacheFile::sCachePath=""; bool XrdCl::JCacheFile::sEnableJournalCache = true; bool XrdCl::JCacheFile::sEnableVectorCache = true; +JournalManager XrdCl::JCacheFile::sJournalManager; namespace XrdCl { @@ -128,7 +129,7 @@ JCacheFile::Close(ResponseHandler* handler, st = XRootDStatus(stOK, 0); } if (sEnableJournalCache) { - pJournal.detach(); + pJournal->detach(); } } else { st = XRootDStatus(stOK, 0); @@ -172,7 +173,7 @@ JCacheFile::Read(uint64_t offset, if (pFile) { if (sEnableJournalCache && AttachForRead()) { - auto rb = pJournal.pread(buffer, size, offset); + auto rb = pJournal->pread(buffer, size, offset); if (rb == size) { pStats.bytesCached += rb; pStats.readOps++; @@ -187,7 +188,7 @@ JCacheFile::Read(uint64_t offset, } } - auto jhandler = new JCacheReadHandler(handler, &pStats.bytesRead,sEnableJournalCache?&pJournal:nullptr); + auto jhandler = new JCacheReadHandler(handler, &pStats.bytesRead,sEnableJournalCache?pJournal.get():nullptr); pStats.readOps++; st = pFile->Read(offset, size, buffer, jhandler, timeout); } else { @@ -231,7 +232,7 @@ JCacheFile::PgRead( uint64_t offset, XRootDStatus st; if (pFile) { if (sEnableJournalCache && AttachForRead()) { - auto rb = pJournal.pread(buffer, size, offset); + auto rb = pJournal->pread(buffer, size, offset); if (rb == size) { pStats.bytesCached += rb; pStats.readOps++; @@ -246,7 +247,7 @@ JCacheFile::PgRead( uint64_t offset, } } - auto jhandler = new JCachePgReadHandler(handler, &pStats.bytesRead,sEnableJournalCache?&pJournal:nullptr); + auto jhandler = new JCachePgReadHandler(handler, &pStats.bytesRead,sEnableJournalCache?pJournal.get():nullptr); pStats.readOps++; st = pFile->PgRead(offset, size, buffer, jhandler, timeout); } else { @@ -347,11 +348,50 @@ JCacheFile::VectorRead(const ChunkList& chunks, vResp = chunks; obj->Set(vReadInfo); handler->HandleResponse(ret_st, obj); + pStats.readVOps++; + pStats.readVreadOps += chunks.size(); + pStats.bytesCachedV += len; return st; } + } else { + if (sEnableJournalCache) { + bool inJournal = true; + size_t len = 0; + // try to get chunks from journal cache + for (auto it = chunks.begin(); it != chunks.end(); ++it) { + auto rb = pJournal->pread(it->buffer, it->length, it->offset); + if (rb != it->length) { + // interrupt if we miss a piece and go remote + inJournal = false; + break; + } else { + len += it->length; + } + } + if (inJournal) { + // we found everything in the journal + pStats.readVOps++; + pStats.readVreadOps += chunks.size(); + pStats.bytesCachedV += len; + XRootDStatus* ret_st = new XRootDStatus(st); + *ret_st = XRootDStatus(stOK, 0); + AnyObject* obj = new AnyObject(); + VectorReadInfo* vReadInfo = new VectorReadInfo(); + vReadInfo->SetSize(len); + ChunkList& vResp = vReadInfo->GetChunks(); + vResp = chunks; + obj->Set(vReadInfo); + handler->HandleResponse(ret_st, obj); + pStats.readVOps++; + pStats.readVreadOps += chunks.size(); + pStats.bytesCachedV += len; + return st; + } + } } + - auto jhandler = new JCacheReadVHandler(handler, &pStats.bytesReadV,sEnableJournalCache?&pJournal:nullptr, buffer?(char*)buffer:(char*)(chunks.begin()->buffer), sEnableVectorCache?sCachePath:"", pUrl); + auto jhandler = new JCacheReadVHandler(handler, &pStats.bytesReadV,sEnableJournalCache?pJournal.get():nullptr, buffer?(char*)buffer:(char*)(chunks.begin()->buffer), sEnableVectorCache?sCachePath:"", pUrl); pStats.readVOps++; pStats.readVreadOps += chunks.size(); @@ -453,10 +493,12 @@ JCacheFile::AttachForRead() if ((mFlags & OpenFlags::Flags::Read) == OpenFlags::Flags::Read) { // attach to a cache if (sEnableJournalCache && pFile) { + mLog->Info(1, "JCache : attaching via journalmanager to '%s'", pUrl.c_str()); + pJournal = sJournalManager.attach(pUrl); StatInfo* sinfo = 0; auto st = pFile->Stat(false, sinfo); if (sinfo) { - if (pJournal.attach(pJournalPath,sinfo->GetSize(),sinfo->GetModTime(),0)) { + if (pJournal->attach(pJournalPath,sinfo->GetSize(),sinfo->GetModTime(),0)) { mLog->Error(1, "JCache : failed to attach to cache directory: %s", pJournalPath.c_str()); mAttachedForRead = true; return false; diff --git a/src/XrdApps/XrdClJCachePlugin/file/XrdClJCacheFile.hh b/src/XrdApps/XrdClJCachePlugin/file/XrdClJCacheFile.hh index 29f0288ce26..f835fc0d81c 100644 --- a/src/XrdApps/XrdClJCachePlugin/file/XrdClJCacheFile.hh +++ b/src/XrdApps/XrdClJCachePlugin/file/XrdClJCacheFile.hh @@ -254,7 +254,8 @@ public: static std::string sCachePath; static bool sEnableVectorCache; static bool sEnableJournalCache; - + static JournalManager sJournalManager; + //---------------------------------------------------------------------------- //! @brief log cache hit statistics //---------------------------------------------------------------------------- @@ -286,13 +287,19 @@ public: {} double HitRate() { - return 100.0*(this->bytesCached.load()+1) /(this->bytesCached.load()+this->bytesRead.load()+1); + auto n = this->bytesCached.load()+this->bytesRead.load(); + if (!n) return 100.0; + return 100.0*(this->bytesCached.load()) / n; } double HitRateV() { - return 100.0*(this->bytesCachedV.load()+1) /(this->bytesCachedV.load()+this->bytesReadV.load()+1); + auto n = this->bytesCachedV.load()+this->bytesReadV.load(); + if (!n) return 100.0; + return 100.0*(this->bytesCachedV.load()) / n; } double CombinedHitRate() { - return 100.0*(this->bytesCached.load()+this->bytesCachedV.load()+1) /(this->bytesCached.load()+this->bytesRead.load()+this->bytesCachedV.load()+this->bytesReadV.load()+1); + auto n = (this->bytesCached.load()+this->bytesRead.load()+this->bytesCachedV.load()+this->bytesReadV.load()); + if (!n) return 100.0; + return 100.0*(this->bytesCached.load()+this->bytesCachedV.load()) / n; } std::atomic bytesRead; @@ -321,7 +328,7 @@ private: //! @brief URL of the remote file std::string pUrl; //! @brief instance of a local journal for this file - Journal pJournal; + std::shared_ptr pJournal; //! @brief path to the journal of this file std::string pJournalPath; //! @brief pointer to logging object