Skip to content

Commit

Permalink
feat: added image queue handler and syncronization logic
Browse files Browse the repository at this point in the history
  • Loading branch information
dfbakin committed Jul 9, 2024
1 parent 0dce411 commit d04f763
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 32 deletions.
54 changes: 28 additions & 26 deletions packages/camera/include/camera.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,39 @@ using namespace std::chrono_literals;

namespace handy::camera {

class CameraRecorder;

struct Size {
size_t area() const { return static_cast<size_t>(width * height); };

int width;
int height;

bool operator==(const Size& other) { return width == other.width && height == other.height; }

bool operator!=(const Size& other) { return !(*this == other); }
};

struct BufferPair {
uint8_t* first;
uint8_t* second;
};

struct StampedImageBuffer {
uint8_t* raw_buffer = nullptr;
uint8_t* bgr_buffer = nullptr;
tSdkFrameHead frame_info{};
int camera_idx;
int frame_id;
uint32_t timestamp;
};

struct SynchronizedFrameBuffers {
std::vector<BufferPair> images;
std::vector<Size> image_sizes;
uint32_t timestamp; // in 0.1 milliseconds
};

struct CameraPool {
public:
CameraPool() = default;
Expand Down Expand Up @@ -56,26 +81,6 @@ class CameraRecorder {
static int getCameraId(int camera_handle);
void applyParamsToCamera(int handle);

struct Size {
size_t area() const { return static_cast<size_t>(width * height); };

int width;
int height;

bool operator==(const Size& other) {
return width == other.width && height == other.height;
}

bool operator!=(const Size& other) {
return !(*this == other);
}
};

struct BufferPair {
uint8_t* first;
uint8_t* second;
};

struct Params {
std::chrono::duration<double> latency{50.0}; // in milliseconds
std::string param_file;
Expand All @@ -89,14 +94,11 @@ class CameraRecorder {
struct State {
std::array<std::unique_ptr<boost::lockfree::queue<StampedImageBuffer>>, kMaxCameraNum>
camera_images;
std::array<
std::unique_ptr<boost::lockfree::queue<BufferPair>>,
kMaxCameraNum>
free_buffers;
std::array<std::unique_ptr<boost::lockfree::queue<BufferPair>>, kMaxCameraNum> free_buffers;

std::atomic<bool> running = true;
std::vector<std::thread> threads; // trigger thread, queue handler thread
std::vector<std::atomic<int>> counters;
std::vector<std::thread> threads; // trigger thread, queue handler thread
std::vector<std::atomic<int>> frame_ids;
std::vector<std::atomic<size_t>> current_buffer_idx;
int camera_num = 2;
std::vector<int> files;
Expand Down
67 changes: 61 additions & 6 deletions packages/camera/src/camera.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,15 @@

using namespace std::chrono_literals;

namespace {

template<typename T>
T getUnsignedDifference(T a, T b) {
return (a > b) ? (a - b) : (b - a);
}

} // namespace

namespace handy::camera {

void abortIfNot(std::string_view msg, int status) {
Expand Down Expand Up @@ -48,7 +57,7 @@ CameraRecorder::CameraRecorder(const char* param_file, const char* output_filena
state_.frame_sizes.resize(state_.camera_num);
state_.files.resize(state_.camera_num);
state_.camera_handles.resize(state_.camera_num);
state_.counters = std::vector<std::atomic<int>>(state_.camera_num);
state_.frame_ids = std::vector<std::atomic<int>>(state_.camera_num);
state_.current_buffer_idx = std::vector<std::atomic<size_t>>(state_.camera_num);
state_.alligned_buffers = std::vector<void*>(state_.camera_num);

Expand Down Expand Up @@ -127,10 +136,10 @@ CameraRecorder::~CameraRecorder() {
iter->join();
}

printf(
"Got and written from cameras: %d %d\n",
state_.counters[0].load(),
state_.counters[1].load());
// printf(
// "Got and written from cameras: %d %d\n",
// state_.counters[0].load(),
// state_.counters[1].load());

for (int i = 0; i < state_.camera_num; ++i) {
abortIfNot("camera " + std::to_string(i) + " stop", CameraStop(state_.camera_handles[i]));
Expand Down Expand Up @@ -173,8 +182,53 @@ void CameraRecorder::triggerCamera() {
}

void CameraRecorder::synchronizeQueues() {
constexpr int num_of_sync_buffers = 20;
std::vector<std::vector<StampedImageBuffer>> recieved_buffers(num_of_sync_buffers);
while (state_.running) {

for (int i = 0; i < state_.camera_num; ++i) {
StampedImageBuffer new_recieved_buffer;
while (!state_.camera_images[i]->pop(new_recieved_buffer)) {
}
int recieved_buffers_idx = new_recieved_buffer.frame_id % num_of_sync_buffers;
recieved_buffers[recieved_buffers_idx].push_back(new_recieved_buffer);
if (recieved_buffers[recieved_buffers_idx].size() == state_.camera_num) {
// make single structure as a shared ptr
auto custom_deleter = [this](SynchronizedFrameBuffers* sync_buffers_ptr) {
for (int i = 0; i < sync_buffers_ptr->images.size(); ++i) {
while (!this->state_.free_buffers[i]->push(sync_buffers_ptr->images[i])) {
}
}
printf("deleter: pushed all ptr back to free queue\n");
free(sync_buffers_ptr);
};
std::shared_ptr<SynchronizedFrameBuffers> sync_buffers(
new SynchronizedFrameBuffers, custom_deleter);
sync_buffers->image_sizes.resize(state_.camera_num);
sync_buffers->timestamp = recieved_buffers[recieved_buffers_idx][0].timestamp;
bool timestamps_are_close = true;
for (int i = 0; i < state_.camera_num; ++i) {
StampedImageBuffer& current_recieved_buffer =
recieved_buffers[recieved_buffers_idx][i];
sync_buffers->images[current_recieved_buffer.camera_idx] = BufferPair{
current_recieved_buffer.raw_buffer, current_recieved_buffer.bgr_buffer};

timestamps_are_close &=
(getUnsignedDifference(
current_recieved_buffer.timestamp, sync_buffers->timestamp)
< 2);
}

recieved_buffers[recieved_buffers_idx].clear();
if (!timestamps_are_close) {
printf(
"failed check for close timestamps inside the vector of %u\n",
state_.camera_num);
continue;
}
// send to either to all registered consumers or to save handler
printf("%p\n", sync_buffers.get());
}
}
}
}

Expand Down Expand Up @@ -203,6 +257,7 @@ void CameraRecorder::handleFrame(CameraHandle handle, BYTE* raw_buffer, tSdkFram
free_buffers.second, // bgr buffer
*frame_info,
state_.handle_to_idx[handle],
state_.frame_ids[state_.handle_to_idx[handle]].fetch_add(1),
frame_info->uiTimeStamp};
if (!state_.camera_images[state_.handle_to_idx[handle]]->push(stamped_buffer_to_add)) {
printf("unable to fit into queue! exiting");
Expand Down

0 comments on commit d04f763

Please sign in to comment.