Skip to content

Commit

Permalink
Merge branch 'release/0.5.0.0'
Browse files Browse the repository at this point in the history
  • Loading branch information
MayerRoman committed May 5, 2017
2 parents aae3302 + fafe57b commit 0138975
Show file tree
Hide file tree
Showing 111 changed files with 3,199 additions and 1,257 deletions.
4 changes: 2 additions & 2 deletions cassandra/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<artifactId>lagerta</artifactId>
<groupId>com.epam</groupId>
<version>1.0-SNAPSHOT</version>
<version>0.5.0.0</version>
</parent>
<artifactId>cassandra</artifactId>
<version>1.0-SNAPSHOT</version>

<repositories>
<repository>
Expand Down
5 changes: 3 additions & 2 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<artifactId>lagerta</artifactId>
<groupId>com.epam</groupId>
<version>1.0-SNAPSHOT</version>
<version>0.5.0.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>core</artifactId>

Expand Down
19 changes: 14 additions & 5 deletions lagerta-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
<parent>
<artifactId>lagerta</artifactId>
<groupId>com.epam</groupId>
<version>1.0-SNAPSHOT</version>
<version>0.5.0.0</version>
</parent>

<artifactId>lagerta-core</artifactId>
Expand Down Expand Up @@ -57,6 +57,7 @@
<execution>
<goals>
<goal>integration-test</goal>
<goal>verify</goal>
</goals>
</execution>
</executions>
Expand All @@ -74,10 +75,6 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-core</artifactId>
Expand Down Expand Up @@ -125,5 +122,17 @@
<version>2.6.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
</dependency>
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-slf4j</artifactId>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -24,45 +24,44 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;

