Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FileDescriptorActivity locking and robustness #309

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
156 changes: 92 additions & 64 deletions rtt/extras/FileDescriptorActivity.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,9 @@ FileDescriptorActivity::FileDescriptorActivity(int priority, RunnableInterface*
, m_period(0)
, m_has_error(false)
, m_has_timeout(false)
, m_break_loop(false)
, m_trigger(false)
, m_update_sets(false)
, m_has_ioready(false)
{
clearCommandFlags();
FD_ZERO(&m_fd_set);
FD_ZERO(&m_fd_work);
m_interrupt_pipe[0] = m_interrupt_pipe[1] = -1;
Expand All @@ -107,10 +106,9 @@ FileDescriptorActivity::FileDescriptorActivity(int scheduler, int priority, Runn
, m_period(0)
, m_has_error(false)
, m_has_timeout(false)
, m_break_loop(false)
, m_trigger(false)
, m_update_sets(false)
, m_has_ioready(false)
{
clearCommandFlags();
FD_ZERO(&m_fd_set);
FD_ZERO(&m_fd_work);
m_interrupt_pipe[0] = m_interrupt_pipe[1] = -1;
Expand All @@ -123,10 +121,9 @@ FileDescriptorActivity::FileDescriptorActivity(int scheduler, int priority, Seco
, m_period(period >= 0.0 ? period : 0.0) // intended period
, m_has_error(false)
, m_has_timeout(false)
, m_break_loop(false)
, m_trigger(false)
, m_update_sets(false)
, m_has_ioready(false)
{
clearCommandFlags();
FD_ZERO(&m_fd_set);
FD_ZERO(&m_fd_work);
m_interrupt_pipe[0] = m_interrupt_pipe[1] = -1;
Expand All @@ -139,10 +136,9 @@ FileDescriptorActivity::FileDescriptorActivity(int scheduler, int priority, Seco
, m_period(period >= 0.0 ? period : 0.0) // intended period
, m_has_error(false)
, m_has_timeout(false)
, m_break_loop(false)
, m_trigger(false)
, m_update_sets(false)
, m_has_ioready(false)
{
clearCommandFlags();
FD_ZERO(&m_fd_set);
FD_ZERO(&m_fd_work);
m_interrupt_pipe[0] = m_interrupt_pipe[1] = -1;
Expand Down Expand Up @@ -186,7 +182,7 @@ void FileDescriptorActivity::setTimeout_us(int timeout_us)
}
}
void FileDescriptorActivity::watch(int fd)
{ RTT::os::MutexLock lock(m_lock);
{ RTT::os::MutexLock lock(m_fd_lock);
if (fd < 0)
{
log(Error) << "negative file descriptor given to FileDescriptorActivity::watch" << endlog();
Expand All @@ -198,33 +194,39 @@ void FileDescriptorActivity::watch(int fd)
triggerUpdateSets();
}
void FileDescriptorActivity::unwatch(int fd)
{ RTT::os::MutexLock lock(m_lock);
{ RTT::os::MutexLock lock(m_fd_lock);
m_watched_fds.erase(fd);
FD_CLR(fd, &m_fd_set);
triggerUpdateSets();
}
void FileDescriptorActivity::clearAllWatches()
{ RTT::os::MutexLock lock(m_lock);
{ RTT::os::MutexLock lock(m_fd_lock);
m_watched_fds.clear();
FD_ZERO(&m_fd_set);
triggerUpdateSets();
}
void FileDescriptorActivity::triggerUpdateSets()
{
{ RTT::os::MutexLock lock(m_command_mutex);
m_update_sets = true;
}
oro_atomic_inc(&m_update_sets);
int unused; (void)unused;
unused = write(m_interrupt_pipe[1], &CMD_ANY_COMMAND, 1);
}

void FileDescriptorActivity::clearCommandFlags()
{
oro_atomic_set(&m_break_loop, 0);
oro_atomic_set(&m_trigger, 0);
oro_atomic_set(&m_update_sets, 0);
}

bool FileDescriptorActivity::isUpdated(int fd) const
{ return FD_ISSET(fd, &m_fd_work); }
bool FileDescriptorActivity::hasError() const
{ return m_has_error; }
bool FileDescriptorActivity::hasTimeout() const
{ return m_has_timeout; }
bool FileDescriptorActivity::isWatched(int fd) const
{ RTT::os::MutexLock lock(m_lock);
{ RTT::os::MutexLock lock(m_fd_lock);
return FD_ISSET(fd, &m_fd_set); }

bool FileDescriptorActivity::start()
Expand Down Expand Up @@ -254,10 +256,8 @@ bool FileDescriptorActivity::start()
}
#endif

// reset flags
m_break_loop = false;
m_trigger = false;
m_update_sets = false;
// clear command flags
clearCommandFlags();

if (!Activity::start())
{
Expand All @@ -273,9 +273,8 @@ bool FileDescriptorActivity::start()
bool FileDescriptorActivity::trigger()
{
if (isActive() ) {
{ RTT::os::MutexLock lock(m_command_mutex);
m_trigger = true;
}
if (oro_atomic_read(&m_trigger) > 0) return true;
oro_atomic_inc(&m_trigger);
int unused; (void)unused;
unused = write(m_interrupt_pipe[1], &CMD_ANY_COMMAND, 1);
return true;
Expand All @@ -288,7 +287,6 @@ bool FileDescriptorActivity::timeout()
return false;
}


struct fd_watch {
int& fd;
fd_watch(int& fd) : fd(fd) {}
Expand All @@ -305,11 +303,12 @@ void FileDescriptorActivity::loop()
int pipe = m_interrupt_pipe[0];
fd_watch watch_pipe_0(m_interrupt_pipe[0]);
fd_watch watch_pipe_1(m_interrupt_pipe[1]);
timeval timeout = { 0, 0 };

while(true)
{
int max_fd;
{ RTT::os::MutexLock lock(m_lock);
{ RTT::os::MutexLock lock(m_fd_lock);
if (m_watched_fds.empty())
max_fd = pipe;
else
Expand All @@ -319,24 +318,35 @@ void FileDescriptorActivity::loop()
}
FD_SET(pipe, &m_fd_work);

int ret;
m_running = false;
int ret = -1;
if (m_timeout_us == 0)
{
ret = select(max_fd + 1, &m_fd_work, NULL, NULL, NULL);
}
else
{
static const int USECS_PER_SEC = 1000000;
timeval timeout = { m_timeout_us / USECS_PER_SEC,
m_timeout_us % USECS_PER_SEC};
// only rearm the timer if the previous call was not a pure command pipe event
if (m_has_timeout || m_has_ioready || m_has_error ||
(timeout.tv_sec == 0 && timeout.tv_usec == 0)) {
static const int USECS_PER_SEC = 1000000;
timeout.tv_sec = m_timeout_us / USECS_PER_SEC;
timeout.tv_usec = m_timeout_us % USECS_PER_SEC;
}
Comment on lines +329 to +334
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be good to add a unit test for this one.

ret = select(max_fd + 1, &m_fd_work, NULL, NULL, &timeout);
}

m_has_error = false;
m_has_timeout = false;
if (ret == -1)
m_has_ioready = false;
if (ret < 0)
{
if (errno == EINTR)
{
// A signal was caught; see signal(7). We should not handle this
// here and simply continue waiting. Could be as trivial as
// a SIGWINCH (Window resize signal).
continue;
}
log(Error) << "FileDescriptorActivity: error in select(), errno = " << errno << endlog();
m_has_error = true;
}
Expand All @@ -345,6 +355,11 @@ void FileDescriptorActivity::loop()
log(Error) << "FileDescriptorActivity: timeout in select()" << endlog();
m_has_timeout = true;
}
else
{
// do not trigger an IOReady event if the only file descriptor that was active is the command pipe
m_has_ioready = !(ret == 1 && FD_ISSET(pipe, &m_fd_work));
}

// Empty all commands queued in the pipe
if (ret > 0 && FD_ISSET(pipe, &m_fd_work)) // breakLoop or trigger requests
Expand All @@ -369,37 +384,56 @@ void FileDescriptorActivity::loop()
}

// We check the flags after the command queue was emptied as we could miss commands otherwise:
bool do_trigger = true;
bool user_trigger = false;
{ RTT::os::MutexLock lock(m_command_mutex);
// This section should be really fast to not block threads calling trigger(), breakLoop() or watch().
if (m_trigger) {
do_trigger = true;
user_trigger = true;
m_trigger = false;
}
if (m_update_sets) {
m_update_sets = false;
do_trigger = false;
}
if (m_break_loop) {
m_break_loop = false;
break;
}
bool do_trigger = false;
if (oro_atomic_read(&m_trigger) > 0) {
oro_atomic_set(&m_trigger, 0);
do_trigger = true;
}
if (oro_atomic_read(&m_update_sets) > 0) {
oro_atomic_set(&m_update_sets, 0);

// Check if file descriptors that have work also have been removed in the current cycle and
// already ignore them. This is a corner case and still triggering an IOReady event would
// probably be fine in real-world applications, but it can break the task-test unit test
// depending on timing.
//
// The simple solution would be to continue without triggering the component, but we might miss
// triggers or activities on other file descriptors then.
//
// Disabled for now and patched in the unit test.
//
// { RTT::os::MutexLock lock(m_fd_lock);
// fd_set copy;
// FD_ZERO(&copy);
// m_has_ioready = false;
// for(int i = 0; i <= max_fd; ++i) {
// if (FD_ISSET(i, &m_fd_set) && FD_ISSET(i, &m_fd_work)) {
// FD_SET(i, &copy);
// m_has_ioready = true;
// }
// }
// m_fd_work = copy;
// }
}
if (oro_atomic_read(&m_break_loop) > 0) {
oro_atomic_set(&m_break_loop, 0);
break;
}

if (do_trigger)
// Execute activity...
if (m_has_timeout || m_has_ioready || do_trigger)
{
try
{
m_running = true;
step();
if (m_has_timeout)
work(RunnableInterface::TimeOut);
else if ( user_trigger )
if ( do_trigger )
work(RunnableInterface::Trigger);
else
if ( m_has_timeout )
work(RunnableInterface::TimeOut);
if ( m_has_ioready )
work(RunnableInterface::IOReady);

m_running = false;
}
catch(...)
Expand All @@ -413,28 +447,22 @@ void FileDescriptorActivity::loop()

bool FileDescriptorActivity::breakLoop()
{
{ RTT::os::MutexLock lock(m_command_mutex);
m_break_loop = true;
}
if (oro_atomic_read(&m_break_loop) > 0) return true;
oro_atomic_inc(&m_break_loop);
int unused; (void)unused;
unused = write(m_interrupt_pipe[1], &CMD_ANY_COMMAND, 1);
return true;
}

void FileDescriptorActivity::step()
{
m_running = true;
if (runner != 0)
runner->step();
m_running = false;
}

void FileDescriptorActivity::work(base::RunnableInterface::WorkReason reason) {
m_running = true;
if (runner != 0)
runner->work(reason);
m_running = false;

}

bool FileDescriptorActivity::stop()
Expand Down
15 changes: 10 additions & 5 deletions rtt/extras/FileDescriptorActivity.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,23 +110,28 @@ namespace RTT { namespace extras {
int m_timeout_us; //! timeout in microseconds
Seconds m_period; //! intended period
/** Lock that protects the access to m_fd_set and m_watched_fds */
mutable RTT::os::Mutex m_lock;
mutable RTT::os::Mutex m_fd_lock;
fd_set m_fd_set;
fd_set m_fd_work;
bool m_has_error;
bool m_has_timeout;
bool m_has_ioready;

static const char CMD_ANY_COMMAND = 0;
RTT::os::Mutex m_command_mutex;
bool m_break_loop;
bool m_trigger;
bool m_update_sets;
mutable oro_atomic_t m_break_loop;
mutable oro_atomic_t m_trigger;
mutable oro_atomic_t m_update_sets;

/** Internal method that makes sure loop() takes into account
* modifications in the set of watched FDs
*/
void triggerUpdateSets();

/** Internal method to clear the command (trigger, break loop,
* update sets) flags.
*/
void clearCommandFlags();

public:
/**
* Create a FileDescriptorActivity with a given priority and base::RunnableInterface
Expand Down
3 changes: 3 additions & 0 deletions tests/taskthread_fd_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,9 @@ BOOST_AUTO_TEST_CASE(testFileDescriptor_Write )
mtask->unwatch(mcomp.fd[0]);
BOOST_CHECK( mtask->isWatched(mcomp.fd[0]) == false );

// sleep to give the FileDescriptorActivity some time to update the internal file descriptor set
usleep(100000/10);

++ch;
rc = write(mcomp.fd[1], &ch, sizeof(ch));
if (1 != rc) std::cerr << "rc=" << rc << " errno=" << errno << ":" << strerror(errno) << std::endl;
Expand Down