Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cleanup(userspace/libsinsp): call sinsp_observer methods after an event has been processed by all parsers #2222

Merged
merged 6 commits into from
Jan 13, 2025
140 changes: 88 additions & 52 deletions userspace/libsinsp/parsers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1276,9 +1276,14 @@
return;
}

/* If there's a listener, invoke it */
//
// If there's a listener, add a callback to later invoke it.
//
if(m_inspector->get_observer()) {
m_inspector->get_observer()->on_clone(evt, new_child.get(), tid_collision);
m_inspector->m_post_process_cbs.emplace(
[&new_child, tid_collision](sinsp_observer *observer, sinsp_evt *evt) {
FedeDP marked this conversation as resolved.
Show resolved Hide resolved
observer->on_clone(evt, new_child.get(), tid_collision);
});
}

/* If we had to erase a previous entry for this tid and rebalance the table,
Expand All @@ -1292,8 +1297,6 @@
new_child->m_comm.c_str());
}
/*=============================== ADD THREAD TO THE TABLE ===========================*/

return;
}

void sinsp_parser::parse_clone_exit_child(sinsp_evt *evt) {
Expand Down Expand Up @@ -1761,17 +1764,19 @@
evt->set_tinfo(new_child.get());

//
// If there's a listener, invoke it
// If there's a listener, add a callback to later invoke it.
//
if(m_inspector->get_observer()) {
m_inspector->get_observer()->on_clone(evt, new_child.get(), tid_collision);
m_inspector->m_post_process_cbs.emplace(
[&new_child, tid_collision](sinsp_observer *observer, sinsp_evt *evt) {
observer->on_clone(evt, new_child.get(), tid_collision);

Check warning on line 1772 in userspace/libsinsp/parsers.cpp

View check run for this annotation

Codecov / codecov/patch

userspace/libsinsp/parsers.cpp#L1771-L1772

Added lines #L1771 - L1772 were not covered by tests
});
}

/* If we had to erase a previous entry for this tid and rebalance the table,
* make sure we reinitialize the child_tinfo pointer for this event, as the thread
* generating it might have gone away.
*/

if(tid_collision != -1) {
reset(evt);
/* Right now we have collisions only on the clone() caller */
Expand All @@ -1781,7 +1786,6 @@
}

/*=============================== CREATE NEW THREAD-INFO ===========================*/
return;
}

