diff --git a/base/src/main/java/vproxybase/util/time/impl2/Bucket.java b/base/src/main/java/vproxybase/util/time/impl2/Bucket.java new file mode 100644 index 000000000..42f8c207a --- /dev/null +++ b/base/src/main/java/vproxybase/util/time/impl2/Bucket.java @@ -0,0 +1,11 @@ +package vproxybase.util.time.impl2; + +public interface Bucket { + void add(TimeElemImpl elem); + + void remove(TimeElemImpl elem); + + void poll(long timeOffset); + + boolean isEmpty(); +} diff --git a/base/src/main/java/vproxybase/util/time/impl2/RawBucket.java b/base/src/main/java/vproxybase/util/time/impl2/RawBucket.java new file mode 100644 index 000000000..d17a0135d --- /dev/null +++ b/base/src/main/java/vproxybase/util/time/impl2/RawBucket.java @@ -0,0 +1,37 @@ +package vproxybase.util.time.impl2; + +import java.util.LinkedList; +import java.util.Queue; + +public class RawBucket implements Bucket { + private final LinkedList> list = new LinkedList<>(); + private final Queue> queue; + + public RawBucket(Queue> queue) { + this.queue = queue; + } + + @Override + public void add(TimeElemImpl elem) { + elem.bucket = this; + list.add(elem); + } + + @Override + public void remove(TimeElemImpl elem) { + if (elem.bucket == this) { + list.remove(elem); + } + } + + @Override + public void poll(long timeOffset) { + queue.addAll(list); + list.clear(); + } + + @Override + public boolean isEmpty() { + return list.isEmpty(); + } +} diff --git a/base/src/main/java/vproxybase/util/time/impl2/TimeElemImpl.java b/base/src/main/java/vproxybase/util/time/impl2/TimeElemImpl.java new file mode 100644 index 000000000..62740a830 --- /dev/null +++ b/base/src/main/java/vproxybase/util/time/impl2/TimeElemImpl.java @@ -0,0 +1,27 @@ +package vproxybase.util.time.impl2; + +import vproxybase.util.time.TimeElem; + +public class TimeElemImpl implements TimeElem { + Bucket bucket; + + final long timeOffset; + private final T value; + + public TimeElemImpl(long timeOffset, T value) { + this.timeOffset = timeOffset; + this.value = value; + } + + @Override + public T get() { + return value; + } + + @Override + public void removeSelf() { + if (bucket != null) { + bucket.remove(this); + } + } +} diff --git a/base/src/main/java/vproxybase/util/time/impl2/TimeQueueImpl.java b/base/src/main/java/vproxybase/util/time/impl2/TimeQueueImpl.java new file mode 100644 index 000000000..78621cf39 --- /dev/null +++ b/base/src/main/java/vproxybase/util/time/impl2/TimeQueueImpl.java @@ -0,0 +1,47 @@ +package vproxybase.util.time.impl2; + +import vproxybase.util.time.TimeQueue; + +import java.util.LinkedList; +import java.util.Queue; + +public class TimeQueueImpl implements TimeQueue { + private final Queue resultQueue = new LinkedList<>(); + private TimeWheel timeWheel; + private Long firstTimestamp; + private final Queue> queue = new LinkedList<>(); + + public TimeElemImpl add(long currentTimestamp, int timeout, T obj) { + if (firstTimestamp == null || timeWheel == null) { + firstTimestamp = currentTimestamp; + timeWheel = new TimeWheel<>(0, 1, 32, 1, queue); + } + TimeElemImpl elem = new TimeElemImpl<>(currentTimestamp - firstTimestamp + timeout, obj); + timeWheel.add(elem); + return elem; + } + + public T poll() { + if (firstTimestamp == null || timeWheel == null) { + return null; + } + long current = System.currentTimeMillis(); + long offset = current - firstTimestamp; + timeWheel.poll(offset); + return resultQueue.poll(); + } + + @Override + public boolean isEmpty() { + return timeWheel.isEmpty(); + } + + @Override + public int nextTime(long current) { + TimeElemImpl elem = queue.peek(); + if (elem == null) { + return Integer.MAX_VALUE; + } + return Math.max((int) (elem.timeOffset + firstTimestamp - current), 0); + } +} diff --git a/base/src/main/java/vproxybase/util/time/impl2/TimeWheel.java b/base/src/main/java/vproxybase/util/time/impl2/TimeWheel.java new file mode 100644 index 000000000..0c0235b0b --- /dev/null +++ b/base/src/main/java/vproxybase/util/time/impl2/TimeWheel.java @@ -0,0 +1,97 @@ +package vproxybase.util.time.impl2; + +import java.util.Arrays; +import java.util.List; +import java.util.Queue; + +public class TimeWheel { + private final long startMs; + private final long endMs; + private final long tickMs; + private final int wheelSize; + private final long interval; + private final List> buckets; + private final int layerNumber; + private TimeWheel overflowWheel; + private final Queue> queue; + private int currentPosition = 0; + + public TimeWheel(long startMs, long tickMs, int wheelSize, int layerNumber, Queue> queue) { + this.startMs = startMs; + this.tickMs = tickMs; + this.wheelSize = wheelSize; + this.endMs = startMs + tickMs * wheelSize; + this.queue = queue; + this.interval = tickMs * wheelSize; + this.buckets = Arrays.asList(new Bucket[wheelSize]); + this.layerNumber = layerNumber; + } + + public void add(TimeElemImpl t) { + long bucketNo = (t.timeOffset - startMs) / tickMs; + if (bucketNo > wheelSize - 1) { + addOverflow(t); + return; + } + + Bucket bucket = buckets.get((int) bucketNo); + if (bucket == null) { + boolean isBottom = layerNumber == 1; + if (isBottom) { + bucket = new RawBucket<>(queue); + } else { + long tickMs = this.tickMs / wheelSize; + long startMs = this.startMs + bucketNo * tickMs; + bucket = new WheelBucket<>(startMs, tickMs, wheelSize, layerNumber - 1, queue); + } + buckets.set((int) bucketNo, bucket); + } + + bucket.add(t); + } + + private void addOverflow(TimeElemImpl t) { + if (overflowWheel == null) { + overflowWheel = new TimeWheel<>(startMs, interval, wheelSize, layerNumber + 1, queue); + } + overflowWheel.add(t); + } + + public void poll(long timeOffset) { + if (timeOffset > endMs && overflowWheel != null) { + overflowWheel.poll(timeOffset); + return; + } + + long length = (timeOffset - startMs) / tickMs; + if (length < currentPosition) { + return; + } + + for (int i = currentPosition; i < length + 1; i++) { + Bucket bucket = buckets.get(i); + if (bucket != null) { + bucket.poll(timeOffset); + } + } + currentPosition = (int) length; + } + + public void remove(TimeElemImpl obj) { + for (Bucket bucket : buckets) { + if (bucket != null) { + bucket.remove(obj); + } + } + } + + public boolean isEmpty() { + boolean isEmpty = true; + for (Bucket bucket : buckets) { + if (bucket != null && !bucket.isEmpty()) { + isEmpty = false; + } + } + return isEmpty; + } +} diff --git a/base/src/main/java/vproxybase/util/time/impl2/WheelBucket.java b/base/src/main/java/vproxybase/util/time/impl2/WheelBucket.java new file mode 100644 index 000000000..60683e91d --- /dev/null +++ b/base/src/main/java/vproxybase/util/time/impl2/WheelBucket.java @@ -0,0 +1,31 @@ +package vproxybase.util.time.impl2; + +import java.util.Queue; + +public class WheelBucket implements Bucket { + private final TimeWheel timeWheel; + + public WheelBucket(long startMs, long tickMs, int wheelSize, int layerNumber, Queue> queue) { + this.timeWheel = new TimeWheel(startMs, tickMs, wheelSize, layerNumber, queue); + } + + @Override + public void add(TimeElemImpl elem) { + timeWheel.add(elem); + } + + @Override + public void remove(TimeElemImpl elem) { + timeWheel.remove(elem); + } + + @Override + public void poll(long timeOffset) { + timeWheel.poll(timeOffset); + } + + @Override + public boolean isEmpty() { + return timeWheel.isEmpty(); + } +}