Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PLT-2568 optimise vertex fetch from janusgraph #3740

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,8 @@
import org.apache.atlas.query.executors.ScriptEngineBasedExecutor;
import org.apache.atlas.query.executors.TraversalBasedExecutor;
import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.audit.ESBasedAuditRepository;
import org.apache.atlas.repository.graph.GraphBackedSearchIndexer;
import org.apache.atlas.repository.graph.GraphHelper;
import org.apache.atlas.repository.audit.ESBasedAuditRepository;
import org.apache.atlas.repository.graphdb.*;
import org.apache.atlas.repository.graphdb.AtlasIndexQuery.Result;
import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
Expand Down Expand Up @@ -76,22 +74,25 @@
import javax.script.ScriptEngine;
import javax.script.ScriptException;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.Collectors;

import static org.apache.atlas.AtlasErrorCode.*;
import static org.apache.atlas.SortOrder.ASCENDING;
import static org.apache.atlas.model.instance.AtlasEntity.Status.ACTIVE;
import static org.apache.atlas.model.instance.AtlasEntity.Status.DELETED;
import static org.apache.atlas.repository.Constants.*;
import static org.apache.atlas.repository.util.AccessControlUtils.ACCESS_READ_DOMAIN;
import static org.apache.atlas.repository.util.AtlasEntityUtils.mapOf;
import static org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery.BASIC_SEARCH_STATE_FILTER;
import static org.apache.atlas.util.AtlasGremlinQueryProvider.AtlasGremlinQuery.TO_RANGE_LIST;

@Component
public class EntityDiscoveryService implements AtlasDiscoveryService {
private static final Logger LOG = LoggerFactory.getLogger(EntityDiscoveryService.class);
private static final String DEFAULT_SORT_ATTRIBUTE_NAME = "name";
private static final int AVAILABLEPROCESSORS = Runtime.getRuntime().availableProcessors();
private static final ForkJoinPool CUSTOMTHREADPOOL = new ForkJoinPool(AVAILABLEPROCESSORS/2); // Use half of available cores
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

once we test and check that things are intact and this change brings value, we can parameterize and better this change


private final AtlasGraph graph;
private final EntityGraphRetriever entityRetriever;
Expand Down Expand Up @@ -1083,83 +1084,113 @@ public SearchLogSearchResult searchLogs(SearchLogSearchParams searchParams) thro
}
}

