Skip to content

Commit

Permalink
Merge pull request #208 from GovernmentCommunicationsHeadquarters/rel…
Browse files Browse the repository at this point in the history
…ease-0.3.5

Release 0.3.5
  • Loading branch information
p013570 committed May 16, 2016
2 parents a75bd03 + dfeb668 commit a6c3279
Show file tree
Hide file tree
Showing 180 changed files with 8,312 additions and 609 deletions.
2 changes: 2 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
language: java
jdk:
- oraclejdk8
install: mvn install -PreducedMemory -Dfindbugs.skip=true
script: mvn findbugs:findbugs && mvn javadoc:javadoc
8 changes: 8 additions & 0 deletions NOTICES
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ Gaffer2 contains two classes which are modified versions of classes from Apache
Copyright 2011-2015 The Apache Software Foundation.


The Gaffer implementation of TinkerPop was originally based on several classes from TinkerGraph.
These classes were copied and adjusted to integrate with Gaffer.

Gaffer2 is built using maven. This process will automatically pull in dependencies. We list Gaffer2's dependencies,
and their licenses, below. For information on the dependencies of these dependencies, see the websites of the
projects below.
Expand Down Expand Up @@ -134,3 +137,8 @@ FindBugs (org.codehaus.mojo:findbugs-maven-plugin:3.0.3):
FindBugs Annotations (com.google.code.findbugs:annotations:3.0.1):

- GNU Lesser Public License


TinkerPop (org.apache.tinkerpop:gremlin-core,gremlin-groovy,gremlin-driver:3.2.0-incubating):

- Apache License, Version 2.0
2 changes: 1 addition & 1 deletion accumulo-store/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
<parent>
<groupId>gaffer</groupId>
<artifactId>gaffer2</artifactId>
<version>0.3.4</version>
<version>0.3.5</version>
</parent>

<artifactId>accumulo-store</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import gaffer.accumulostore.key.exception.AccumuloElementConversionException;
import gaffer.accumulostore.operation.handler.AddElementsHandler;
import gaffer.accumulostore.operation.handler.GetAdjacentEntitySeedsHandler;
import gaffer.accumulostore.operation.handler.GetAllElementsHandler;
import gaffer.accumulostore.operation.handler.GetElementsBetweenSetsHandler;
import gaffer.accumulostore.operation.handler.GetElementsHandler;
import gaffer.accumulostore.operation.handler.GetElementsInRangesHandler;
Expand All @@ -52,6 +53,7 @@
import gaffer.operation.data.EntitySeed;
import gaffer.operation.impl.add.AddElements;
import gaffer.operation.impl.get.GetAdjacentEntitySeeds;
import gaffer.operation.impl.get.GetAllElements;
import gaffer.operation.impl.get.GetElements;
import gaffer.operation.simple.hdfs.AddElementsFromHdfs;
import gaffer.store.Store;
Expand All @@ -70,9 +72,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

/**
* An Accumulo Implementation of the Gaffer Framework
Expand All @@ -85,7 +87,7 @@
*/
public class AccumuloStore extends Store {
private static final Logger LOGGER = LoggerFactory.getLogger(AccumuloStore.class);
private static final List<StoreTrait> TRAITS = Arrays.asList(AGGREGATION, FILTERING, TRANSFORMATION, STORE_VALIDATION);
private static final Set<StoreTrait> TRAITS = new HashSet<>(Arrays.asList(AGGREGATION, FILTERING, TRANSFORMATION, STORE_VALIDATION));
private AccumuloKeyPackage keyPackage;
private Connector connection = null;

Expand Down Expand Up @@ -150,6 +152,11 @@ protected OperationHandler<GetElements<ElementSeed, Element>, Iterable<Element>>
return new GetElementsHandler();
}

@Override
protected OperationHandler<GetAllElements<Element>, Iterable<Element>> getGetAllElementsHandler() {
return new GetAllElementsHandler();
}

