Skip to content

Commit

Permalink
make a time event abstraction
Browse files Browse the repository at this point in the history
  • Loading branch information
wkgcass committed Jan 6, 2021
1 parent 1843264 commit 74ef345
Show file tree
Hide file tree
Showing 9 changed files with 105 additions and 61 deletions.
1 change: 1 addition & 0 deletions base/src/main/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
exports vproxybase.util.promise;
exports vproxybase.util.exception;
exports vproxybase.util.table;
exports vproxybase.util.time;
exports vproxybase.util.objectpool;
exports vproxybase.processor;
exports vproxybase.processor.http1;
Expand Down
11 changes: 6 additions & 5 deletions base/src/main/java/vproxybase/selector/SelectorEventLoop.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import vproxybase.selector.wrap.WrappedSelector;
import vproxybase.util.*;
import vproxybase.util.promise.Promise;
import vproxybase.util.time.TimeQueue;

import java.io.IOException;
import java.nio.channels.CancelledKeyException;
Expand Down Expand Up @@ -48,7 +49,7 @@ public static SelectorEventLoop current() {

public final WrappedSelector selector;
public final FDs fds;
private final TimeQueue<Runnable> timeQueue = new TimeQueue<>();
private final TimeQueue<Runnable> timeQueue = TimeQueue.create();
private final ConcurrentLinkedQueue<Runnable> runOnLoopEvents = new ConcurrentLinkedQueue<>();

private final Lock channelRegisteringLock = Lock.create();
Expand Down Expand Up @@ -156,8 +157,8 @@ private void handleRunOnLoopEvents() {

private void handleTimeEvents() {
List<Runnable> toRun = new LinkedList<>();
while (timeQueue.nextTime() == 0) {
Runnable r = timeQueue.pop();
while (timeQueue.nextTime(Config.currentTimestamp) == 0) {
Runnable r = timeQueue.poll();
toRun.add(r);
}
for (Runnable r : toRun) {
Expand Down Expand Up @@ -289,7 +290,7 @@ public int onePoll() {
} else if (!channelsToBeRegisteredStep1.isEmpty() || !channelsToBeRegisteredStep2.isEmpty()) {
selected = selector.selectNow(); // immediately return when channels are going to be registered
} else {
int time = timeQueue.nextTime();
int time = timeQueue.nextTime(Config.currentTimestamp);
if (time == 0) {
selected = selector.selectNow(); // immediately return
} else {
Expand Down Expand Up @@ -391,7 +392,7 @@ public TimerEvent delay(int timeout, Runnable r) {
TimerEvent e = new TimerEvent(this);
// timeQueue is not thread safe
// modify it in the event loop's thread
nextTick(() -> e.setEvent(timeQueue.push(timeout, r)));
nextTick(() -> e.setEvent(timeQueue.add(Config.currentTimestamp, timeout, r)));
return e;
}

Expand Down
2 changes: 1 addition & 1 deletion base/src/main/java/vproxybase/selector/TimerEvent.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package vproxybase.selector;

import vproxybase.util.ThreadSafe;
import vproxybase.util.TimeElem;
import vproxybase.util.time.TimeElem;

public class TimerEvent {
private TimeElem event;
Expand Down
18 changes: 0 additions & 18 deletions base/src/main/java/vproxybase/util/TimeElem.java

This file was deleted.

37 changes: 0 additions & 37 deletions base/src/main/java/vproxybase/util/TimeQueue.java

This file was deleted.

10 changes: 10 additions & 0 deletions base/src/main/java/vproxybase/util/time/TimeElem.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package vproxybase.util.time;

public interface TimeElem<T> {
T get();

/**
* this method should always be called on the event loop
*/
void removeSelf();
}
24 changes: 24 additions & 0 deletions base/src/main/java/vproxybase/util/time/TimeQueue.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package vproxybase.util.time;

import vproxybase.util.time.impl.TimeQueueImpl;

public interface TimeQueue<T> {
static <T> TimeQueue<T> create() {
return new TimeQueueImpl<>();
}

TimeElem<T> add(long current, int timeout, T elem);

/**
* @return element of the nearest timeout event, or null if no elements. this method ignores whether the event is timed-out.
*/
T poll();

boolean isEmpty();

/**
* @param current current timestamp millis
* @return time left to the nearest timeout, or 0 if the timeout event triggers, must not < 0, Integer.MAX_VALUE means no timer event
*/
int nextTime(long current);
}
24 changes: 24 additions & 0 deletions base/src/main/java/vproxybase/util/time/impl/TimeElemImpl.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package vproxybase.util.time.impl;

import vproxybase.util.time.TimeElem;

public class TimeElemImpl<T> implements TimeElem<T> {
public final long triggerTime;
public final T elem;
private final TimeQueueImpl<T> queue;

TimeElemImpl(long triggerTime, T elem, TimeQueueImpl<T> queue) {
this.triggerTime = triggerTime;
this.elem = elem;
this.queue = queue;
}

@Override
public T get() {
return elem;
}

public void removeSelf() {
queue.queue.remove(this);
}
}
39 changes: 39 additions & 0 deletions base/src/main/java/vproxybase/util/time/impl/TimeQueueImpl.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package vproxybase.util.time.impl;

import vproxybase.util.time.TimeElem;
import vproxybase.util.time.TimeQueue;

import java.util.PriorityQueue;

public class TimeQueueImpl<T> implements TimeQueue<T> {
PriorityQueue<TimeElemImpl<T>> queue = new PriorityQueue<>((a, b) -> (int) (a.triggerTime - b.triggerTime));

@Override
public TimeElem<T> add(long currentTimestamp, int timeout, T elem) {
TimeElemImpl<T> event = new TimeElemImpl<>(currentTimestamp + timeout, elem, this);
queue.add(event);
return event;
}

@Override
public T poll() {
TimeElemImpl<T> elem = queue.poll();
if (elem == null)
return null;
return elem.elem;
}

@Override
public boolean isEmpty() {
return queue.isEmpty();
}

@Override
public int nextTime(long currentTimestamp) {
TimeElemImpl<T> elem = queue.peek();
if (elem == null)
return Integer.MAX_VALUE;
long triggerTime = elem.triggerTime;
return Math.max((int) (triggerTime - currentTimestamp), 0);
}
}

0 comments on commit 74ef345

Please sign in to comment.