Skip to content

Commit

Permalink
javadoc + better pool management + pool decorator for debug
Browse files Browse the repository at this point in the history
  • Loading branch information
mariofusco committed Aug 1, 2023
1 parent 9553cd8 commit 3a0f9e5
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -193,15 +193,19 @@ protected int charBufferLength(int ix) {
protected char[] calloc(int size) { return new char[size]; }

BufferRecycler withPool(ObjectPool<BufferRecycler> pool) {
if (this._pool != null) {
throw new IllegalStateException();
}
this._pool = pool;
return this;
}

@Override
public void close() {
if (_pool != null) {
_pool.release(this);
ObjectPool<BufferRecycler> tempPool = _pool;
_pool = null;
tempPool.release(this);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@

public class BufferRecyclerPool {

private static final ObjectPool<BufferRecycler> pool = ObjectPool.newObjectPool(new BufferRecycler()::withPool);
private static final ObjectPool<BufferRecycler> pool = ObjectPool.newObjectPool(BufferRecycler::new);

public static BufferRecycler acquireBufferRecycler() {
return pool.acquire();
return pool.acquire().withPool(pool);
}

public static void releaseBufferRecycler(BufferRecycler bufferRecycler) {
Expand Down
91 changes: 75 additions & 16 deletions src/main/java/com/fasterxml/jackson/core/util/ObjectPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,19 @@
import java.util.Deque;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Consumer;
import java.util.function.Function;

import java.util.function.Supplier;

/**
* This is a utility class, whose main functionality is pooling object
* with a huge memory footprint and that are costly to be recreated at
* every usage like the {@link BufferRecycler}. It is intended for
* internal use only.
*
* @since 2.16
*/
public interface ObjectPool<T> extends AutoCloseable {

T acquire();
Expand All @@ -21,7 +31,22 @@ default void withPooledObject(Consumer<T> objectConsumer) {
}

enum Strategy {
CONCURRENT_DEQUEUE, LOCK_FREE
CONCURRENT_DEQUEUE(ConcurrentDequePool::new, false), LOCK_FREE(LockFreePool::new, false),
DEBUG_CONCURRENT_DEQUEUE(ConcurrentDequePool::new, true), DEBUG_LOCK_FREE(LockFreePool::new, true);

private final Function<Supplier, ObjectPool> constructor;

private final boolean debug;

Strategy(Function<Supplier, ObjectPool> constructor, boolean debug) {
this.constructor = constructor;
this.debug = debug;
}

<T> ObjectPool<T> newObjectPool(Supplier<T> factory) {
ObjectPool<T> pool = constructor.apply(factory);
return debug ? new DebugPoolDecorator<>(pool) : pool;
}
}

class StrategyHolder {
Expand All @@ -32,33 +57,29 @@ public static void setStrategy(String name) {
}
}

static <T> ObjectPool<T> newObjectPool(Function<ObjectPool<T>, T> factory) {
switch (StrategyHolder.strategy) {
case CONCURRENT_DEQUEUE: return new ConcurrentDequePool<>(factory);
case LOCK_FREE: return new LockFreePool<>(factory);
}
throw new UnsupportedOperationException();
static <T> ObjectPool<T> newObjectPool(Supplier<T> factory) {
return StrategyHolder.strategy.newObjectPool(factory);
}

class ConcurrentDequePool<T> implements ObjectPool<T> {
private final Function<ObjectPool<T>, T> factory;
private final Supplier<T> factory;
private final Consumer<T> destroyer;

private final Deque<T> pool = new ConcurrentLinkedDeque<>();

public ConcurrentDequePool(Function<ObjectPool<T>, T> factory) {
public ConcurrentDequePool(Supplier<T> factory) {
this(factory, null);
}

public ConcurrentDequePool(Function<ObjectPool<T>, T> factory, Consumer<T> destroyer) {
public ConcurrentDequePool(Supplier<T> factory, Consumer<T> destroyer) {
this.factory = factory;
this.destroyer = destroyer;
}

@Override
public T acquire() {
T t = pool.pollFirst();
return t != null ? t : factory.apply(this);
return t != null ? t : factory.get();
}

@Override
Expand All @@ -77,9 +98,9 @@ public void close() throws Exception {
class LockFreePool<T> implements ObjectPool<T> {
private final AtomicReference<Node<T>> head = new AtomicReference<>();

private final Function<ObjectPool<T>, T> factory;
private final Supplier<T> factory;

public LockFreePool(Function<ObjectPool<T>, T> factory) {
public LockFreePool(Supplier<T> factory) {
this.factory = factory;
}

Expand All @@ -88,14 +109,14 @@ public T acquire() {
for (int i = 0; i < 3; i++) {
Node<T> currentHead = head.get();
if (currentHead == null) {
return factory.apply(this);
return factory.get();
}
if (head.compareAndSet(currentHead, currentHead.next)) {
currentHead.next = null;
return currentHead.value;
}
}
return factory.apply(this);
return factory.get();
}

@Override
Expand Down Expand Up @@ -123,4 +144,42 @@ static class Node<T> {
}
}
}

class DebugPoolDecorator<T> implements ObjectPool<T> {

private final ObjectPool<T> pool;

private final LongAdder acquireCounter = new LongAdder();
private final LongAdder releaseCounter = new LongAdder();

public DebugPoolDecorator(ObjectPool<T> pool) {
this.pool = pool;
}

@Override
public T acquire() {
acquireCounter.increment();
return pool.acquire();
}

@Override
public void release(T t) {
releaseCounter.increment();
pool.release(t);
}

@Override
public void close() throws Exception {
System.out.println("Closing " + this);
pool.close();
}

@Override
public String toString() {
return "DebugPoolDecorator{" +
"acquires = " + acquireCounter.sum() +
", releases = " + releaseCounter.sum() +
'}';
}
}
}

0 comments on commit 3a0f9e5

Please sign in to comment.