Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(server): adoption for off-heap memory management in kout module #2704

Draft
wants to merge 6 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Set;

import org.apache.hugegraph.HugeGraph;
Expand All @@ -34,6 +35,9 @@
import org.apache.hugegraph.backend.id.Id;
import org.apache.hugegraph.backend.query.QueryResults;
import org.apache.hugegraph.core.GraphManager;
import org.apache.hugegraph.memory.MemoryManager;
import org.apache.hugegraph.memory.pool.MemoryPool;
import org.apache.hugegraph.memory.pool.impl.TaskMemoryPool;
import org.apache.hugegraph.structure.HugeVertex;
import org.apache.hugegraph.traversal.algorithm.HugeTraverser;
import org.apache.hugegraph.traversal.algorithm.KoutTraverser;
Expand Down Expand Up @@ -93,27 +97,40 @@
"'{}', max degree '{}', capacity '{}' and limit '{}'",
graph, source, direction, edgeLabel, depth,
nearest, maxDegree, capacity, limit);
MemoryPool queryPool = MemoryManager.getInstance().addQueryMemoryPool();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we bind a MemoryPool for each graph

Optional.ofNullable(queryPool).ifPresent(pool -> {
MemoryPool currentTaskPool = pool.addChildPool("kout-main-task");
MemoryManager.getInstance()
.bindCorrespondingTaskMemoryPool(Thread.currentThread().getName(),

Check warning on line 104 in hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/api/traversers/KoutAPI.java

View check run for this annotation

Codecov / codecov/patch

hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/api/traversers/KoutAPI.java#L102-L104

Added lines #L102 - L104 were not covered by tests
(TaskMemoryPool) currentTaskPool);
MemoryPool currentOperationPool = currentTaskPool.addChildPool("kout-main-operation");
});

Check warning on line 107 in hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/api/traversers/KoutAPI.java

View check run for this annotation

Codecov / codecov/patch

hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/api/traversers/KoutAPI.java#L106-L107

Added lines #L106 - L107 were not covered by tests
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a method like graph.switchToMemoryPool("kout", "main")?


ApiMeasurer measure = new ApiMeasurer();
try {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prefer to add a wrapper method for MemoryPool init-and-gc, then call the original method?

ApiMeasurer measure = new ApiMeasurer();

Id sourceId = VertexAPI.checkAndParseVertexId(source);
Directions dir = Directions.convert(EdgeAPI.parseDirection(direction));
Id sourceId = VertexAPI.checkAndParseVertexId(source);
Directions dir = Directions.convert(EdgeAPI.parseDirection(direction));

HugeGraph g = graph(manager, graph);
HugeGraph g = graph(manager, graph);

Set<Id> ids;
try (KoutTraverser traverser = new KoutTraverser(g)) {
ids = traverser.kout(sourceId, dir, edgeLabel, depth,
nearest, maxDegree, capacity, limit);
measure.addIterCount(traverser.vertexIterCounter.get(),
traverser.edgeIterCounter.get());
}
Set<Id> ids;
try (KoutTraverser traverser = new KoutTraverser(g)) {
ids = traverser.kout(sourceId, dir, edgeLabel, depth,
nearest, maxDegree, capacity, limit);
measure.addIterCount(traverser.vertexIterCounter.get(),
traverser.edgeIterCounter.get());
}

if (count_only) {
return manager.serializer(g, measure.measures())
.writeMap(ImmutableMap.of("vertices_size", ids.size()));
if (count_only) {
return manager.serializer(g, measure.measures())
.writeMap(ImmutableMap.of("vertices_size", ids.size()));

Check warning on line 127 in hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/api/traversers/KoutAPI.java

View check run for this annotation

Codecov / codecov/patch

hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/api/traversers/KoutAPI.java#L126-L127

Added lines #L126 - L127 were not covered by tests
}
return manager.serializer(g, measure.measures()).writeList("vertices", ids);
} finally {
Optional.ofNullable(queryPool)
.ifPresent(pool -> MemoryManager.getInstance().gcQueryMemoryPool(pool));
}
return manager.serializer(g, measure.measures()).writeList("vertices", ids);
}