void sinsp_parser::parse_clone_exit(sinsp_evt *evt) {
Expand Down Expand Up @@ -2230,15 +2234,11 @@
evt->get_tinfo()->m_flags |= PPM_CL_NAME_CHANGED;

//
// Recompute the program hash
//
evt->get_tinfo()->compute_program_hash();

//
// If there's a listener, invoke it
// If there's a listener, add a callback to later invoke it.
//
if(m_inspector->get_observer()) {
m_inspector->get_observer()->on_execve(evt);
m_inspector->m_post_process_cbs.emplace(
[](sinsp_observer *observer, sinsp_evt *evt) { observer->on_execve(evt); });
}

/* If any of the threads in a thread group performs an
Expand Down Expand Up @@ -2787,10 +2787,11 @@
evt->get_fd_info()->m_name = evt->get_param_as_str(1, &parstr, sinsp_evt::PF_SIMPLE);

//
// If there's a listener callback, invoke it
// If there's a listener, add a callback to later invoke it.
//
if(m_inspector->get_observer()) {
m_inspector->get_observer()->on_bind(evt);
m_inspector->m_post_process_cbs.emplace(
[](sinsp_observer *observer, sinsp_evt *evt) { observer->on_bind(evt); });

Check warning on line 2794 in userspace/libsinsp/parsers.cpp

View check run for this annotation

Codecov / codecov/patch

userspace/libsinsp/parsers.cpp#L2793-L2794

Added lines #L2793 - L2794 were not covered by tests
}
}

Expand Down Expand Up @@ -2982,7 +2983,6 @@

void sinsp_parser::parse_connect_exit(sinsp_evt *evt) {
const sinsp_evt_param *parinfo;
uint8_t *packed_data;
int64_t retval;
int64_t fd;
bool force_overwrite_stale_data = false;
Expand Down Expand Up @@ -3048,15 +3048,18 @@
return;
}

packed_data = (uint8_t *)parinfo->m_val;
uint8_t *packed_data = (uint8_t *)parinfo->m_val;

fill_client_socket_info(evt, packed_data, force_overwrite_stale_data);

//
// If there's a listener callback, invoke it
// If there's a listener, add a callback to later invoke it.
//
if(m_inspector->get_observer()) {
m_inspector->get_observer()->on_connect(evt, packed_data);
m_inspector->m_post_process_cbs.emplace(
[&packed_data](sinsp_observer *observer, sinsp_evt *evt) {
observer->on_connect(evt, packed_data);

Check warning on line 3061 in userspace/libsinsp/parsers.cpp

View check run for this annotation

Codecov / codecov/patch

userspace/libsinsp/parsers.cpp#L3059-L3061

Added lines #L3059 - L3061 were not covered by tests
});
}
}

Expand Down Expand Up @@ -3144,8 +3147,14 @@
fdi->m_name = evt->get_param_as_str(1, &parstr, sinsp_evt::PF_SIMPLE);
fdi->m_flags = 0;

//
// If there's a listener, add a callback to later invoke it.
//
if(m_inspector->get_observer()) {
m_inspector->get_observer()->on_accept(evt, fd, packed_data, fdi.get());
m_inspector->m_post_process_cbs.emplace(
[fd, &packed_data](sinsp_observer *observer, sinsp_evt *evt) {
observer->on_accept(evt, fd, packed_data, evt->get_fd_info());

Check warning on line 3156 in userspace/libsinsp/parsers.cpp

View check run for this annotation

Codecov / codecov/patch

userspace/libsinsp/parsers.cpp#L3155-L3156

Added lines #L3155 - L3156 were not covered by tests
});
}

//
Expand Down Expand Up @@ -3205,8 +3214,14 @@
m_inspector->get_fds_to_remove().push_back(params->m_fd);
}

//
// If there's a listener, add a callback to later invoke it.
//
if(m_inspector->get_observer()) {
m_inspector->get_observer()->on_erase_fd(params);
m_inspector->m_post_process_cbs.emplace(
[&params](sinsp_observer *observer, sinsp_evt *evt) {
observer->on_erase_fd(params);

Check warning on line 3223 in userspace/libsinsp/parsers.cpp

View check run for this annotation

Codecov / codecov/patch

userspace/libsinsp/parsers.cpp#L3221-L3223

Added lines #L3221 - L3223 were not covered by tests
});
}
}

Expand Down Expand Up @@ -3669,8 +3684,6 @@
}

if(eflags & EF_READS_FROM_FD) {
const char *data;
uint32_t datalen;
int32_t tupleparam = -1;

if(etype == PPME_SOCKET_RECVFROM_X) {
Expand Down Expand Up @@ -3729,20 +3742,23 @@
parinfo = evt->get_param(1);
}

datalen = parinfo->m_len;
data = parinfo->m_val;
uint32_t datalen = parinfo->m_len;
const char *data = parinfo->m_val;

//
// If there's an fd listener, call it now
// If there's a listener, add a callback to later invoke it.
//
if(m_inspector->get_observer()) {
m_inspector->get_observer()->on_read(evt,
tid,
evt->get_tinfo()->m_lastevent_fd,
evt->get_fd_info(),
data,
(uint32_t)retval,
datalen);
m_inspector->m_post_process_cbs.emplace(
[tid, &data, retval, datalen](sinsp_observer *observer, sinsp_evt *evt) {
observer->on_read(evt,

Check warning on line 3754 in userspace/libsinsp/parsers.cpp

View check run for this annotation

Codecov / codecov/patch

userspace/libsinsp/parsers.cpp#L3752-L3754

Added lines #L3752 - L3754 were not covered by tests
tid,
evt->get_tinfo()->m_lastevent_fd,

Check warning on line 3756 in userspace/libsinsp/parsers.cpp

View check run for this annotation

Codecov / codecov/patch

userspace/libsinsp/parsers.cpp#L3756

Added line #L3756 was not covered by tests
evt->get_fd_info(),
data,
(uint32_t)retval,
datalen);
});
}

//
Expand Down Expand Up @@ -3791,7 +3807,6 @@
#endif

} else {
const char *data;
uint32_t datalen;
int32_t tupleparam = -1;

Expand Down Expand Up @@ -3849,19 +3864,22 @@
//
parinfo = evt->get_param(1);
datalen = parinfo->m_len;
data = parinfo->m_val;
const char *data = parinfo->m_val;

//
// If there's an fd listener, call it now
// If there's a listener, add a callback to later invoke it.
//
if(m_inspector->get_observer()) {
m_inspector->get_observer()->on_write(evt,
tid,
evt->get_tinfo()->m_lastevent_fd,
evt->get_fd_info(),
data,
(uint32_t)retval,
datalen);
m_inspector->m_post_process_cbs.emplace(
[tid, &data, retval, datalen](sinsp_observer *observer, sinsp_evt *evt) {
observer->on_write(evt,

Check warning on line 3875 in userspace/libsinsp/parsers.cpp

View check run for this annotation

Codecov / codecov/patch

userspace/libsinsp/parsers.cpp#L3873-L3875

Added lines #L3873 - L3875 were not covered by tests
tid,
evt->get_tinfo()->m_lastevent_fd,

Check warning on line 3877 in userspace/libsinsp/parsers.cpp

View check run for this annotation

Codecov / codecov/patch

userspace/libsinsp/parsers.cpp#L3877

Added line #L3877 was not covered by tests
evt->get_fd_info(),
data,
(uint32_t)retval,
datalen);
});
}

// perform syslog decoding if applicable
Expand All @@ -3873,8 +3891,14 @@
if(evt->get_fd_info()->m_type == SCAP_FD_IPV4_SOCK ||
evt->get_fd_info()->m_type == SCAP_FD_IPV6_SOCK) {
evt->get_fd_info()->set_socket_failed();
//
// If there's a listener, add a callback to later invoke it.
//
if(m_inspector->get_observer()) {
m_inspector->get_observer()->on_socket_status_changed(evt);
m_inspector->m_post_process_cbs.emplace(
[](sinsp_observer *observer, sinsp_evt *evt) {
observer->on_socket_status_changed(evt);

Check warning on line 3900 in userspace/libsinsp/parsers.cpp

View check run for this annotation

Codecov / codecov/patch

userspace/libsinsp/parsers.cpp#L3898-L3900

Added lines #L3898 - L3900 were not covered by tests
});
}
}
}
Expand All @@ -3897,7 +3921,6 @@
//
if(retval >= 0) {
sinsp_evt *enter_evt = &m_tmp_evt;
int64_t fdin;

if(!retrieve_enter_event(enter_evt, evt)) {
return;
Expand All @@ -3906,13 +3929,16 @@
//
// Extract the in FD
//
fdin = enter_evt->get_param(1)->as<int64_t>();
int64_t fdin = enter_evt->get_param(1)->as<int64_t>();

//
// If there's an fd listener, call it now
// If there's a listener, add a callback to later invoke it.
//
if(m_inspector->get_observer()) {
m_inspector->get_observer()->on_sendfile(evt, fdin, (uint32_t)retval);
m_inspector->m_post_process_cbs.emplace(
[fdin, retval](sinsp_observer *observer, sinsp_evt *evt) {
observer->on_sendfile(evt, fdin, (uint32_t)retval);

Check warning on line 3940 in userspace/libsinsp/parsers.cpp

View check run for this annotation

Codecov / codecov/patch

userspace/libsinsp/parsers.cpp#L3938-L3940

Added lines #L3938 - L3940 were not covered by tests
});
}
}
}
Expand Down Expand Up @@ -4068,8 +4094,13 @@
return;
}

//
// If there's a listener, add a callback to later invoke it.
//
if(m_inspector->get_observer()) {
m_inspector->get_observer()->on_socket_shutdown(evt);
m_inspector->m_post_process_cbs.emplace([](sinsp_observer *observer, sinsp_evt *evt) {
observer->on_socket_shutdown(evt);

Check warning on line 4102 in userspace/libsinsp/parsers.cpp

View check run for this annotation

Codecov / codecov/patch

userspace/libsinsp/parsers.cpp#L4101-L4102

Added lines #L4101 - L4102 were not covered by tests
});
}
}
}
Expand Down Expand Up @@ -5048,8 +5079,13 @@
} else {
evt->get_fd_info()->set_socket_connected();
}
//
// If there's a listener, add a callback to later invoke it.
//
if(m_inspector->get_observer()) {
m_inspector->get_observer()->on_socket_status_changed(evt);
m_inspector->m_post_process_cbs.emplace([](sinsp_observer *observer, sinsp_evt *evt) {
observer->on_socket_status_changed(evt);

Check warning on line 5087 in userspace/libsinsp/parsers.cpp

View check run for this annotation

Codecov / codecov/patch

userspace/libsinsp/parsers.cpp#L5086-L5087

Added lines #L5086 - L5087 were not covered by tests
});
}
}
}
Expand Down
14 changes: 14 additions & 0 deletions userspace/libsinsp/sinsp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1316,6 +1316,20 @@ int32_t sinsp::next(sinsp_evt** puevt) {
// todo(jason): should we log parsing errors here?
pp.process_event(evt, m_event_sources);
}

// Once we processed all events, make sure to call all
// requested post-process callbacks.
// At this point, any plugin could have modified state tables,
// thus we can guarantee that any post-process callback
// will see the full post-event-processed state.
// NOTE: we don't use a RAII object because
// we cannot guarantee that no exception will be thrown by the callbacks.
if(m_observer != nullptr) {
for(; !m_post_process_cbs.empty(); m_post_process_cbs.pop()) {
auto cb = m_post_process_cbs.front();
cb(m_observer, evt);
}
}
}

// Finally set output evt;
Expand Down
3 changes: 3 additions & 0 deletions userspace/libsinsp/sinsp.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ limitations under the License.
#include <string>
#include <unordered_set>
#include <vector>
#include <queue>

#define ONE_SECOND_IN_NS 1000000000LL

Expand Down Expand Up @@ -1242,6 +1243,8 @@ class SINSP_PUBLIC sinsp : public capture_stats_source {

sinsp_observer* m_observer{nullptr};

std::queue<std::function<void(sinsp_observer* observer, sinsp_evt* evt)>> m_post_process_cbs{};

bool m_inited;
static std::atomic<int> instance_count;
};
Expand Down
Loading
Loading