diff --git a/Foundation/Foundation_vs140.vcxproj b/Foundation/Foundation_vs140.vcxproj index 385a231908..1ce7d199bf 100644 --- a/Foundation/Foundation_vs140.vcxproj +++ b/Foundation/Foundation_vs140.vcxproj @@ -1385,6 +1385,7 @@ + true diff --git a/Foundation/Foundation_vs150.vcxproj b/Foundation/Foundation_vs150.vcxproj index 597ce98b1e..960d00aba2 100644 --- a/Foundation/Foundation_vs150.vcxproj +++ b/Foundation/Foundation_vs150.vcxproj @@ -1385,6 +1385,7 @@ + true diff --git a/Foundation/Foundation_vs160.vcxproj b/Foundation/Foundation_vs160.vcxproj index 905849d71a..99854dff36 100644 --- a/Foundation/Foundation_vs160.vcxproj +++ b/Foundation/Foundation_vs160.vcxproj @@ -1391,6 +1391,7 @@ + true diff --git a/Foundation/Foundation_vs170.vcxproj b/Foundation/Foundation_vs170.vcxproj index b77d103e07..0eac76c958 100644 --- a/Foundation/Foundation_vs170.vcxproj +++ b/Foundation/Foundation_vs170.vcxproj @@ -1950,6 +1950,7 @@ + true diff --git a/Foundation/Makefile b/Foundation/Makefile index 16a45cd05a..447fa3629b 100644 --- a/Foundation/Makefile +++ b/Foundation/Makefile @@ -6,7 +6,7 @@ include $(POCO_BASE)/build/rules/global -objects = ArchiveStrategy Ascii ASCIIEncoding AsyncChannel \ +objects = ArchiveStrategy Ascii ASCIIEncoding AsyncChannel ActiveThreadPool\ Base32Decoder Base32Encoder Base64Decoder Base64Encoder \ BinaryReader BinaryWriter Bugcheck ByteOrder Channel Checksum Clock Configurable ConsoleChannel \ Condition CountingStream DateTime LocalDateTime DateTimeFormat DateTimeFormatter DateTimeParser \ diff --git a/Foundation/include/Poco/AbstractEvent.h b/Foundation/include/Poco/AbstractEvent.h index d3724c9095..032f1477d8 100644 --- a/Foundation/include/Poco/AbstractEvent.h +++ b/Foundation/include/Poco/AbstractEvent.h @@ -339,9 +339,8 @@ class AbstractEvent } NotifyAsyncParams params = par; - TArgs retArgs(params.args); - params.ptrStrat->notify(params.pSender, retArgs); - return retArgs; + params.ptrStrat->notify(params.pSender, params.args); + return params.args; } TStrategy _strategy; /// The strategy used to notify observers. diff --git a/Foundation/include/Poco/ActiveStarter.h b/Foundation/include/Poco/ActiveStarter.h index c1e2117384..66c300ac00 100644 --- a/Foundation/include/Poco/ActiveStarter.h +++ b/Foundation/include/Poco/ActiveStarter.h @@ -19,7 +19,7 @@ #include "Poco/Foundation.h" -#include "Poco/ThreadPool.h" +#include "Poco/ActiveThreadPool.h" #include "Poco/ActiveRunnable.h" @@ -36,7 +36,7 @@ class ActiveStarter public: static void start(OwnerType* /*pOwner*/, ActiveRunnableBase::Ptr pRunnable) { - ThreadPool::defaultPool().start(*pRunnable); + ActiveThreadPool::defaultPool().start(*pRunnable); pRunnable->duplicate(); // The runnable will release itself. } }; diff --git a/Foundation/include/Poco/ActiveThreadPool.h b/Foundation/include/Poco/ActiveThreadPool.h new file mode 100644 index 0000000000..5c04a8ff02 --- /dev/null +++ b/Foundation/include/Poco/ActiveThreadPool.h @@ -0,0 +1,145 @@ +// +// ActiveThreadPool.h +// +// Library: Foundation +// Package: Threading +// Module: ActiveThreadPool +// +// Definition of the ActiveThreadPool class. +// +// Copyright (c) 2004-2006, Applied Informatics Software Engineering GmbH. +// and Contributors. +// +// SPDX-License-Identifier: BSL-1.0 +// + + +#ifndef Foundation_ActiveThreadPool_INCLUDED +#define Foundation_ActiveThreadPool_INCLUDED + + +#include "Poco/Foundation.h" +#include "Poco/Thread.h" +#include "Poco/Mutex.h" +#include "Poco/Environment.h" +#include + + +namespace Poco { + + +class Runnable; +class ActiveThread; + + +class Foundation_API ActiveThreadPool + /// A thread pool always keeps a number of threads running, ready + /// to accept work. + /// Threads in an active thread pool are re-used + /// Every thread in the pool has own notification-queue with Runnable + /// Every Runnable executes on next thread (round-robin model) + /// The thread pool always keeps fixed number of threads running. + /// Use case for this pool is running many (more than os-max-thread-count) short live tasks + /// Round-robin model allow efficiently utilize cpu cores +{ +public: + ActiveThreadPool(int capacity = static_cast(Environment::processorCount()) + 1, + int stackSize = POCO_THREAD_STACK_SIZE); + /// Creates a thread pool with fixed capacity threads. + /// Threads are created with given stack size. + + ActiveThreadPool(std::string name, + int capacity = static_cast(Environment::processorCount()) + 1, + int stackSize = POCO_THREAD_STACK_SIZE); + /// Creates a thread pool with the given name and fixed capacity threads. + /// Threads are created with given stack size. + + ~ActiveThreadPool(); + /// Currently running threads will remain active + /// until they complete. + + int capacity() const; + /// Returns the capacity of threads. + + int getStackSize() const; + /// Returns the stack size used to create new threads. + + void start(Runnable& target); + /// Obtains a thread and starts the target. + + void start(Runnable& target, const std::string& name); + /// Obtains a thread and starts the target. + /// Assigns the given name to the thread. + + void startWithPriority(Thread::Priority priority, Runnable& target); + /// Obtains a thread, adjusts the thread's priority, and starts the target. + + void startWithPriority(Thread::Priority priority, Runnable& target, const std::string& name); + /// Obtains a thread, adjusts the thread's priority, and starts the target. + /// Assigns the given name to the thread. + + void stopAll(); + /// Stops all running threads and waits for their completion. + /// + /// Will also delete all thread objects. + /// If used, this method should be the last action before + /// the thread pool is deleted. + /// + /// Note: If a thread fails to stop within 10 seconds + /// (due to a programming error, for example), the + /// underlying thread object will not be deleted and + /// this method will return anyway. This allows for a + /// more or less graceful shutdown in case of a misbehaving + /// thread. + + void joinAll(); + /// Waits for all threads to complete. + /// + /// Note that this will join() underlying + /// threads and restart them for next tasks. + + const std::string& name() const; + /// Returns the name of the thread pool, + /// or an empty string if no name has been + /// specified in the constructor. + + static ActiveThreadPool& defaultPool(); + /// Returns a reference to the default + /// thread pool. + +protected: + ActiveThread* getThread(); + ActiveThread* createThread(); + +private: + ActiveThreadPool(const ActiveThreadPool& pool); + ActiveThreadPool& operator = (const ActiveThreadPool& pool); + + typedef std::vector ThreadVec; + + std::string _name; + int _capacity; + int _serial; + int _stackSize; + ThreadVec _threads; + mutable FastMutex _mutex; + std::atomic _lastThreadIndex{0}; +}; + + +inline int ActiveThreadPool::getStackSize() const +{ + return _stackSize; +} + + +inline const std::string& ActiveThreadPool::name() const +{ + return _name; +} + + +} // namespace Poco + + +#endif // Foundation_ActiveThreadPool_INCLUDED diff --git a/Foundation/include/Poco/DefaultStrategy.h b/Foundation/include/Poco/DefaultStrategy.h index 905acacc4a..ede9d051e4 100644 --- a/Foundation/include/Poco/DefaultStrategy.h +++ b/Foundation/include/Poco/DefaultStrategy.h @@ -147,6 +147,11 @@ class DefaultStrategy: public NotificationStrategy: public NotificationStrategy { } + FIFOStrategy(FIFOStrategy&& s): + DefaultStrategy(std::move(s)) + { + } + ~FIFOStrategy() { } @@ -50,6 +55,12 @@ class FIFOStrategy: public DefaultStrategy DefaultStrategy::operator = (s); return *this; } + + FIFOStrategy& operator = (FIFOStrategy&& s) + { + DefaultStrategy::operator = (s); + return *this; + } }; diff --git a/Foundation/src/ActiveThreadPool.cpp b/Foundation/src/ActiveThreadPool.cpp new file mode 100644 index 0000000000..b7a9170fd7 --- /dev/null +++ b/Foundation/src/ActiveThreadPool.cpp @@ -0,0 +1,363 @@ +// +// ActiveThreadPool.cpp +// +// Library: Foundation +// Package: Threading +// Module: ActiveThreadPool +// +// Copyright (c) 2004-2006, Applied Informatics Software Engineering GmbH. +// and Contributors. +// +// SPDX-License-Identifier: BSL-1.0 +// + + +#include "Poco/ActiveThreadPool.h" +#include "Poco/Runnable.h" +#include "Poco/Thread.h" +#include "Poco/Event.h" +#include "Poco/ThreadLocal.h" +#include "Poco/ErrorHandler.h" +#include "Poco/NotificationQueue.h" +#include +#include +#include + +namespace Poco { + +class NewActionNotification: public Notification +{ +public: + NewActionNotification(Thread::Priority priority, Runnable &runnable, std::string name) : + _priority(priority), + _runnable(runnable), + _name(std::move(name)) + { } + + ~NewActionNotification() override = default; + + Runnable& runnable() const + { + return _runnable; + } + + Thread::Priority priotity() const + { + return _priority; + } + + const std::string &threadName() const + { + return _name; + } + + std::string threadFullName() const + { + std::string fullName(_name); + if (_name.empty()) + { + fullName = _name; + } + else + { + fullName.append(" ("); + fullName.append(_name); + fullName.append(")"); + } + return fullName; + } + +private: + Thread::Priority _priority; + Runnable &_runnable; + std::string _name; +}; + +class ActiveThread: public Runnable +{ +public: + ActiveThread(const std::string& name, int stackSize = POCO_THREAD_STACK_SIZE); + ~ActiveThread() override = default; + + void start(); + void start(Thread::Priority priority, Runnable& target); + void start(Thread::Priority priority, Runnable& target, const std::string& name); + void join(); + void release(); + void run() override; + +private: + NotificationQueue _pTargetQueue; + std::string _name; + Thread _thread; + Event _targetCompleted; + FastMutex _mutex; + const long JOIN_TIMEOUT = 10000; + std::atomic _needToStop{false}; +}; + + +ActiveThread::ActiveThread(const std::string& name, int stackSize): + _name(name), + _thread(name), + _targetCompleted(false) +{ + poco_assert_dbg (stackSize >= 0); + _thread.setStackSize(stackSize); +} + +void ActiveThread::start() +{ + _needToStop = false; + _thread.start(*this); +} + + +void ActiveThread::start(Thread::Priority priority, Runnable& target) +{ + _pTargetQueue.enqueueNotification(Poco::makeAuto(priority, target, _name)); +} + + +void ActiveThread::start(Thread::Priority priority, Runnable& target, const std::string& name) +{ + _pTargetQueue.enqueueNotification(Poco::makeAuto(priority, target, name)); +} + +void ActiveThread::join() +{ + _pTargetQueue.wakeUpAll(); + if (!_pTargetQueue.empty()) + { + _targetCompleted.wait(); + } + +} + + +void ActiveThread::release() +{ + // In case of a statically allocated thread pool (such + // as the default thread pool), Windows may have already + // terminated the thread before we got here. + if (_thread.isRunning()) + { + _needToStop = true; + _pTargetQueue.wakeUpAll(); + if (!_pTargetQueue.empty()) + _targetCompleted.wait(JOIN_TIMEOUT); + } + + if (_thread.tryJoin(JOIN_TIMEOUT)) + { + delete this; + } +} + + +void ActiveThread::run() +{ + do { + auto *_pTarget = dynamic_cast(_pTargetQueue.waitDequeueNotification()); + while (_pTarget) + { + Runnable* pTarget = &_pTarget->runnable(); + _thread.setPriority(_pTarget->priotity()); + _thread.setName(_pTarget->name()); + try + { + pTarget->run(); + } + catch (Exception& exc) + { + ErrorHandler::handle(exc); + } + catch (std::exception& exc) + { + ErrorHandler::handle(exc); + } + catch (...) + { + ErrorHandler::handle(); + } + _pTarget->release(); + _thread.setName(_name); + _thread.setPriority(Thread::PRIO_NORMAL); + ThreadLocalStorage::clear(); + _pTarget = dynamic_cast(_pTargetQueue.waitDequeueNotification(1000)); + } + _targetCompleted.set(); + } + while (_needToStop == false); +} + + +ActiveThreadPool::ActiveThreadPool(int capacity, int stackSize): + _capacity(capacity), + _serial(0), + _stackSize(stackSize), + _lastThreadIndex(0) +{ + poco_assert (_capacity >= 1); + + _threads.reserve(_capacity); + + for (int i = 0; i < _capacity; i++) + { + ActiveThread* pThread = createThread(); + _threads.push_back(pThread); + pThread->start(); + } +} + + +ActiveThreadPool::ActiveThreadPool(std::string name, int capacity, int stackSize): + _name(std::move(name)), + _capacity(capacity), + _serial(0), + _stackSize(stackSize), + _lastThreadIndex(0) +{ + poco_assert (_capacity >= 1); + + _threads.reserve(_capacity); + + for (int i = 0; i < _capacity; i++) + { + ActiveThread* pThread = createThread(); + _threads.push_back(pThread); + pThread->start(); + } +} + + +ActiveThreadPool::~ActiveThreadPool() +{ + try + { + stopAll(); + } + catch (...) + { + poco_unexpected(); + } +} + + +int ActiveThreadPool::capacity() const +{ + return _capacity; +} + + +void ActiveThreadPool::start(Runnable& target) +{ + getThread()->start(Thread::PRIO_NORMAL, target); +} + + +void ActiveThreadPool::start(Runnable& target, const std::string& name) +{ + getThread()->start(Thread::PRIO_NORMAL, target, name); +} + + +void ActiveThreadPool::startWithPriority(Thread::Priority priority, Runnable& target) +{ + getThread()->start(priority, target); +} + + +void ActiveThreadPool::startWithPriority(Thread::Priority priority, Runnable& target, const std::string& name) +{ + getThread()->start(priority, target, name); +} + + +void ActiveThreadPool::stopAll() +{ + FastMutex::ScopedLock lock(_mutex); + + for (auto pThread: _threads) + { + pThread->release(); + } + _threads.clear(); +} + + +void ActiveThreadPool::joinAll() +{ + FastMutex::ScopedLock lock(_mutex); + + for (auto pThread: _threads) + { + pThread->join(); + } + + _threads.clear(); + _threads.reserve(_capacity); + + for (int i = 0; i < _capacity; i++) + { + ActiveThread* pThread = createThread(); + _threads.push_back(pThread); + pThread->start(); + } +} + +ActiveThread* ActiveThreadPool::getThread() +{ + auto thrSize = _threads.size(); + auto i = (_lastThreadIndex++) % thrSize; + ActiveThread* pThread = _threads[i]; + return pThread; +} + + +ActiveThread* ActiveThreadPool::createThread() +{ + std::ostringstream name; + name << _name << "[#active-thread-" << ++_serial << "]"; + return new ActiveThread(name.str(), _stackSize); +} + + +class ActiveThreadPoolSingletonHolder +{ +public: + ActiveThreadPoolSingletonHolder() = default; + ~ActiveThreadPoolSingletonHolder() + { + delete _pPool; + } + ActiveThreadPool* pool() + { + FastMutex::ScopedLock lock(_mutex); + + if (!_pPool) + { + _pPool = new ActiveThreadPool("default-active"); + } + return _pPool; + } + +private: + ActiveThreadPool* _pPool{nullptr}; + FastMutex _mutex; +}; + + +namespace +{ + static ActiveThreadPoolSingletonHolder sh; +} + + +ActiveThreadPool& ActiveThreadPool::defaultPool() +{ + return *sh.pool(); +} + + +} // namespace Poco diff --git a/Foundation/src/NotificationQueue.cpp b/Foundation/src/NotificationQueue.cpp index 88a182a382..c365baeab6 100644 --- a/Foundation/src/NotificationQueue.cpp +++ b/Foundation/src/NotificationQueue.cpp @@ -45,13 +45,13 @@ void NotificationQueue::enqueueNotification(Notification::Ptr pNotification) FastMutex::ScopedLock lock(_mutex); if (_waitQueue.empty()) { - _nfQueue.push_back(pNotification); + _nfQueue.push_back(std::move(pNotification)); } else { WaitInfo* pWI = _waitQueue.front(); _waitQueue.pop_front(); - pWI->pNf = pNotification; + pWI->pNf = std::move(pNotification); pWI->nfAvailable.set(); } } @@ -63,13 +63,13 @@ void NotificationQueue::enqueueUrgentNotification(Notification::Ptr pNotificatio FastMutex::ScopedLock lock(_mutex); if (_waitQueue.empty()) { - _nfQueue.push_front(pNotification); + _nfQueue.push_front(std::move(pNotification)); } else { WaitInfo* pWI = _waitQueue.front(); _waitQueue.pop_front(); - pWI->pNf = pNotification; + pWI->pNf = std::move(pNotification); pWI->nfAvailable.set(); } } diff --git a/Foundation/testsuite/Makefile-Driver b/Foundation/testsuite/Makefile-Driver index a7a7814faa..e5230f1a5a 100644 --- a/Foundation/testsuite/Makefile-Driver +++ b/Foundation/testsuite/Makefile-Driver @@ -28,7 +28,7 @@ objects = ActiveMethodTest ActivityTest ActiveDispatcherTest \ StreamsTestSuite StringTest StringTokenizerTest TaskTestSuite TaskTest \ TaskManagerTest TestChannel TeeStreamTest UTF8StringTest \ TextConverterTest TextIteratorTest TextBufferIteratorTest TextTestSuite TextEncodingTest \ - ThreadLocalTest ThreadPoolTest ThreadTest ThreadingTestSuite TimerTest \ + ThreadLocalTest ThreadPoolTest ActiveThreadPoolTest ThreadTest ThreadingTestSuite TimerTest \ TimespanTest TimestampTest TimezoneTest URIStreamOpenerTest URITest \ URITestSuite UUIDGeneratorTest UUIDTest UUIDTestSuite ZLibTest \ TestPlugin DummyDelegate BasicEventTest FIFOEventTest PriorityEventTest EventTestSuite \ diff --git a/Foundation/testsuite/TestSuite_vs140.vcxproj b/Foundation/testsuite/TestSuite_vs140.vcxproj index 089e45fc9d..fa71aeb291 100644 --- a/Foundation/testsuite/TestSuite_vs140.vcxproj +++ b/Foundation/testsuite/TestSuite_vs140.vcxproj @@ -723,6 +723,7 @@ + @@ -864,6 +865,7 @@ + @@ -887,4 +889,4 @@ - \ No newline at end of file + diff --git a/Foundation/testsuite/TestSuite_vs150.vcxproj b/Foundation/testsuite/TestSuite_vs150.vcxproj index c67f7c8bf9..03036de2e1 100644 --- a/Foundation/testsuite/TestSuite_vs150.vcxproj +++ b/Foundation/testsuite/TestSuite_vs150.vcxproj @@ -723,6 +723,7 @@ + @@ -864,6 +865,7 @@ + @@ -887,4 +889,4 @@ - \ No newline at end of file + diff --git a/Foundation/testsuite/TestSuite_vs160.vcxproj b/Foundation/testsuite/TestSuite_vs160.vcxproj index 65082cfb7e..ed453e07a5 100644 --- a/Foundation/testsuite/TestSuite_vs160.vcxproj +++ b/Foundation/testsuite/TestSuite_vs160.vcxproj @@ -730,6 +730,7 @@ + @@ -871,6 +872,7 @@ + @@ -894,4 +896,4 @@ - \ No newline at end of file + diff --git a/Foundation/testsuite/TestSuite_vs170.vcxproj b/Foundation/testsuite/TestSuite_vs170.vcxproj index 754b4a1402..b476e428f4 100644 --- a/Foundation/testsuite/TestSuite_vs170.vcxproj +++ b/Foundation/testsuite/TestSuite_vs170.vcxproj @@ -1021,6 +1021,7 @@ + @@ -1162,6 +1163,7 @@ + @@ -1185,4 +1187,4 @@ - \ No newline at end of file + diff --git a/Foundation/testsuite/src/ActiveThreadPoolTest.cpp b/Foundation/testsuite/src/ActiveThreadPoolTest.cpp new file mode 100644 index 0000000000..1bef49d0bb --- /dev/null +++ b/Foundation/testsuite/src/ActiveThreadPoolTest.cpp @@ -0,0 +1,103 @@ +// +// ActiveThreadPoolTest.cpp +// +// Copyright (c) 2004-2023, Applied Informatics Software Engineering GmbH. +// and Contributors. +// +// SPDX-License-Identifier: BSL-1.0 +// + + +#include "ActiveThreadPoolTest.h" +#include "CppUnit/TestCaller.h" +#include "CppUnit/TestSuite.h" +#include "Poco/ActiveThreadPool.h" +#include "Poco/RunnableAdapter.h" +#include "Poco/Exception.h" +#include "Poco/Thread.h" +#include "Poco/Environment.h" + + +using Poco::ActiveThreadPool; +using Poco::RunnableAdapter; +using Poco::Thread; +using Poco::Environment; + + +ActiveThreadPoolTest::ActiveThreadPoolTest(const std::string& name): CppUnit::TestCase(name) +{ +} + + +ActiveThreadPoolTest::~ActiveThreadPoolTest() +{ +} + + +void ActiveThreadPoolTest::testActiveThreadPool() +{ + ActiveThreadPool pool; + + assertTrue (pool.capacity() == static_cast(Environment::processorCount()) + 1); + + RunnableAdapter ra(*this, &ActiveThreadPoolTest::count); + + try + { + for (int i = 0; i < 2000; ++i) + { + pool.start(ra); + } + } + catch (...) + { + failmsg("wrong exception thrown"); + } + + pool.joinAll(); + + assertTrue (_count == 2000); + + _count = 0; + try + { + for (int i = 0; i < 1000; ++i) + { + pool.start(ra); + } + } + catch (...) + { + failmsg("wrong exception thrown"); + } + pool.joinAll(); + + assertTrue (_count == 1000); +} + + +void ActiveThreadPoolTest::setUp() +{ + _count = 0; +} + + +void ActiveThreadPoolTest::tearDown() +{ +} + + +void ActiveThreadPoolTest::count() +{ + ++_count; +} + + +CppUnit::Test* ActiveThreadPoolTest::suite() +{ + CppUnit::TestSuite* pSuite = new CppUnit::TestSuite("ActiveThreadPoolTest"); + + CppUnit_addTest(pSuite, ActiveThreadPoolTest, testActiveThreadPool); + + return pSuite; +} diff --git a/Foundation/testsuite/src/ActiveThreadPoolTest.h b/Foundation/testsuite/src/ActiveThreadPoolTest.h new file mode 100644 index 0000000000..51df837355 --- /dev/null +++ b/Foundation/testsuite/src/ActiveThreadPoolTest.h @@ -0,0 +1,44 @@ +// +// ActiveThreadPoolTest.h +// +// Definition of the ActiveThreadPoolTest class. +// +// Copyright (c) 2004-2023, Applied Informatics Software Engineering GmbH. +// and Contributors. +// +// SPDX-License-Identifier: BSL-1.0 +// + + +#ifndef ActiveThreadPoolTest_INCLUDED +#define ActiveThreadPoolTest_INCLUDED + + +#include "Poco/Foundation.h" +#include "CppUnit/TestCase.h" +#include "Poco/Event.h" +#include "Poco/AtomicCounter.h" + + +class ActiveThreadPoolTest: public CppUnit::TestCase +{ +public: + ActiveThreadPoolTest(const std::string& name); + ~ActiveThreadPoolTest(); + + void testActiveThreadPool(); + + void setUp(); + void tearDown(); + + static CppUnit::Test* suite(); + +protected: + void count(); + +private: + Poco::AtomicCounter _count; +}; + + +#endif // ActiveThreadPoolTest_INCLUDED diff --git a/Foundation/testsuite/src/FIFOEventTest.cpp b/Foundation/testsuite/src/FIFOEventTest.cpp index 9f1bd73acd..7bc5c0d941 100644 --- a/Foundation/testsuite/src/FIFOEventTest.cpp +++ b/Foundation/testsuite/src/FIFOEventTest.cpp @@ -16,6 +16,8 @@ #include "Poco/Expire.h" #include "Poco/Thread.h" #include "Poco/Exception.h" +#include "Poco/Stopwatch.h" +#include using namespace Poco; @@ -347,6 +349,35 @@ void FIFOEventTest::testAsyncNotify() assertTrue (_count == LARGEINC); } +void FIFOEventTest::testAsyncNotifyBenchmark() +{ + Poco::FIFOEvent simple; + simple += delegate(this, &FIFOEventTest::onAsyncBench); + assertTrue (_count == 0); + const int cnt = 10000; + int runCount = 1000; + const Poco::Int64 allCount = cnt * runCount; + Poco::Stopwatch sw; + sw.restart(); + while (runCount-- > 0) + { + std::vector> vresult; + vresult.reserve(cnt); + for (int i = 0; i < cnt; ++i) + { + vresult.push_back(simple.notifyAsync(this, i)); + } + + for (int i = 0; i < cnt; ++i) + { + vresult[i].wait(); + assertTrue (vresult[i].data() == (i*2)); + } + } + sw.stop(); + std::cout << "notify and wait time = " << sw.elapsed() / 1000 << std::endl; + assertTrue (_count == allCount); +} void FIFOEventTest::onVoid(const void* pSender) { @@ -402,6 +433,11 @@ void FIFOEventTest::onAsync(const void* pSender, int& i) _count += LARGEINC ; } +void FIFOEventTest::onAsyncBench(const void* pSender, int& i) +{ + ++_count; + i *= 2; +} int FIFOEventTest::getCount() const { @@ -446,5 +482,6 @@ CppUnit::Test* FIFOEventTest::suite() CppUnit_addTest(pSuite, FIFOEventTest, testExpireReRegister); CppUnit_addTest(pSuite, FIFOEventTest, testOverwriteDelegate); CppUnit_addTest(pSuite, FIFOEventTest, testAsyncNotify); + CppUnit_addTest(pSuite, FIFOEventTest, testAsyncNotifyBenchmark); return pSuite; } diff --git a/Foundation/testsuite/src/FIFOEventTest.h b/Foundation/testsuite/src/FIFOEventTest.h index 84113cd78f..f66d6a802a 100644 --- a/Foundation/testsuite/src/FIFOEventTest.h +++ b/Foundation/testsuite/src/FIFOEventTest.h @@ -45,6 +45,7 @@ class FIFOEventTest: public CppUnit::TestCase void testReturnParams(); void testOverwriteDelegate(); void testAsyncNotify(); + void testAsyncNotifyBenchmark(); void setUp(); void tearDown(); @@ -60,10 +61,11 @@ class FIFOEventTest: public CppUnit::TestCase void onConstComplex(const void* pSender, const Poco::EventArgs*& i); void onConst2Complex(const void* pSender, const Poco::EventArgs * const & i); void onAsync(const void* pSender, int& i); + void onAsyncBench(const void* pSender, int& i); int getCount() const; private: - std::atomic _count; + std::atomic _count; }; diff --git a/Foundation/testsuite/src/ThreadingTestSuite.cpp b/Foundation/testsuite/src/ThreadingTestSuite.cpp index 18bc6fd1c3..11936a86ea 100644 --- a/Foundation/testsuite/src/ThreadingTestSuite.cpp +++ b/Foundation/testsuite/src/ThreadingTestSuite.cpp @@ -19,6 +19,7 @@ #include "ActiveMethodTest.h" #include "ActiveDispatcherTest.h" #include "ConditionTest.h" +#include "ActiveThreadPoolTest.h" CppUnit::Test* ThreadingTestSuite::suite() @@ -35,6 +36,7 @@ CppUnit::Test* ThreadingTestSuite::suite() pSuite->addTest(ActiveMethodTest::suite()); pSuite->addTest(ActiveDispatcherTest::suite()); pSuite->addTest(ConditionTest::suite()); + pSuite->addTest(ActiveThreadPoolTest::suite()); return pSuite; }