Skip to content

Commit

Permalink
refactor claim_strategy
Browse files Browse the repository at this point in the history
- added MultiThreadStrategy and added corresponding tests.
- simplified methods with defaults
  • Loading branch information
fsaintjacques committed Jul 9, 2015
1 parent d751327 commit f1b3b5e
Show file tree
Hide file tree
Showing 4 changed files with 256 additions and 111 deletions.
122 changes: 90 additions & 32 deletions disruptor/claim_strategy.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,64 +33,122 @@

namespace disruptor {

/*
// Strategy employed by a {@link Publisher} to wait claim and publish sequences
// on the sequencer.
//
class ClaimStrategy {
public:
// Wait for the given sequence to be available for consumption.
//
// @param dependents dependents sequences to wait on (mostly consumers).
// @param delta sequences to claim [default: 1].
//
// @return last claimed sequence.
int64_t IncrementAndGet(const std::vector<Sequence*>& dependents,
size_t delta = 1);
// Verify in a non-blocking way that there exists claimable sequences.
//
// @param dependents dependents sequences to wait on (mostly consumers).
//
// @return last claimed sequence.
bool HasAvalaibleCapacity(const std::vector<Sequence*>& dependents);
void SynchronizePublishing(const int64_t& sequence, const Sequence& cursor,
const size_t& delta) {}
};
*/

template <size_t N>
class SingleThreadedStrategy;
using kDefaultClaimStrategy = SingleThreadedStrategy<kDefaultRingBufferSize>;

// Optimised strategy can be used when there is a single publisher thread
// claiming {@link AbstractEvent}s.
// Optimised strategy can be used when there is a single publisher thread.
template <size_t N = kDefaultRingBufferSize>
class SingleThreadedStrategy {
public:
SingleThreadedStrategy() {}

int64_t IncrementAndGet(const std::vector<Sequence*>& dependents) {
const int64_t next_sequence = sequence_.IncrementAndGet(1L);
WaitForFreeSlotAt(next_sequence, dependents);
return next_sequence;
}

int64_t IncrementAndGet(const int& delta,
const std::vector<Sequence*>& dependents) {
const int64_t next_sequence = sequence_.IncrementAndGet(delta);
WaitForFreeSlotAt(next_sequence, dependents);
SingleThreadedStrategy()
: last_claimed_sequence_(kInitialCursorValue),
last_consumer_sequence_(kInitialCursorValue) {}

int64_t IncrementAndGet(const std::vector<Sequence*>& dependents,
size_t delta = 1) {
const int64_t next_sequence = (last_claimed_sequence_ += delta);
const int64_t wrap_point = next_sequence - N;
if (last_consumer_sequence_ < wrap_point) {
while (GetMinimumSequence(dependents) < wrap_point) {
std::this_thread::yield();
}
}
return next_sequence;
}

bool HasAvalaibleCapacity(const std::vector<Sequence*>& dependents) {
const int64_t wrap_point = sequence_.sequence() + 1L - N;
if (wrap_point > min_gating_sequence_.sequence()) {
const int64_t wrap_point = last_claimed_sequence_ + 1L - N;
if (wrap_point > last_consumer_sequence_) {
const int64_t min_sequence = GetMinimumSequence(dependents);
min_gating_sequence_.set_sequence(min_sequence);
last_consumer_sequence_ = min_sequence;
if (wrap_point > min_sequence) return false;
}
return true;
}

void SetSequence(const int64_t& sequence,
const std::vector<Sequence*>& dependents) {
sequence_.set_sequence(sequence);
WaitForFreeSlotAt(sequence, dependents);
}

void SerialisePublishing(const int64_t& sequence, const Sequence& cursor,
const int64_t& batch_size) {}
void SynchronizePublishing(const int64_t& sequence, const Sequence& cursor,
const size_t& delta) {}

private:
void WaitForFreeSlotAt(const int64_t& sequence,
const std::vector<Sequence*>& dependents) {
const int64_t wrap_point = sequence - N;
if (min_gating_sequence_.sequence() < wrap_point) {
// We do not need to use atomic values since this function is called by a
// single publisher.
int64_t last_claimed_sequence_;
int64_t last_consumer_sequence_;

DISALLOW_COPY_MOVE_AND_ASSIGN(SingleThreadedStrategy);
};

// Optimised strategy can be used when there is a single publisher thread.
template <size_t N = kDefaultRingBufferSize>
class MultiThreadedStrategy {
public:
MultiThreadedStrategy() {}

int64_t IncrementAndGet(const std::vector<Sequence*>& dependents,
size_t delta = 1) {
const int64_t next_sequence = last_claimed_sequence_.IncrementAndGet(delta);
const int64_t wrap_point = next_sequence - N;
if (last_consumer_sequence_.sequence() < wrap_point) {
while (GetMinimumSequence(dependents) < wrap_point) {
std::this_thread::yield();
}
}
return next_sequence;
}

Sequence sequence_;
Sequence min_gating_sequence_;
bool HasAvalaibleCapacity(const std::vector<Sequence*>& dependents) {
const int64_t wrap_point = last_claimed_sequence_.sequence() + 1L - N;
if (wrap_point > last_consumer_sequence_.sequence()) {
const int64_t min_sequence = GetMinimumSequence(dependents);
last_consumer_sequence_.set_sequence(min_sequence);
if (wrap_point > min_sequence) return false;
}
return true;
}

DISALLOW_COPY_MOVE_AND_ASSIGN(SingleThreadedStrategy);
void SynchronizePublishing(const int64_t& sequence, const Sequence& cursor,
const size_t& delta) {
int64_t my_first_sequence = sequence - delta;

while (cursor.sequence() < my_first_sequence) {
// TODO: configurable yield strategy
std::this_thread::yield();
}
}

private:
Sequence last_claimed_sequence_;
Sequence last_consumer_sequence_;

DISALLOW_COPY_MOVE_AND_ASSIGN(MultiThreadedStrategy);
};

}; // namespace disruptor
Expand Down
4 changes: 3 additions & 1 deletion disruptor/sequence.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,12 @@ class Sequence {
}

private:
// padding
int64_t padding0_[ATOMIC_SEQUENCE_PADDING_LENGTH];
// members
std::atomic<int64_t> sequence_;
// padding
int64_t padding_[ATOMIC_SEQUENCE_PADDING_LENGTH];
int64_t padding1_[ATOMIC_SEQUENCE_PADDING_LENGTH];

DISALLOW_COPY_MOVE_AND_ASSIGN(Sequence);
};
Expand Down
68 changes: 7 additions & 61 deletions disruptor/sequencer.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,6 @@
#ifndef DISRUPTOR_SEQUENCER_H_ // NOLINT
#define DISRUPTOR_SEQUENCER_H_ // NOLINT

#include <condition_variable>
#include <mutex>
#include <vector>

#include "disruptor/batch_descriptor.h"
#include "disruptor/claim_strategy.h"
#include "disruptor/wait_strategy.h"
Expand All @@ -44,10 +40,6 @@ template <typename T, size_t N = kDefaultRingBufferSize,
class Sequencer {
public:
// Construct a Sequencer with the selected strategies.
//
// @param buffer_size over which sequences are valid.
// @param claim_strategy_option for those claiming sequences.
// @param wait_strategy_option for those waiting on sequences.
Sequencer(std::array<T, N> events) : ring_buffer_(events) {}

// Set the sequences that will gate publishers to prevent the buffer
Expand All @@ -67,15 +59,6 @@ class Sequencer {
return SequenceBarrier<W>(cursor_, dependents);
}

// Create a new {@link BatchDescriptor} that is the minimum of the
// requested size and the buffer_size.
//
// @param size for the new batch.
// @return the new {@link BatchDescriptor}.
BatchDescriptor NewBatchDescriptor(const int& size) {
return BatchDescriptor(size < N ? size : N);
}

// Get the value of the cursor indicating the published sequence.
//
// @return value of the cursor for events that have been published.
Expand All @@ -90,72 +73,35 @@ class Sequencer {
return claim_strategy_.HasAvalaibleCapacity(gating_sequences_);
}

// Claim the next event in sequence for publishing to the {@link RingBuffer}.
//
// @return the claimed sequence.
int64_t Next() { return claim_strategy_.IncrementAndGet(gating_sequences_); }

// Claim the next batch of sequence numbers for publishing.
//
// @param batch_descriptor to be updated for the batch range.
// @return the updated batch_descriptor.
BatchDescriptor* Next(BatchDescriptor* batch_descriptor) {
int64_t sequence = claim_strategy_->IncrementAndGet(
batch_descriptor->size(), gating_sequences_);
batch_descriptor->set_end(sequence);
return batch_descriptor;
}

// Claim a specific sequence when only one publisher is involved.
//
// @param sequence to be claimed.
// @return sequence just claimed.
int64_t Claim(const int64_t& sequence) {
claim_strategy_.SetSequence(sequence, gating_sequences_);
return sequence;
int64_t Next(size_t delta = 1) {
return claim_strategy_.IncrementAndGet(gating_sequences_, delta);
}

// Publish an event and make it visible to {@link EventProcessor}s.
//
// @param sequence to be published.
void Publish(const int64_t& sequence) { Publish(sequence, 1); }

// Publish the batch of events in sequence.
//
// @param sequence to be published.
void Publish(const BatchDescriptor& batch_descriptor) {
Publish(batch_descriptor.end(), batch_descriptor.size());
}

// Force the publication of a cursor sequence.
//
// Only use this method when forcing a sequence and you are sure only one
// publisher exists. This will cause the cursor to advance to this
// sequence.
//
// @param sequence to which is to be forced for publication.
void ForcePublish(const int64_t& sequence) {
void Publish(const int64_t& sequence, size_t delta = 1) {
claim_strategy_.SynchronizePublishing(sequence, cursor_, delta);
cursor_.set_sequence(sequence);
wait_strategy_.SignalAllWhenBlocking();
}

private:
// Helpers
void Publish(const int64_t& sequence, const int64_t& batch_size) {
claim_strategy_.SerialisePublishing(sequence, cursor_, batch_size);
cursor_.set_sequence(sequence);
wait_strategy_.SignalAllWhenBlocking();
}

// Members
RingBuffer<T, N> ring_buffer_;

Sequence cursor_;
std::vector<Sequence*> gating_sequences_;

C claim_strategy_;

W wait_strategy_;

std::vector<Sequence*> gating_sequences_;

DISALLOW_COPY_MOVE_AND_ASSIGN(Sequencer);
};

Expand Down
Loading

0 comments on commit f1b3b5e

Please sign in to comment.