Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[c++ Wrapper] Use aeron_distinct_error_log to implement the ErrorLogReader.read functionality #1743

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion aeron-client/src/main/cpp_wrapper/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ SET(HEADERS
${CMAKE_CURRENT_SOURCE_DIR}/concurrent/atomic/Atomic64_gcc_cpp11.h
${CMAKE_CURRENT_SOURCE_DIR}/concurrent/atomic/Atomic64_gcc_x86_64.h
${CMAKE_CURRENT_SOURCE_DIR}/concurrent/atomic/Atomic64_msvc.h
${CMAKE_CURRENT_SOURCE_DIR}/concurrent/errors/ErrorLogDescriptor.h
${CMAKE_CURRENT_SOURCE_DIR}/concurrent/errors/ErrorLogReader.h
${CMAKE_CURRENT_SOURCE_DIR}/concurrent/logbuffer/BufferClaim.h
${CMAKE_CURRENT_SOURCE_DIR}/concurrent/logbuffer/DataFrameHeader.h
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
#define AERON_CONCURRENT_ERROR_LOG_READER_H

#include <functional>
#include "util/BitUtil.h"
#include "concurrent/AtomicBuffer.h"
#include "concurrent/errors/ErrorLogDescriptor.h"
extern "C"
{
#include "concurrent/aeron_distinct_error_log.h"
}

namespace aeron { namespace concurrent { namespace errors {

Expand All @@ -35,40 +37,26 @@ typedef std::function<void(

inline static int read(AtomicBuffer &buffer, const error_consumer_t &consumer, std::int64_t sinceTimestamp)
{
int entries = 0;
int offset = 0;
const int capacity = buffer.capacity();

while (offset < capacity)
aeron_error_log_reader_func_t aeron_error_log_consumer = [](
int32_t observation_count,
int64_t first_observation_timestamp,
int64_t last_observation_timestamp,
const char *encoded_exception,
size_t length,
void *client_id)
{
const std::int32_t length = buffer.getInt32Volatile(offset + ErrorLogDescriptor::LENGTH_OFFSET);
if (0 == length)
{
break;
}

const std::int64_t lastObservationTimestamp =
buffer.getInt64Volatile(offset + ErrorLogDescriptor::LAST_OBSERVATION_TIMESTAMP_OFFSET);

if (lastObservationTimestamp >= sinceTimestamp)
{
auto &entry = buffer.overlayStruct<ErrorLogDescriptor::ErrorLogEntryDefn>(offset);

++entries;

consumer(
entry.observationCount,
entry.firstObservationTimestamp,
lastObservationTimestamp,
buffer.getStringWithoutLength(
offset + ErrorLogDescriptor::ENCODED_ERROR_OFFSET,
static_cast<std::size_t>(length - ErrorLogDescriptor::HEADER_LENGTH)));
}

offset += util::BitUtil::align(length, ErrorLogDescriptor::RECORD_ALIGNMENT);
}

return entries;
auto consumer = reinterpret_cast<error_consumer_t *>(client_id);
(*consumer)(observation_count,
first_observation_timestamp,
last_observation_timestamp,
std::string(encoded_exception, length));
};
return static_cast<int>(aeron_error_log_read(
buffer.buffer(),
buffer.capacity(),
aeron_error_log_consumer,
const_cast<void *>(reinterpret_cast<const void *>(&consumer)),
sinceTimestamp));
}

}}}}
Expand Down
Loading