Skip to content

Commit

Permalink
Added statistics support (#11)
Browse files Browse the repository at this point in the history
* Added statistics support

* added delegating stats collector

* Added error counts to stats

* Added noop version of collector

* Made it doubles and added toMap

* javadoc warning

* batch load count is now the number of objects loaded via the batch function and batch invoke count the number of invocations of the batch function

* readme updates

* Made Statistics a class
  • Loading branch information
bbakerman authored Sep 20, 2017
1 parent 25047ce commit aa2994e
Show file tree
Hide file tree
Showing 17 changed files with 1,096 additions and 13 deletions.
32 changes: 32 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,38 @@ In some circumstances you may wish to clear the cache for these individual probl
});
```


## Statistics on what is happening

`DataLoader` keeps statistics on what is happening. It can tell you the number of objects asked for, the cache hit number, the number of objects
asked for via batching and so on.

Knowing what the behaviour of your data is important for you to understand how efficient you are in serving the data via this pattern.


```java
Statistics statistics = userDataLoader.getStatistics();

System.out.println(format("load : %d", statistics.getLoadCount()));
System.out.println(format("batch load: %d", statistics.getBatchLoadCount()));
System.out.println(format("cache hit: %d", statistics.getCacheHitCount()));
System.out.println(format("cache hit ratio: %d", statistics.getCacheHitRatio()));

```

`DataLoaderRegistry` can also roll up the statistics for all data loaders inside it.

You can configure the statistics collector used when you build the data loader

```java
DataLoaderOptions options = DataLoaderOptions.newOptions().setStatisticsCollector(() -> new ThreadLocalStatisticsCollector());
DataLoader<String,User> userDataLoader = DataLoader.newDataLoader(userBatchLoader,options);

