Skip to content

Commit

Permalink
fix default event port behavior along with FileDescriptorActivity
Browse files Browse the repository at this point in the history
With the introduction of the work() method, which allowed to
be more precise on what gets called in which condition,
an event port under FileDescriptorActivity would not make
updateHook get called anymore.

This broke the Rock workflow with file descriptor activities.
(Un)fortunately, the effect has been subtle enough to not be
noticed readily. Or people started deploying tasks under the
normal Activity where before it would work with the FDA (I'm
guilty of that).

This is a rather convoluted code path (the characters in the
play are a FileDescriptorActivity 'FDA', ExecutionEngine 'EE'
and a TaskContext 'TC'

- the port signal calls FDA::trigger()
- FDA::trigger() wakes-up the FDA loop, which calls EE::work(Trigger)
- EE::work(Trigger) calls EE::processPortCallbacks but NOT
  EE::processHooks (expected from the work() refactoring)

At that point, we have to remember that TC::addEventPort registers
a port callback that by default calls activity->trigger(). The
comment at that point in the code itself says "default schedules
an updateHook" (which we're indeed expecting with a FDA)

- so, EE::processPortCallbacks in the end calls TC::trigger
- which, by default calls FDA::timeout

And FDA::timeout() was not implemented.
  • Loading branch information
doudou committed May 5, 2020
1 parent baaea50 commit 9827e7c
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 7 deletions.
27 changes: 24 additions & 3 deletions rtt/extras/FileDescriptorActivity.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ FileDescriptorActivity::FileDescriptorActivity(int priority, RunnableInterface*
, m_has_timeout(false)
, m_break_loop(false)
, m_trigger(false)
, m_user_timeout(false)
, m_update_sets(false)
{
FD_ZERO(&m_fd_set);
Expand All @@ -109,6 +110,7 @@ FileDescriptorActivity::FileDescriptorActivity(int scheduler, int priority, Runn
, m_has_timeout(false)
, m_break_loop(false)
, m_trigger(false)
, m_user_timeout(false)
, m_update_sets(false)
{
FD_ZERO(&m_fd_set);
Expand All @@ -125,6 +127,7 @@ FileDescriptorActivity::FileDescriptorActivity(int scheduler, int priority, Seco
, m_has_timeout(false)
, m_break_loop(false)
, m_trigger(false)
, m_user_timeout(false)
, m_update_sets(false)
{
FD_ZERO(&m_fd_set);
Expand All @@ -141,6 +144,7 @@ FileDescriptorActivity::FileDescriptorActivity(int scheduler, int priority, Seco
, m_has_timeout(false)
, m_break_loop(false)
, m_trigger(false)
, m_user_timeout(false)
, m_update_sets(false)
{
FD_ZERO(&m_fd_set);
Expand Down Expand Up @@ -257,6 +261,7 @@ bool FileDescriptorActivity::start()
// reset flags
m_break_loop = false;
m_trigger = false;
m_user_timeout = false;
m_update_sets = false;

if (!Activity::start())
Expand All @@ -271,7 +276,7 @@ bool FileDescriptorActivity::start()
}

bool FileDescriptorActivity::trigger()
{
{
if (isActive() ) {
{ RTT::os::MutexLock lock(m_command_mutex);
m_trigger = true;
Expand All @@ -285,7 +290,15 @@ bool FileDescriptorActivity::trigger()

bool FileDescriptorActivity::timeout()
{
return false;
if (isActive() ) {
{ RTT::os::MutexLock lock(m_command_mutex);
m_user_timeout = true;
}
int unused; (void)unused;
unused = write(m_interrupt_pipe[1], &CMD_ANY_COMMAND, 1);
return true;
} else
return false;
}


Expand All @@ -294,7 +307,7 @@ struct fd_watch {
fd_watch(int& fd) : fd(fd) {}
~fd_watch()
{
if (fd != -1)
if (fd != -1)
close(fd);
fd = -1;
}
Expand Down Expand Up @@ -371,13 +384,19 @@ void FileDescriptorActivity::loop()
// We check the flags after the command queue was emptied as we could miss commands otherwise:
bool do_trigger = true;
bool user_trigger = false;
bool user_timeout = false;
{ RTT::os::MutexLock lock(m_command_mutex);
// This section should be really fast to not block threads calling trigger(), breakLoop() or watch().
if (m_trigger) {
do_trigger = true;
user_trigger = true;
m_trigger = false;
}
if (m_user_timeout) {
do_trigger = true;
user_timeout = true;
m_user_timeout = false;
}
if (m_update_sets) {
m_update_sets = false;
do_trigger = false;
Expand All @@ -396,6 +415,8 @@ void FileDescriptorActivity::loop()
step();
if (m_has_timeout)
work(RunnableInterface::TimeOut);
else if ( user_timeout )
work(RunnableInterface::TimeOut);
else if ( user_trigger )
work(RunnableInterface::Trigger);
else
Expand Down
3 changes: 2 additions & 1 deletion rtt/extras/FileDescriptorActivity.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ namespace RTT { namespace extras {
RTT::os::Mutex m_command_mutex;
bool m_break_loop;
bool m_trigger;
bool m_user_timeout;
bool m_update_sets;

/** Internal method that makes sure loop() takes into account
Expand Down Expand Up @@ -278,7 +279,7 @@ namespace RTT { namespace extras {
virtual void loop();
virtual bool breakLoop();
virtual bool stop();

/** Called by loop() when data is available on the file descriptor. By
* default, it calls step() on the associated runner interface (if any)
*/
Expand Down
23 changes: 20 additions & 3 deletions tests/specialized_activities.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,17 @@ struct TestFDActivity : public FileDescriptorActivity
int fd, other_fd, result;

bool do_read;
base::RunnableInterface::WorkReason work_reason;

RTT::os::Mutex mutex;

TestFDActivity()
: FileDescriptorActivity(0), step_count(0), count(0), other_count(0), do_read(false) {}

void work(base::RunnableInterface::WorkReason reason)
{
work_reason = reason;
}
void step()
{
RTT::os::MutexLock lock(mutex);
Expand Down Expand Up @@ -97,22 +102,34 @@ BOOST_AUTO_TEST_CASE( testFileDescriptorActivity )
BOOST_CHECK_EQUAL(0, activity->count);
BOOST_CHECK_EQUAL(0, activity->other_count);
BOOST_CHECK( !activity->isRunning() && activity->isActive() );
BOOST_CHECK_EQUAL(base::RunnableInterface::Trigger, activity->work_reason);

// Check timeout(). Disable reading as there won't be any data on the FD
activity->do_read = false;
BOOST_CHECK( activity->timeout() );
usleep(USLEEP);
BOOST_CHECK_EQUAL(2, activity->step_count);
BOOST_CHECK_EQUAL(0, activity->count);
BOOST_CHECK_EQUAL(0, activity->other_count);
BOOST_CHECK( !activity->isRunning() && activity->isActive() );
BOOST_CHECK_EQUAL(base::RunnableInterface::TimeOut, activity->work_reason);

// Check normal operations. Re-enable reading.
activity->do_read = true;
int buffer, result;
result = write(writer, &buffer, 2);
BOOST_CHECK( result == 2 );
usleep(USLEEP);
BOOST_CHECK_EQUAL(3, activity->step_count);
BOOST_CHECK_EQUAL(4, activity->step_count);
BOOST_CHECK_EQUAL(2, activity->count);
BOOST_CHECK_EQUAL(0, activity->other_count);
BOOST_CHECK( !activity->isRunning() && activity->isActive() );
BOOST_CHECK_EQUAL(base::RunnableInterface::IOReady, activity->work_reason);

result = write(other_writer, &buffer, 2);
BOOST_CHECK( result == 2 );
usleep(USLEEP);
BOOST_CHECK_EQUAL(5, activity->step_count);
BOOST_CHECK_EQUAL(6, activity->step_count);
BOOST_CHECK_EQUAL(2, activity->count);
BOOST_CHECK_EQUAL(2, activity->other_count);
BOOST_CHECK( !activity->isRunning() && activity->isActive() );
Expand Down Expand Up @@ -143,11 +160,11 @@ BOOST_AUTO_TEST_CASE( testFileDescriptorActivity )
// step is blocking now
// trigger another 65537 times
for(std::size_t i = 0; i < 65537; ++i) activity->trigger();
BOOST_CHECK_EQUAL(base::RunnableInterface::TimeOut, activity->work_reason);
activity->mutex.unlock();
sleep(1);
BOOST_CHECK_EQUAL(2, activity->step_count);
BOOST_CHECK( activity->stop() );

}


Expand Down

0 comments on commit 9827e7c

Please sign in to comment.