Skip to content

Commit

Permalink
Merge pull request #82 from luxonis/shared-memory-on-localhost
Browse files Browse the repository at this point in the history
FD sharing protocol on localhost
  • Loading branch information
TheMutta authored Jul 5, 2024
2 parents 0617093 + 62909dc commit ddfc42c
Show file tree
Hide file tree
Showing 20 changed files with 1,021 additions and 38 deletions.
7 changes: 7 additions & 0 deletions examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,10 @@ add_example(xlink_server xlink_server.cpp)
add_example(xlink_server2 xlink_server2.cpp)
# Boot firmware
add_example(device_connect_reset device_connect_reset.cpp)

# Local shared memory example
if (UNIX)
add_example(xlink_server_local xlink_server_local.cpp)
add_example(xlink_client_local xlink_client_local.cpp)
endif (UNIX)

109 changes: 109 additions & 0 deletions examples/xlink_client_local.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
#include <cstring>
#include <cstddef>
#include <fcntl.h>
#include <sys/mman.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <sys/stat.h>
#include <unistd.h>
#include <cassert>

#include "XLink/XLink.h"
#include "XLink/XLinkPublicDefines.h"
#include "XLink/XLinkLog.h"

const long MAXIMUM_SHM_SIZE = 4096;
const char *SHARED_MEMORY_NAME = "/xlink_shared_memory_b";

XLinkGlobalHandler_t xlinkGlobalHandler = {};

int main(int argc, const char** argv){
xlinkGlobalHandler.protocol = X_LINK_TCP_IP_OR_LOCAL_SHDMEM;

mvLogDefaultLevelSet(MVLOG_ERROR);

printf("Initializing XLink...\n");
auto status = XLinkInitialize(&xlinkGlobalHandler);
if(X_LINK_SUCCESS != status) {
printf("Initializing wasn't successful\n");
return 1;
}

XLinkHandler_t handler;
handler.devicePath = "127.0.0.1";
handler.protocol = X_LINK_TCP_IP_OR_LOCAL_SHDMEM;
status = XLinkConnect(&handler);
if(X_LINK_SUCCESS != status) {
printf("Connecting wasn't successful\n");
return 1;
}

streamPacketDesc_t *packet;

auto s = XLinkOpenStream(0, "test", 1024);
assert(s != INVALID_STREAM_ID);

// Read the data packet containing the FD
auto r = XLinkReadData(s, &packet);
assert(r == X_LINK_SUCCESS);

long receivedFd = packet->fd;
if (receivedFd < 0) {
printf("Not a valid FD, data streamed through message\n");
return 1;
}

// Map the shared memory
void *sharedMemAddr =
mmap(NULL, MAXIMUM_SHM_SIZE, PROT_READ, MAP_SHARED, receivedFd, 0);
if (sharedMemAddr == MAP_FAILED) {
perror("mmap");
return 1;
}

// Read and print the message from shared memory
printf("Message from Process A: %s\n", static_cast<char *>(sharedMemAddr));

const char *normalMessage = "Normal message from Process B";
auto w = XLinkWriteData(s, (uint8_t*)normalMessage, strlen(normalMessage) + 1);
assert(w == X_LINK_SUCCESS);

const char *shmName = SHARED_MEMORY_NAME;
long shmFd = shm_open(shmName, O_CREAT | O_RDWR, 0666);
if (shmFd < 0) {
perror("shm_open");
return 1;
}

ftruncate(shmFd, MAXIMUM_SHM_SIZE);

void *addr = mmap(NULL, MAXIMUM_SHM_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, shmFd, 0);
if (addr == MAP_FAILED) {
perror("mmap");
close(shmFd);
shm_unlink(shmName);
return 1;
}

// Write a message to the shared memory
const char *message = "Shared message from Process B!";
memcpy(addr, message, strlen(message) + 1);

// Send the FD through the XLinkWriteFd function
w = XLinkWriteFd(s, shmFd);
assert(w == X_LINK_SUCCESS);

r = XLinkReadData(s, &packet);
assert(w == X_LINK_SUCCESS);

printf("Message from Process A: %s\n", (char *)(packet->data));


munmap(sharedMemAddr, MAXIMUM_SHM_SIZE);

munmap(addr, MAXIMUM_SHM_SIZE);
close(shmFd);
unlink(shmName);

return 0;
}
106 changes: 106 additions & 0 deletions examples/xlink_server_local.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
#include <cstring>
#include <cstddef>
#include <fcntl.h>
#include <sys/mman.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <unistd.h>
#include <cassert>

#include "XLink/XLink.h"
#include "XLink/XLinkPublicDefines.h"
#include "XLink/XLinkLog.h"

const long MAXIMUM_SHM_SIZE = 4096;
const char *SHARED_MEMORY_NAME = "/xlink_shared_memory_a";

XLinkGlobalHandler_t xlinkGlobalHandler = {};

