Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MESH-299 | Audit Search Logs #3932

Open
wants to merge 7 commits into
base: staging
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,8 @@ public String toString() {
sb.append(", detail=").append(detail);
sb.append(", created=").append(created);
sb.append(", headers=").append(headers);


sb.append('}');

return sb.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import org.apache.atlas.model.Clearable;
import org.apache.atlas.model.instance.AtlasEntityHeader;

import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
Expand All @@ -30,6 +31,7 @@ public EntityAuditSearchResult() { }
private Map<String, Object> aggregations;
private int count;
private int totalCount;
private Map<String, AtlasEntityHeader> linkedEntities;

public List<EntityAuditEventV2> getEntityAudits() {
return entityAudits;
Expand Down Expand Up @@ -63,6 +65,10 @@ public void setTotalCount(int totalCount) {
this.totalCount = totalCount;
}

public Map<String, AtlasEntityHeader> getLinkedEntities() { return linkedEntities; }

public void setLinkedEntities(Map<String, AtlasEntityHeader> linkedEntities) { this.linkedEntities = linkedEntities; }

@Override
public boolean equals(Object o) {
if (this == o) { return true; }
Expand All @@ -85,6 +91,7 @@ public String toString() {
final StringBuilder sb = new StringBuilder("EntityAuditSearchResult{");
sb.append("entityAudits='").append(entityAudits).append('\'');
sb.append(", aggregations='").append(aggregations).append('\'');
sb.append(", linkedEntities='").append(linkedEntities).append('\'');
sb.append(", count=").append(count);
sb.append(", totalCount=").append(totalCount);
sb.append('}');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.audit.EntityAuditEventV2;
import org.apache.atlas.model.audit.EntityAuditSearchResult;
import org.apache.atlas.model.instance.AtlasEntityHeader;
import org.apache.atlas.type.AtlasType;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.configuration.Configuration;
Expand All @@ -47,6 +48,7 @@
import org.slf4j.LoggerFactory;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;

import javax.inject.Inject;
import javax.inject.Singleton;
Expand All @@ -60,6 +62,7 @@
import java.util.*;

import static java.nio.charset.Charset.defaultCharset;
import static org.apache.atlas.repository.Constants.DOMAIN_GUIDS;
import static org.springframework.util.StreamUtils.copyToString;

/**
Expand All @@ -85,6 +88,7 @@ public class ESBasedAuditRepository extends AbstractStorageBasedAuditRepository
private static final String DETAIL = "detail";
private static final String ENTITY = "entity";
private static final String bulkMetadata = String.format("{ \"index\" : { \"_index\" : \"%s\" } }%n", INDEX_NAME);
private static final Set<String> linkedAttributes = new HashSet<>(Arrays.asList(DOMAIN_GUIDS));

/*
* created → event creation time
Expand All @@ -94,13 +98,14 @@ public class ESBasedAuditRepository extends AbstractStorageBasedAuditRepository

private RestClient lowLevelClient;
private final Configuration configuration;
private EntityGraphRetriever entityGraphRetriever;

@Inject
public ESBasedAuditRepository(Configuration configuration) {
public ESBasedAuditRepository(Configuration configuration, EntityGraphRetriever entityGraphRetriever) {
this.configuration = configuration;
this.entityGraphRetriever = entityGraphRetriever;
}


@Override
public void putEventsV1(List<EntityAuditEvent> events) throws AtlasException {

Expand Down Expand Up @@ -220,6 +225,12 @@ private EntityAuditSearchResult getResultFromResponse(String responseString) thr
Map<String, Object> responseMap = AtlasType.fromJson(responseString, Map.class);
Map<String, Object> hits_0 = (Map<String, Object>) responseMap.get("hits");
List<LinkedHashMap> hits_1 = (List<LinkedHashMap>) hits_0.get("hits");
Map<String, AtlasEntityHeader> existingLinkedEntities = searchResult.getLinkedEntities();

if (existingLinkedEntities == null) {
existingLinkedEntities = new HashMap<>();
}

for (LinkedHashMap hit : hits_1) {
Map source = (Map) hit.get("_source");
String entityGuid = (String) source.get(ENTITYID);
Expand All @@ -243,6 +254,32 @@ private EntityAuditSearchResult getResultFromResponse(String responseString) thr
eventKey = event.getEntityId() + ":" + event.getTimestamp();
}

Map<String, Object> detail = event.getDetail();
if (detail != null && detail.containsKey("attributes")) {
Map<String, Object> attributes = (Map<String, Object>) detail.get("attributes");

for (Map.Entry<String, Object> entry: attributes.entrySet()) {
if (linkedAttributes.contains(entry.getKey())) {
List<String> guids = (List<String>) entry.getValue();

if (guids != null && !guids.isEmpty()){
for (String guid: guids){
if(!existingLinkedEntities.containsKey(guid)){
try {
AtlasEntityHeader entityHeader = fetchAtlasEntityHeader(guid);
if (entityHeader != null) {
existingLinkedEntities.put(guid, entityHeader);
}
} catch (AtlasBaseException e) {
throw new AtlasBaseException(e);
}
}
}
}
}
}
}

event.setHeaders((Map<String, String>) source.get("headers"));

event.setEventKey(eventKey);
Expand All @@ -252,12 +289,22 @@ private EntityAuditSearchResult getResultFromResponse(String responseString) thr
Map<String, Object> countObject = (Map<String, Object>) hits_0.get("total");
int totalCount = (int) countObject.get("value");
searchResult.setEntityAudits(entityAudits);
searchResult.setLinkedEntities(existingLinkedEntities);
searchResult.setAggregations(aggregationsMap);
searchResult.setTotalCount(totalCount);
searchResult.setCount(entityAudits.size());
return searchResult;
}

private AtlasEntityHeader fetchAtlasEntityHeader(String domainGUID) throws AtlasBaseException {
try {
AtlasEntityHeader entityHeader = entityGraphRetriever.toAtlasEntityHeader(domainGUID);
return entityHeader;
} catch (AtlasBaseException e) {
throw new AtlasBaseException(e);
}
}

private String performSearchOnIndex(String queryString) throws IOException {
HttpEntity entity = new NStringEntity(queryString, ContentType.APPLICATION_JSON);
String endPoint = INDEX_NAME + "/_search";
Expand Down
Loading