Skip to content

Commit

Permalink
feat: added pool of buffers and timers
Browse files Browse the repository at this point in the history
  • Loading branch information
dfbakin committed Jul 8, 2024
1 parent b89b867 commit 0dce411
Show file tree
Hide file tree
Showing 2 changed files with 185 additions and 71 deletions.
65 changes: 63 additions & 2 deletions packages/camera/include/camera.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,54 @@
#include <string>
#include <vector>
#include <map>
#include <thread>

#include <boost/lockfree/queue.hpp>

using namespace std::chrono_literals;

namespace handy::camera {
class Writer {

struct StampedImageBuffer {
uint8_t* raw_buffer = nullptr;
uint8_t* bgr_buffer = nullptr;
tSdkFrameHead frame_info{};
int camera_idx;
uint32_t timestamp;
};

struct CameraPool {
public:
CameraPool() = default;
CameraPool(size_t height, size_t width, size_t frame_n) :
frame_n_(frame_n), raw_frame_size_(height * width), bgr_frame_size_(height * width * 3) {
raw_.resize(raw_frame_size_ * frame_n_);
bgr_.resize(bgr_frame_size_ * frame_n_);
}

uint8_t* getRawFrame(size_t frame_idx) { return raw_.data() + frame_idx * raw_frame_size_; }
uint8_t* getBgrFrame(size_t frame_idx) { return bgr_.data() + frame_idx * bgr_frame_size_; }

private:
size_t frame_n_ = 0;
size_t raw_frame_size_ = 0;
size_t bgr_frame_size_ = 0;
std::vector<uint8_t> raw_ = {};
std::vector<uint8_t> bgr_ = {};
};

class CameraRecorder {
public:
Writer(const char* param_file, const char* output_filename);
CameraRecorder(const char* param_file, const char* output_filename);
~CameraRecorder();

constexpr static int kMaxCameraNum = 4;
constexpr static int kQueueCapacity = 5;

private:
void handleFrame(CameraHandle handle, BYTE* raw_buffer, tSdkFrameHead* frame_info);
void triggerCamera();
void synchronizeQueues();
static int getCameraId(int camera_handle);
void applyParamsToCamera(int handle);

Expand All @@ -23,6 +61,19 @@ class Writer {

int width;
int height;

bool operator==(const Size& other) {
return width == other.width && height == other.height;
}

bool operator!=(const Size& other) {
return !(*this == other);
}
};

struct BufferPair {
uint8_t* first;
uint8_t* second;
};

struct Params {
Expand All @@ -36,6 +87,15 @@ class Writer {
} param_{};

struct State {
std::array<std::unique_ptr<boost::lockfree::queue<StampedImageBuffer>>, kMaxCameraNum>
camera_images;
std::array<
std::unique_ptr<boost::lockfree::queue<BufferPair>>,
kMaxCameraNum>
free_buffers;

std::atomic<bool> running = true;
std::vector<std::thread> threads; // trigger thread, queue handler thread
std::vector<std::atomic<int>> counters;
std::vector<std::atomic<size_t>> current_buffer_idx;
int camera_num = 2;
Expand All @@ -45,6 +105,7 @@ class Writer {
std::vector<std::mutex> file_mutexes;
std::vector<int> camera_handles;
std::vector<void*> alligned_buffers;
CameraPool buffers;
} state_{};
};
} // namespace handy::camera
191 changes: 122 additions & 69 deletions packages/camera/src/camera.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ void abortIfNot(std::string_view msg, int camera_idx, int status) {
}
}

Writer::Writer(const char* param_file, const char* output_filename) {
CameraRecorder::CameraRecorder(const char* param_file, const char* output_filename) {
param_.param_file = {param_file};
param_.output_filename = {output_filename};

Expand Down Expand Up @@ -74,43 +74,57 @@ Writer::Writer(const char* param_file, const char* output_filename) {
abortIfNot(
"set icp", i, CameraSetIspOutFormat(state_.camera_handles[i], CAMERA_MEDIA_TYPE_BGR8));

// if node is launch in soft trigger mode
if (!param_.use_hardware_triger || state_.camera_handles[i] == param_.master_camera_id) {
CameraSetTriggerMode(state_.camera_handles[i], SOFT_TRIGGER);
} else {
CameraSetTriggerMode(state_.camera_handles[i], EXTERNAL_TRIGGER);
}

auto func = [](CameraHandle idx,
BYTE* raw_buffer,
tSdkFrameHead* frame_info,
PVOID camera_node_instance) -> void {
reinterpret_cast<Writer*>(camera_node_instance)
reinterpret_cast<CameraRecorder*>(camera_node_instance)
->handleFrame(idx, raw_buffer, frame_info);
};

// if node is launch in soft trigger mode
CameraSetTriggerMode(state_.camera_handles[i], SOFT_TRIGGER);

CameraSetCallbackFunction(state_.camera_handles[i], std::move(func), this, nullptr);

applyParamsToCamera(state_.camera_handles[i]);
abortIfNot("start", CameraPlay(state_.camera_handles[i]));
printf("inited API and started camera handle = %d\n", state_.camera_handles[i]);
}

boost::asio::io_service io;
boost::posix_time::milliseconds interval(
static_cast<int>(param_.latency.count() * 1000)); // 10 milliseconds
boost::asio::deadline_timer timer(io, interval);
// determine the largest frame_size, allocate pool and push all free buffers
Size max_frame_size = *std::max_element(
state_.frame_sizes.begin(), state_.frame_sizes.begin(), [](Size& first, Size& second) {
return first.area() < second.area();
});

for (int trigger_cnt = 0; trigger_cnt < param_.frames_to_take; ++trigger_cnt) {
timer.wait();
for (int i = 0; i < state_.camera_num; ++i) {
if (!param_.use_hardware_triger
|| state_.camera_handles[i] == param_.master_camera_id) {
CameraSoftTrigger(state_.camera_handles[i]);
}
state_.buffers =
CameraPool(max_frame_size.height, max_frame_size.width, kQueueCapacity * kMaxCameraNum);
printf("%d pools initialised", kQueueCapacity * kMaxCameraNum);

// init queues and push pointers to buffers
for (int i = 0; i < state_.camera_num; ++i) {
for (size_t j = 0; j < kQueueCapacity; ++j) {
state_.free_buffers[i]->push(
{state_.buffers.getRawFrame(i * kQueueCapacity + j),
state_.buffers.getBgrFrame(i * kQueueCapacity + j)});
}
timer.expires_at(timer.expires_at() + interval);
}

std::this_thread::sleep_for(param_.latency * 100);
for (int i = 0; i < state_.camera_num; ++i) {
state_.file_mutexes[i].lock();
// start trigger in a separate thread
state_.threads.emplace_back(&CameraRecorder::triggerCamera, this);
// start queue handler in a separate thread
state_.threads.emplace_back(&CameraRecorder::synchronizeQueues, this);
}

CameraRecorder::~CameraRecorder() {
state_.running = false;
// waiting for all threads to stop by flag state_.running
for (auto iter = state_.threads.begin(); iter != state_.threads.end(); ++iter) {
iter->join();
}

printf(
Expand Down Expand Up @@ -140,38 +154,77 @@ Writer::Writer(const char* param_file, const char* output_filename) {
}
}

void Writer::handleFrame(CameraHandle handle, BYTE* raw_buffer, tSdkFrameHead* frame_info) {
void CameraRecorder::triggerCamera() {
boost::asio::io_service io;
boost::posix_time::milliseconds interval(
static_cast<int>(param_.latency.count() * 1000)); // `latency` milliseconds
boost::asio::deadline_timer timer(io, interval);

while (state_.running) {
timer.wait();
for (int i = 0; i < state_.camera_num; ++i) {
if (!param_.use_hardware_triger
|| state_.camera_handles[i] == param_.master_camera_id) {
CameraSoftTrigger(state_.camera_handles[i]);
}
}
timer.expires_at(timer.expires_at() + interval);
}
}

void CameraRecorder::synchronizeQueues() {
while (state_.running) {

}
}

void CameraRecorder::handleFrame(CameraHandle handle, BYTE* raw_buffer, tSdkFrameHead* frame_info) {
// TODO: delete elapsed time?
auto start = std::chrono::high_resolution_clock::now();

const int camera_idx = state_.handle_to_idx[handle];
++state_.counters[camera_idx];
std::lock_guard<std::mutex> lock(state_.file_mutexes[camera_idx]);
size_t buffer_idx = state_.current_buffer_idx[camera_idx].fetch_add(1);
printf("%d buffer id %ld\n", handle, buffer_idx);
Size size{frame_info->iWidth, frame_info->iHeight};
if (size != state_.frame_sizes[state_.handle_to_idx[handle]]) {
printf(
"expected frame size (%d; %d), but got (%d, %d)",
state_.frame_sizes[handle].width,
state_.frame_sizes[handle].height,
size.width,
size.height);
exit(EXIT_FAILURE);
}
int frame_size_px = frame_info->iWidth * frame_info->iHeight;

std::memcpy(
static_cast<uint8_t*>(state_.alligned_buffers[camera_idx])
+ state_.frame_sizes[camera_idx].area() * buffer_idx,
raw_buffer,
frame_info->iWidth * frame_info->iHeight);
BufferPair free_buffers;
while (!state_.free_buffers[state_.handle_to_idx[handle]]->pop(free_buffers)) {
}
std::memcpy(free_buffers.first, raw_buffer, frame_size_px);
StampedImageBuffer stamped_buffer_to_add{
free_buffers.first, // raw buffer
free_buffers.second, // bgr buffer
*frame_info,
state_.handle_to_idx[handle],
frame_info->uiTimeStamp};
if (!state_.camera_images[state_.handle_to_idx[handle]]->push(stamped_buffer_to_add)) {
printf("unable to fit into queue! exiting");
exit(EXIT_FAILURE);
}
CameraReleaseImageBuffer(handle, raw_buffer);

// TODO: delete elapsed time?
auto end = std::chrono::high_resolution_clock::now();
std::chrono::duration<double, std::milli> elapsed = end - start;

// std::cout << "Function took " << elapsed.count() << " ms to complete.\n";

if (elapsed.count() > 10) {
std::cout << "WARNING: Function took more than 10 ms " << elapsed.count() << '\n';
}
}

int Writer::getCameraId(int camera_handle) {
int CameraRecorder::getCameraId(int camera_handle) {
uint8_t camera_id;
abortIfNot("getting camera id", CameraLoadUserData(camera_handle, 0, &camera_id, 1));
return static_cast<int>(camera_id);
}

void Writer::applyParamsToCamera(int handle) {
void CameraRecorder::applyParamsToCamera(int handle) {
const int camera_idx = state_.handle_to_idx[handle];
const std::string camera_id_str = std::to_string(getCameraId(handle));
const YAML::Node camera_params = YAML::LoadFile(param_.param_file)["parameters"][camera_id_str];
Expand All @@ -188,37 +241,37 @@ void Writer::applyParamsToCamera(int handle) {
resolution_data->iHeight);
state_.frame_sizes[camera_idx] = {resolution_data->iWidth, resolution_data->iHeight};

const std::string path_to_file = param_.output_filename + "_" + camera_id_str + ".out";
state_.files[camera_idx] = open(path_to_file.c_str(), O_RDWR | O_CREAT, S_IWUSR | S_IRUSR);
if (state_.files[camera_idx] == -1) {
printf("failed to open %s\n", camera_id_str.c_str());
exit(EXIT_FAILURE);
}
int64_t page_size = sysconf(_SC_PAGE_SIZE);

if (ftruncate(
state_.files[camera_idx],
state_.frame_sizes[camera_idx].area() * param_.frames_to_take / page_size
* page_size
+ page_size)
== -1) {
close(state_.files[camera_idx]);
perror("Error resizing the file");
exit(1);
}

state_.alligned_buffers[camera_idx] = mmap(
nullptr,
state_.frame_sizes[camera_idx].area() * param_.frames_to_take / page_size * page_size
+ page_size,
PROT_WRITE,
MAP_SHARED,
state_.files[camera_idx],
0);
if (state_.alligned_buffers[camera_idx] == MAP_FAILED) {
perror("mmap");
exit(EXIT_FAILURE);
}
// const std::string path_to_file = param_.output_filename + "_" + camera_id_str + ".out";
// state_.files[camera_idx] = open(path_to_file.c_str(), O_RDWR | O_CREAT, S_IWUSR |
// S_IRUSR); if (state_.files[camera_idx] == -1) {
// printf("failed to open %s\n", camera_id_str.c_str());
// exit(EXIT_FAILURE);
// }
// int64_t page_size = sysconf(_SC_PAGE_SIZE);

// if (ftruncate(
// state_.files[camera_idx],
// state_.frame_sizes[camera_idx].area() * param_.frames_to_take / page_size
// * page_size
// + page_size)
// == -1) {
// close(state_.files[camera_idx]);
// perror("Error resizing the file");
// exit(1);
// }

// state_.alligned_buffers[camera_idx] = mmap(
// nullptr,
// state_.frame_sizes[camera_idx].area() * param_.frames_to_take / page_size * page_size
// + page_size,
// PROT_WRITE,
// MAP_SHARED,
// state_.files[camera_idx],
// 0);
// if (state_.alligned_buffers[camera_idx] == MAP_FAILED) {
// perror("mmap");
// exit(EXIT_FAILURE);
// }
}

{
Expand Down Expand Up @@ -303,7 +356,7 @@ int main(int argc, char* argv[]) {
printf("help\n./camera_bin <param_file> <output_filename>\n");
return 0;
}
handy::camera::Writer writer(argv[1], argv[2]);
handy::camera::CameraRecorder writer(argv[1], argv[2]);
printf("finished writing to file %s\n", argv[2]);

return 0;
Expand Down

0 comments on commit 0dce411

Please sign in to comment.