@Override
protected OperationHandler<? extends GetAdjacentEntitySeeds, Iterable<EntitySeed>> getAdjacentEntitySeedsHandler() {
return new GetAdjacentEntitySeedsHandler();
Expand All @@ -161,7 +168,7 @@ protected OperationHandler<? extends AddElements, Void> getAddElementsHandler()
}

@Override
protected Collection<StoreTrait> getTraits() {
public Set<StoreTrait> getTraits() {
return TRAITS;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ protected Entity getEntityFromKey(final Key key) throws AccumuloElementConversio
}

@Override
protected boolean getSourceAndDestinationFromRowKey(final byte[] rowKey, final byte[][] sourceValueDestinationValue,
protected boolean getSourceAndDestinationFromRowKey(final byte[] rowKey, final byte[][] sourceDestValues,
final Map<String, String> options) throws AccumuloElementConversionException {
// Get element class, sourceValue, destinationValue and directed flag from row key
// Expect to find 3 delimiters (4 fields)
Expand Down Expand Up @@ -161,35 +161,44 @@ protected boolean getSourceAndDestinationFromRowKey(final byte[] rowKey, final b
}
if (directionFlag == ByteEntityPositions.UNDIRECTED_EDGE) {
// Edge is undirected
sourceValueDestinationValue[0] = ByteArrayEscapeUtils
.unEscape(Arrays.copyOfRange(rowKey, 0, positionsOfDelimiters[0]));
sourceValueDestinationValue[1] = ByteArrayEscapeUtils
.unEscape(Arrays.copyOfRange(rowKey, positionsOfDelimiters[1] + 1, positionsOfDelimiters[2]));
sourceDestValues[0] = getSourceBytes(rowKey, positionsOfDelimiters);
sourceDestValues[1] = getDestBytes(rowKey, positionsOfDelimiters);
return false;
} else if (directionFlag == ByteEntityPositions.CORRECT_WAY_DIRECTED_EDGE) {
// Edge is directed and the first identifier is the source of the edge
sourceValueDestinationValue[0] = ByteArrayEscapeUtils
.unEscape(Arrays.copyOfRange(rowKey, 0, positionsOfDelimiters[0]));
sourceValueDestinationValue[1] = ByteArrayEscapeUtils
.unEscape(Arrays.copyOfRange(rowKey, positionsOfDelimiters[1] + 1, positionsOfDelimiters[2]));
sourceDestValues[0] = getSourceBytes(rowKey, positionsOfDelimiters);
sourceDestValues[1] = getDestBytes(rowKey, positionsOfDelimiters);
return true;
} else if (directionFlag == ByteEntityPositions.INCORRECT_WAY_DIRECTED_EDGE) {
// Edge is directed and the second identifier is the source of the edge
int src = 1;
int dst = 0;
if (options != null && options.containsKey(AccumuloStoreConstants.OPERATION_RETURN_MATCHED_SEEDS_AS_EDGE_SOURCE)
&& "true".equalsIgnoreCase(options.get(AccumuloStoreConstants.OPERATION_RETURN_MATCHED_SEEDS_AS_EDGE_SOURCE))) {
if (matchEdgeSource(options)) {
src = 0;
dst = 1;
}
sourceValueDestinationValue[src] = ByteArrayEscapeUtils
.unEscape(Arrays.copyOfRange(rowKey, 0, positionsOfDelimiters[0]));
sourceValueDestinationValue[dst] = ByteArrayEscapeUtils
.unEscape(Arrays.copyOfRange(rowKey, positionsOfDelimiters[1] + 1, positionsOfDelimiters[2]));
sourceDestValues[src] = getSourceBytes(rowKey, positionsOfDelimiters);
sourceDestValues[dst] = getDestBytes(rowKey, positionsOfDelimiters);
return true;
} else {
throw new AccumuloElementConversionException(
"Invalid direction flag in row key - flag was " + directionFlag);
}
}

private byte[] getDestBytes(final byte[] rowKey, final int[] positionsOfDelimiters) {
return ByteArrayEscapeUtils
.unEscape(Arrays.copyOfRange(rowKey, positionsOfDelimiters[1] + 1, positionsOfDelimiters[2]));
}

private byte[] getSourceBytes(final byte[] rowKey, final int[] positionsOfDelimiters) {
return ByteArrayEscapeUtils
.unEscape(Arrays.copyOfRange(rowKey, 0, positionsOfDelimiters[0]));
}

private boolean matchEdgeSource(final Map<String, String> options) {
return options != null
&& options.containsKey(AccumuloStoreConstants.OPERATION_RETURN_MATCHED_SEEDS_AS_EDGE_SOURCE)
&& "true".equalsIgnoreCase(options.get(AccumuloStoreConstants.OPERATION_RETURN_MATCHED_SEEDS_AS_EDGE_SOURCE));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import gaffer.operation.GetOperation;
import gaffer.operation.GetOperation.IncludeEdgeType;
import gaffer.operation.GetOperation.IncludeIncomingOutgoingType;
import gaffer.operation.impl.get.GetAllElements;
import org.apache.accumulo.core.client.IteratorSetting;

public class ByteEntityIteratorSettingsFactory extends AbstractCoreKeyIteratorSettingsFactory {
Expand All @@ -38,14 +39,21 @@ public IteratorSetting getElementPropertyRangeQueryFilter(final GetOperation<?,
final boolean includeEntities = operation.isIncludeEntities();
final IncludeEdgeType includeEdgeType = operation.getIncludeEdges();
final IncludeIncomingOutgoingType includeIncomingOutgoingType = operation.getIncludeIncomingOutGoing();
final boolean deduplicateUndirectedEdges = operation instanceof GetAllElements;

if (includeEdgeType == IncludeEdgeType.ALL && includeIncomingOutgoingType == IncludeIncomingOutgoingType.BOTH
&& includeEntities) {
&& includeEntities && !deduplicateUndirectedEdges) {
return null;
}

return new IteratorSettingBuilder(AccumuloStoreConstants.RANGE_ELEMENT_PROPERTY_FILTER_ITERATOR_PRIORITY,
AccumuloStoreConstants.RANGE_ELEMENT_PROPERTY_FILTER_ITERATOR_NAME, RANGE_ELEMENT_PROPERTY_FILTER_ITERATOR).all()
.includeIncomingOutgoing(includeIncomingOutgoingType).includeEdges(includeEdgeType)
.includeEntities(includeEntities).build();
AccumuloStoreConstants.RANGE_ELEMENT_PROPERTY_FILTER_ITERATOR_NAME, RANGE_ELEMENT_PROPERTY_FILTER_ITERATOR)
.all()
.includeIncomingOutgoing(includeIncomingOutgoingType)
.includeEdges(includeEdgeType)
.includeEntities(includeEntities)
.deduplicateUndirectedEdges(deduplicateUndirectedEdges)
.build();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,32 @@

package gaffer.accumulostore.key.core.impl.byteEntity;

import java.io.IOException;
import java.util.Map;

import gaffer.accumulostore.key.exception.AccumuloElementConversionException;
import gaffer.accumulostore.utils.AccumuloStoreConstants;
import gaffer.accumulostore.utils.ByteUtils;
import gaffer.accumulostore.utils.IteratorOptionsBuilder;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.Filter;
import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;

import gaffer.accumulostore.utils.AccumuloStoreConstants;
import gaffer.accumulostore.utils.IteratorOptionsBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Map;

public class ByteEntityRangeElementPropertyFilterIterator extends Filter {
private static final Logger LOGGER = LoggerFactory.getLogger(ByteEntityRangeElementPropertyFilterIterator.class);

// This converter does not have the schema so not all converter methods can be used.
private ByteEntityAccumuloElementConverter converter = new ByteEntityAccumuloElementConverter(null);
private boolean edges = false;
private boolean entities = false;
private boolean unDirectedEdges = false;
private boolean directedEdges = false;
private boolean incomingEdges = false;
private boolean outgoingEdges = false;
private boolean deduplicateUndirectedEdges = false;

@Override
public boolean accept(final Key key, final Value value) {
Expand All @@ -46,22 +52,51 @@ public boolean accept(final Key key, final Value value) {
} else if (!entities && !isEdge) {
return false;
}
return checkEdge(flag);
return !isEdge || checkEdge(flag, key);
}

private byte getFlag(final Key key) {
final byte[] rowID = key.getRowData().getBackingArray();
return rowID[rowID.length - 1];
}

private boolean checkEdge(final byte flag) {
private boolean checkEdge(final byte flag, final Key key) {
final boolean isUndirected = flag == ByteEntityPositions.UNDIRECTED_EDGE;
if (unDirectedEdges) {
return flag == ByteEntityPositions.UNDIRECTED_EDGE;
} else if (directedEdges) {
return flag != ByteEntityPositions.UNDIRECTED_EDGE && checkDirection(flag);
} else {
return checkDirection(flag);
// Only undirected edges
if (isUndirected) {
if (deduplicateUndirectedEdges) {
return checkForDuplicateUndirectedEdge(key);
}
return true;
}
return false;
}

if (directedEdges) {
// Only directed edges
return !isUndirected && checkDirection(flag);
}

// All edge types
if (isUndirected && deduplicateUndirectedEdges) {
return checkForDuplicateUndirectedEdge(key);
}

return checkDirection(flag);
}

private boolean checkForDuplicateUndirectedEdge(final Key key) {
boolean isCorrect = false;
try {
final byte[][] sourceDestValues = new byte[3][];
converter.getSourceAndDestinationFromRowKey(key.getRowData().getBackingArray(), sourceDestValues, null);
isCorrect = ByteUtils.compareBytes(sourceDestValues[0], sourceDestValues[1]) <= 0;
} catch (AccumuloElementConversionException e) {
LOGGER.warn(e.getMessage(), e);
}

return isCorrect;
}

private boolean checkDirection(final byte flag) {
Expand All @@ -79,7 +114,7 @@ private boolean checkDirection(final byte flag) {

@Override
public void init(final SortedKeyValueIterator<Key, Value> source, final Map<String, String> options,
final IteratorEnvironment env) throws IOException {
final IteratorEnvironment env) throws IOException {
super.init(source, options, env);
validateOptions(options);
}
Expand Down Expand Up @@ -113,6 +148,9 @@ public boolean validateOptions(final Map<String, String> options) {
if (!options.containsKey(AccumuloStoreConstants.NO_EDGES)) {
edges = true;
}
if (options.containsKey(AccumuloStoreConstants.DEDUPLICATE_UNDIRECTED_EDGES)) {
deduplicateUndirectedEdges = true;
}
return true;
}

Expand All @@ -127,6 +165,7 @@ public IteratorOptions describeOptions() {
.addNamedOption(AccumuloStoreConstants.INCOMING_EDGE_ONLY, "Optional: Set if only incoming edges should be returned")
.addNamedOption(AccumuloStoreConstants.OUTGOING_EDGE_ONLY, "Optional: Set if only outgoing edges should be returned")
.addNamedOption(AccumuloStoreConstants.NO_EDGES, "Optional: Set if no edges should be returned")
.addNamedOption(AccumuloStoreConstants.DEDUPLICATE_UNDIRECTED_EDGES, "Optional: Set if undirected edges should be deduplicated")
.setIteratorName(AccumuloStoreConstants.RANGE_ELEMENT_PROPERTY_FILTER_ITERATOR_NAME)
.setIteratorDescription(
"Only returns Entities or Edges that are directed undirected incoming or outgoing as specified by the user's options")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ protected Entity getEntityFromKey(final Key key) throws AccumuloElementConversio
}

@Override
protected boolean getSourceAndDestinationFromRowKey(final byte[] rowKey, final byte[][] sourceValueDestinationValue,
protected boolean getSourceAndDestinationFromRowKey(final byte[] rowKey, final byte[][] sourceDestValue,
final Map<String, String> options) throws AccumuloElementConversionException {
// Get sourceValue, destinationValue and directed flag from row key
// Expect to find 2 delimiters (3 fields)
Expand All @@ -155,35 +155,44 @@ protected boolean getSourceAndDestinationFromRowKey(final byte[] rowKey, final b
final int directionFlag = rowKey[rowKey.length - 1];
if (directionFlag == ClassicBytePositions.UNDIRECTED_EDGE) {
// Edge is undirected
sourceValueDestinationValue[0] = ByteArrayEscapeUtils
.unEscape(Arrays.copyOfRange(rowKey, 0, positionsOfDelimiters[0]));
sourceValueDestinationValue[1] = ByteArrayEscapeUtils
.unEscape(Arrays.copyOfRange(rowKey, positionsOfDelimiters[0] + 1, positionsOfDelimiters[1]));
sourceDestValue[0] = getSourceBytes(rowKey, positionsOfDelimiters);
sourceDestValue[1] = getDestBytes(rowKey, positionsOfDelimiters);
return false;
} else if (directionFlag == ClassicBytePositions.CORRECT_WAY_DIRECTED_EDGE) {
// Edge is directed and the first identifier is the source of the edge
sourceValueDestinationValue[0] = ByteArrayEscapeUtils
.unEscape(Arrays.copyOfRange(rowKey, 0, positionsOfDelimiters[0]));
sourceValueDestinationValue[1] = ByteArrayEscapeUtils
.unEscape(Arrays.copyOfRange(rowKey, positionsOfDelimiters[0] + 1, positionsOfDelimiters[1]));
sourceDestValue[0] = getSourceBytes(rowKey, positionsOfDelimiters);
sourceDestValue[1] = getDestBytes(rowKey, positionsOfDelimiters);
return true;
} else if (directionFlag == ClassicBytePositions.INCORRECT_WAY_DIRECTED_EDGE) {
// Edge is directed and the second identifier is the source of the edge
int src = 1;
int dst = 0;
if (options != null && options.containsKey(AccumuloStoreConstants.OPERATION_RETURN_MATCHED_SEEDS_AS_EDGE_SOURCE)
&& "true".equalsIgnoreCase(options.get(AccumuloStoreConstants.OPERATION_RETURN_MATCHED_SEEDS_AS_EDGE_SOURCE))) {
if (matchEdgeSource(options)) {
src = 0;
dst = 1;
}
sourceValueDestinationValue[src] = ByteArrayEscapeUtils
.unEscape(Arrays.copyOfRange(rowKey, 0, positionsOfDelimiters[0]));
sourceValueDestinationValue[dst] = ByteArrayEscapeUtils
.unEscape(Arrays.copyOfRange(rowKey, positionsOfDelimiters[0] + 1, positionsOfDelimiters[1]));
sourceDestValue[src] = getSourceBytes(rowKey, positionsOfDelimiters);
sourceDestValue[dst] = getDestBytes(rowKey, positionsOfDelimiters);
return true;
} else {
throw new AccumuloElementConversionException(
"Invalid direction flag in row key - flag was " + directionFlag);
}
}

private byte[] getDestBytes(final byte[] rowKey, final int[] positionsOfDelimiters) {
return ByteArrayEscapeUtils
.unEscape(Arrays.copyOfRange(rowKey, positionsOfDelimiters[0] + 1, positionsOfDelimiters[1]));
}

private byte[] getSourceBytes(final byte[] rowKey, final int[] positionsOfDelimiters) {
return ByteArrayEscapeUtils
.unEscape(Arrays.copyOfRange(rowKey, 0, positionsOfDelimiters[0]));
}

private boolean matchEdgeSource(final Map<String, String> options) {
return options != null
&& options.containsKey(AccumuloStoreConstants.OPERATION_RETURN_MATCHED_SEEDS_AS_EDGE_SOURCE)
&& "true".equalsIgnoreCase(options.get(AccumuloStoreConstants.OPERATION_RETURN_MATCHED_SEEDS_AS_EDGE_SOURCE));
}
}
Loading

0 comments on commit a6c3279

Please sign in to comment.