int main(int argc, const char** argv){
xlinkGlobalHandler.protocol = X_LINK_TCP_IP_OR_LOCAL_SHDMEM;

mvLogDefaultLevelSet(MVLOG_ERROR);

printf("Initializing XLink...\n");
auto status = XLinkInitialize(&xlinkGlobalHandler);
if(X_LINK_SUCCESS != status) {
printf("Initializing wasn't successful\n");
return 1;
}

XLinkHandler_t handler;
handler.devicePath = "0.0.0.0";
handler.protocol = X_LINK_TCP_IP_OR_LOCAL_SHDMEM;
status = XLinkServerOnly(&handler);
if(X_LINK_SUCCESS != status) {
printf("Connecting wasn't successful\n");
return 1;
}

auto s = XLinkOpenStream(0, "test", 1024);
assert(s != INVALID_STREAM_ID);

const char *shmName = SHARED_MEMORY_NAME;
long shmFd = shm_open(shmName, O_CREAT | O_RDWR, 0666);
if (shmFd < 0) {
perror("shm_open");
return 1;
}

ftruncate(shmFd, MAXIMUM_SHM_SIZE);

void *addr = mmap(NULL, MAXIMUM_SHM_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, shmFd, 0);
if (addr == MAP_FAILED) {
perror("mmap");
close(shmFd);
shm_unlink(shmName);
return 1;
}

// Write a message to the shared memory
const char *message = "Shared message from Process A!";
memcpy(addr, message, strlen(message) + 1);

// Send the FD through the XLinkWriteFd function
auto w = XLinkWriteFd(s, shmFd);
assert(w == X_LINK_SUCCESS);

streamPacketDesc_t *packet;
auto r = XLinkReadData(s, &packet);
assert(w == X_LINK_SUCCESS);

printf("Message from Process B: %s\n", (char *)(packet->data));

// Read the data packet containing the FD
r = XLinkReadData(s, &packet);
assert(r == X_LINK_SUCCESS);

long receivedFd = packet->fd;
if (receivedFd < 0) {
printf("Not a valid FD, data streamed through message\n");
return 1;
}

// Map the shared memory
void *sharedMemAddr =
mmap(NULL, MAXIMUM_SHM_SIZE, PROT_READ, MAP_SHARED, receivedFd, 0);
if (sharedMemAddr == MAP_FAILED) {
perror("mmap");
return 1;
}

// Read and print the message from shared memory
printf("Message from Process B: %s\n", static_cast<char *>(sharedMemAddr));

const char *normalMessage = "Normal message from Process A";
w = XLinkWriteData(s, (uint8_t*)normalMessage, strlen(normalMessage) + 1);
assert(w == X_LINK_SUCCESS);

munmap(sharedMemAddr, MAXIMUM_SHM_SIZE);

munmap(addr, MAXIMUM_SHM_SIZE);
close(shmFd);
unlink(shmName);

return 0;
}
12 changes: 12 additions & 0 deletions include/XLink/XLink.h
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,18 @@ XLinkError_t XLinkWriteData(streamId_t const streamId, const uint8_t* buffer, in

XLinkError_t XLinkWriteData_(streamId_t streamId, const uint8_t* buffer, int size, XLinkTimespec* outTSend);

/**
* @brief Sends a package to initiate the writing of a file descriptor
* @warning Actual size of the written data is ALIGN_UP(size, 64)
* @param[in] streamId - stream link Id obtained from XLinkOpenStream call
* @param[in] buffer - FD to be transmitted
* @return Status code of the operation: X_LINK_SUCCESS (0) for success
*/
XLinkError_t XLinkWriteFd(streamId_t const streamId, const long fd);
XLinkError_t XLinkWriteFd_(streamId_t streamId, const long fd, XLinkTimespec* outTSend);
XLinkError_t XLinkWriteFdData(streamId_t streamId, const long fd, int fdSize, const uint8_t* dataBuffer, int dataSize);


/**
* @brief Sends a package to initiate the writing of data to a remote stream
* @warning Actual size of the written data is ALIGN_UP(size, 64)
Expand Down
8 changes: 5 additions & 3 deletions include/XLink/XLinkPlatform.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ typedef enum {
X_LINK_PLATFORM_DRIVER_NOT_LOADED = -128,
X_LINK_PLATFORM_USB_DRIVER_NOT_LOADED = X_LINK_PLATFORM_DRIVER_NOT_LOADED+X_LINK_USB_VSC,
X_LINK_PLATFORM_TCP_IP_DRIVER_NOT_LOADED = X_LINK_PLATFORM_DRIVER_NOT_LOADED+X_LINK_TCP_IP,
X_LINK_PLATFORM_LOCAL_SHDMEM_DRIVER_NOT_LOADED = X_LINK_PLATFORM_DRIVER_NOT_LOADED+X_LINK_LOCAL_SHDMEM,
X_LINK_PLATFORM_PCIE_DRIVER_NOT_LOADED = X_LINK_PLATFORM_DRIVER_NOT_LOADED+X_LINK_PCIE,
} xLinkPlatformErrorCode_t;

Expand Down Expand Up @@ -64,10 +65,10 @@ xLinkPlatformErrorCode_t XLinkPlatformFindArrayOfDevicesNames(
xLinkPlatformErrorCode_t XLinkPlatformBootRemote(const deviceDesc_t* deviceDesc, const char* binaryPath);
xLinkPlatformErrorCode_t XLinkPlatformBootFirmware(const deviceDesc_t* deviceDesc, const char* firmware, size_t length);
xLinkPlatformErrorCode_t XLinkPlatformConnect(const char* devPathRead, const char* devPathWrite,
XLinkProtocol_t protocol, void** fd);
XLinkProtocol_t *protocol, void** fd);
xLinkPlatformErrorCode_t XLinkPlatformBootBootloader(const char* name, XLinkProtocol_t protocol);
xLinkPlatformErrorCode_t XLinkPlatformServer(const char* devPathRead, const char* devPathWrite,
XLinkProtocol_t protocol, void** fd);
XLinkProtocol_t *protocol, void** fd);

UsbSpeed_t get_usb_speed();
const char* get_mx_serial();
Expand All @@ -86,7 +87,8 @@ xLinkPlatformErrorCode_t XLinkPlatformCloseRemote(xLinkDeviceHandle_t* deviceHan
// ------------------------------------

int XLinkPlatformWrite(xLinkDeviceHandle_t *deviceHandle, void *data, int size);
int XLinkPlatformRead(xLinkDeviceHandle_t *deviceHandle, void *data, int size);
int XLinkPlatformWriteFd(xLinkDeviceHandle_t *deviceHandle, const long fd, void *data2, int size2);
int XLinkPlatformRead(xLinkDeviceHandle_t *deviceHandle, void *data, int size, long *fd);

void* XLinkPlatformAllocateData(uint32_t size, uint32_t alignment);
void XLinkPlatformDeallocateData(void *ptr, uint32_t size, uint32_t alignment);
Expand Down
13 changes: 11 additions & 2 deletions include/XLink/XLinkPrivateDefines.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ typedef enum
XLINK_CLOSE_STREAM_REQ,
XLINK_PING_REQ,
XLINK_RESET_REQ,
XLINK_REQUEST_LAST,

XLINK_STATIC_REQUEST_LAST,
//note that is important to separate request and response
XLINK_WRITE_RESP,
XLINK_READ_RESP,
Expand All @@ -98,20 +99,28 @@ typedef enum
XLINK_CLOSE_STREAM_RESP,
XLINK_PING_RESP,
XLINK_RESET_RESP,
XLINK_RESP_LAST,

XLINK_STATIC_RESP_LAST,

/*X_LINK_IPC related events*/
IPC_WRITE_REQ,
IPC_READ_REQ,
IPC_CREATE_STREAM_REQ,
IPC_CLOSE_STREAM_REQ,

//
IPC_WRITE_RESP,
IPC_READ_RESP,
IPC_CREATE_STREAM_RESP,
IPC_CLOSE_STREAM_RESP,

XLINK_READ_REL_SPEC_REQ,
XLINK_WRITE_FD_REQ, // only for the shared mem protocol
XLINK_REQUEST_LAST,

XLINK_READ_REL_SPEC_RESP,
XLINK_WRITE_FD_RESP, // only for the shared mem protocol
XLINK_RESP_LAST,
} xLinkEventType_t;