public class EntityDescriptor<T> {

// todo mark this names as restriction for users
public static final String KEY_FIELD_NAME = "key";
public static final String VAL_FIELD_NAME = "val";

private final Class<T> clazz;
private final String tableName;
private final Map<String, FieldDescriptor> fieldDescriptors;
private final List<FieldDescriptor> fieldDescriptors;
private final String upsertQuery;
private final String selectQuery;

public EntityDescriptor(String tableName, String keyField, List<FieldDescriptor> fieldDescriptors) {
this(null, tableName, keyField, fieldDescriptors.stream().collect(Collectors
.toMap(FieldDescriptor::getName, descriptor -> descriptor)));
}

public EntityDescriptor(Class<T> clazz, String tableName, String keyField, List<FieldDescriptor> fieldDescriptors) {
this(clazz, tableName, keyField, fieldDescriptors.stream().collect(Collectors
.toMap(FieldDescriptor::getName, descriptor -> descriptor)));
}
public EntityDescriptor(Class<T> clazz, String tableName, List<FieldDescriptor> fieldDescriptors) {
Objects.requireNonNull(clazz, "class in " + EntityDescriptor.class + " was not set");

public EntityDescriptor(Class<T> clazz, String tableName, String keyField,
Map<String, FieldDescriptor> fieldDescriptors) {
this.clazz = clazz;
this.tableName = tableName;
this.fieldDescriptors = fieldDescriptors;

List<String> sortedColumns = fieldDescriptors.entrySet()
List<String> sortedColumns = this.fieldDescriptors
.stream()
.sorted((e1, e2) -> Integer.valueOf(e1.getValue().getIndex()).compareTo(e2.getValue().getIndex()))
.map(Map.Entry::getKey)
.sorted((e1, e2) -> Integer.valueOf(e1.getIndex()).compareTo(e2.getIndex()))
.map(FieldDescriptor::getName)
.collect(Collectors.toList());

String columnNames = String.join(",", sortedColumns);
String maskFields = fieldDescriptors.entrySet().stream()
String maskFields = this.fieldDescriptors.stream()
.map(i -> "?")
.collect(Collectors.joining(", "));
//maybe customization sql syntax for different dialect in future
upsertQuery = "MERGE INTO " + tableName + " (" + columnNames + ") KEY(" + keyField + ")" +
" VALUES (" + maskFields + ")";
upsertQuery = String.format("MERGE INTO %s (%s) KEY(%s) VALUES (%s)",
tableName, columnNames, KEY_FIELD_NAME, maskFields);
// specific IN semantic for h2
selectQuery = "SELECT " + columnNames + " FROM " + tableName + " WHERE array_contains(?, " + keyField + ")";
selectQuery = String.format("SELECT %s FROM %s WHERE array_contains(?, %s)",
columnNames, tableName, KEY_FIELD_NAME);
}

public String getTableName() {
Expand All @@ -79,9 +78,8 @@ public String getSelectQuery() {

public void addValuesToBatch(Object key, Object value, PreparedStatement statement) throws SQLException {
Map<String, Object> parametersValue = JDBCKeyValueMapper.keyValueMap(key, value);
for (Map.Entry<String, FieldDescriptor> fieldNameAndDescriptor : fieldDescriptors.entrySet()) {
Object valueForField = parametersValue.get(fieldNameAndDescriptor.getKey());
FieldDescriptor descriptor = fieldNameAndDescriptor.getValue();
for (FieldDescriptor descriptor : fieldDescriptors) {
Object valueForField = parametersValue.get(descriptor.getName());
if (valueForField == null) {
statement.setObject(descriptor.getIndex(), null);
} else {
Expand All @@ -96,9 +94,9 @@ public <K> Map<K, T> transform(ResultSet resultSet) throws Exception {
Map<K, T> result = new HashMap<>();
while (resultSet.next()) {
Map<String, Object> objectParameters = new HashMap<>(fieldDescriptors.size());
for (Map.Entry<String, FieldDescriptor> entry : fieldDescriptors.entrySet()) {
FieldDescriptor descriptor = entry.getValue();
objectParameters.put(entry.getKey(), descriptor.getTransformer().get(resultSet, descriptor.getIndex()));
for (FieldDescriptor descriptor : fieldDescriptors) {
Object value = descriptor.getTransformer().get(resultSet, descriptor.getIndex());
objectParameters.put(descriptor.getName(), value);
}
JDBCKeyValueMapper.KeyAndValue<T> keyAndValue = JDBCKeyValueMapper.getObject(objectParameters, clazz);
result.put((K) keyAndValue.getKey(), keyAndValue.getValue());
Expand All @@ -108,6 +106,6 @@ public <K> Map<K, T> transform(ResultSet resultSet) throws Exception {

@Override
public String toString() {
return "Entity {" + ", table '" + tableName + "'\', fields " + fieldDescriptors.values() + '}';
return "Entity {" + ", table '" + tableName + "'\', fields " + fieldDescriptors + '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

public enum SimpleValueTransformer implements ValueTransformer {
DUMMY(null, null),
OBJECT(ResultSet::getObject, PreparedStatement::setObject, Object.class),

STRING(ResultSet::getString, (Setter<String>) PreparedStatement::setString, String.class),
BOOLEAN(ResultSet::getBoolean, (Setter<Boolean>) PreparedStatement::setBoolean, Boolean.class, Boolean.TYPE),
Expand Down Expand Up @@ -76,7 +77,7 @@ public void set(PreparedStatement preparedStatement, int index, Object value) th
setter.set(preparedStatement, index, value);
}

public static ValueTransformer get(Class<?> clazz) {
public static ValueTransformer of(Class<?> clazz) {
return MATCH.computeIfAbsent(clazz, FIND);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import java.util.Map;
import java.util.stream.Collectors;

import static java.util.function.Function.identity;

public class JDBCCommitter implements Committer {

private static final int BATCH_SIZE = 50_000;
Expand All @@ -36,7 +38,7 @@ public class JDBCCommitter implements Committer {

public JDBCCommitter(DataSource dataSource, List<EntityDescriptor> descriptors) {
this(dataSource, descriptors.stream().collect(Collectors
.toMap(EntityDescriptor::getTableName, descriptor -> descriptor)));
.toMap(EntityDescriptor::getTableName, identity())));
}

public JDBCCommitter(DataSource dataSource, Map<String, EntityDescriptor> entityDescriptors) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Copyright (c) 2017. EPAM Systems
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.epam.lagerta.base.util;

import com.epam.lagerta.base.BlobValueTransformer;
import com.epam.lagerta.base.EnumValueTransformer;
import com.epam.lagerta.base.FieldDescriptor;
import com.epam.lagerta.base.SimpleValueTransformer;
import com.epam.lagerta.base.ValueTransformer;
import com.epam.lagerta.util.Serializer;
import org.springframework.util.ReflectionUtils;

import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.util.ArrayList;
import java.util.List;

import static com.epam.lagerta.base.EntityDescriptor.KEY_FIELD_NAME;
import static com.epam.lagerta.base.EntityDescriptor.VAL_FIELD_NAME;

public class FieldDescriptorHelper {

private final BlobValueTransformer blobValueTransformer;

public FieldDescriptorHelper(Serializer serializer) {
this.blobValueTransformer = new BlobValueTransformer(serializer);
}

public <T> List<FieldDescriptor> parseFields(Class<T> clazz) {
int[] index = new int[]{1};
List<FieldDescriptor> descriptors = new ArrayList<>();
ReflectionUtils.doWithFields(clazz, field -> {
ReflectionUtils.makeAccessible(field);
Class<?> type = field.getType();
ValueTransformer transformer = identifyTransformer(type);
descriptors.add(new FieldDescriptor(index[0]++, field.getName(), transformer));
}, field -> {
int modifiers = field.getModifiers();
return !Modifier.isTransient(modifiers) && !Modifier.isStatic(modifiers);
});
return addDefaultDescriptors(descriptors);
}

private ValueTransformer identifyTransformer(Class<?> type) {
ValueTransformer transformer;
if (type.isEnum()) {
transformer = EnumValueTransformer.of(type);
} else {
transformer = SimpleValueTransformer.of(type);
if (transformer == SimpleValueTransformer.DUMMY) {
transformer = blobValueTransformer;
}
}
return transformer;
}

/**
* adds default fieldDescriptors at the end of table columns by indexes
*/
public List<FieldDescriptor> addDefaultDescriptors(List<FieldDescriptor> fieldDescriptors) {
int lastIndex = fieldDescriptors
.stream()
.mapToInt(FieldDescriptor::getIndex)
.max().orElse(0);

if (fieldDescriptors.stream().noneMatch(field -> field.getName().equals(KEY_FIELD_NAME))) {
fieldDescriptors.add(new FieldDescriptor(++lastIndex, KEY_FIELD_NAME, SimpleValueTransformer.OBJECT));
}
if (fieldDescriptors.stream().noneMatch(field -> field.getName().equals(VAL_FIELD_NAME))) {
fieldDescriptors.add(new FieldDescriptor(++lastIndex, VAL_FIELD_NAME, blobValueTransformer));
}
return fieldDescriptors;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,6 @@ public class DataCapturerBus<K, V> implements CacheStore<K, V> {
*/
private static final String BUFFER_PROPERTY_NAME = "BUFFER_PROPERTY_NAME";

/** */
private static final String ON_DR_FLAG_PROPERTY_NAME = "ON_DR_FLAG_PROPERTY_NAME";

/**
* Auto-injected store session.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,21 @@
import java.util.Map;
import java.util.stream.Collectors;

import static java.util.function.Function.identity;

public class JDBCDataCapturerLoader implements DataCapturerLoader {

private final DataSource dataSource;
private final Map<String, EntityDescriptor> entityDescriptors;

/**
* @param dataSource
* @param descriptors which have the same table name with cache name.
* Otherwise they must be identified explicitly
*/
public JDBCDataCapturerLoader(DataSource dataSource, List<EntityDescriptor> descriptors) {
this(dataSource, descriptors.stream().collect(Collectors
.toMap(EntityDescriptor::getTableName, descriptor -> descriptor)));
.toMap(EntityDescriptor::getTableName, identity())));
}

public JDBCDataCapturerLoader(DataSource dataSource, Map<String, EntityDescriptor> entityDescriptors) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,9 @@
import java.util.concurrent.Future;

public interface TransactionalProducer {

Future<RecordMetadata> send(long transactionId, Map<String, Collection<Cache.Entry<?, ?>>> updates);

void close();

}
Loading

0 comments on commit 0138975

Please sign in to comment.