Skip to content

Commit

Permalink
Changed server log to have shared read access so that we don't have t…
Browse files Browse the repository at this point in the history
…o terminate the server before looking at the log. Add a logFlush() function and do a few more log flush on occasion to limit buffered log getting big.
  • Loading branch information
ben-may committed May 2, 2023
1 parent 763095d commit 77cdc0b
Show file tree
Hide file tree
Showing 7 changed files with 3,099 additions and 3,032 deletions.
7 changes: 4 additions & 3 deletions include/EACopyClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ namespace eacopy

enum : uint {
ClientMajorVersion = 1,
ClientMinorVersion = 14
ClientMinorVersion = 15
};

///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -103,6 +103,7 @@ struct ClientStats
u64 netCreateDirCount = 0;
u64 netFileInfoTime = 0;
u64 netFileInfoCount = 0;
u64 processedByServerCount = 0;

u64 readLinkDbTime = 0;
u64 readLinkDbEntries = 0;
Expand Down Expand Up @@ -224,10 +225,10 @@ class Client::Connection
~Connection();
bool sendCommand(const Command& cmd);
bool sendTextCommand(const wchar_t* text);
bool sendWriteFileCommand(const wchar_t* src, const wchar_t* dst, const FileInfo& srcInfo, u64& outSize, u64& outWritten, bool& outLinked, NetworkCopyContext& copyContext);
bool sendWriteFileCommand(const wchar_t* src, const wchar_t* dst, const FileInfo& srcInfo, u64& outSize, u64& outWritten, bool& outLinked, NetworkCopyContext& copyContext, bool &processedByServer);

enum ReadFileResult { ReadFileResult_Error, ReadFileResult_Success, ReadFileResult_ServerBusy };
ReadFileResult sendReadFileCommand(const wchar_t* src, const wchar_t* dst, const FileInfo& srcInfo, u64& outSize, u64& outRead, NetworkCopyContext& copyContext);
ReadFileResult sendReadFileCommand(const wchar_t* src, const wchar_t* dst, const FileInfo& srcInfo, u64& outSize, u64& outRead, NetworkCopyContext& copyContext, bool& processedByServer);


bool sendCreateDirectoryCommand(const wchar_t* directory, FilesSet& outCreatedDirs);
Expand Down
2 changes: 1 addition & 1 deletion include/EACopyServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ namespace eacopy

enum : uint {
ServerMajorVersion = 1,
ServerMinorVersion = 8,
ServerMinorVersion = 9,
};