typedef enum
Expand Down
4 changes: 4 additions & 0 deletions include/XLink/XLinkPublicDefines.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ typedef enum{
X_LINK_NOT_IMPLEMENTED,
X_LINK_INIT_USB_ERROR,
X_LINK_INIT_TCP_IP_ERROR,
X_LINK_INIT_LOCAL_SHDMEM_ERROR,
X_LINK_INIT_PCIE_ERROR,
} XLinkError_t;

Expand All @@ -63,6 +64,8 @@ typedef enum{
X_LINK_PCIE,
X_LINK_IPC,
X_LINK_TCP_IP,
X_LINK_LOCAL_SHDMEM,
X_LINK_TCP_IP_OR_LOCAL_SHDMEM,
X_LINK_NMB_OF_PROTOCOLS,
X_LINK_ANY_PROTOCOL
} XLinkProtocol_t;
Expand Down Expand Up @@ -138,6 +141,7 @@ typedef struct streamPacketDesc_t
{
uint8_t* data;
uint32_t length;
int32_t fd; // file descriptor
XLinkTimespec tRemoteSent; /// remote timestamp of when the packet was sent. Related to remote clock. Note: not directly related to local clock
XLinkTimespec tReceived; /// local timestamp of when the packet was received. Related to local monotonic clock
} streamPacketDesc_t;
Expand Down
Loading

0 comments on commit ddfc42c

Please sign in to comment.