@SuppressWarnings("rawtypes")
private void prepareSearchResult(AtlasSearchResult ret, DirectIndexQueryResult indexQueryResult, Set<String> resultAttributes, boolean fetchCollapsedResults) throws AtlasBaseException {
SearchParams searchParams = ret.getSearchParameters();
try {
if(LOG.isDebugEnabled()){
LOG.debug("Preparing search results for ({})", ret.getSearchParameters());
}
Iterator<Result> iterator = indexQueryResult.getIterator();
boolean showSearchScore = searchParams.getShowSearchScore();
if (iterator == null) {
return;
}
boolean showSearchScore = searchParams.getShowSearchScore();
List<Result> results = new ArrayList<>();

while (iterator.hasNext()) {
Result result = iterator.next();
AtlasVertex vertex = result.getVertex();

if (vertex == null) {
LOG.warn("vertex in null");
continue;
}
// Collect results for batch processing
Iterator<Result> iterator = indexQueryResult.getIterator();
while (iterator != null && iterator.hasNext()) {
results.add(iterator.next());
}

AtlasEntityHeader header = entityRetriever.toAtlasEntityHeader(vertex, resultAttributes);
if(RequestContext.get().includeClassifications()){
header.setClassifications(entityRetriever.getAllClassifications(vertex));
}
if (showSearchScore) {
ret.addEntityScore(header.getGuid(), result.getScore());
}
if (fetchCollapsedResults) {
Map<String, AtlasSearchResult> collapse = new HashMap<>();

Set<String> collapseKeys = result.getCollapseKeys();
for (String collapseKey : collapseKeys) {
AtlasSearchResult collapseRet = new AtlasSearchResult();
collapseRet.setSearchParameters(ret.getSearchParameters());

Set<String> collapseResultAttributes = new HashSet<>();
if (searchParams.getCollapseAttributes() != null) {
collapseResultAttributes.addAll(searchParams.getCollapseAttributes());
} else {
collapseResultAttributes = resultAttributes;
// Batch fetch vertices
List<AtlasVertex> vertices = results.stream()
.map(Result::getVertex)
.filter(Objects::nonNull)
.collect(Collectors.toList());

// Use ConcurrentHashMap for thread-safe access
ConcurrentHashMap<String, AtlasEntityHeader> headers = new ConcurrentHashMap<>();

// Run vertex processing in limited parallel threads
CompletableFuture.runAsync(() -> CUSTOMTHREADPOOL.submit(() ->
vertices.parallelStream().forEach(vertex -> {
String guid = vertex.getProperty("guid", String.class);
headers.computeIfAbsent(guid, k -> {
try {
AtlasEntityHeader header = entityRetriever.toAtlasEntityHeader(vertex, resultAttributes);
if (RequestContext.get().includeClassifications()) {
header.setClassifications(entityRetriever.getAllClassifications(vertex));
}
return header;
} catch (AtlasBaseException e) {
throw new RuntimeException(e);
}
});
})
).join(), CUSTOMTHREADPOOL);

if (searchParams.getCollapseRelationAttributes() != null) {
RequestContext.get().getRelationAttrsForSearch().clear();
RequestContext.get().setRelationAttrsForSearch(searchParams.getCollapseRelationAttributes());
}
// Process results and handle collapse in parallel
results.parallelStream().forEach(result -> {
AtlasVertex vertex = result.getVertex();
if (vertex == null) return;

DirectIndexQueryResult indexQueryCollapsedResult = result.getCollapseVertices(collapseKey);
collapseRet.setApproximateCount(indexQueryCollapsedResult.getApproximateCount());
prepareSearchResult(collapseRet, indexQueryCollapsedResult, collapseResultAttributes, false);
String guid = vertex.getProperty("guid", String.class);
AtlasEntityHeader header = headers.get(guid);

collapseRet.setSearchParameters(null);
collapse.put(collapseKey, collapseRet);
}
if (!collapse.isEmpty()) {
header.setCollapse(collapse);
}
if (showSearchScore) {
ret.addEntityScore(header.getGuid(), result.getScore());
}

if (fetchCollapsedResults) {
Map<String, AtlasSearchResult> collapse;
try {
collapse = processCollapseResults(result, searchParams, resultAttributes);
} catch (AtlasBaseException e) {
throw new RuntimeException(e);
}
if (searchParams.getShowSearchMetadata()) {
ret.addHighlights(header.getGuid(), result.getHighLights());
ret.addSort(header.getGuid(), result.getSort());
} else if (searchParams.getShowHighlights()) {
ret.addHighlights(header.getGuid(), result.getHighLights());
if (!collapse.isEmpty()) {
header.setCollapse(collapse);
}
}

ret.addEntity(header);
if (searchParams.getShowSearchMetadata()) {
ret.addHighlights(header.getGuid(), result.getHighLights());
ret.addSort(header.getGuid(), result.getSort());
} else if (searchParams.getShowHighlights()) {
ret.addHighlights(header.getGuid(), result.getHighLights());
}
} catch (Exception e) {
throw e;
}

ret.addEntity(header);
});

if (!searchParams.getEnableFullRestriction()) {
scrubSearchResults(ret, searchParams.getSuppressLogs());
}
}

// Non-recursive collapse processing
private Map<String, AtlasSearchResult> processCollapseResults(Result result, SearchParams searchParams, Set<String> resultAttributes) throws AtlasBaseException {
Map<String, AtlasSearchResult> collapse = new HashMap<>();
Set<String> collapseKeys = result.getCollapseKeys();

for (String collapseKey : collapseKeys) {
AtlasSearchResult collapseRet = new AtlasSearchResult();
collapseRet.setSearchParameters(searchParams);
Set<String> collapseResultAttributes = new HashSet<>(Optional.ofNullable(searchParams.getCollapseAttributes()).orElse(resultAttributes));
DirectIndexQueryResult indexQueryCollapsedResult = result.getCollapseVertices(collapseKey);
collapseRet.setApproximateCount(indexQueryCollapsedResult.getApproximateCount());

// Directly iterate over collapse vertices
Iterator<Result> iterator = indexQueryCollapsedResult.getIterator();
while (iterator != null && iterator.hasNext()) {
Result collapseResult = iterator.next();
AtlasVertex collapseVertex = collapseResult.getVertex();
if (collapseVertex == null) continue;

AtlasEntityHeader collapseHeader = entityRetriever.toAtlasEntityHeader(collapseVertex, collapseResultAttributes);
collapseRet.addEntity(collapseHeader);
}

collapse.put(collapseKey, collapseRet);
}

return collapse;
}

private Map<String, Object> getMap(String key, Object value) {
Map<String, Object> map = new HashMap<>();
map.put(key, value);
Expand Down
Loading