Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] add new TimeQueueImpl based on TimeWheel #12

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions base/src/main/java/vproxybase/util/time/impl2/Bucket.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package vproxybase.util.time.impl2;

public interface Bucket<T> {
void add(TimeElemImpl<T> elem);

void remove(TimeElemImpl<T> elem);

void poll(long timeOffset);

boolean isEmpty();
}
37 changes: 37 additions & 0 deletions base/src/main/java/vproxybase/util/time/impl2/RawBucket.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package vproxybase.util.time.impl2;

import java.util.LinkedList;
import java.util.Queue;

public class RawBucket<T> implements Bucket<T> {
private final LinkedList<TimeElemImpl<T>> list = new LinkedList<>();
private final Queue<TimeElemImpl<T>> queue;

public RawBucket(Queue<TimeElemImpl<T>> queue) {
this.queue = queue;
}

@Override
public void add(TimeElemImpl<T> elem) {
elem.bucket = this;
list.add(elem);
}

@Override
public void remove(TimeElemImpl<T> elem) {
if (elem.bucket == this) {
list.remove(elem);
}
}

@Override
public void poll(long timeOffset) {
queue.addAll(list);
list.clear();
}

@Override
public boolean isEmpty() {
return list.isEmpty();
}
}
27 changes: 27 additions & 0 deletions base/src/main/java/vproxybase/util/time/impl2/TimeElemImpl.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package vproxybase.util.time.impl2;

import vproxybase.util.time.TimeElem;

public class TimeElemImpl<T> implements TimeElem<T> {
Bucket<T> bucket;

final long timeOffset;
private final T value;

public TimeElemImpl(long timeOffset, T value) {
this.timeOffset = timeOffset;
this.value = value;
}

@Override
public T get() {
return value;
}

@Override
public void removeSelf() {
if (bucket != null) {
bucket.remove(this);
}
}
}
47 changes: 47 additions & 0 deletions base/src/main/java/vproxybase/util/time/impl2/TimeQueueImpl.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package vproxybase.util.time.impl2;

import vproxybase.util.time.TimeQueue;

import java.util.LinkedList;
import java.util.Queue;

public class TimeQueueImpl<T> implements TimeQueue<T> {
private final Queue<T> resultQueue = new LinkedList<>();
private TimeWheel<T> timeWheel;
private Long firstTimestamp;
private final Queue<TimeElemImpl<T>> queue = new LinkedList<>();

public TimeElemImpl<T> add(long currentTimestamp, int timeout, T obj) {
if (firstTimestamp == null || timeWheel == null) {
firstTimestamp = currentTimestamp;
timeWheel = new TimeWheel<>(0, 1, 32, 1, queue);
}
TimeElemImpl<T> elem = new TimeElemImpl<>(currentTimestamp - firstTimestamp + timeout, obj);
timeWheel.add(elem);
return elem;
}

public T poll() {
if (firstTimestamp == null || timeWheel == null) {
return null;
}
long current = System.currentTimeMillis();
long offset = current - firstTimestamp;
timeWheel.poll(offset);
return resultQueue.poll();
}

@Override
public boolean isEmpty() {
return timeWheel.isEmpty();
}

@Override
public int nextTime(long current) {
TimeElemImpl<T> elem = queue.peek();
if (elem == null) {
return Integer.MAX_VALUE;
}
return Math.max((int) (elem.timeOffset + firstTimestamp - current), 0);
}
}
97 changes: 97 additions & 0 deletions base/src/main/java/vproxybase/util/time/impl2/TimeWheel.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package vproxybase.util.time.impl2;

import java.util.Arrays;
import java.util.List;
import java.util.Queue;

public class TimeWheel<T> {
private final long startMs;
private final long endMs;
private final long tickMs;
private final int wheelSize;
private final long interval;
private final List<Bucket<T>> buckets;
private final int layerNumber;
private TimeWheel<T> overflowWheel;
private final Queue<TimeElemImpl<T>> queue;
private int currentPosition = 0;

public TimeWheel(long startMs, long tickMs, int wheelSize, int layerNumber, Queue<TimeElemImpl<T>> queue) {
this.startMs = startMs;
this.tickMs = tickMs;
this.wheelSize = wheelSize;
this.endMs = startMs + tickMs * wheelSize;
this.queue = queue;
this.interval = tickMs * wheelSize;
this.buckets = Arrays.asList(new Bucket[wheelSize]);
this.layerNumber = layerNumber;
}

public void add(TimeElemImpl<T> t) {
long bucketNo = (t.timeOffset - startMs) / tickMs;
if (bucketNo > wheelSize - 1) {
addOverflow(t);
return;
}

Bucket<T> bucket = buckets.get((int) bucketNo);
if (bucket == null) {
boolean isBottom = layerNumber == 1;
if (isBottom) {
bucket = new RawBucket<>(queue);
} else {
long tickMs = this.tickMs / wheelSize;
long startMs = this.startMs + bucketNo * tickMs;
bucket = new WheelBucket<>(startMs, tickMs, wheelSize, layerNumber - 1, queue);
}
buckets.set((int) bucketNo, bucket);
}

bucket.add(t);
}

private void addOverflow(TimeElemImpl<T> t) {
if (overflowWheel == null) {
overflowWheel = new TimeWheel<>(startMs, interval, wheelSize, layerNumber + 1, queue);
}
overflowWheel.add(t);
}

public void poll(long timeOffset) {
if (timeOffset > endMs && overflowWheel != null) {
overflowWheel.poll(timeOffset);
return;
}

long length = (timeOffset - startMs) / tickMs;
if (length < currentPosition) {
return;
}

for (int i = currentPosition; i < length + 1; i++) {
Bucket<T> bucket = buckets.get(i);
if (bucket != null) {
bucket.poll(timeOffset);
}
}
currentPosition = (int) length;
}

public void remove(TimeElemImpl<T> obj) {
for (Bucket<T> bucket : buckets) {
if (bucket != null) {
bucket.remove(obj);
}
}
}

public boolean isEmpty() {
boolean isEmpty = true;
for (Bucket<T> bucket : buckets) {
if (bucket != null && !bucket.isEmpty()) {
isEmpty = false;
}
}
return isEmpty;
}
}
31 changes: 31 additions & 0 deletions base/src/main/java/vproxybase/util/time/impl2/WheelBucket.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package vproxybase.util.time.impl2;

import java.util.Queue;

public class WheelBucket<T> implements Bucket<T> {
private final TimeWheel<T> timeWheel;

public WheelBucket(long startMs, long tickMs, int wheelSize, int layerNumber, Queue<TimeElemImpl<T>> queue) {
this.timeWheel = new TimeWheel<T>(startMs, tickMs, wheelSize, layerNumber, queue);
}

@Override
public void add(TimeElemImpl<T> elem) {
timeWheel.add(elem);
}

@Override
public void remove(TimeElemImpl<T> elem) {
timeWheel.remove(elem);
}

@Override
public void poll(long timeOffset) {
timeWheel.poll(timeOffset);
}

@Override
public boolean isEmpty() {
return timeWheel.isEmpty();
}
}