enum : uint { DefaultHistorySize = 500000 }; // Number of files
Expand Down
3 changes: 2 additions & 1 deletion include/EACopyShared.h
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ bool deleteDirectory(const wchar_t* directory, IOStats& ioStats, bool errorO
bool deleteAllFiles(const wchar_t* directory, IOStats& ioStats, bool errorOnMissingFile = true);
bool isAbsolutePath(const wchar_t* path);
bool openFileRead(const wchar_t* fullPath, FileHandle& outFile, IOStats& ioStats, bool useBufferedIO, _OVERLAPPED* overlapped = nullptr, bool isSequentialScan = true, bool sharedRead = true);
bool openFileWrite(const wchar_t* fullPath, FileHandle& outFilee, IOStats& ioStats, bool useBufferedIO, _OVERLAPPED* overlapped = nullptr, bool hidden = false, bool createAlways = true);
bool openFileWrite(const wchar_t* fullPath, FileHandle& outFilee, IOStats& ioStats, bool useBufferedIO, _OVERLAPPED* overlapped = nullptr, bool hidden = false, bool createAlways = true, bool sharedRead = false);
bool writeFile(const wchar_t* fullPath, FileHandle& file, const void* data, u64 dataSize, IOStats& ioStats, _OVERLAPPED* overlapped = nullptr);
bool readFile(const wchar_t* fullPath, FileHandle& file, void* destData, u64 toRead, u64& read, IOStats& ioStats);
bool setFileLastWriteTime(const wchar_t* fullPath, FileHandle& file, FileTime lastWriteTime, IOStats& ioStats);
Expand Down Expand Up @@ -381,6 +381,7 @@ class HashBuilder
// Logging

void logErrorf(const wchar_t* fmt, ...);
void logFlush();
void logInfo(const wchar_t* str);
void logInfof(const wchar_t* fmt, ...);
void logInfoLinef(const wchar_t* fmt, ...);
Expand Down
3 changes: 3 additions & 0 deletions source/EACopy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -581,7 +581,10 @@ int main(int argc, char* argv_[])


if (stats.destServerUsed || stats.sourceServerUsed)
{
logInfoLinef(L" Server found (%ls)", stats.info.c_str());
logInfoLinef(L" Number of files (copied/linked/skipped) processed by server: %lu", stats.processedByServerCount);
}
else if (stats.serverAttempt && !stats.destServerUsed)
logInfoLinef(L" Server not found (Spent ~%ls trying to connect. Use /NOSERVER to disable attempt)", toHourMinSec(stats.connectTime/max(1, (int)settings.threadCount)).c_str());
else
Expand Down
51 changes: 43 additions & 8 deletions source/EACopyClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ Client::process(Log& log, ClientStats& outStats)
outStats.linkCount += threadStats.linkCount;
outStats.linkSize += threadStats.linkSize;
outStats.serverAttempt |= threadStats.serverAttempt;
outStats.processedByServerCount += threadStats.processedByServerCount;

outStats.copyTime = max(outStats.copyTime, threadStats.copyTime);
outStats.skipTime = max(outStats.skipTime, threadStats.skipTime);
Expand Down Expand Up @@ -553,9 +554,10 @@ Client::processFile(LogContext& logContext, Connection* sourceConnection, Connec
u64 size;
u64 written;
bool linked;
bool processedByServer;

// Send file to server (might be skipped if server already has it).. returns false if it fails
if (destConnection->sendWriteFileCommand(entry.src.c_str(), entry.dst.c_str(), entry.srcInfo, size, written, linked, copyContext))
if (destConnection->sendWriteFileCommand(entry.src.c_str(), entry.dst.c_str(), entry.srcInfo, size, written, linked, copyContext, processedByServer))
{
if (written)
{
Expand All @@ -569,14 +571,19 @@ Client::processFile(LogContext& logContext, Connection* sourceConnection, Connec
{
reportSkip();
}
if (processedByServer)
{
++stats.processedByServerCount;
}
return true;
}
}
else if (isValid(sourceConnection))
{
u64 size;
u64 read;
switch (sourceConnection->sendReadFileCommand(entry.src.c_str(), entry.dst.c_str(), entry.srcInfo, size, read, copyContext))
bool processedByServer;
switch (sourceConnection->sendReadFileCommand(entry.src.c_str(), entry.dst.c_str(), entry.srcInfo, size, read, copyContext, processedByServer))
{
case Connection::ReadFileResult_Success:
if (read)
Expand All @@ -595,6 +602,10 @@ Client::processFile(LogContext& logContext, Connection* sourceConnection, Connec
++stats.skipCount;
stats.skipSize += size;
}
if (processedByServer)
{
++stats.processedByServerCount;
}
return true;

case Connection::ReadFileResult_Error:
Expand Down Expand Up @@ -1293,9 +1304,10 @@ Client::handleFilesOrWildcardsFromFile(LogContext& logContext, ClientStats& stat
FileInfo srcInfo;
uint srcAttributes;
uint error;
bool processedByServer;
if (!m_sourceConnection->sendGetFileAttributes(fileName.c_str(), srcInfo, srcAttributes, error))
continue;
if (!m_sourceConnection->sendReadFileCommand(originalFullPath.c_str(), fileName.c_str(), srcInfo, fileSize, read, m_copyContext))
if (!m_sourceConnection->sendReadFileCommand(originalFullPath.c_str(), fileName.c_str(), srcInfo, fileSize, read, m_copyContext, processedByServer))
continue;
fullPath = destPath + fileName;
}
Expand Down Expand Up @@ -2050,11 +2062,12 @@ Client::Connection::sendTextCommand(const wchar_t* text)
}

bool
Client::Connection::sendWriteFileCommand(const wchar_t* src, const wchar_t* dst, const FileInfo& srcInfo, u64& outSize, u64& outWritten, bool& outLinked, NetworkCopyContext& copyContext)
Client::Connection::sendWriteFileCommand(const wchar_t* src, const wchar_t* dst, const FileInfo& srcInfo, u64& outSize, u64& outWritten, bool& outLinked, NetworkCopyContext& copyContext, bool &processedByServer)
{
outSize = 0;
outWritten = 0;
outLinked = false;
processedByServer = false;

WriteFileType writeType = m_settings.compressionLevel != 0 ? WriteFileType_Compressed : WriteFileType_Send;

Expand Down Expand Up @@ -2094,18 +2107,23 @@ Client::Connection::sendWriteFileCommand(const wchar_t* src, const wchar_t* dst,
do
{
if (writeResponse == WriteResponse_Skip)
{
processedByServer = true;
return true;
}

if (writeResponse == WriteResponse_Link)
{
outWritten = cmd.info.fileSize;
outLinked = true;
processedByServer = true;
return true;
}

if (writeResponse == WriteResponse_Odx)
{
outWritten = cmd.info.fileSize;
processedByServer = true;
return true;
}

Expand All @@ -2126,7 +2144,7 @@ Client::Connection::sendWriteFileCommand(const wchar_t* src, const wchar_t* dst,
}
++m_stats.netWriteResponseCount[writeResponse];
m_stats.netWriteResponseTime[writeResponse] += netWriteResponseTime;

} while (true);

if (writeResponse == WriteResponse_Copy)
Expand All @@ -2152,6 +2170,8 @@ Client::Connection::sendWriteFileCommand(const wchar_t* src, const wchar_t* dst,
}

outWritten = cmd.info.fileSize;
processedByServer = true;

return true;
}

Expand Down Expand Up @@ -2208,10 +2228,11 @@ Client::Connection::sendWriteFileCommand(const wchar_t* src, const wchar_t* dst,
}

Client::Connection::ReadFileResult
Client::Connection::sendReadFileCommand(const wchar_t* src, const wchar_t* dst, const FileInfo& srcInfo, u64& outSize, u64& outRead, NetworkCopyContext& copyContext)
Client::Connection::sendReadFileCommand(const wchar_t* src, const wchar_t* dst, const FileInfo& srcInfo, u64& outSize, u64& outRead, NetworkCopyContext& copyContext, bool& processedByServer)
{
outSize = 0;
outRead = 0;
processedByServer = false;

// Make src a local path to server
src += m_settings.sourceDirectory.size();
Expand Down Expand Up @@ -2272,8 +2293,18 @@ Client::Connection::sendReadFileCommand(const wchar_t* src, const wchar_t* dst,
if (readResponse == ReadResponse_Hash)
{
Hash hash;
if (!getFileHash(hash, fullDest.c_str(), copyContext, m_stats.ioStats, m_hashContext, m_stats.hashTime))
return ReadFileResult_Error;
// Test file exists before actually calculating hash or we will crash the program and leave the server hanging waiting for the hash input.
// If file doesn't exist, we will just return empty hash info and the server will test for invalid hash.
FILE *file;
if (_wfopen_s(&file, fullDest.c_str(), L"r") == 0)
{
if (file != nullptr)
{
fclose(file);
if (!getFileHash(hash, fullDest.c_str(), copyContext, m_stats.ioStats, m_hashContext, m_stats.hashTime))
return ReadFileResult_Error;
}
}
if (!sendData(m_socket, &hash, sizeof(hash)))
return ReadFileResult_Error;
if (!receiveData(m_socket, &readResponse, sizeof(readResponse)))
Expand All @@ -2284,6 +2315,7 @@ Client::Connection::sendReadFileCommand(const wchar_t* src, const wchar_t* dst,
if (readResponse == ReadResponse_Skip)
{
outSize = cmd.info.fileSize;
processedByServer = true;
return ReadFileResult_Success;
}

Expand Down Expand Up @@ -2314,6 +2346,7 @@ Client::Connection::sendReadFileCommand(const wchar_t* src, const wchar_t* dst,

outRead = newFileSize;
outSize = newFileSize;
processedByServer = success;
return success ? ReadFileResult_Success : ReadFileResult_Error;
}
else if (readResponse == ReadResponse_CopyUsingSmb)
Expand All @@ -2326,6 +2359,7 @@ Client::Connection::sendReadFileCommand(const wchar_t* src, const wchar_t* dst,
return ReadFileResult_Error;
outRead = written;
outSize = written;
processedByServer = true;
return ReadFileResult_Success;
}
else // ReadResponse_CopyDelta
Expand All @@ -2340,6 +2374,7 @@ Client::Connection::sendReadFileCommand(const wchar_t* src, const wchar_t* dst,
return ReadFileResult_Error;
outSize = newFileSize;
outRead = newFileSize;
processedByServer = true;
m_stats.recvTime += recvStats.recvTime;
m_stats.recvSize += recvStats.recvSize;
m_stats.decompressTime += recvStats.decompressTime;
Expand Down
Loading

0 comments on commit 77cdc0b

Please sign in to comment.