@POST
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ private Iterator<HugeVertex> queryVerticesByIds(IdQuery query) {
return QueryResults.emptyIterator();
}
if (needCacheVertex(vertex)) {
vertex.convertIdToOnHeapIfNeeded();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's ok to just call convert in HeapCache.update(), at the same time, we avoid modifying the code everywhere

this.verticesCache.update(vertex.id(), vertex);
}
return QueryResults.iterator(vertex);
Expand Down Expand Up @@ -295,6 +296,7 @@ private Iterator<HugeVertex> queryVerticesByIds(IdQuery query) {
for (HugeVertex vertex : listIterator.list()) {
// Skip large vertex
if (needCacheVertex(vertex)) {
vertex.convertIdToOnHeapIfNeeded();
this.verticesCache.update(vertex.id(), vertex);
}
}
Expand Down Expand Up @@ -353,6 +355,7 @@ protected Iterator<HugeEdge> queryEdgesFromBackend(Query query) {
if (edges.isEmpty()) {
this.edgesCache.update(cacheKey, Collections.emptyList());
} else if (edges.size() <= MAX_CACHE_EDGES_PER_QUERY) {
edges.forEach(HugeEdge::convertIdToOnHeapIfNeeded);
this.edgesCache.update(cacheKey, edges);
}

Expand All @@ -378,6 +381,7 @@ protected void commitMutation2Backend(BackendMutation... mutations) {
vertexIds[vertexOffset++] = vertex.id();
if (needCacheVertex(vertex)) {
// Update cache
vertex.convertIdToOnHeapIfNeeded();
this.verticesCache.updateIfPresent(vertex.id(), vertex);
} else {
// Skip large vertex
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,9 @@ private void clearCache(boolean notify) {
private void updateCache(SchemaElement schema) {
this.resetCachedAllIfReachedCapacity();

// convert schema.id to on heap if needed.
schema.convertIdToOnHeapIfNeeded();

// update id cache
Id prefixedId = generateId(schema.type(), schema.id());
this.idCache.update(prefixedId, schema);
Expand Down Expand Up @@ -204,14 +207,20 @@ private void invalidateCache(HugeType type, Id id) {
this.arrayCaches.remove(type, id);
}

/**
* Ids used in cache must be on-heap object
*/
private static Id generateId(HugeType type, Id id) {
// NOTE: it's slower performance to use:
// String.format("%x-%s", type.code(), name)
return IdGenerator.of(type.string() + "-" + id.asString());
return new IdGenerator.StringId(type.string() + "-" + id.asString());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a OnHeapIdGenerator.of() class?

}

/**
* Ids used in cache must be on-heap object
*/
private static Id generateId(HugeType type, String name) {
return IdGenerator.of(type.string() + "-" + name);
return new IdGenerator.StringId(type.string() + "-" + name);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import com.google.common.collect.ImmutableSet;

public class CachedSchemaTransactionV2 extends SchemaTransactionV2 {

private final Cache<Id, Object> idCache;
private final Cache<Id, Object> nameCache;

Expand All @@ -51,8 +52,8 @@
private EventListener cacheEventListener;

public CachedSchemaTransactionV2(MetaDriver metaDriver,
String cluster,
HugeGraphParams graphParams) {
String cluster,
HugeGraphParams graphParams) {
super(metaDriver, cluster, graphParams);

final long capacity = graphParams.configuration()
Expand Down Expand Up @@ -223,6 +224,9 @@
private void updateCache(SchemaElement schema) {
this.resetCachedAllIfReachedCapacity();

// convert schema.id to on heap if needed.
schema.convertIdToOnHeapIfNeeded();

Check warning on line 228 in hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedSchemaTransactionV2.java

View check run for this annotation

Codecov / codecov/patch

hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedSchemaTransactionV2.java#L228

Added line #L228 was not covered by tests

// update id cache
Id prefixedId = generateId(schema.type(), schema.id());
this.idCache.update(prefixedId, schema);
Expand Down Expand Up @@ -268,10 +272,12 @@
value = super.getSchema(type, id);
if (value != null) {
this.resetCachedAllIfReachedCapacity();
// convert schema.id to on heap if needed.
SchemaElement schema = (SchemaElement) value;
schema.convertIdToOnHeapIfNeeded();

Check warning on line 277 in hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedSchemaTransactionV2.java

View check run for this annotation

Codecov / codecov/patch

hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedSchemaTransactionV2.java#L276-L277

Added lines #L276 - L277 were not covered by tests

this.idCache.update(prefixedId, value);
this.idCache.update(prefixedId, schema);

Check warning on line 279 in hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedSchemaTransactionV2.java

View check run for this annotation

Codecov / codecov/patch

hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedSchemaTransactionV2.java#L279

Added line #L279 was not covered by tests

SchemaElement schema = (SchemaElement) value;
Id prefixedName = generateId(schema.type(), schema.name());
this.nameCache.update(prefixedName, schema);
}
Expand Down Expand Up @@ -321,6 +327,9 @@
if (results.size() <= free) {
// Update cache
for (T schema : results) {
// convert schema.id to on heap if needed.
schema.convertIdToOnHeapIfNeeded();

Check warning on line 331 in hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedSchemaTransactionV2.java

View check run for this annotation

Codecov / codecov/patch

hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedSchemaTransactionV2.java#L331

Added line #L331 was not covered by tests

Id prefixedId = generateId(schema.type(), schema.id());
this.idCache.update(prefixedId, schema);

Expand Down Expand Up @@ -481,7 +490,7 @@
}

private static class CachedTypes
extends ConcurrentHashMap<HugeType, Boolean> {
extends ConcurrentHashMap<HugeType, Boolean> {

private static final long serialVersionUID = -2215549791679355996L;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import org.apache.hugegraph.backend.id.Id.IdType;
import org.apache.hugegraph.backend.serializer.BytesBuffer;
import org.apache.hugegraph.memory.consumer.factory.IdFactory;
import org.apache.hugegraph.structure.HugeVertex;
import org.apache.hugegraph.util.E;
import org.apache.hugegraph.util.LongEncoding;
Expand All @@ -35,19 +36,19 @@
public abstract Id generate(HugeVertex vertex);

public static Id of(String id) {
return new StringId(id);
return IdFactory.getInstance().newStringId(id);
}

public static Id of(UUID id) {
return new UuidId(id);
return IdFactory.getInstance().newUuidId(id);
}

public static Id of(String id, boolean uuid) {
return uuid ? new UuidId(id) : new StringId(id);
return uuid ? IdFactory.getInstance().newUuidId(id) : IdFactory.getInstance().newStringId(id);
}

public static Id of(long id) {
return new LongId(id);
return IdFactory.getInstance().newLongId(id);
}

public static Id of(Object id) {
Expand All @@ -66,11 +67,11 @@
public static Id of(byte[] bytes, IdType type) {
switch (type) {
case LONG:
return new LongId(bytes);
return IdFactory.getInstance().newLongId(bytes);

Check warning on line 70 in hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/id/IdGenerator.java

View check run for this annotation

Codecov / codecov/patch

hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/id/IdGenerator.java#L70

Added line #L70 was not covered by tests
case UUID:
return new UuidId(bytes);
return IdFactory.getInstance().newUuidId(bytes);
case STRING:
return new StringId(bytes);
return IdFactory.getInstance().newStringId(bytes);
default:
throw new AssertionError("Invalid id type " + type);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;

import org.apache.commons.lang3.NotImplementedException;
import org.apache.hugegraph.backend.query.ConditionQuery;
import org.apache.hugegraph.backend.query.Query;
import org.apache.hugegraph.backend.serializer.BytesBuffer;
import org.apache.hugegraph.memory.MemoryManager;
import org.apache.hugegraph.type.HugeType;
import org.apache.hugegraph.type.define.Directions;
import org.apache.hugegraph.type.define.HugeKeys;
Expand Down Expand Up @@ -135,7 +137,16 @@

public abstract void clear(Session session);

public abstract Iterator<BackendEntry> query(Session session, Query query);
public Iterator<BackendEntry> query(Session session, Query query) {
Optional.ofNullable(MemoryManager.getInstance()
.getCorrespondingTaskMemoryPool(
Thread.currentThread().getName()))
.ifPresent(currentTaskPool -> {

Check warning on line 144 in hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/BackendTable.java

View check run for this annotation

Codecov / codecov/patch

hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/BackendTable.java#L141-L144

Added lines #L141 - L144 were not covered by tests
// Some system-query(not requested by user) don't have memory pool.
currentTaskPool.addChildPool("BackendTable-Iterator");
});
return null;

Check warning on line 148 in hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/BackendTable.java

View check run for this annotation

Codecov / codecov/patch

hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/BackendTable.java#L146-L148

Added lines #L146 - L148 were not covered by tests
}

public Iterator<BackendEntry> queryOlap(Session session, Query query) {
throw new NotImplementedException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -687,7 +687,7 @@ public static synchronized CoreOptions instance() {
"memory.mode",
"The memory mode used for query in HugeGraph.",
disallowEmpty(),
"off-heap"
"disable"
);

public static final ConfigOption<Long> MAX_MEMORY_CAPACITY = new ConfigOption<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public class MemoryManager {
private final MemoryArbitrator memoryArbitrator;
private final ExecutorService arbitrateExecutor;

private static MemoryMode MEMORY_MODE = MemoryMode.ENABLE_OFF_HEAP_MANAGEMENT;
private static MemoryMode MEMORY_MODE = MemoryMode.DISABLE_MEMORY_MANAGEMENT;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MEMORY_MODE style is only for const var, and if there is only a single MemoryManager, also remove static mark: private MemoryMode memoryMode


private MemoryManager() {
this.memoryArbitrator = new MemoryArbitratorImpl(this);
Expand All @@ -89,6 +89,10 @@ private MemoryManager() {
}

public MemoryPool addQueryMemoryPool() {
if (MEMORY_MODE == MemoryMode.DISABLE_MEMORY_MANAGEMENT) {
return null;
}

int count = queryMemoryPools.size();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

expect this.xx stype

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the difference between QueryMemoryPool and MemoryPool? if no difference, just naming MemoryPool is ok.
and can we make some special names for CorrespondingTaskMemoryPool and CurrentWorkingOperatorMemoryPool, such as RequestMemoryPool and RequestStageMemoryPool

String poolName =
QUERY_MEMORY_POOL_NAME_PREFIX + DELIMINATOR + count + DELIMINATOR +
Expand Down
Loading
Loading