Skip to content

Commit

Permalink
feat: add time wheel
Browse files Browse the repository at this point in the history
  • Loading branch information
nintha committed Feb 20, 2021
1 parent 5ff73ae commit 2b98437
Show file tree
Hide file tree
Showing 3 changed files with 223 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ public T get() {
return elem;
}

@Override
public void removeSelf() {
queue.queue.remove(this);
queue.remove(this);
}
}
111 changes: 102 additions & 9 deletions base/src/main/java/vproxybase/util/time/impl/TimeQueueImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,37 +3,130 @@
import vproxybase.util.time.TimeElem;
import vproxybase.util.time.TimeQueue;

import java.util.PriorityQueue;
import java.util.*;

public class TimeQueueImpl<T> implements TimeQueue<T> {
PriorityQueue<TimeElemImpl<T>> queue = new PriorityQueue<>((a, b) -> (int) (a.triggerTime - b.triggerTime));
private static final int TIME_WHEEL_LEVEL = 4;
private static final int MAX_TIME_WHEEL_INTERVAL = 1 << (TIME_WHEEL_LEVEL * TimeWheel.WHEEL_SIZE_POWER);

private final PriorityQueue<TimeElemImpl<T>> queue = new PriorityQueue<>(Comparator.comparingLong(x -> x.triggerTime));

private final ArrayList<TimeWheel<T>> timeWheels;

private long lastTickTimestamp;

public TimeQueueImpl() {
this(System.currentTimeMillis());
}

public TimeQueueImpl(long currentTimestamp) {
this.timeWheels = new ArrayList<>(TIME_WHEEL_LEVEL);
for (int i = 0; i < TIME_WHEEL_LEVEL; i++) {
this.timeWheels.add(new TimeWheel<>(1 << (i * TimeWheel.WHEEL_SIZE_POWER), currentTimestamp));
}
this.lastTickTimestamp = currentTimestamp;
}

@Override
public TimeElem<T> add(long currentTimestamp, int timeout, T elem) {
TimeElemImpl<T> event = new TimeElemImpl<>(currentTimestamp + timeout, elem, this);
queue.add(event);
final TimeElemImpl<T> event = new TimeElemImpl<>(currentTimestamp + timeout, elem, this);
addTimeElem(event, currentTimestamp);
return event;
}

private void addTimeElem(TimeElemImpl<T> event, long currentTimestamp) {
long timeout = event.triggerTime - currentTimestamp;
if (timeout >= MAX_TIME_WHEEL_INTERVAL) {
queue.add(event);
}
// already timeout task put into the lowest time wheel
else if (timeout <= 0) {
this.timeWheels.get(0).add(event, currentTimestamp);
}
// long timeout task put into queue
else {
var index = findTimeWheelIndex(timeout);
this.timeWheels.get(index).add(event, currentTimestamp);
}
}

@Override
public T poll() {
TimeElemImpl<T> elem = queue.poll();
if (elem == null)
TimeElem<T> elem = timeWheels.get(0).poll();
if (elem == null) {
return null;
return elem.elem;
}
return elem.get();
}

@Override
public boolean isEmpty() {
return queue.isEmpty();
for (TimeWheel<T> timeWheel : timeWheels) {
if (!timeWheel.isEmpty()) {
return false;
}
}
return true;
}

@Override
public int nextTime(long currentTimestamp) {
forward(currentTimestamp);
for (TimeWheel<T> timeWheel : timeWheels) {
if (timeWheel.isEmpty()) {
continue;
}
return timeWheel.nextTime(currentTimestamp);
}

TimeElemImpl<T> elem = queue.peek();
if (elem == null)
if (elem == null) {
return Integer.MAX_VALUE;
}
long triggerTime = elem.triggerTime;
return Math.max((int) (triggerTime - currentTimestamp), 0);
}

private void forward(long currentTimestamp) {
for (int i = TIME_WHEEL_LEVEL - 1; i > 0; i--) {
final var wheel = timeWheels.get(i);
while (wheel.tryTick(currentTimestamp)) {
final Collection<TimeElemImpl<T>> events = wheel.tick(currentTimestamp);
for (TimeElemImpl<T> event : events) {
addTimeElem(event, currentTimestamp);
}
}
}

// move elements from queue to time wheels
while (!queue.isEmpty()) {
final TimeElemImpl<T> elem = queue.peek();
long timeout = elem.triggerTime - currentTimestamp;
if (timeout >= MAX_TIME_WHEEL_INTERVAL) {
break;
}

addTimeElem(elem, currentTimestamp);
queue.poll();
}

this.lastTickTimestamp = currentTimestamp;
}

public void remove(TimeElemImpl<T> elem) {
long timeout = elem.triggerTime - this.lastTickTimestamp;
if (timeout >= MAX_TIME_WHEEL_INTERVAL) {
queue.remove(elem);
} else {
timeWheels.get(findTimeWheelIndex(timeout)).remove(elem);
}
}

private static int findTimeWheelIndex(long timeout) {
if (timeout <= 0) {
return 0;
}
int bits = 63 - Long.numberOfLeadingZeros(timeout);
return bits / TimeWheel.WHEEL_SIZE_POWER;
}
}
119 changes: 119 additions & 0 deletions base/src/main/java/vproxybase/util/time/impl/TimeWheel.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package vproxybase.util.time.impl;

