Skip to content

Commit

Permalink
add v1.3.1
Browse files Browse the repository at this point in the history
  • Loading branch information
junstor committed Oct 28, 2022
1 parent d98f434 commit cbb4c0d
Show file tree
Hide file tree
Showing 8 changed files with 71 additions and 30 deletions.
8 changes: 0 additions & 8 deletions base/FPTimer.epoll.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -213,14 +213,6 @@ bool Timer::initEpoll()

void Timer::stopEpoll()
{
if (_eventNotifyFds[0])
{
close(_eventNotifyFds[1]);
close(_eventNotifyFds[0]);
_eventNotifyFds[0] = 0;
_eventNotifyFds[1] = 0;
}

if (_epoll_fd)
{
close(_epoll_fd);
Expand Down
13 changes: 12 additions & 1 deletion base/FPTimer.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,18 @@ namespace fpnn {
TimerPtr timer(new Timer());
return timer->init(threadPool) ? timer : nullptr;
}
~Timer() { stop(); }
~Timer()
{
stop();

if (_eventNotifyFds[0])
{
close(_eventNotifyFds[1]);
close(_eventNotifyFds[0]);
_eventNotifyFds[0] = 0;
_eventNotifyFds[1] = 0;
}
}

bool init(ITaskThreadPoolPtr threadPool);

Expand Down
10 changes: 1 addition & 9 deletions base/FPTimer.kqueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -202,15 +202,7 @@ bool Timer::initKqueue()
}

void Timer::stopKqueue()
{
if (_eventNotifyFds[0])
{
close(_eventNotifyFds[1]);
close(_eventNotifyFds[0]);
_eventNotifyFds[0] = 0;
_eventNotifyFds[1] = 0;
}

{
if (_kqueue_fd)
{
close(_kqueue_fd);
Expand Down
9 changes: 9 additions & 0 deletions changes.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
### v.1.3.1

#### 修复

* 满尺寸的UDP包,因为重发默认是组装包格式,所以因剩余空间小2个字节,导致无法重发的问题。


-----------

### v.1.3.0

#### 增加
Expand Down
2 changes: 1 addition & 1 deletion core/Config.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

namespace fpnn
{
#define FPNN_SERVER_VERSION "1.3.0"
#define FPNN_SERVER_VERSION "1.3.1"

//in second
#define FPNN_DEFAULT_QUEST_TIMEOUT (5)
Expand Down
13 changes: 12 additions & 1 deletion core/UDP.v2/UDPIOBuffer.v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -522,11 +522,22 @@ bool UDPIOBuffer::prepareResentPackage_normalMode()
UDPPackage* package;
if (_protocolVersion > 1)
{
if (_unconformedMap.prepareSendingBuffer(_MTU, _resendThreshold, _currentSendingBuffer))
bool requireSingleResending;
if (_unconformedMap.prepareSendingBuffer(_MTU, _resendThreshold, _currentSendingBuffer, requireSingleResending))
{
_resentCount -= (int)(_currentSendingBuffer->assembledPackages.size());
return true;
}
else if (requireSingleResending)
{
package = _unconformedMap.fetchFirstResendPackage(_resendThreshold, seqNum);
if (package)
{
_currentSendingBuffer->resendPackage(seqNum, package);
_resentCount -= 1;
return true;
}
}
}
else
{
Expand Down
29 changes: 24 additions & 5 deletions core/UDP.v2/UDPUnconformedMap.v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,14 @@ void UDPUnconformedMap::insert(uint32_t seqNum, UDPPackage* package)
_sentQueue.push_back(node);
}

void UDPUnconformedMap::fetchResendPackages(int freeSpace, int64_t threshold, std::list<UDPUnconformedMap::PackageNode*>& canbeAssembledPackages)
void UDPUnconformedMap::fetchResendPackages(int freeSpace, int64_t threshold, bool& checkRequireSingleResending, std::list<UDPUnconformedMap::PackageNode*>& canbeAssembledPackages)
{
const int assembledSectionExtraBytes = ARQConstant::AssembledPackageLengthFieldSize - 1; //-- 1: version field size.
const int mimimumSpaceRequire = assembledSectionExtraBytes + ARQConstant::PackageMimimumLength;

bool requireChekSingleResending = checkRequireSingleResending;
checkRequireSingleResending = false;

if (freeSpace < ARQConstant::PackageMimimumLength + assembledSectionExtraBytes)
return;

Expand All @@ -98,7 +101,15 @@ void UDPUnconformedMap::fetchResendPackages(int freeSpace, int64_t threshold, st
break;
}
else
{
if (requireChekSingleResending && canbeAssembledPackages.empty())
{
checkRequireSingleResending = true;
return;
}

it++;
}
}
else
break;
Expand Down Expand Up @@ -127,6 +138,7 @@ void UDPUnconformedMap::assemblePackages(UDPPackage* package,
sendingBuffer->assemblePackage(node->package);
}

/*
void UDPUnconformedMap::assemblePackages(std::set<UDPUnconformedMap::PackageNode*>& selectedPackages,
std::list<UDPUnconformedMap::PackageNode*>& supplementaryPackages, CurrentSendingBuffer* sendingBuffer)
{
Expand All @@ -139,28 +151,35 @@ void UDPUnconformedMap::assemblePackages(std::set<UDPUnconformedMap::PackageNode
for (auto node: supplementaryPackages)
sendingBuffer->assemblePackage(node->package);
}
*/

bool UDPUnconformedMap::prepareSendingBuffer(int MTU, int64_t threshold, UDPPackage* package, CurrentSendingBuffer* sendingBuffer)
{
int freeSpace = MTU - ARQConstant::AssembledPackageHeaderSize - (int)(package->len);

bool checkRequireSingleResending = false;
std::list<UDPUnconformedMap::PackageNode*> canbeAssembledPackages;
fetchResendPackages(freeSpace, threshold, canbeAssembledPackages);
fetchResendPackages(freeSpace, threshold, checkRequireSingleResending, canbeAssembledPackages);
if (canbeAssembledPackages.empty())
return false;

assemblePackages(package, canbeAssembledPackages, sendingBuffer);
return true;
}

bool UDPUnconformedMap::prepareSendingBuffer(int MTU, int64_t threshold, CurrentSendingBuffer* sendingBuffer)
bool UDPUnconformedMap::prepareSendingBuffer(int MTU, int64_t threshold, CurrentSendingBuffer* sendingBuffer, bool& requireSingleResending)
{
int freeSpace = MTU - ARQConstant::AssembledPackageHeaderSize;

requireSingleResending = false;
bool checkRequireSingleResending = true;
std::list<UDPUnconformedMap::PackageNode*> canbeAssembledPackages;
fetchResendPackages(freeSpace, threshold, canbeAssembledPackages);
fetchResendPackages(freeSpace, threshold, checkRequireSingleResending, canbeAssembledPackages);
if (canbeAssembledPackages.empty())
{
requireSingleResending = checkRequireSingleResending;
return false;
}

assemblePackages(NULL, canbeAssembledPackages, sendingBuffer);
return true;
Expand Down Expand Up @@ -218,7 +237,7 @@ void UDPUnconformedMap::cleanByAcks(const std::unordered_set<uint32_t>& acks, in
}
}

UDPPackage* UDPUnconformedMap::v1_fetchResentPackage_normalMode(int64_t threshold, uint32_t& seqNum)
UDPPackage* UDPUnconformedMap::fetchFirstResendPackage(int64_t threshold, uint32_t& seqNum)
{
PackageNode* target = NULL;
for (auto it = _sentQueue.begin(); it != _sentQueue.end(); )
Expand Down
17 changes: 12 additions & 5 deletions core/UDP.v2/UDPUnconformedMap.v2.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,17 @@ namespace fpnn
ResendTracer _resendTracer;
bool _enableExpireCheck;

void fetchResendPackages(int freeSpace, int64_t threshold, std::list<PackageNode*>& canbeAssembledPackages);
/*
* checkRequireSingleResending:
in: check first resendable package is full size or not. If which is full size, it requiring to be single resent.
out: if in parameter is set true: first resendable package is required to be single resent or not;
when in parameter is set false, out patameter will be ignored.
*/
void fetchResendPackages(int freeSpace, int64_t threshold, bool& checkRequireSingleResending, std::list<PackageNode*>& canbeAssembledPackages);
void assemblePackages(UDPPackage* package, std::list<PackageNode*>& canbeAssembledPackages,
CurrentSendingBuffer* sendingBuffer);
void assemblePackages(std::set<PackageNode*>& selectedPackages,
std::list<PackageNode*>& supplementaryPackages, CurrentSendingBuffer* sendingBuffer);
//void assemblePackages(std::set<PackageNode*>& selectedPackages,
// std::list<PackageNode*>& supplementaryPackages, CurrentSendingBuffer* sendingBuffer);

public:
UDPUnconformedMap(): _enableExpireCheck(true) {}
Expand All @@ -67,13 +73,14 @@ namespace fpnn
//-- Only can insert reliable package.
void insert(uint32_t seqNum, UDPPackage* package);
bool prepareSendingBuffer(int MTU, int64_t threshold, UDPPackage* package, CurrentSendingBuffer* sendingBuffer);
bool prepareSendingBuffer(int MTU, int64_t threshold, CurrentSendingBuffer* sendingBuffer);
bool prepareSendingBuffer(int MTU, int64_t threshold, CurrentSendingBuffer* sendingBuffer, bool& requireSingleResending);

void cleanByUNA(uint32_t una, int64_t now, int &count, int64_t &totalDelay);
void cleanByAcks(const std::unordered_set<uint32_t>& acks, int64_t now, int &count, int64_t &totalDelay);

UDPPackage* fetchFirstResendPackage(int64_t threshold, uint32_t& seqNum);
//-- Compatible for protocol version 1.
UDPPackage* v1_fetchResentPackage_normalMode(int64_t threshold, uint32_t& seqNum);
UDPPackage* v1_fetchResentPackage_normalMode(int64_t threshold, uint32_t& seqNum) { return fetchFirstResendPackage(threshold, seqNum); }
};
}

Expand Down

0 comments on commit cbb4c0d

Please sign in to comment.