```

Which collector you use is up to you. It ships with the following: `SimpleStatisticsCollector`, `ThreadLocalStatisticsCollector`, `DelegatingStatisticsCollector`
and `NoOpStatisticsCollector`.

## The scope of a data loader is important

If you are serving web requests then the data can be specific to the user requesting it. If you have user specific data
Expand Down
47 changes: 37 additions & 10 deletions src/main/java/org/dataloader/DataLoader.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@
package org.dataloader;

import org.dataloader.impl.CompletableFutureKit;
import org.dataloader.stats.Statistics;
import org.dataloader.stats.StatisticsCollector;

import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.stream.Collectors;

import static java.util.Collections.emptyList;
Expand Down Expand Up @@ -64,6 +67,7 @@ public class DataLoader<K, V> {
private final DataLoaderOptions loaderOptions;
private final CacheMap<Object, CompletableFuture<V>> futureCache;
private final Map<K, CompletableFuture<V>> loaderQueue;
private final StatisticsCollector stats;

/**
* Creates new DataLoader with the specified batch loader function and default options
Expand Down Expand Up @@ -153,6 +157,7 @@ public DataLoader(BatchLoader<K, V> batchLoadFunction, DataLoaderOptions options
this.futureCache = determineCacheMap(loaderOptions);
// order of keys matter in data loader
this.loaderQueue = new LinkedHashMap<>();
this.stats = nonNull(this.loaderOptions.getStatisticsCollector());
}

@SuppressWarnings("unchecked")
Expand All @@ -173,8 +178,11 @@ private CacheMap<Object, CompletableFuture<V>> determineCacheMap(DataLoaderOptio
*/
public CompletableFuture<V> load(K key) {
Object cacheKey = getCacheKey(nonNull(key));
stats.incrementLoadCount();

synchronized (futureCache) {
if (loaderOptions.cachingEnabled() && futureCache.containsKey(cacheKey)) {
stats.incrementCacheHitCount();
return futureCache.get(cacheKey);
}
}
Expand All @@ -185,6 +193,7 @@ public CompletableFuture<V> load(K key) {
loaderQueue.put(key, future);
}
} else {
stats.incrementBatchLoadCountBy(1);
// immediate execution of batch function
CompletableFuture<List<V>> batchedLoad = batchLoadFunction
.load(singletonList(key))
Expand Down Expand Up @@ -291,7 +300,14 @@ private CompletableFuture<List<V>> sliceIntoBatchesOfBatches(List<K> keys, List<

@SuppressWarnings("unchecked")
private CompletableFuture<List<V>> dispatchQueueBatch(List<K> keys, List<CompletableFuture<V>> queuedFutures) {
return batchLoadFunction.load(keys)
stats.incrementBatchLoadCountBy(keys.size());
CompletionStage<List<V>> batchLoad;
try {
batchLoad = nonNull(batchLoadFunction.load(keys), "Your batch loader function MUST return a non null CompletionStage promise");
} catch (Exception e) {
batchLoad = CompletableFutureKit.failedFuture(e);
}
return batchLoad
.toCompletableFuture()
.thenApply(values -> {
assertState(keys.size() == values.size(), "The size of the promised values MUST be the same size as the key list");
Expand All @@ -300,20 +316,28 @@ private CompletableFuture<List<V>> dispatchQueueBatch(List<K> keys, List<Complet
Object value = values.get(idx);
CompletableFuture<V> future = queuedFutures.get(idx);
if (value instanceof Throwable) {
stats.incrementLoadErrorCount();
future.completeExceptionally((Throwable) value);
// we don't clear the cached view of this entry to avoid
// frequently loading the same error
} else if (value instanceof Try) {
// we allow the batch loader to return a Try so we can better represent a computation
// that might have worked or not.
handleTry((Try<V>) value, future);
Try<V> tryValue = (Try<V>) value;
if (tryValue.isSuccess()) {
future.complete(tryValue.get());
} else {
stats.incrementLoadErrorCount();
future.completeExceptionally(tryValue.getThrowable());
}
} else {
V val = (V) value;
future.complete(val);
}
}
return values;
}).exceptionally(ex -> {
stats.incrementBatchLoadExceptionCount();
for (int idx = 0; idx < queuedFutures.size(); idx++) {
K key = keys.get(idx);
CompletableFuture<V> future = queuedFutures.get(idx);
Expand All @@ -325,14 +349,6 @@ private CompletableFuture<List<V>> dispatchQueueBatch(List<K> keys, List<Complet
});
}

private void handleTry(Try<V> vTry, CompletableFuture<V> future) {
if (vTry.isSuccess()) {
future.complete(vTry.get());
} else {
future.completeExceptionally(vTry.getThrowable());
}
}

/**
* Normally {@link #dispatch()} is an asynchronous operation but this version will 'join' on the
* results if dispatch and wait for them to complete. If the {@link CompletableFuture} callbacks make more
Expand Down Expand Up @@ -441,4 +457,15 @@ public Object getCacheKey(K key) {
return loaderOptions.cacheKeyFunction().isPresent() ?
loaderOptions.cacheKeyFunction().get().getKey(key) : key;
}

/**
* Gets the statistics associated with this data loader. These will have been gather via
* the {@link org.dataloader.stats.StatisticsCollector} passed in via {@link DataLoaderOptions#getStatisticsCollector()}
*
* @return statistics for this data loader
*/
public Statistics getStatistics() {
return stats.getStatistics();
}

}
34 changes: 32 additions & 2 deletions src/main/java/org/dataloader/DataLoaderOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,13 @@

package org.dataloader;

import org.dataloader.impl.Assertions;
import org.dataloader.stats.SimpleStatisticsCollector;
import org.dataloader.stats.StatisticsCollector;

import java.util.Optional;
import java.util.function.Supplier;

import static org.dataloader.impl.Assertions.nonNull;

/**
* Configuration options for {@link DataLoader} instances.
Expand All @@ -32,6 +36,7 @@ public class DataLoaderOptions {
private CacheKey cacheKeyFunction;
private CacheMap cacheMap;
private int maxBatchSize;
private Supplier<StatisticsCollector> statisticsCollector;

/**
* Creates a new data loader options with default settings.
Expand All @@ -40,6 +45,7 @@ public DataLoaderOptions() {
batchingEnabled = true;
cachingEnabled = true;
maxBatchSize = -1;
statisticsCollector = SimpleStatisticsCollector::new;
}

/**
Expand All @@ -48,12 +54,13 @@ public DataLoaderOptions() {
* @param other the other options instance
*/
public DataLoaderOptions(DataLoaderOptions other) {
Assertions.nonNull(other);
nonNull(other);
this.batchingEnabled = other.batchingEnabled;
this.cachingEnabled = other.cachingEnabled;
this.cacheKeyFunction = other.cacheKeyFunction;
this.cacheMap = other.cacheMap;
this.maxBatchSize = other.maxBatchSize;
this.statisticsCollector = other.statisticsCollector;
}

/**
Expand Down Expand Up @@ -173,4 +180,27 @@ public DataLoaderOptions setMaxBatchSize(int maxBatchSize) {
this.maxBatchSize = maxBatchSize;
return this;
}

/**
* @return the statistics collector to use with these options
*/
public StatisticsCollector getStatisticsCollector() {
return nonNull(this.statisticsCollector.get());
}

/**
* Sets the statistics collector supplier that will be used with these data loader options. Since it uses
* the supplier pattern, you can create a new statistics collector on each call or you can reuse
* a common value
*
* @param statisticsCollector the statistics collector to use
*
* @return the data loader options for fluent coding
*/
public DataLoaderOptions setStatisticsCollector(Supplier<StatisticsCollector> statisticsCollector) {
this.statisticsCollector = nonNull(statisticsCollector);
return this;
}


}
14 changes: 14 additions & 0 deletions src/main/java/org/dataloader/DataLoaderRegistry.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package org.dataloader;

import org.dataloader.stats.Statistics;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -91,4 +93,16 @@ public Set<String> getKeys() {
public void dispatchAll() {
getDataLoaders().forEach(DataLoader::dispatch);
}

/**
* @return a combined set of statistics for all data loaders in this registry presented
* as the sum of all their statistics
*/
public Statistics getStatistics() {
Statistics stats = new Statistics();
for (DataLoader<?, ?> dataLoader : dataLoaders.values()) {
stats = stats.combine(dataLoader.getStatistics());
}
return stats;
}
}
4 changes: 4 additions & 0 deletions src/main/java/org/dataloader/impl/Assertions.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ public static <T> T nonNull(T t) {
return Objects.requireNonNull(t, "nonNull object required");
}

public static <T> T nonNull(T t, String message) {
return Objects.requireNonNull(t, message);
}

private static class AssertionException extends IllegalStateException {
public AssertionException(String message) {
super(message);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package org.dataloader.stats;

import static org.dataloader.impl.Assertions.nonNull;

/**
* This statistics collector keeps dataloader statistics AND also calls the delegate
* collector at the same time. This allows you to keep a specific set of statistics
* and also delegate the calls onto another collector.
*/
public class DelegatingStatisticsCollector implements StatisticsCollector {

private final StatisticsCollector collector = new SimpleStatisticsCollector();
private final StatisticsCollector delegateCollector;

/**
* @param delegateCollector a non null delegate collector
*/
public DelegatingStatisticsCollector(StatisticsCollector delegateCollector) {
this.delegateCollector = nonNull(delegateCollector);
}

@Override
public long incrementLoadCount() {
delegateCollector.incrementLoadCount();
return collector.incrementLoadCount();
}

@Override
public long incrementBatchLoadCountBy(long delta) {
delegateCollector.incrementBatchLoadCountBy(delta);
return collector.incrementBatchLoadCountBy(delta);
}

@Override
public long incrementCacheHitCount() {
delegateCollector.incrementCacheHitCount();
return collector.incrementCacheHitCount();
}

@Override
public long incrementLoadErrorCount() {
delegateCollector.incrementLoadErrorCount();
return collector.incrementLoadErrorCount();
}

@Override
public long incrementBatchLoadExceptionCount() {
delegateCollector.incrementBatchLoadExceptionCount();
return collector.incrementBatchLoadExceptionCount();
}

/**
* @return the statistics of the this collector (and not its delegate)
*/
@Override
public Statistics getStatistics() {
return collector.getStatistics();
}

/**
* @return the statistics of the delegate
*/
public Statistics getDelegateStatistics() {
return delegateCollector.getStatistics();
}

}
39 changes: 39 additions & 0 deletions src/main/java/org/dataloader/stats/NoOpStatisticsCollector.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package org.dataloader.stats;

/**
* A statistics collector that does nothing
*/
public class NoOpStatisticsCollector implements StatisticsCollector {

private static final Statistics ZERO_STATS = new Statistics();

@Override
public long incrementLoadCount() {
return 0;
}

@Override
public long incrementLoadErrorCount() {
return 0;
}

@Override
public long incrementBatchLoadCountBy(long delta) {
return 0;
}

@Override
public long incrementBatchLoadExceptionCount() {
return 0;
}

@Override
public long incrementCacheHitCount() {
return 0;
}

@Override
public Statistics getStatistics() {
return ZERO_STATS;
}
}
Loading

0 comments on commit aa2994e

Please sign in to comment.