import vproxybase.util.time.TimeElem;

import java.util.*;

public class TimeWheel<T> {
public static final int WHEEL_SIZE_POWER = 5;
public static final int WHEEL_SIZE = 1 << WHEEL_SIZE_POWER;

private final PriorityQueue<TimeElemImpl<T>>[] slots = new PriorityQueue[WHEEL_SIZE];
/**
* min time unit in this time wheel
*/
public final long tickDuration;
/**
* the time wheel max time interval. interval = tickDuration * WHEEL_SIZE
*/
public final long interval;
public final long startTimestamp;
private int tickIndex;
private long elemNum;
private long currentTime;

public TimeWheel(long tickDuration, long timestamp) {
this.tickDuration = tickDuration;
this.interval = this.tickDuration * WHEEL_SIZE;
this.startTimestamp = timestamp;
this.currentTime = timestamp;
this.tickIndex = 0;
this.elemNum = 0;

for (int i = 0; i < slots.length; i++) {
slots[i] = new PriorityQueue<>(Comparator.comparingLong(x -> x.triggerTime));
}
}

public void add(TimeElemImpl<T> elem, long timestamp) {
if (elem.triggerTime <= timestamp) {
slots[tickIndex].add(elem);
} else {
slots[findSlotIndex(elem.triggerTime)].add(elem);
}
elemNum++;
}

private int findSlotIndex(long timestamp) {
long timeout = timestamp - startTimestamp;
return (int) ((timeout & (interval - 1)) / tickDuration);
}

/**
* return true if it can move.
*/
public boolean tryTick(long timestamp) {
return timestamp - currentTime >= tickDuration;
}

/**
* move the tick index to point the next slot.
*/
public Collection<TimeElemImpl<T>> tick(long timestamp) {
if (!tryTick(timestamp)) {
return Collections.emptyList();
}

int oldIndex = tickIndex;
int nextIndex = (oldIndex + 1) & (WHEEL_SIZE - 1);
if (!slots[oldIndex].isEmpty()) {
slots[nextIndex].addAll(slots[oldIndex]);
}
this.tickIndex = nextIndex;
final PriorityQueue<TimeElemImpl<T>> queue = slots[tickIndex];
slots[tickIndex] = new PriorityQueue<>(Comparator.comparingLong(x -> x.triggerTime));

elemNum -= queue.size();
currentTime += tickDuration;
return queue;
}

public TimeElem<T> poll() {
var elem = slots[tickIndex].poll();
if (elem != null) {
elemNum--;
}
return elem;
}

public boolean isEmpty() {
return elemNum == 0;
}

public long size() {
return elemNum;
}

public int nextTime(long timestamp) {
for (int i = tickIndex; i < tickIndex + WHEEL_SIZE; i++) {
final int index = i & (WHEEL_SIZE - 1);
if (slots[index].isEmpty()) {
continue;
}

long triggerTime = slots[index].peek().triggerTime;
int nextTime = Math.max((int) (triggerTime - timestamp), 0);
if (nextTime == 0 && index != tickIndex){
slots[tickIndex].add(slots[index].poll());
}
return nextTime;
}
return Integer.MAX_VALUE;
}

public void remove(TimeElemImpl<T> elem) {
if (slots[findSlotIndex(elem.triggerTime)].remove(elem)) {
elemNum--;
}
}
}

0 comments on commit 2b98437

Please sign in to comment.