Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rvc3 timesync improvements #76

Merged
merged 4 commits into from
Feb 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions include/XLink/XLink.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#ifndef _XLINK_H
#define _XLINK_H
#include "XLinkPublicDefines.h"
#include "XLinkTime.h"

#ifdef __cplusplus
extern "C"
Expand Down Expand Up @@ -301,6 +302,8 @@ XLinkError_t XLinkCloseStream(streamId_t const streamId);
*/
XLinkError_t XLinkWriteData(streamId_t const streamId, const uint8_t* buffer, int size);

XLinkError_t XLinkWriteData_(streamId_t streamId, const uint8_t* buffer, int size, XLinkTimespec* outTSend);

/**
* @brief Sends a package to initiate the writing of data to a remote stream
* @warning Actual size of the written data is ALIGN_UP(size, 64)
Expand Down
5 changes: 3 additions & 2 deletions include/XLink/XLinkDispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ extern "C"
#endif
typedef int (*getRespFunction) (xLinkEvent_t*, xLinkEvent_t*, bool);
typedef struct {
int (*eventSend) (xLinkEvent_t*);
int (*eventSend) (xLinkEvent_t*, XLinkTimespec*);
int (*eventReceive) (xLinkEvent_t*);
getRespFunction localGetResponse;
getRespFunction remoteGetResponse;
Expand All @@ -36,6 +36,7 @@ int DispatcherClean(xLinkDeviceHandle_t *deviceHandle);
int DispatcherDeviceFdDown(xLinkDeviceHandle_t *deviceHandle);

xLinkEvent_t* DispatcherAddEvent(xLinkEventOrigin_t origin, xLinkEvent_t *event);
xLinkEvent_t* DispatcherAddEvent_(xLinkEventOrigin_t origin, xLinkEvent_t *event, XLinkTimespec* outTime);
int DispatcherWaitEventComplete(xLinkDeviceHandle_t *deviceHandle, unsigned int timeoutMs);
int DispatcherWaitEventCompleteTimeout(xLinkDeviceHandle_t *deviceHandle, struct timespec abstime);

Expand All @@ -52,4 +53,4 @@ int DispatcherServeEvent(eventId_t id,
}
#endif

#endif
#endif
2 changes: 1 addition & 1 deletion include/XLink/XLinkDispatcherImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
#include "XLinkPrivateDefines.h"
#include <stdbool.h>

int dispatcherEventSend (xLinkEvent_t*);
int dispatcherEventSend (xLinkEvent_t*, XLinkTimespec* sendTime);
int dispatcherEventReceive (xLinkEvent_t*);
int dispatcherLocalEventGetResponse (xLinkEvent_t*, xLinkEvent_t*, bool);
int dispatcherRemoteEventGetResponse (xLinkEvent_t*, xLinkEvent_t*, bool);
Expand Down
3 changes: 2 additions & 1 deletion include/XLink/XLinkTime.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#ifndef _XLINK_TIME_H
#define _XLINK_TIME_H

#include <stdint.h>
#ifdef __cplusplus
extern "C"
{
Expand All @@ -16,4 +17,4 @@ void getMonotonicTimestamp(XLinkTimespec* ts);
#ifdef __cplusplus
}
#endif
#endif
#endif
33 changes: 24 additions & 9 deletions src/shared/XLinkData.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@
static XLinkError_t checkEventHeader(xLinkEventHeader_t header);
static float timespec_diff(struct timespec *start, struct timespec *stop);
static XLinkError_t addEvent(xLinkEvent_t *event, unsigned int timeoutMs);
static XLinkError_t addEvent_(xLinkEvent_t *event, unsigned int timeoutMs, XLinkTimespec* outTime);
static XLinkError_t addEventWithPerf(xLinkEvent_t *event, float* opTime, unsigned int timeoutMs);
static XLinkError_t addEventWithPerf_(xLinkEvent_t *event, float* opTime, unsigned int timeoutMs, XLinkTimespec* outTime);
static XLinkError_t addEventWithPerfTimeout(xLinkEvent_t *event, float* opTime, unsigned int msTimeout);
static XLinkError_t getLinkByStreamId(streamId_t streamId, xLinkDesc_t** out_link);

Expand Down Expand Up @@ -111,8 +113,8 @@ XLinkError_t XLinkCloseStream(streamId_t const streamId)
return X_LINK_SUCCESS;
}

XLinkError_t XLinkWriteData(streamId_t const streamId, const uint8_t* buffer,
int size)
XLinkError_t XLinkWriteData_(streamId_t streamId, const uint8_t* buffer,
int size, XLinkTimespec* outTSend)
{
XLINK_RET_IF(buffer == NULL);

Expand All @@ -125,9 +127,9 @@ XLinkError_t XLinkWriteData(streamId_t const streamId, const uint8_t* buffer,
XLINK_INIT_EVENT(event, streamIdOnly, XLINK_WRITE_REQ,
size,(void*)buffer, link->deviceHandle);

XLINK_RET_IF(addEventWithPerf(&event, &opTime, XLINK_NO_RW_TIMEOUT));
XLINK_RET_IF(addEventWithPerf_(&event, &opTime, XLINK_NO_RW_TIMEOUT, outTSend));

if (glHandler->profEnable) {
if( glHandler->profEnable) {
glHandler->profilingData.totalWriteBytes += size;
glHandler->profilingData.totalWriteTime += opTime;
}
Expand All @@ -137,6 +139,12 @@ XLinkError_t XLinkWriteData(streamId_t const streamId, const uint8_t* buffer,
return X_LINK_SUCCESS;
}

XLinkError_t XLinkWriteData(streamId_t const streamId, const uint8_t* buffer,
int size)
{
return XLinkWriteData_(streamId, buffer, size, NULL);
}

XLinkError_t XLinkWriteData2(streamId_t streamId, const uint8_t* buffer1, int buffer1Size, const uint8_t* buffer2, int buffer2Size)
{
ASSERT_XLINK(buffer1);
Expand Down Expand Up @@ -436,11 +444,11 @@ float timespec_diff(struct timespec *start, struct timespec *stop)
return start->tv_nsec/ 1000000000.0f + start->tv_sec;
}

XLinkError_t addEvent(xLinkEvent_t *event, unsigned int timeoutMs)
XLinkError_t addEvent_(xLinkEvent_t *event, unsigned int timeoutMs, XLinkTimespec* outTime)
{
ASSERT_XLINK(event);

xLinkEvent_t* ev = DispatcherAddEvent(EVENT_LOCAL, event);
xLinkEvent_t* ev = DispatcherAddEvent_(EVENT_LOCAL, event, outTime);
if(ev == NULL) {
mvLog(MVLOG_ERROR, "Dispatcher failed on adding event. type: %s, id: %d, stream name: %s\n",
TypeToStr(event->header.type), event->header.id, event->header.streamName);
Expand Down Expand Up @@ -478,28 +486,35 @@ XLinkError_t addEvent(xLinkEvent_t *event, unsigned int timeoutMs)
return X_LINK_TIMEOUT;
}
}

XLINK_RET_ERR_IF(
event->header.flags.bitField.ack != 1,
X_LINK_COMMUNICATION_FAIL);

return X_LINK_SUCCESS;
}
XLinkError_t addEvent(xLinkEvent_t *event, unsigned int timeoutMs)
{
return addEvent_(event, timeoutMs, NULL);
}

XLinkError_t addEventWithPerf(xLinkEvent_t *event, float* opTime, unsigned int timeoutMs)
XLinkError_t addEventWithPerf_(xLinkEvent_t *event, float* opTime, unsigned int timeoutMs, XLinkTimespec* outTime)
{
ASSERT_XLINK(opTime);

struct timespec start, end;
clock_gettime(CLOCK_REALTIME, &start);

XLINK_RET_IF_FAIL(addEvent(event, timeoutMs));
XLINK_RET_IF_FAIL(addEvent_(event, timeoutMs, outTime));

clock_gettime(CLOCK_REALTIME, &end);
*opTime = timespec_diff(&start, &end);

return X_LINK_SUCCESS;
}
XLinkError_t addEventWithPerf(xLinkEvent_t *event, float* opTime, unsigned int timeoutMs)
{
return addEventWithPerf_(event, opTime, timeoutMs, NULL);
}

XLinkError_t addEventTimeout(xLinkEvent_t *event, struct timespec abstime)
{
Expand Down
33 changes: 26 additions & 7 deletions src/shared/XLinkDispatcher.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
///
#ifndef _GNU_SOURCE
#define _GNU_SOURCE // fix for warning: implicit declaration of function 'pthread_setname_np'
#include "XLinkTime.h"
#endif

#include <errno.h>
Expand Down Expand Up @@ -58,6 +59,7 @@ typedef struct xLinkEventPriv_t {
xLinkEvent_t *retEv;
xLinkEventState_t isServed;
xLinkEventOrigin_t origin;
XLinkTimespec* sendTime;
XLink_sem_t* sem;
void* data;
} xLinkEventPriv_t;
Expand Down Expand Up @@ -165,6 +167,11 @@ static xLinkEventPriv_t* getNextQueueElemToProc(eventQueueHandler_t *q );
static xLinkEvent_t* addNextQueueElemToProc(xLinkSchedulerState_t* curr,
eventQueueHandler_t *q, xLinkEvent_t* event,
XLink_sem_t* sem, xLinkEventOrigin_t o);
static xLinkEvent_t* addNextQueueElemToProc_(xLinkSchedulerState_t* curr,
eventQueueHandler_t *q, xLinkEvent_t* event,
XLink_sem_t* sem, xLinkEventOrigin_t o,
XLinkTimespec* outTime);


static xLinkEventPriv_t* dispatcherGetNextEvent(xLinkSchedulerState_t* curr);

Expand Down Expand Up @@ -351,7 +358,7 @@ int DispatcherDeviceFdDown(xLinkDeviceHandle_t *deviceHandle){
return dispatcherDeviceFdDown(curr);
}

xLinkEvent_t* DispatcherAddEvent(xLinkEventOrigin_t origin, xLinkEvent_t *event)
xLinkEvent_t* DispatcherAddEvent_(xLinkEventOrigin_t origin, xLinkEvent_t *event, XLinkTimespec* outTime)
{
xLinkSchedulerState_t* curr = findCorrespondingScheduler(event->deviceHandle.xLinkFD);
XLINK_RET_ERR_IF(curr == NULL, NULL);
Expand Down Expand Up @@ -387,9 +394,9 @@ xLinkEvent_t* DispatcherAddEvent(xLinkEventOrigin_t origin, xLinkEvent_t *event)
const uint32_t tmpMoveSem = event->header.flags.bitField.moveSemantic;
event->header.flags.raw = 0;
event->header.flags.bitField.moveSemantic = tmpMoveSem;
ev = addNextQueueElemToProc(curr, &curr->lQueue, event, sem, origin);
ev = addNextQueueElemToProc_(curr, &curr->lQueue, event, sem, origin, outTime);
} else {
ev = addNextQueueElemToProc(curr, &curr->rQueue, event, NULL, origin);
ev = addNextQueueElemToProc_(curr, &curr->rQueue, event, NULL, origin, outTime);
}
if (XLink_sem_post(&curr->addEventSem)) {
mvLog(MVLOG_ERROR,"can't post semaphore\n");
Expand All @@ -399,6 +406,10 @@ xLinkEvent_t* DispatcherAddEvent(xLinkEventOrigin_t origin, xLinkEvent_t *event)
}
return ev;
}
xLinkEvent_t* DispatcherAddEvent(xLinkEventOrigin_t origin, xLinkEvent_t *event)
{
return DispatcherAddEvent_(origin, event, NULL);
}

int DispatcherWaitEventComplete(xLinkDeviceHandle_t *deviceHandle, unsigned int timeoutMs)
{
Expand Down Expand Up @@ -959,10 +970,10 @@ static xLinkEventPriv_t* getNextQueueElemToProc(eventQueueHandler_t *q ){
* @brief Add event to Queue
* @note It called from dispatcherAddEvent
*/
static xLinkEvent_t* addNextQueueElemToProc(xLinkSchedulerState_t* curr,
static xLinkEvent_t* addNextQueueElemToProc_(xLinkSchedulerState_t* curr,
eventQueueHandler_t *q, xLinkEvent_t* event,
XLink_sem_t* sem, xLinkEventOrigin_t o)
{
XLink_sem_t* sem, xLinkEventOrigin_t o,
XLinkTimespec* outTime) {
xLinkEvent_t* ev;
XLINK_RET_ERR_IF(pthread_mutex_lock(&(curr->queueMutex)) != 0, NULL);
xLinkEventPriv_t* eventP = getNextElementWithState(q->base, q->end, q->cur, EVENT_SERVED);
Expand All @@ -980,15 +991,23 @@ static xLinkEvent_t* addNextQueueElemToProc(xLinkSchedulerState_t* curr,
if (o == EVENT_LOCAL) {
// XLink API caller provided buffer for return the final result to
eventP->retEv = event;
eventP->sendTime = outTime;
}else{
eventP->retEv = NULL;
eventP->sendTime = NULL;
}
q->cur = eventP;
eventP->isServed = EVENT_ALLOCATED;
CIRCULAR_INCREMENT_BASE(q->cur, q->end, q->base);
XLINK_RET_ERR_IF(pthread_mutex_unlock(&(curr->queueMutex)) != 0, NULL);
return ev;
}
static xLinkEvent_t* addNextQueueElemToProc(xLinkSchedulerState_t* curr,
eventQueueHandler_t *q, xLinkEvent_t* event,
XLink_sem_t* sem, xLinkEventOrigin_t o)
{
return addNextQueueElemToProc_(curr, q, event, sem, o, NULL);
}

static xLinkEventPriv_t* dispatcherGetNextEvent(xLinkSchedulerState_t* curr)
{
Expand Down Expand Up @@ -1212,7 +1231,7 @@ static XLinkError_t sendEvents(xLinkSchedulerState_t* curr) {
}

XLINK_RET_ERR_IF(pthread_mutex_unlock(&(curr->queueMutex)) != 0, X_LINK_ERROR);
if (glControlFunc->eventSend(toSend) != 0) {
if (glControlFunc->eventSend(toSend, event->sendTime) != 0) {
// Error out
curr->resetXLink = 1;
XLINK_RET_ERR_IF(pthread_mutex_lock(&(curr->queueMutex)) != 0, X_LINK_ERROR);
Expand Down
4 changes: 3 additions & 1 deletion src/shared/XLinkDispatcherImpl.c
Original file line number Diff line number Diff line change
Expand Up @@ -147,13 +147,15 @@ int writeEventMultipart(xLinkDeviceHandle_t* deviceHandle, void* data, int total
}

//adds a new event with parameters and returns event id
int dispatcherEventSend(xLinkEvent_t *event)
int dispatcherEventSend(xLinkEvent_t *event, XLinkTimespec* sendTime)
{
mvLog(MVLOG_DEBUG, "Send event: %s, size %d, streamId %ld.\n",
TypeToStr(event->header.type), event->header.size, event->header.streamId);

XLinkTimespec stime;
getMonotonicTimestamp(&stime);
if (sendTime != NULL) *sendTime = stime;

event->header.tsecLsb = (uint32_t)stime.tv_sec;
event->header.tsecMsb = (uint32_t)(stime.tv_sec >> 32);
event->header.tnsec = (uint32_t)stime.tv_nsec;
Expand Down
Loading