Skip to content

Commit

Permalink
Add the TSDB.OperationMode to track the mode of the tsd, adding a write-
Browse files Browse the repository at this point in the history
only mode.
Add the option to have a Guava LRU cache for UIDs to limit the size of
UID caches.
Add an option to only write the one or the other of the UID caches
if the TSD is in read-only or write-only mode.

Signed-off-by: Chris Larsen <[email protected]>
  • Loading branch information
manolama committed Oct 8, 2017
1 parent 607f7d8 commit bbb687a
Show file tree
Hide file tree
Showing 4 changed files with 444 additions and 35 deletions.
27 changes: 27 additions & 0 deletions src/core/TSDB.java
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,19 @@ public final class TSDB {
private static short TAG_VALUE_WIDTH = 3;
private static final int MIN_HISTOGRAM_BYTES = 1;

/** The operation mode (role) of the TSD. */
public enum OperationMode {
READWRITE,
READONLY,
WRITEONLY
}

/** Client for the HBase cluster to use. */
final HBaseClient client;

/** The operation mode (role) of the TSD. */
final OperationMode mode;

/** Name of the table in which timeseries are stored. */
final byte[] table;
/** Name of the table in which UID information is stored. */
Expand Down Expand Up @@ -218,6 +228,17 @@ public TSDB(final HBaseClient client, final Config config) {
this.client = client;
}

String string_mode = config.getString("tsd.mode");
if (Strings.isNullOrEmpty(string_mode)) {
mode = OperationMode.READWRITE;
} else if (string_mode.toLowerCase().equals("ro")) {
mode = OperationMode.READONLY;
} else if (string_mode.toLowerCase().equals("wo")) {
mode = OperationMode.WRITEONLY;
} else {
mode = OperationMode.READWRITE;
}

// SALT AND UID WIDTHS
// Users really wanted this to be set via config instead of having to
// compile. Hopefully they know NOT to change these after writing data.
Expand Down Expand Up @@ -2112,6 +2133,12 @@ public QueryLimitOverride getQueryByteLimits() {
return query_limits;
}

/** @return The mode of operation for this TSD.
* @since 2.4 */
public OperationMode getMode() {
return mode;
}

private final boolean isHistogram(final byte[] qualifier) {
return (qualifier.length & 0x1) == 1;
}
Expand Down
187 changes: 157 additions & 30 deletions src/uid/UniqueId.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@

import javax.xml.bind.DatatypeConverter;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.stumbleupon.async.Callback;
import com.stumbleupon.async.Deferred;

Expand All @@ -45,6 +48,7 @@
import net.opentsdb.core.Const;
import net.opentsdb.core.Internal;
import net.opentsdb.core.TSDB;
import net.opentsdb.core.TSDB.OperationMode;
import net.opentsdb.meta.UIDMeta;

/**
Expand Down Expand Up @@ -98,12 +102,17 @@ public enum UniqueIdType {
private final boolean randomize_id;

/** Cache for forward mappings (name to ID). */
private final ConcurrentHashMap<String, byte[]> name_cache =
new ConcurrentHashMap<String, byte[]>();
private final ConcurrentHashMap<String, byte[]> name_cache;
/** Cache for backward mappings (ID to name).
* The ID in the key is a byte[] converted to a String to be Comparable. */
private final ConcurrentHashMap<String, String> id_cache =
new ConcurrentHashMap<String, String>();
private final ConcurrentHashMap<String, String> id_cache;

/** Cache for forward mappings (name to ID). */
private final Cache<String, byte[]> lru_name_cache;
/** Cache for backward mappings (ID to name).
* The ID in the key is a byte[] converted to a String to be Comparable. */
private final Cache<String, String> lru_id_cache;

/** Map of pending UID assignments */
private final HashMap<String, Deferred<byte[]>> pending_assignments =
new HashMap<String, Deferred<byte[]>>();
Expand All @@ -121,6 +130,15 @@ public enum UniqueIdType {
/** How many times assignments have been rejected by the UID filter */
private volatile int rejected_assignments;

/** The mode of operation for this TSD. */
private OperationMode mode;

/** Whether or not to use the mode for caching IDs. */
private boolean use_mode;

/** Whether or not to use the Guava LRU cache for IDs. */
private boolean use_lru;

/** TSDB object used for filtering and/or meta generation. */
private TSDB tsdb;

Expand Down Expand Up @@ -163,6 +181,12 @@ public UniqueId(final HBaseClient client, final byte[] table, final String kind,
}
this.id_width = (short) width;
this.randomize_id = randomize_id;
mode = OperationMode.READWRITE;
name_cache = new ConcurrentHashMap<String, byte[]>();
id_cache = new ConcurrentHashMap<String, String>();
lru_name_cache = null;
lru_id_cache = null;
use_lru = false;
}

/**
Expand Down Expand Up @@ -191,6 +215,24 @@ public UniqueId(final TSDB tsdb, final byte[] table, final String kind,
}
this.id_width = (short) width;
this.randomize_id = randomize_id;
mode = tsdb.getMode();
use_mode = tsdb.getConfig().getBoolean("tsd.uid.use_mode");
use_lru = tsdb.getConfig().getBoolean("tsd.uid.lru.enable");
if (use_lru) {
name_cache = null;
id_cache = null;
lru_name_cache = CacheBuilder.newBuilder()
.maximumSize(tsdb.getConfig().getInt("tsd.uid.lru.name.size"))
.build();
lru_id_cache = CacheBuilder.newBuilder()
.maximumSize(tsdb.getConfig().getInt("tsd.uid.lru.id.size"))
.build();
} else {
name_cache = new ConcurrentHashMap<String, byte[]>();
id_cache = new ConcurrentHashMap<String, String>();
lru_name_cache = null;
lru_id_cache = null;
}
}

/** The number of times we avoided reading from HBase thanks to the cache. */
Expand All @@ -205,6 +247,9 @@ public int cacheMisses() {

/** Returns the number of elements stored in the internal cache. */
public int cacheSize() {
if (use_lru) {
return (int) (lru_name_cache.size() + lru_id_cache.size());
}
return name_cache.size() + id_cache.size();
}

Expand All @@ -229,6 +274,8 @@ public short width() {
/** @param tsdb Whether or not to track new UIDMeta objects */
public void setTSDB(final TSDB tsdb) {
this.tsdb = tsdb;
mode = tsdb.getMode();
use_mode = tsdb.getConfig().getBoolean("tsd.uid.use_mode");
}

/** The largest possible ID given the number of bytes the IDs are
Expand All @@ -244,8 +291,13 @@ public long maxPossibleId() {
* @since 1.1
*/
public void dropCaches() {
name_cache.clear();
id_cache.clear();
if (use_lru) {
lru_name_cache.invalidateAll();
lru_id_cache.invalidateAll();
} else {
name_cache.clear();
id_cache.clear();
}
}

/**
Expand Down Expand Up @@ -300,16 +352,30 @@ public String call(final String name) {
if (name == null) {
throw new NoSuchUniqueId(kind(), id);
}
addNameToCache(id, name);
addIdToCache(name, id);
if (use_mode) {
switch(mode) {
case READONLY:
addNameToCache(id, name);
break;
case WRITEONLY:
break;
default:
addNameToCache(id, name);
addIdToCache(name, id);
}
} else {
addNameToCache(id, name);
addIdToCache(name, id);
}
return name;
}
}
return getNameFromHBase(id).addCallback(new GetNameCB());
}

private String getNameFromCache(final byte[] id) {
return id_cache.get(fromBytes(id));
return use_lru ? lru_id_cache.getIfPresent(fromBytes(id)) :
id_cache.get(fromBytes(id));
}

private Deferred<String> getNameFromHBase(final byte[] id) {
Expand All @@ -323,9 +389,13 @@ public String call(final byte[] name) {

private void addNameToCache(final byte[] id, final String name) {
final String key = fromBytes(id);
String found = id_cache.get(key);
String found = use_lru ? lru_id_cache.getIfPresent(key) : id_cache.get(key);
if (found == null) {
found = id_cache.putIfAbsent(key, name);
if (use_lru) {
lru_id_cache.put(key, name);
} else {
found = id_cache.putIfAbsent(key, name);
}
}
if (found != null && !found.equals(name)) {
throw new IllegalStateException("id=" + Arrays.toString(id) + " => name="
Expand Down Expand Up @@ -360,8 +430,21 @@ public byte[] call(final byte[] id) {
+ " which is != " + id_width
+ " required for '" + kind() + '\'');
}
addIdToCache(name, id);
addNameToCache(id, name);
if (use_mode) {
switch(mode) {
case READONLY:
break;
case WRITEONLY:
addIdToCache(name, id);
break;
default:
addNameToCache(id, name);
addIdToCache(name, id);
}
} else {
addIdToCache(name, id);
addNameToCache(id, name);
}
return id;
}
}
Expand All @@ -370,21 +453,26 @@ public byte[] call(final byte[] id) {
}

private byte[] getIdFromCache(final String name) {
return name_cache.get(name);
return use_lru ? lru_name_cache.getIfPresent(name) : name_cache.get(name);
}

private Deferred<byte[]> getIdFromHBase(final String name) {
return hbaseGet(toBytes(name), ID_FAMILY);
}

private void addIdToCache(final String name, final byte[] id) {
byte[] found = name_cache.get(name);
byte[] found = use_lru ? lru_name_cache.getIfPresent(name) :
name_cache.get(name);
if (found == null) {
found = name_cache.putIfAbsent(name,
// Must make a defensive copy to be immune
// to any changes the caller may do on the
// array later on.
Arrays.copyOf(id, id.length));
if (use_lru) {
lru_name_cache.put(name, Arrays.copyOf(id, id.length));
} else {
found = name_cache.putIfAbsent(name,
// Must make a defensive copy to be immune
// to any changes the caller may do on the
// array later on.
Arrays.copyOf(id, id.length));
}
}
if (found != null && !Arrays.equals(found, id)) {
throw new IllegalStateException("name=" + name + " => id="
Expand Down Expand Up @@ -939,7 +1027,8 @@ public Object call(final ArrayList<ArrayList<KeyValue>> rows) {
final byte[] key = row.get(0).key();
final String name = fromBytes(key);
final byte[] id = row.get(0).value();
final byte[] cached_id = name_cache.get(name);
final byte[] cached_id = use_lru ? lru_name_cache.getIfPresent(name) :
name_cache.get(name);
if (cached_id == null) {
cacheMapping(name, id);
} else if (!Arrays.equals(id, cached_id)) {
Expand Down Expand Up @@ -1042,8 +1131,13 @@ public void rename(final String oldname, final String newname) {

// Update cache.
addIdToCache(newname, row); // add new name -> ID
id_cache.put(fromBytes(row), newname); // update ID -> new name
name_cache.remove(oldname); // remove old name -> ID
if (use_lru) {
lru_id_cache.put(fromBytes(row), newname);
lru_name_cache.invalidate(oldname);
} else {
id_cache.put(fromBytes(row), newname); // update ID -> new name
name_cache.remove(oldname); // remove old name -> ID
}

// Delete the old forward mapping.
try {
Expand Down Expand Up @@ -1103,8 +1197,13 @@ public Deferred<Object> deleteAsync(final String name) {
class ErrCB implements Callback<Object, Exception> {
@Override
public Object call(final Exception ex) throws Exception {
name_cache.remove(name);
id_cache.remove(fromBytes(uid));
if (use_lru) {
lru_name_cache.invalidate(name);
lru_id_cache.invalidate(fromBytes(uid));
} else {
name_cache.remove(name);
id_cache.remove(fromBytes(uid));
}
LOG.error("Failed to delete " + fromBytes(kind) + " UID " + name
+ " but still cleared the cache", ex);
return ex;
Expand All @@ -1116,8 +1215,13 @@ class GroupCB implements Callback<Deferred<Object>, ArrayList<Object>> {
@Override
public Deferred<Object> call(final ArrayList<Object> response)
throws Exception {
name_cache.remove(name);
id_cache.remove(fromBytes(uid));
if (use_lru) {
lru_name_cache.invalidate(name);
lru_id_cache.invalidate(fromBytes(uid));
} else {
name_cache.remove(name);
id_cache.remove(fromBytes(uid));
}
LOG.info("Successfully deleted " + fromBytes(kind) + " UID " + name);
return Deferred.fromResult(null);
}
Expand Down Expand Up @@ -1146,7 +1250,8 @@ public Deferred<Object> call(final byte[] stored_uid) throws Exception {
}
}

final byte[] cached_uid = name_cache.get(name);
final byte[] cached_uid = use_lru ? lru_name_cache.getIfPresent(name) :
name_cache.get(name);
if (cached_uid == null) {
return getIdFromHBase(name).addCallbackDeferring(new LookupCB())
.addErrback(new ErrCB());
Expand Down Expand Up @@ -1663,8 +1768,10 @@ public static void preloadUidCache(final TSDB tsdb,
for (UniqueId unique_id_table : uid_cache_map.values()) {
LOG.info("After preloading, uid cache '{}' has {} ids and {} names.",
unique_id_table.kind(),
unique_id_table.id_cache.size(),
unique_id_table.name_cache.size());
unique_id_table.use_lru ? unique_id_table.lru_id_cache.size() :
unique_id_table.id_cache.size(),
unique_id_table.use_lru ? unique_id_table.lru_name_cache.size() :
unique_id_table.name_cache.size());
}
} catch (Exception e) {
if (e instanceof HBaseException) {
Expand All @@ -1680,4 +1787,24 @@ public static void preloadUidCache(final TSDB tsdb,
}
}
}

@VisibleForTesting
Map<String, byte[]> nameCache() {
return name_cache;
}

@VisibleForTesting
Map<String, String> idCache() {
return id_cache;
}

@VisibleForTesting
Cache<String, byte[]> lruNameCache() {
return lru_name_cache;
}

@VisibleForTesting
Cache<String, String> lruIdCache() {
return lru_id_cache;
}
}
Loading

0 comments on commit bbb687a

Please sign in to comment.