From 491988f6093beeece825ed33fc0e1b0975a1adba Mon Sep 17 00:00:00 2001 From: Timo Michna Date: Sun, 17 Jul 2016 23:24:20 +0200 Subject: [PATCH 1/6] fixed parameter name --- src/Tidal/WampWatch/Stub/ClientSessionStub.php | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/Tidal/WampWatch/Stub/ClientSessionStub.php b/src/Tidal/WampWatch/Stub/ClientSessionStub.php index 1d90d5f..4f4ba50 100644 --- a/src/Tidal/WampWatch/Stub/ClientSessionStub.php +++ b/src/Tidal/WampWatch/Stub/ClientSessionStub.php @@ -76,11 +76,11 @@ public function subscribe($topicName, callable $callback, $options = null) * * @param $topicName * @param $requestId - * @param $sessionId + * @param $subscriptionId * * @throws UnknownTopicException if the topic is unknown */ - public function completeSubscription($topicName, $requestId = 1, $sessionId = 1) + public function completeSubscription($topicName, $requestId = 1, $subscriptionId = 1) { if (!isset($this->subscriptions[$topicName])) { throw new UnknownTopicException($topicName); @@ -88,7 +88,7 @@ public function completeSubscription($topicName, $requestId = 1, $sessionId = 1) /* @var $futureResult Deferred */ $futureResult = $this->subscriptions[$topicName]; - $result = new SubscribedMessage($requestId, $sessionId); + $result = new SubscribedMessage($requestId, $subscriptionId); $futureResult->resolve($result); } From 97a75e94f68611aed81b4bf2beb1b02d057459be Mon Sep 17 00:00:00 2001 From: Timo Michna Date: Mon, 18 Jul 2016 00:20:36 +0200 Subject: [PATCH 2/6] added SubscriptionCollection --- .../WampWatch/Subscription/Collection.php | 169 ++++++++++++++++ tests/unit/SubscriptionCollectionTest.php | 187 ++++++++++++++++++ 2 files changed, 356 insertions(+) create mode 100644 src/Tidal/WampWatch/Subscription/Collection.php create mode 100644 tests/unit/SubscriptionCollectionTest.php diff --git a/src/Tidal/WampWatch/Subscription/Collection.php b/src/Tidal/WampWatch/Subscription/Collection.php new file mode 100644 index 0000000..bbb52d7 --- /dev/null +++ b/src/Tidal/WampWatch/Subscription/Collection.php @@ -0,0 +1,169 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + * + */ + +namespace Tidal\WampWatch\Subscription; + +use Thruway\Message\SubscribedMessage; +use React\Promise\Deferred; +use React\Promise\Promise; +use Tidal\WampWatch\ClientSessionInterface as ClientSession; +use Tidal\WampWatch\Util; + + +class Collection +{ + /** + * The collection's WAMP client session. + * + * @var ClientSession + */ + private $session; + + /** + * @var array list of subscriptions with topic as key and subscription-id as value + */ + private $subscriptions = []; + + /** + * @var array list of subscriptions callbacks with topic as key + */ + private $subscriptionCallbacks = []; + + /** + * @var bool if the collection is successfully subscribed to all topics + */ + private $isSubscribed = false; + + /** + * @var bool if the collection is currently trying to subscribe to all topics + */ + private $isSubscribing = false; + + + /** + * @var Deferred + */ + private $subscriptionPromise; + + + /** + * Collection constructor. + * + * @param \Tidal\WampWatch\ClientSessionInterface $session + */ + public function __construct(ClientSession $session) + { + $this->session = $session; + } + + /** + * @param string $topic the topic the subscription is for + * @param callable $callback the callback for the topic + */ + public function addSubscription($topic, callable $callback) + { + $this->subscriptions[$topic] = 0; + $this->subscriptionCallbacks[$topic] = $callback; + } + + /** + * Subscribe to all topics added with 'addSubscription'. + * Returns false if already subscribed or curretly subscribing. + * + * @return \React\Promise\Promise + */ + public function subscribe() + { + if (!$this->isSubscribed() && !$this->isSubscribing()) { + + $this->isSubscribing = true; + $this->subscriptionPromise = new Deferred(); + $this->doSubscribe(); + } + + return $this->subscriptionPromise->promise(); + } + + /** + * + */ + protected function doSubscribe() + { + foreach (array_keys($this->subscriptions) as $topic) { + $this->session->subscribe($topic, $this->subscriptionCallbacks[$topic]) + ->done(function (SubscribedMessage $msg) use ($topic) { + + $this->subscriptions[$topic] = $msg->getSubscriptionId(); + $this->subscriptionPromise->notify($topic); + + $this->checkSubscribed(); + }); + } + } + + /** + * @return \React\Promise\Promise|\React\Promise\PromiseInterface + */ + public function unsubscribe() + { + + $resolver = function (callable $resolve) { + $resolve(); + }; + $promise = new Promise($resolver); + + if ($this->isSubscribed()) { + foreach ($this->subscriptions as $topic => $subId) { + Util::unsubscribe($this->session, $subId); + } + + $this->isSubscribed = false; + } + + return $promise; + } + + /** + * @return bool + */ + public function isSubscribed() + { + return $this->isSubscribed; + } + + /** + * @return bool + */ + public function isSubscribing() + { + return $this->isSubscribing; + } + + + /** + * Check if all subscriptions have been successfully confirmed. + */ + protected function checkSubscribed() + { + foreach ($this->subscriptions as $topic => $subId) { + if ($subId === 0) { + + return false; + } + } + $this->isSubscribed = true; + $this->isSubscribing = false; + $this->subscriptionPromise->resolve($this->subscriptions); + + return true; + } + +} \ No newline at end of file diff --git a/tests/unit/SubscriptionCollectionTest.php b/tests/unit/SubscriptionCollectionTest.php new file mode 100644 index 0000000..a480506 --- /dev/null +++ b/tests/unit/SubscriptionCollectionTest.php @@ -0,0 +1,187 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + * + */ + +namespace Tidal\WampWatch\tests\unit; + + +use React\Promise\Promise; +use Tidal\WampWatch\Stub\ClientSessionStub; +use Tidal\WampWatch\Subscription\Collection; + +class SubscriptionCollectionTest extends \PHPUnit_Framework_TestCase +{ + + /** + * @var ClientSessionStub + */ + private $session; + + /** + * @var Collection + */ + private $collection; + + public function setup() + { + $this->session = new ClientSessionStub(); + $this->collection = new Collection($this->session); + } + + + // initial state tests + + public function test_at_construction_is_not_subscribed() + { + + $this->assertFalse($this->collection->isSubscribed()); + + } + + public function test_at_construction_is_not_subscribing() + { + + $this->assertFalse($this->collection->isSubscribing()); + + } + + // subscription tests + + public function test_subscribe_returns_promise() + { + + $this->assertInstanceOf(Promise::class, $this->collection->subscribe()); + + } + + public function test_is_not_subscribed_if_not_all_subscriptions_returned() + { + + $this->collection->addSubscription('foo', $this->getEmptyFunc()); + $this->collection->addSubscription('bar', $this->getEmptyFunc()); + + $this->collection->subscribe(); + + $this->session->completeSubscription('foo'); + + $this->assertFalse($this->collection->isSubscribed()); + + } + + + public function test_is_subscribing_if_not_all_subscriptions_returned() + { + + $this->collection->addSubscription('foo', $this->getEmptyFunc()); + $this->collection->addSubscription('bar', $this->getEmptyFunc()); + + $this->collection->subscribe(); + + $this->session->completeSubscription('foo'); + + $this->assertTrue($this->collection->isSubscribing()); + + } + + public function test_is_subscribed_if_all_subscriptions_returned() + { + + $this->collection->addSubscription('foo', $this->getEmptyFunc()); + $this->collection->addSubscription('bar', $this->getEmptyFunc()); + + $this->collection->subscribe(); + + $this->session->completeSubscription('foo', 1, 1); + $this->session->completeSubscription('bar', 2, 2); + + $this->assertTrue($this->collection->isSubscribed(), "Collection should be subscribed."); + + } + + public function test_subscribing_updates_promise() + { + + $callbackNr = 0; + + $this->collection->addSubscription('foo', $this->getEmptyFunc()); + $this->collection->addSubscription('bar', $this->getEmptyFunc()); + + $this->collection->subscribe()->done($this->getEmptyFunc(), $this->getEmptyFunc(), function () use (&$callbackNr) { + $callbackNr++; + }); + + $this->session->completeSubscription('foo', 1, 1); + $this->session->completeSubscription('bar', 2, 2); + + $this->assertEquals(2, $callbackNr, "Collection should update 2 times."); + + } + + public function test_promise_update_returns_topic() + { + + $topic = ""; + + $this->collection->addSubscription('foo', $this->getEmptyFunc()); + + $this->collection->subscribe()->done($this->getEmptyFunc(), $this->getEmptyFunc(), function ($t) use (&$topic) { + $topic = $t; + }); + + $this->session->completeSubscription('foo', 1, 1); + + $this->assertEquals('foo', $topic, "Subscription update should return topic."); + + } + + public function test_promise_resolved_returns_subscriptions_list() + { + + $subs = []; + + $this->collection->addSubscription('foo', $this->getEmptyFunc()); + $this->collection->addSubscription('bar', $this->getEmptyFunc()); + + $this->collection->subscribe()->done(function ($s) use (&$subs) { + $subs = $s; + }); + + $this->session->completeSubscription('foo', 1, 1); + $this->session->completeSubscription('bar', 2, 2); + + $this->assertEquals(['foo' => 1, 'bar' => 2], $subs, "Subscription resolve should return list of subscriptions"); + + } + + public function test_is_not_subscribed_after_unsubscribe() + { + + $subscribed = true; + + $this->collection->addSubscription('foo', $this->getEmptyFunc()); + + $this->collection->subscribe(); + $this->session->completeSubscription('foo', 1, 1); + + $this->collection->unsubscribe()->done(function () use (&$subscribed) { + $subscribed = $this->collection->isSubscribed(); + }); + + $this->assertFalse($subscribed, "Collection should not be subscribed after unsubscribe"); + + } + + + private function getEmptyFunc() + { + return function () { + }; + } + +} From 032388997637e96aef8b9307926bf77a922ac970 Mon Sep 17 00:00:00 2001 From: Timo Michna Date: Mon, 18 Jul 2016 00:31:22 +0200 Subject: [PATCH 3/6] cs fixes --- src/Tidal/WampWatch/Subscription/Collection.php | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/src/Tidal/WampWatch/Subscription/Collection.php b/src/Tidal/WampWatch/Subscription/Collection.php index bbb52d7..04114c5 100644 --- a/src/Tidal/WampWatch/Subscription/Collection.php +++ b/src/Tidal/WampWatch/Subscription/Collection.php @@ -17,7 +17,6 @@ use Tidal\WampWatch\ClientSessionInterface as ClientSession; use Tidal\WampWatch\Util; - class Collection { /** @@ -47,13 +46,11 @@ class Collection */ private $isSubscribing = false; - /** * @var Deferred */ private $subscriptionPromise; - /** * Collection constructor. * @@ -83,7 +80,6 @@ public function addSubscription($topic, callable $callback) public function subscribe() { if (!$this->isSubscribed() && !$this->isSubscribing()) { - $this->isSubscribing = true; $this->subscriptionPromise = new Deferred(); $this->doSubscribe(); @@ -114,7 +110,6 @@ protected function doSubscribe() */ public function unsubscribe() { - $resolver = function (callable $resolve) { $resolve(); }; @@ -147,7 +142,6 @@ public function isSubscribing() return $this->isSubscribing; } - /** * Check if all subscriptions have been successfully confirmed. */ @@ -155,7 +149,6 @@ protected function checkSubscribed() { foreach ($this->subscriptions as $topic => $subId) { if ($subId === 0) { - return false; } } @@ -165,5 +158,4 @@ protected function checkSubscribed() return true; } - -} \ No newline at end of file +} From 90d19554dd9b4407d6cab3c3fe0d6dba9e48eafe Mon Sep 17 00:00:00 2001 From: Timo Michna Date: Mon, 18 Jul 2016 00:33:59 +0200 Subject: [PATCH 4/6] fixed spelling --- src/Tidal/WampWatch/MonitorTrait.php | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Tidal/WampWatch/MonitorTrait.php b/src/Tidal/WampWatch/MonitorTrait.php index c1241f7..b5eae70 100644 --- a/src/Tidal/WampWatch/MonitorTrait.php +++ b/src/Tidal/WampWatch/MonitorTrait.php @@ -31,7 +31,7 @@ trait MonitorTrait protected $session; /** - * Wether the monitor is running. + * if the monitor is running. * * @var bool */ @@ -60,7 +60,7 @@ public function start() /** * Stop the monitor. - * Returns boolean wether the monitor could be started. + * Returns boolean if the monitor could be started. * * @return bool */ From f12bf9c74e5b65688f3ca9e307b6f19081b72116 Mon Sep 17 00:00:00 2001 From: Timo Michna Date: Mon, 18 Jul 2016 02:40:53 +0200 Subject: [PATCH 5/6] monitor trait refactoring part 1 --- src/Tidal/WampWatch/MonitorTrait.php | 74 ++++++++++++++++++++++ src/Tidal/WampWatch/SessionMonitor.php | 88 ++++++++++---------------- 2 files changed, 106 insertions(+), 56 deletions(-) diff --git a/src/Tidal/WampWatch/MonitorTrait.php b/src/Tidal/WampWatch/MonitorTrait.php index b5eae70..7699a95 100644 --- a/src/Tidal/WampWatch/MonitorTrait.php +++ b/src/Tidal/WampWatch/MonitorTrait.php @@ -12,7 +12,9 @@ namespace Tidal\WampWatch; use Evenement\EventEmitterTrait; +use React\Promise\Promise; use Tidal\WampWatch\ClientSessionInterface as ClientSession; +use Tidal\WampWatch\Subscription\Collection as SubscriptionCollection; /** * Description of MonitorTrait. @@ -37,6 +39,26 @@ trait MonitorTrait */ protected $isRunning = false; + /** + * @var SubscriptionCollection + */ + protected $subscriptionCollection; + + /** + * @var string + */ + protected $initialCallProcedure; + + /** + * @var callable + */ + protected $initialCallCallback; + + /** + * @var bool + */ + protected $initialCallDone = false; + /** * @param ClientSession $session */ @@ -69,6 +91,9 @@ public function stop() if (!$this->isRunning()) { return false; } + + $this->getSubscriptionCollection()->unsubscribe(); + $this->isRunning = false; $this->emit('stop', [$this]); @@ -99,4 +124,53 @@ public function isRunning() { return $this->isRunning; } + + /** + * @return \Tidal\WampWatch\Subscription\Collection + */ + protected function getSubscriptionCollection() + { + return isset($this->subscriptionCollection) + ? $this->subscriptionCollection + : $this->subscriptionCollection = new SubscriptionCollection($this->session); + } + + protected function setInitialCall($pocedure, callable $callback) + { + $this->initialCallProcedure = (string)$pocedure; + $this->initialCallCallback = $callback; + } + + /** + * @return \React\Promise\PromiseInterface + */ + protected function callInitialProcedure() + { + if (!isset($this->initialCallProcedure) || !isset($this->initialCallCallback)) { + $resolver = function (callable $resolve) { + $resolve(); + }; + + return new Promise($resolver); + } + + return $this->session->call($this->initialCallProcedure)->then(function ($res) { + $this->initialCallDone = true; + + return $res; + }); + } + + /** + * Checks if all necessary subscriptions and calls have been responded to. + */ + protected function checkStarted() + { + if ($this->getSubscriptionCollection()->isSubscribed() && + $this->initialCallDone && + !$this->isRunning() + ) { + $this->doStart(); + } + } } diff --git a/src/Tidal/WampWatch/SessionMonitor.php b/src/Tidal/WampWatch/SessionMonitor.php index 2744ffb..3d9c8b7 100644 --- a/src/Tidal/WampWatch/SessionMonitor.php +++ b/src/Tidal/WampWatch/SessionMonitor.php @@ -13,7 +13,6 @@ use Evenement\EventEmitterInterface; use Tidal\WampWatch\ClientSessionInterface as ClientSession; -use Thruway\Message\SubscribedMessage; /** * Description of SessionMonitor. @@ -24,7 +23,6 @@ class SessionMonitor implements MonitorInterface, EventEmitterInterface { use MonitorTrait { start as doStart; - stop as doStop; } const SESSION_JOIN_TOPIC = 'wamp.session.on_join'; @@ -70,7 +68,10 @@ public function __construct(ClientSession $session) */ public function start() { - $this->startSubscriptions(); + $this->initSetupCalls(); + $this->getSubscriptionCollection()->subscribe()->done(function () { + $this->checkStarted(); + }); $this->retrieveSessionIds(); return true; @@ -81,8 +82,7 @@ public function start() */ protected function checkStarted() { - if ($this->joinSubscriptionId > 0 && - $this->leaveSubscriptionId > 0 && + if ($this->getSubscriptionCollection()->isSubscribed() && $this->calledList && !$this->isRunning() ) { @@ -90,20 +90,6 @@ protected function checkStarted() } } - /** - * Stop the monitor. - * Returns boolean wether the monitor could be started. - * - * @return bool - */ - public function stop() - { - $this->stopSubscriptions(); - $this->doStop(); - - return true; - } - /** * Retrieves the session-info for given sessionId * and populates it in via given callback. @@ -214,44 +200,28 @@ protected function validateSessionInfo($sessionInfo) /** * Initializes the subscription to the meta-events. */ - protected function startSubscriptions() + protected function initSetupCalls() { - // subscription to 'wamp.session.on_join' - $this->session->subscribe(self::SESSION_JOIN_TOPIC, function (array $res) { + // @var \Tidal\WampWatch\Subscription\Collection + $collection = $this->getSubscriptionCollection(); + + $collection->addSubscription(self::SESSION_JOIN_TOPIC, function (array $res) { $sessionInfo = $res[0]; if (!$this->validateSessionInfo($sessionInfo) || $this->hasSession($sessionInfo)) { return; } $this->addSession($sessionInfo); - })->then(function (SubscribedMessage $msg) { - $this->joinSubscriptionId = $msg->getSubscriptionId(); - $this->checkStarted(); }); - // subscription to 'wamp.session.on_leave' - $this->session->subscribe(self::SESSION_LEAVE_TOPIC, function (array $res) { + $collection->addSubscription(self::SESSION_LEAVE_TOPIC, function (array $res) { // @bug : wamp.session.on_leave is bugged as of crossbar.io 0.11.0 // will provide sessionID when Browser closes/reloads, // but not when calling connection.close(); $sessionId = (int) $res[0]; $this->removeSessionId($sessionId); - })->then(function (SubscribedMessage $msg) { - $this->leaveSubscriptionId = $msg->getSubscriptionId(); - $this->checkStarted(); }); - } - /** - * Unsubscribes from the meta-events. - */ - protected function stopSubscriptions() - { - if ($this->joinSubscriptionId > 0) { - Util::unsubscribe($this->session, $this->joinSubscriptionId); - } - if ($this->leaveSubscriptionId > 0) { - Util::unsubscribe($this->session, $this->leaveSubscriptionId); - } + $this->setInitialCall(self::SESSION_LIST_TOPIC, $this->getSessionIdRetrievalCallback()); } /** @@ -261,22 +231,28 @@ protected function stopSubscriptions() */ protected function retrieveSessionIds(callable $callback = null) { - $this->session->call(self::SESSION_LIST_TOPIC, [])->then( - function ($res) use ($callback) { - // remove our own sessionID from the tracked sessions - $sessionIds = $this->removeOwnSessionId($res[0]); - $this->setList($sessionIds); - $this->emit('list', [$this->getList()]); + $this->session->call(self::SESSION_LIST_TOPIC, []) + ->then( + $this->getSessionIdRetrievalCallback() + )->done(function ($res) use ($callback) { if ($callback !== null) { - $callback($this->sessionIds); + $callback($res); } - $this->calledList = true; - $this->checkStarted(); - }, - function ($error) { - $this->emit('error', [$error]); - } - ); + }); + } + + protected function getSessionIdRetrievalCallback() + { + return function ($res) { + // remove our own sessionID from the tracked sessions + $sessionIds = $this->removeOwnSessionId($res[0]); + $this->setList($sessionIds); + $this->emit('list', [$this->getList()]); + $this->calledList = true; + $this->checkStarted(); + + return $this->getList(); + }; } protected function setList($list) From 137d2c0fe6b70e5a7fe8317cb313e6929230f75f Mon Sep 17 00:00:00 2001 From: Timo Michna Date: Sun, 17 Jul 2016 20:52:28 -0400 Subject: [PATCH 6/6] Applied fixes from StyleCI --- src/Tidal/WampWatch/MonitorTrait.php | 2 +- src/Tidal/WampWatch/Subscription/Collection.php | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/src/Tidal/WampWatch/MonitorTrait.php b/src/Tidal/WampWatch/MonitorTrait.php index 7699a95..3ab62b8 100644 --- a/src/Tidal/WampWatch/MonitorTrait.php +++ b/src/Tidal/WampWatch/MonitorTrait.php @@ -137,7 +137,7 @@ protected function getSubscriptionCollection() protected function setInitialCall($pocedure, callable $callback) { - $this->initialCallProcedure = (string)$pocedure; + $this->initialCallProcedure = (string) $pocedure; $this->initialCallCallback = $callback; } diff --git a/src/Tidal/WampWatch/Subscription/Collection.php b/src/Tidal/WampWatch/Subscription/Collection.php index 04114c5..7a08f2d 100644 --- a/src/Tidal/WampWatch/Subscription/Collection.php +++ b/src/Tidal/WampWatch/Subscription/Collection.php @@ -96,7 +96,6 @@ protected function doSubscribe() foreach (array_keys($this->subscriptions) as $topic) { $this->session->subscribe($topic, $this->subscriptionCallbacks[$topic]) ->done(function (SubscribedMessage $msg) use ($topic) { - $this->subscriptions[$topic] = $msg->getSubscriptionId(); $this->subscriptionPromise->notify($topic);