diff --git a/base/src/main/java/vproxy/base/util/time/impl/TimeElemImpl.java b/base/src/main/java/vproxy/base/util/time/impl/TimeElemImpl.java index acb5b3fe8..1fa64add1 100644 --- a/base/src/main/java/vproxy/base/util/time/impl/TimeElemImpl.java +++ b/base/src/main/java/vproxy/base/util/time/impl/TimeElemImpl.java @@ -18,7 +18,8 @@ public T get() { return elem; } + @Override public void removeSelf() { - queue.queue.remove(this); + queue.remove(this); } } diff --git a/base/src/main/java/vproxy/base/util/time/impl/TimeQueueImpl.java b/base/src/main/java/vproxy/base/util/time/impl/TimeQueueImpl.java index 8ca67246c..e5ffda05d 100644 --- a/base/src/main/java/vproxy/base/util/time/impl/TimeQueueImpl.java +++ b/base/src/main/java/vproxy/base/util/time/impl/TimeQueueImpl.java @@ -3,37 +3,130 @@ import vproxy.base.util.time.TimeElem; import vproxy.base.util.time.TimeQueue; -import java.util.PriorityQueue; +import java.util.*; public class TimeQueueImpl implements TimeQueue { - PriorityQueue> queue = new PriorityQueue<>((a, b) -> (int) (a.triggerTime - b.triggerTime)); + private static final int TIME_WHEEL_LEVEL = 4; + private static final int MAX_TIME_WHEEL_INTERVAL = 1 << (TIME_WHEEL_LEVEL * TimeWheel.WHEEL_SIZE_POWER); + + private final PriorityQueue> queue = new PriorityQueue<>(Comparator.comparingLong(x -> x.triggerTime)); + + private final ArrayList> timeWheels; + + private long lastTickTimestamp; + + public TimeQueueImpl() { + this(System.currentTimeMillis()); + } + + public TimeQueueImpl(long currentTimestamp) { + this.timeWheels = new ArrayList<>(TIME_WHEEL_LEVEL); + for (int i = 0; i < TIME_WHEEL_LEVEL; i++) { + this.timeWheels.add(new TimeWheel<>(1 << (i * TimeWheel.WHEEL_SIZE_POWER), currentTimestamp)); + } + this.lastTickTimestamp = currentTimestamp; + } @Override public TimeElem add(long currentTimestamp, int timeout, T elem) { - TimeElemImpl event = new TimeElemImpl<>(currentTimestamp + timeout, elem, this); - queue.add(event); + final TimeElemImpl event = new TimeElemImpl<>(currentTimestamp + timeout, elem, this); + addTimeElem(event, currentTimestamp); return event; } + private void addTimeElem(TimeElemImpl event, long currentTimestamp) { + long timeout = event.triggerTime - currentTimestamp; + if (timeout >= MAX_TIME_WHEEL_INTERVAL) { + queue.add(event); + } + // already timeout task put into the lowest time wheel + else if (timeout <= 0) { + this.timeWheels.get(0).add(event, currentTimestamp); + } + // long timeout task put into queue + else { + var index = findTimeWheelIndex(timeout); + this.timeWheels.get(index).add(event, currentTimestamp); + } + } + @Override public T poll() { - TimeElemImpl elem = queue.poll(); - if (elem == null) + TimeElem elem = timeWheels.get(0).poll(); + if (elem == null) { return null; - return elem.elem; + } + return elem.get(); } @Override public boolean isEmpty() { - return queue.isEmpty(); + for (TimeWheel timeWheel : timeWheels) { + if (!timeWheel.isEmpty()) { + return false; + } + } + return true; } @Override public int nextTime(long currentTimestamp) { + tickTimeWheel(currentTimestamp); + for (TimeWheel timeWheel : timeWheels) { + if (timeWheel.isEmpty()) { + continue; + } + return timeWheel.nextTime(currentTimestamp); + } + TimeElemImpl elem = queue.peek(); - if (elem == null) + if (elem == null) { return Integer.MAX_VALUE; + } long triggerTime = elem.triggerTime; return Math.max((int) (triggerTime - currentTimestamp), 0); } + + private void tickTimeWheel(long currentTimestamp) { + for (int i = TIME_WHEEL_LEVEL - 1; i > 0; i--) { + final var wheel = timeWheels.get(i); + while (wheel.tryTick(currentTimestamp)) { + final Collection> events = wheel.tick(currentTimestamp); + for (TimeElemImpl event : events) { + addTimeElem(event, currentTimestamp); + } + } + } + + // move elements from queue to time wheels + while (!queue.isEmpty()) { + final TimeElemImpl elem = queue.peek(); + long timeout = elem.triggerTime - currentTimestamp; + if (timeout >= MAX_TIME_WHEEL_INTERVAL) { + break; + } + + addTimeElem(elem, currentTimestamp); + queue.poll(); + } + + this.lastTickTimestamp = currentTimestamp; + } + + public void remove(TimeElemImpl elem) { + long timeout = elem.triggerTime - this.lastTickTimestamp; + if (timeout >= MAX_TIME_WHEEL_INTERVAL) { + queue.remove(elem); + } else { + timeWheels.get(findTimeWheelIndex(timeout)).remove(elem); + } + } + + private static int findTimeWheelIndex(long timeout) { + if (timeout <= 0) { + return 0; + } + int bits = 63 - Long.numberOfLeadingZeros(timeout); + return bits / TimeWheel.WHEEL_SIZE_POWER; + } } diff --git a/base/src/main/java/vproxy/base/util/time/impl/TimeWheel.java b/base/src/main/java/vproxy/base/util/time/impl/TimeWheel.java new file mode 100644 index 000000000..be501617d --- /dev/null +++ b/base/src/main/java/vproxy/base/util/time/impl/TimeWheel.java @@ -0,0 +1,119 @@ +package vproxy.base.util.time.impl; + +import vproxy.base.util.time.TimeElem; + +import java.util.*; + +public class TimeWheel { + public static final int WHEEL_SIZE_POWER = 5; + public static final int WHEEL_SIZE = 1 << WHEEL_SIZE_POWER; + + private final PriorityQueue>[] slots = new PriorityQueue[WHEEL_SIZE]; + /** + * min time unit in this time wheel + */ + public final long tickDuration; + /** + * the time wheel max time interval. interval = tickDuration * WHEEL_SIZE + */ + public final long interval; + public final long startTimestamp; + private int tickIndex; + private long elemNum; + private long currentTime; + + public TimeWheel(long tickDuration, long timestamp) { + this.tickDuration = tickDuration; + this.interval = this.tickDuration * WHEEL_SIZE; + this.startTimestamp = timestamp; + this.currentTime = timestamp; + this.tickIndex = 0; + this.elemNum = 0; + + for (int i = 0; i < slots.length; i++) { + slots[i] = new PriorityQueue<>(Comparator.comparingLong(x -> x.triggerTime)); + } + } + + public void add(TimeElemImpl elem, long timestamp) { + if (elem.triggerTime <= timestamp) { + slots[tickIndex].add(elem); + } else { + slots[findSlotIndex(elem.triggerTime)].add(elem); + } + elemNum++; + } + + private int findSlotIndex(long timestamp) { + long timeout = timestamp - startTimestamp; + return (int) ((timeout & (interval - 1)) / tickDuration); + } + + /** + * return true if it can move. + */ + public boolean tryTick(long timestamp) { + return timestamp - currentTime >= tickDuration; + } + + /** + * move the tick index to point the next slot. + */ + public Collection> tick(long timestamp) { + if (!tryTick(timestamp)) { + return Collections.emptyList(); + } + + int oldIndex = tickIndex; + int nextIndex = (oldIndex + 1) & (WHEEL_SIZE - 1); + if (!slots[oldIndex].isEmpty()) { + slots[nextIndex].addAll(slots[oldIndex]); + } + this.tickIndex = nextIndex; + final PriorityQueue> queue = slots[tickIndex]; + slots[tickIndex] = new PriorityQueue<>(Comparator.comparingLong(x -> x.triggerTime)); + + elemNum -= queue.size(); + currentTime += tickDuration; + return queue; + } + + public TimeElem poll() { + var elem = slots[tickIndex].poll(); + if (elem != null) { + elemNum--; + } + return elem; + } + + public boolean isEmpty() { + return elemNum == 0; + } + + public long size() { + return elemNum; + } + + public int nextTime(long timestamp) { + for (int i = tickIndex; i < tickIndex + WHEEL_SIZE; i++) { + final int index = i & (WHEEL_SIZE - 1); + if (slots[index].isEmpty()) { + continue; + } + + long triggerTime = slots[index].peek().triggerTime; + int nextTime = Math.max((int) (triggerTime - timestamp), 0); + if (nextTime == 0 && index != tickIndex){ + slots[tickIndex].add(slots[index].poll()); + } + return nextTime; + } + return Integer.MAX_VALUE; + } + + public void remove(TimeElemImpl elem) { + if (slots[findSlotIndex(elem.triggerTime)].remove(elem)) { + elemNum--; + } + } +}