From 5647ad5dd420a0f974f48f2a535087ec1bfcce81 Mon Sep 17 00:00:00 2001 From: Victor Camargo Date: Mon, 19 Aug 2024 17:24:26 +0100 Subject: [PATCH] Revert "Add Support to Upsert/Insert Ignore on PDB" This reverts commit e100a516 --- .../sql/abstraction/batch/AbstractBatch.java | 61 +++------ .../abstraction/batch/AbstractPdbBatch.java | 31 +---- .../sql/abstraction/batch/PdbBatch.java | 12 +- .../batch/impl/MultithreadedBatch.java | 33 ++--- .../engine/AbstractDatabaseEngine.java | 120 ++++++------------ .../abstraction/engine/DatabaseEngine.java | 42 ++---- .../sql/abstraction/engine/MappedEntity.java | 39 +----- .../sql/abstraction/engine/impl/H2Engine.java | 64 +++------- .../engine/impl/PostgreSqlEngine.java | 17 +-- .../engine/pool/PooledDatabaseEngine.java | 10 -- .../engine/impl/abs/BatchUpdateTest.java | 109 +++++----------- .../engine/impl/abs/EngineCloseTest.java | 46 +++---- 12 files changed, 159 insertions(+), 425 deletions(-) diff --git a/src/main/java/com/feedzai/commons/sql/abstraction/batch/AbstractBatch.java b/src/main/java/com/feedzai/commons/sql/abstraction/batch/AbstractBatch.java index e18164fc..8cb0ee51 100644 --- a/src/main/java/com/feedzai/commons/sql/abstraction/batch/AbstractBatch.java +++ b/src/main/java/com/feedzai/commons/sql/abstraction/batch/AbstractBatch.java @@ -15,7 +15,22 @@ */ package com.feedzai.commons.sql.abstraction.batch; +import com.feedzai.commons.sql.abstraction.FailureListener; +import com.feedzai.commons.sql.abstraction.engine.DatabaseEngine; +import com.feedzai.commons.sql.abstraction.engine.DatabaseEngineException; +import com.feedzai.commons.sql.abstraction.entry.EntityEntry; +import com.feedzai.commons.sql.abstraction.listeners.BatchListener; +import com.feedzai.commons.sql.abstraction.listeners.MetricsListener; +import com.feedzai.commons.sql.abstraction.listeners.impl.NoopBatchListener; +import com.feedzai.commons.sql.abstraction.listeners.impl.NoopMetricsListener; import com.google.common.base.Strings; +import org.apache.commons.lang3.time.DurationFormatUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.Marker; +import org.slf4j.MarkerFactory; + +import javax.annotation.Nullable; import java.time.Duration; import java.util.Collections; import java.util.LinkedList; @@ -27,21 +42,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -import javax.annotation.Nullable; -import org.apache.commons.lang3.time.DurationFormatUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.slf4j.Marker; -import org.slf4j.MarkerFactory; - -import com.feedzai.commons.sql.abstraction.FailureListener; -import com.feedzai.commons.sql.abstraction.engine.DatabaseEngine; -import com.feedzai.commons.sql.abstraction.engine.DatabaseEngineException; -import com.feedzai.commons.sql.abstraction.entry.EntityEntry; -import com.feedzai.commons.sql.abstraction.listeners.BatchListener; -import com.feedzai.commons.sql.abstraction.listeners.MetricsListener; -import com.feedzai.commons.sql.abstraction.listeners.impl.NoopBatchListener; -import com.feedzai.commons.sql.abstraction.listeners.impl.NoopMetricsListener; /** * A Batch that periodically flushes pending insertions to the database. @@ -365,17 +365,6 @@ protected AbstractBatch(final DatabaseEngine de, final int batchSize, final long this(de, null, batchSize, batchTimeout, maxAwaitTimeShutdown); } - /** - * A functional interface to represent a {@link java.util.function.BiConsumer} that throws an exception. - * - * @param the type of the first argument to the operation. - * @param the type of the second argument to the operation. - */ - @FunctionalInterface - public interface ThrowingBiConsumer { - void accept(T t, R r) throws Exception; - } - /** * Starts the timer task. */ @@ -463,24 +452,6 @@ public void add(final String entityName, final EntityEntry ee) throws DatabaseEn * @implSpec Same as {@link #flush(boolean)} with {@code false}. */ public void flush() { - logger.trace("Start batch flushing entries."); - flush(this::processBatch); - } - - /** - * Flushes the pending batches ignoring duplicate entries. - */ - public void flushIgnore() { - logger.trace("Start batch flushing ignoring duplicated entries."); - flush((this::processBatchIgnoring)); - } - - /** - * Flushes the pending batches given a processing callback function. - * - * @param processBatch A (throwing) BiConsumer to process the batch entries. - */ - private void flush(final ThrowingBiConsumer> processBatch) { this.metricsListener.onFlushTriggered(); final long flushTriggeredMs = System.currentTimeMillis(); List temp; @@ -514,7 +485,7 @@ private void flush(final ThrowingBiConsumer> pr this.metricsListener.onFlushStarted(flushTriggeredMs, temp.size()); start = System.currentTimeMillis(); - processBatch.accept(de, temp); + processBatch(de, temp); onFlushFinished(flushTriggeredMs, temp, Collections.emptyList()); logger.trace("[{}] Batch flushed. Took {} ms, {} rows.", name, System.currentTimeMillis() - start, temp.size()); diff --git a/src/main/java/com/feedzai/commons/sql/abstraction/batch/AbstractPdbBatch.java b/src/main/java/com/feedzai/commons/sql/abstraction/batch/AbstractPdbBatch.java index bcfb87d8..1ecb61e9 100644 --- a/src/main/java/com/feedzai/commons/sql/abstraction/batch/AbstractPdbBatch.java +++ b/src/main/java/com/feedzai/commons/sql/abstraction/batch/AbstractPdbBatch.java @@ -16,13 +16,13 @@ package com.feedzai.commons.sql.abstraction.batch; -import java.util.List; -import java.util.concurrent.CompletableFuture; - import com.feedzai.commons.sql.abstraction.engine.DatabaseEngine; import com.feedzai.commons.sql.abstraction.engine.DatabaseEngineException; import com.feedzai.commons.sql.abstraction.entry.EntityEntry; +import java.util.List; +import java.util.concurrent.CompletableFuture; + /** * A abstract {@link PdbBatch} with useful default base methods for concrete implementations. * @@ -72,29 +72,4 @@ protected void processBatch(final DatabaseEngine de, final List batc de.flush(); de.commit(); } - - /** - * Processes all batch entries ignoring duplicate entries. - * - * @implSpec Same as {@link #processBatch(DatabaseEngine, List)}}. - * - * @param de The {@link DatabaseEngine} on which to perform the flush. - * @param batchEntries The list of batch entries to be flushed. - * @throws DatabaseEngineException If the operation failed. - */ - protected void processBatchIgnoring(final DatabaseEngine de, final List batchEntries) throws DatabaseEngineException { - /* - Begin transaction before the addBatch calls, in order to force the retry of the connection if it was lost during - or since the last batch. Otherwise, the addBatch call that uses a prepared statement will fail. - */ - de.beginTransaction(); - - for (final BatchEntry entry : batchEntries) { - de.addBatchIgnore(entry.getTableName(), entry.getEntityEntry()); - } - - de.flushIgnore(); - de.commit(); - } - } diff --git a/src/main/java/com/feedzai/commons/sql/abstraction/batch/PdbBatch.java b/src/main/java/com/feedzai/commons/sql/abstraction/batch/PdbBatch.java index 48a4198b..64f8a1b4 100644 --- a/src/main/java/com/feedzai/commons/sql/abstraction/batch/PdbBatch.java +++ b/src/main/java/com/feedzai/commons/sql/abstraction/batch/PdbBatch.java @@ -16,10 +16,10 @@ package com.feedzai.commons.sql.abstraction.batch; -import java.util.concurrent.CompletableFuture; - import com.feedzai.commons.sql.abstraction.entry.EntityEntry; +import java.util.concurrent.CompletableFuture; + /** * Interface specifying a batch that periodically flushes pending insertions to the database. * @@ -60,12 +60,4 @@ public interface PdbBatch extends AutoCloseable { * @return A void {@link CompletableFuture} that completes when the flush action finishes. */ CompletableFuture flushAsync() throws Exception; - - /** - * Flushes the pending batches ignoring duplicated key violations. - * - * @throws Exception If an error occurs while flushing. - */ - void flushIgnore() throws Exception; - } diff --git a/src/main/java/com/feedzai/commons/sql/abstraction/batch/impl/MultithreadedBatch.java b/src/main/java/com/feedzai/commons/sql/abstraction/batch/impl/MultithreadedBatch.java index 51944473..e9a3dc70 100644 --- a/src/main/java/com/feedzai/commons/sql/abstraction/batch/impl/MultithreadedBatch.java +++ b/src/main/java/com/feedzai/commons/sql/abstraction/batch/impl/MultithreadedBatch.java @@ -16,7 +16,21 @@ package com.feedzai.commons.sql.abstraction.batch.impl; +import com.feedzai.commons.sql.abstraction.batch.AbstractPdbBatch; +import com.feedzai.commons.sql.abstraction.batch.BatchEntry; +import com.feedzai.commons.sql.abstraction.batch.PdbBatch; +import com.feedzai.commons.sql.abstraction.engine.DatabaseEngine; +import com.feedzai.commons.sql.abstraction.engine.configuration.PdbProperties; +import com.feedzai.commons.sql.abstraction.listeners.BatchListener; +import com.feedzai.commons.sql.abstraction.listeners.MetricsListener; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.commons.lang3.time.DurationFormatUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.Marker; +import org.slf4j.MarkerFactory; + +import javax.inject.Inject; import java.time.Duration; import java.util.Collections; import java.util.LinkedList; @@ -38,20 +52,6 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; -import javax.inject.Inject; -import org.apache.commons.lang3.time.DurationFormatUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.slf4j.Marker; -import org.slf4j.MarkerFactory; - -import com.feedzai.commons.sql.abstraction.batch.AbstractPdbBatch; -import com.feedzai.commons.sql.abstraction.batch.BatchEntry; -import com.feedzai.commons.sql.abstraction.batch.PdbBatch; -import com.feedzai.commons.sql.abstraction.engine.DatabaseEngine; -import com.feedzai.commons.sql.abstraction.engine.configuration.PdbProperties; -import com.feedzai.commons.sql.abstraction.listeners.BatchListener; -import com.feedzai.commons.sql.abstraction.listeners.MetricsListener; /** * A Batch that periodically flushes pending insertions to the database using multiple threads/connections. @@ -340,11 +340,6 @@ When done, the future removes itself (if done already, all this can be skipped). } } - @Override - public void flushIgnore() { - logger.trace("Flush ignoring not available for MultithreadedBatch. Skipping ..."); - } - /** * Flushes the given list batch entries to {@link DatabaseEngine} immediately. * diff --git a/src/main/java/com/feedzai/commons/sql/abstraction/engine/AbstractDatabaseEngine.java b/src/main/java/com/feedzai/commons/sql/abstraction/engine/AbstractDatabaseEngine.java index 433a9a87..5fd2affa 100644 --- a/src/main/java/com/feedzai/commons/sql/abstraction/engine/AbstractDatabaseEngine.java +++ b/src/main/java/com/feedzai/commons/sql/abstraction/engine/AbstractDatabaseEngine.java @@ -15,6 +15,33 @@ */ package com.feedzai.commons.sql.abstraction.engine; +import com.feedzai.commons.sql.abstraction.FailureListener; +import com.feedzai.commons.sql.abstraction.batch.AbstractBatch; +import com.feedzai.commons.sql.abstraction.batch.BatchConfig; +import com.feedzai.commons.sql.abstraction.batch.DefaultBatch; +import com.feedzai.commons.sql.abstraction.batch.PdbBatch; +import com.feedzai.commons.sql.abstraction.ddl.DbColumn; +import com.feedzai.commons.sql.abstraction.ddl.DbColumnType; +import com.feedzai.commons.sql.abstraction.ddl.DbEntity; +import com.feedzai.commons.sql.abstraction.ddl.DbEntityType; +import com.feedzai.commons.sql.abstraction.ddl.DbFk; +import com.feedzai.commons.sql.abstraction.ddl.DbIndex; +import com.feedzai.commons.sql.abstraction.dml.Expression; +import com.feedzai.commons.sql.abstraction.dml.dialect.Dialect; +import com.feedzai.commons.sql.abstraction.dml.result.ResultColumn; +import com.feedzai.commons.sql.abstraction.dml.result.ResultIterator; +import com.feedzai.commons.sql.abstraction.engine.configuration.PdbProperties; +import com.feedzai.commons.sql.abstraction.engine.handler.ExceptionHandler; +import com.feedzai.commons.sql.abstraction.engine.handler.OperationFault; +import com.feedzai.commons.sql.abstraction.engine.handler.QueryExceptionHandler; +import com.feedzai.commons.sql.abstraction.entry.EntityEntry; +import com.feedzai.commons.sql.abstraction.exceptions.DatabaseEngineRetryableException; +import com.feedzai.commons.sql.abstraction.exceptions.DatabaseEngineRetryableRuntimeException; +import com.feedzai.commons.sql.abstraction.listeners.BatchListener; +import com.feedzai.commons.sql.abstraction.util.AESHelper; +import com.feedzai.commons.sql.abstraction.util.Constants; +import com.feedzai.commons.sql.abstraction.util.InitiallyReusableByteArrayOutputStream; +import com.feedzai.commons.sql.abstraction.util.PreparedStatementCapsule; import com.google.common.collect.ImmutableList; import com.google.inject.AbstractModule; import com.google.inject.Guice; @@ -22,6 +49,14 @@ import com.google.inject.Injector; import com.google.inject.Stage; import com.google.inject.util.Providers; +import org.apache.commons.lang3.NotImplementedException; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.Marker; +import org.slf4j.MarkerFactory; + +import javax.annotation.Nullable; import java.io.ByteArrayOutputStream; import java.io.File; import java.io.FileInputStream; @@ -50,41 +85,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -import javax.annotation.Nullable; -import org.apache.commons.lang3.NotImplementedException; -import org.apache.commons.lang3.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.slf4j.Marker; -import org.slf4j.MarkerFactory; - -import com.feedzai.commons.sql.abstraction.FailureListener; -import com.feedzai.commons.sql.abstraction.batch.AbstractBatch; -import com.feedzai.commons.sql.abstraction.batch.BatchConfig; -import com.feedzai.commons.sql.abstraction.batch.DefaultBatch; -import com.feedzai.commons.sql.abstraction.batch.PdbBatch; -import com.feedzai.commons.sql.abstraction.ddl.DbColumn; -import com.feedzai.commons.sql.abstraction.ddl.DbColumnType; -import com.feedzai.commons.sql.abstraction.ddl.DbEntity; -import com.feedzai.commons.sql.abstraction.ddl.DbEntityType; -import com.feedzai.commons.sql.abstraction.ddl.DbFk; -import com.feedzai.commons.sql.abstraction.ddl.DbIndex; -import com.feedzai.commons.sql.abstraction.dml.Expression; -import com.feedzai.commons.sql.abstraction.dml.dialect.Dialect; -import com.feedzai.commons.sql.abstraction.dml.result.ResultColumn; -import com.feedzai.commons.sql.abstraction.dml.result.ResultIterator; -import com.feedzai.commons.sql.abstraction.engine.configuration.PdbProperties; -import com.feedzai.commons.sql.abstraction.engine.handler.ExceptionHandler; -import com.feedzai.commons.sql.abstraction.engine.handler.OperationFault; -import com.feedzai.commons.sql.abstraction.engine.handler.QueryExceptionHandler; -import com.feedzai.commons.sql.abstraction.entry.EntityEntry; -import com.feedzai.commons.sql.abstraction.exceptions.DatabaseEngineRetryableException; -import com.feedzai.commons.sql.abstraction.exceptions.DatabaseEngineRetryableRuntimeException; -import com.feedzai.commons.sql.abstraction.listeners.BatchListener; -import com.feedzai.commons.sql.abstraction.util.AESHelper; -import com.feedzai.commons.sql.abstraction.util.Constants; -import com.feedzai.commons.sql.abstraction.util.InitiallyReusableByteArrayOutputStream; -import com.feedzai.commons.sql.abstraction.util.PreparedStatementCapsule; import static com.feedzai.commons.sql.abstraction.batch.AbstractBatch.DEFAULT_RETRY_INTERVAL; import static com.feedzai.commons.sql.abstraction.batch.AbstractBatch.NO_RETRY; @@ -503,7 +503,6 @@ private void closeMappedEntity(final MappedEntity mappedEntity) { final PreparedStatement insert = mappedEntity.getInsert(); final PreparedStatement insertReturning = mappedEntity.getInsertReturning(); final PreparedStatement insertWithAutoInc = mappedEntity.getInsertWithAutoInc(); - final PreparedStatement insertIgnoring = mappedEntity.getInsertIgnoring(); if (!insert.isClosed()) { insert.executeBatch(); @@ -517,10 +516,6 @@ private void closeMappedEntity(final MappedEntity mappedEntity) { insertWithAutoInc.executeBatch(); } - if (insertIgnoring != null && !insertIgnoring.isClosed()) { - insertIgnoring.executeBatch(); - } - } catch (final SQLException e) { logger.debug(String.format("Failed to flush before closing mapped entity '%s'", mappedEntity.getEntity().getName()), e); @@ -954,26 +949,6 @@ public synchronized void flush() throws DatabaseEngineException { } } - /** - * Flushes the batches for all the registered entities. - * - * @throws DatabaseEngineException If something goes wrong while persisting data. - */ - @Override - public synchronized void flushIgnore() throws DatabaseEngineException { - /* - * Reconnect on this method does not make sense since a new connection will have nothing to flush. - */ - - try { - for (MappedEntity me : entities.values()) { - me.getInsertIgnoring().executeBatch(); - } - } catch (final Exception ex) { - throw getQueryExceptionHandler().handleException(ex, "Something went wrong while flushing"); - } - } - /** * Commits the current transaction. You should only call this method if you've previously called {@link AbstractDatabaseEngine#beginTransaction()}. * @@ -1141,7 +1116,7 @@ protected void configure() { /** * Creates a new batch that periodically flushes a batch. A flush will also occur when the maximum number of statements in the batch is reached. *

- * Please be sure to call {@link AbstractBatch#destroy() } before closing the session with the database + * Please be sure to call {@link com.feedzai.commons.sql.abstraction.batch.AbstractBatch#destroy() } before closing the session with the database * * @param batchSize The batch size. * @param batchTimeout If inserts do not occur after the specified time, a flush will be performed. @@ -1340,27 +1315,6 @@ public synchronized void addBatch(final String name, final EntityEntry entry) th } - @Override - public synchronized void addBatchIgnore(final String name, final EntityEntry entry) throws DatabaseEngineException { - try { - - final MappedEntity me = entities.get(name); - - if (me == null) { - throw new DatabaseEngineException(String.format("Unknown entity '%s'", name)); - } - - PreparedStatement ps = me.getInsertIgnoring(); - - entityToPreparedStatementForBatch(me.getEntity(), ps, entry, true); - - ps.addBatch(); - } catch (final Exception ex) { - throw new DatabaseEngineException("Error adding to batch", ex); - } - - } - /** * Translates the given entry entity to the prepared statement when used in the context of batch updates. *

@@ -1940,7 +1894,7 @@ public Map getQueryMetadata(String query) throws DatabaseE /** * Maps the database type to {@link DbColumnType}. If there's no mapping a {@link DbColumnType#UNMAPPED} is returned. * - * @param type The SQL type from {@link Types}. + * @param type The SQL type from {@link java.sql.Types}. * @param typeName The native database type name. It provides additional information for * derived classes to resolve types unmapped here. * @return The {@link DbColumnType}. diff --git a/src/main/java/com/feedzai/commons/sql/abstraction/engine/DatabaseEngine.java b/src/main/java/com/feedzai/commons/sql/abstraction/engine/DatabaseEngine.java index 6577a000..05ed81ce 100644 --- a/src/main/java/com/feedzai/commons/sql/abstraction/engine/DatabaseEngine.java +++ b/src/main/java/com/feedzai/commons/sql/abstraction/engine/DatabaseEngine.java @@ -15,15 +15,6 @@ */ package com.feedzai.commons.sql.abstraction.engine; -import java.sql.Connection; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Optional; -import java.util.Properties; -import javax.annotation.Nullable; -import org.slf4j.Logger; - import com.feedzai.commons.sql.abstraction.FailureListener; import com.feedzai.commons.sql.abstraction.batch.AbstractBatch; import com.feedzai.commons.sql.abstraction.batch.BatchConfig; @@ -39,6 +30,15 @@ import com.feedzai.commons.sql.abstraction.engine.handler.ExceptionHandler; import com.feedzai.commons.sql.abstraction.entry.EntityEntry; import com.feedzai.commons.sql.abstraction.listeners.BatchListener; +import org.slf4j.Logger; + +import javax.annotation.Nullable; +import java.sql.Connection; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Properties; /** * Interface with the specific database implementation. @@ -73,14 +73,14 @@ public interface DatabaseEngine extends AutoCloseable { /** * Loads an entity into the engine. *

- * No DDL commands will be executed, only prepared statements will be created in order to {@link #persist(String, EntityEntry) persist} + * No DDL commands will be executed, only prepared statements will be created in order to {@link #persist(String, com.feedzai.commons.sql.abstraction.entry.EntityEntry) persist} * data into the entities. * * @param entity The entity to load into the connection. * @throws DatabaseEngineException If something goes wrong while loading the entity. * @implSpec The invocation of this method multiple times is allowed. If the entity already exists, the invocation is a no-op. - * @implNote The implementation is similar to the {@link #addEntity(DbEntity) addEntity} that configured with - * {@link PdbProperties#SCHEMA_POLICY SCHEMA_POLICY} of none. + * @implNote The implementation is similar to the {@link #addEntity(com.feedzai.commons.sql.abstraction.ddl.DbEntity) addEntity} that configured with + * {@link com.feedzai.commons.sql.abstraction.engine.configuration.PdbProperties#SCHEMA_POLICY SCHEMA_POLICY} of none. * @since 2.1.2 */ void loadEntity(DbEntity entity) throws DatabaseEngineException; @@ -90,7 +90,7 @@ public interface DatabaseEngine extends AutoCloseable { * Updates an entity in the engine. *

*

- * If the entity does not exists in the instance, the method {@link #addEntity(DbEntity)} will be invoked. + * If the entity does not exists in the instance, the method {@link #addEntity(com.feedzai.commons.sql.abstraction.ddl.DbEntity)} will be invoked. *

*

* The engine will compare the entity with the {@link #getMetadata(String)} information and update the schema of the table. @@ -187,13 +187,6 @@ public interface DatabaseEngine extends AutoCloseable { */ void flush() throws DatabaseEngineException; - /** - * Flushes the batches for all the registered entities ignoring duplicated entries. - * - * @throws DatabaseEngineException If something goes wrong while persisting data. - */ - void flushIgnore() throws DatabaseEngineException; - /** * Commits the current transaction. You should only call this method if you've previously called * {@link DatabaseEngine#beginTransaction()}. @@ -392,15 +385,6 @@ default AbstractBatch createBatch(final int batchSize, final long batchTimeout, */ void addBatch(final String name, final EntityEntry entry) throws DatabaseEngineException; - /** - * Adds an entry to the batch ignoring duplicate entries. - * - * @param name The entity name. - * @param entry The entry to persist. - * @throws DatabaseEngineException If something goes wrong while persisting data. - */ - void addBatchIgnore(final String name, final EntityEntry entry) throws DatabaseEngineException; - /** * Executes the given query. * diff --git a/src/main/java/com/feedzai/commons/sql/abstraction/engine/MappedEntity.java b/src/main/java/com/feedzai/commons/sql/abstraction/engine/MappedEntity.java index 884c02f1..838c6f1e 100644 --- a/src/main/java/com/feedzai/commons/sql/abstraction/engine/MappedEntity.java +++ b/src/main/java/com/feedzai/commons/sql/abstraction/engine/MappedEntity.java @@ -15,13 +15,13 @@ */ package com.feedzai.commons.sql.abstraction.engine; -import java.sql.PreparedStatement; -import java.sql.SQLException; +import com.feedzai.commons.sql.abstraction.ddl.DbEntity; +import com.feedzai.commons.sql.abstraction.entry.EntityEntry; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.feedzai.commons.sql.abstraction.ddl.DbEntity; -import com.feedzai.commons.sql.abstraction.entry.EntityEntry; +import java.sql.PreparedStatement; +import java.sql.SQLException; /** * Mapped entity contains information about an entity that has been mapped using the engine. @@ -50,10 +50,6 @@ public class MappedEntity implements AutoCloseable { * The prepared statement to insert new values. */ private PreparedStatement insertReturning = null; - /** - * The prepared statement to insert new values ignoring duplicated keys. - */ - private PreparedStatement insertIgnoring = null; /** * The auto increment column if exists; */ @@ -149,35 +145,13 @@ public PreparedStatement getInsertWithAutoInc() { * Sets the insert statement auto inc columns. * * @param insertWithAutoInc The insert statement with auto inc columns. - * @return This mapped entity; + * @return This mapped entity; * @see DatabaseEngine#persist(String, EntityEntry, boolean) */ public MappedEntity setInsertWithAutoInc(final PreparedStatement insertWithAutoInc) { closeQuietly(this.insertWithAutoInc); this.insertWithAutoInc = insertWithAutoInc; - - return this; - } - - /** - * Gets the prepared statement for inserts ignoring duplicated keys. - * - * @return The insert statement that allows ignoring duplicated keys. - */ - public PreparedStatement getInsertIgnoring() { - return insertIgnoring; - } - - /** - * Sets the insert that allows ignoring duplicated keys. - * - * @param insertIgnoring The insert statement that allows ignoring duplicated keys - * @return This mapped entity - */ - public MappedEntity setInsertIgnoring(final PreparedStatement insertIgnoring) { - closeQuietly(this.insertIgnoring); - this.insertIgnoring = insertIgnoring; - + return this; } @@ -241,6 +215,5 @@ public void close() throws Exception { closeQuietly(this.insert); closeQuietly(this.insertWithAutoInc); closeQuietly(this.insertReturning); - closeQuietly(this.insertIgnoring); } } diff --git a/src/main/java/com/feedzai/commons/sql/abstraction/engine/impl/H2Engine.java b/src/main/java/com/feedzai/commons/sql/abstraction/engine/impl/H2Engine.java index bae334ee..7d603896 100644 --- a/src/main/java/com/feedzai/commons/sql/abstraction/engine/impl/H2Engine.java +++ b/src/main/java/com/feedzai/commons/sql/abstraction/engine/impl/H2Engine.java @@ -15,21 +15,6 @@ */ package com.feedzai.commons.sql.abstraction.engine.impl; -import java.io.ByteArrayInputStream; -import java.io.Serializable; -import java.io.StringReader; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Statement; -import java.sql.Types; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - import com.feedzai.commons.sql.abstraction.ddl.DbColumn; import com.feedzai.commons.sql.abstraction.ddl.DbColumnConstraint; import com.feedzai.commons.sql.abstraction.ddl.DbEntity; @@ -49,6 +34,21 @@ import com.feedzai.commons.sql.abstraction.engine.handler.QueryExceptionHandler; import com.feedzai.commons.sql.abstraction.engine.impl.h2.H2QueryExceptionHandler; +import java.io.ByteArrayInputStream; +import java.io.Serializable; +import java.io.StringReader; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.sql.Types; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + import static com.feedzai.commons.sql.abstraction.dml.dialect.SqlBuilder.column; import static com.feedzai.commons.sql.abstraction.dml.dialect.SqlBuilder.max; import static com.feedzai.commons.sql.abstraction.dml.dialect.SqlBuilder.select; @@ -463,7 +463,6 @@ protected MappedEntity createPreparedStatementForInserts(final DbEntity entity) insertIntoWithAutoInc.add("(" + join(columnsWithAutoInc, ", ") + ")"); insertIntoWithAutoInc.add("VALUES (" + join(valuesWithAutoInc, ", ") + ")"); - final String statementWithMerge = buildMergeStatement(entity, columns, values); final String statement = join(insertInto, " "); // The H2 DB doesn't implement INSERT RETURNING. Therefore, we just create a dummy statement, which will @@ -473,7 +472,7 @@ protected MappedEntity createPreparedStatementForInserts(final DbEntity entity) logger.trace(statement); - final PreparedStatement ps, psReturn, psWithAutoInc, psMerge; + final PreparedStatement ps, psReturn, psWithAutoInc; try { // Generate keys when the table has at least 1 column with auto generate value. @@ -481,46 +480,19 @@ protected MappedEntity createPreparedStatementForInserts(final DbEntity entity) ps = this.conn.prepareStatement(statement, generateKeys); psReturn = this.conn.prepareStatement(insertReturnStatement, generateKeys); psWithAutoInc = this.conn.prepareStatement(statementWithAutoInt, generateKeys); - psMerge = this.conn.prepareStatement(statementWithMerge, generateKeys); + return new MappedEntity() .setInsert(ps) .setInsertReturning(psReturn) .setInsertWithAutoInc(psWithAutoInc) // The auto incremented column must be set, so when persisting a row, it's possible to retrieve its value // by consulting the column name from this MappedEntity. - .setAutoIncColumn(columnWithAutoIncName) - .setInsertIgnoring(psMerge); + .setAutoIncColumn(columnWithAutoIncName); } catch (final SQLException ex) { throw new DatabaseEngineException("Something went wrong handling statement", ex); } } - /** - * Helper method to create a merge statement for this engine. - * - * @param entity The entity. - * @param columns The columns of this entity. - * @param values The values of the entity. - * - * @return A merge statement. - */ - private String buildMergeStatement(final DbEntity entity, final List columns, final List values) { - final String statementWithMerge; - if (!entity.getPkFields().isEmpty() && !columns.isEmpty() && !values.isEmpty()) { - final List mergeInto = new ArrayList<>(); - mergeInto.add("MERGE INTO"); - mergeInto.add(quotize(entity.getName())); - - mergeInto.add("(" + join(columns, ", ") + ")"); - mergeInto.add("VALUES (" + join(values, ", ") + ")"); - - statementWithMerge = join(mergeInto, " "); - } else { - statementWithMerge = ""; - } - return statementWithMerge; - } - @Override protected void dropSequences(DbEntity entity) { /* diff --git a/src/main/java/com/feedzai/commons/sql/abstraction/engine/impl/PostgreSqlEngine.java b/src/main/java/com/feedzai/commons/sql/abstraction/engine/impl/PostgreSqlEngine.java index b1b8d667..19902402 100644 --- a/src/main/java/com/feedzai/commons/sql/abstraction/engine/impl/PostgreSqlEngine.java +++ b/src/main/java/com/feedzai/commons/sql/abstraction/engine/impl/PostgreSqlEngine.java @@ -351,15 +351,9 @@ protected MappedEntity createPreparedStatementForInserts(final DbEntity entity) List insertInto = new ArrayList<>(); insertInto.add("INSERT INTO"); insertInto.add(quotize(entity.getName())); - List insertIntoWithAutoInc = new ArrayList<>(); insertIntoWithAutoInc.add("INSERT INTO"); insertIntoWithAutoInc.add(quotize(entity.getName())); - - List insertIntoIgnoring = new ArrayList<>(); - insertIntoIgnoring.add("INSERT INTO"); - insertIntoIgnoring.add(quotize(entity.getName())); - List columns = new ArrayList<>(); List values = new ArrayList<>(); List columnsWithAutoInc = new ArrayList<>(); @@ -383,10 +377,6 @@ protected MappedEntity createPreparedStatementForInserts(final DbEntity entity) insertIntoWithAutoInc.add("(" + join(columnsWithAutoInc, ", ") + ")"); insertIntoWithAutoInc.add("VALUES (" + join(valuesWithAutoInc, ", ") + ")"); - insertIntoIgnoring.add("(" + join(columns, ", ") + ")"); - insertIntoIgnoring.add("VALUES (" + join(values, ", ") + ")"); - insertIntoIgnoring.add("ON CONFLICT DO NOTHING"); - List insertIntoReturn = new ArrayList<>(insertInto); @@ -397,21 +387,18 @@ protected MappedEntity createPreparedStatementForInserts(final DbEntity entity) final String insertStatement = join(insertInto, " "); final String insertReturnStatement = join(insertIntoReturn, " "); final String statementWithAutoInt = join(insertIntoWithAutoInc, " "); - final String insertIgnoring = join(insertIntoIgnoring, " ");; logger.trace(insertStatement); logger.trace(insertReturnStatement); - logger.trace(insertIgnoring); - PreparedStatement ps, psReturn, psWithAutoInc, psWithInsertIgnoring; + PreparedStatement ps, psReturn, psWithAutoInc; try { ps = conn.prepareStatement(insertStatement); psReturn = conn.prepareStatement(insertReturnStatement); psWithAutoInc = conn.prepareStatement(statementWithAutoInt); - psWithInsertIgnoring = conn.prepareStatement(insertIgnoring); - return new MappedEntity().setInsert(ps).setInsertReturning(psReturn).setInsertWithAutoInc(psWithAutoInc).setInsertIgnoring(psWithInsertIgnoring).setAutoIncColumn(returning); + return new MappedEntity().setInsert(ps).setInsertReturning(psReturn).setInsertWithAutoInc(psWithAutoInc).setAutoIncColumn(returning); } catch (final SQLException ex) { throw new DatabaseEngineException("Something went wrong handling statement", ex); } diff --git a/src/main/java/com/feedzai/commons/sql/abstraction/engine/pool/PooledDatabaseEngine.java b/src/main/java/com/feedzai/commons/sql/abstraction/engine/pool/PooledDatabaseEngine.java index 6c0af084..4c9b159d 100644 --- a/src/main/java/com/feedzai/commons/sql/abstraction/engine/pool/PooledDatabaseEngine.java +++ b/src/main/java/com/feedzai/commons/sql/abstraction/engine/pool/PooledDatabaseEngine.java @@ -154,11 +154,6 @@ public void flush() throws DatabaseEngineException { engine.flush(); } - @Override - public void flushIgnore() throws DatabaseEngineException { - engine.flushIgnore(); - } - @Override public void commit() throws DatabaseEngineRuntimeException { engine.commit(); @@ -234,11 +229,6 @@ public void addBatch(final String name, final EntityEntry entry) throws Database engine.addBatch(name, entry); } - @Override - public void addBatchIgnore(final String name, final EntityEntry entry) throws DatabaseEngineException { - engine.addBatchIgnore(name, entry); - } - @Override public List> query(final Expression query) throws DatabaseEngineException { return engine.query(query); diff --git a/src/test/java/com/feedzai/commons/sql/abstraction/engine/impl/abs/BatchUpdateTest.java b/src/test/java/com/feedzai/commons/sql/abstraction/engine/impl/abs/BatchUpdateTest.java index fbefd0b7..6a94be67 100644 --- a/src/test/java/com/feedzai/commons/sql/abstraction/engine/impl/abs/BatchUpdateTest.java +++ b/src/test/java/com/feedzai/commons/sql/abstraction/engine/impl/abs/BatchUpdateTest.java @@ -15,12 +15,40 @@ */ package com.feedzai.commons.sql.abstraction.engine.impl.abs; -import ch.qos.logback.classic.Level; -import ch.qos.logback.classic.Logger; +import com.feedzai.commons.sql.abstraction.batch.AbstractBatch; +import com.feedzai.commons.sql.abstraction.batch.AbstractBatchConfig; +import com.feedzai.commons.sql.abstraction.batch.BatchEntry; +import com.feedzai.commons.sql.abstraction.batch.PdbBatch; +import com.feedzai.commons.sql.abstraction.batch.impl.DefaultBatch; +import com.feedzai.commons.sql.abstraction.batch.impl.DefaultBatchConfig; +import com.feedzai.commons.sql.abstraction.batch.impl.MultithreadedBatchConfig; +import com.feedzai.commons.sql.abstraction.ddl.DbEntity; +import com.feedzai.commons.sql.abstraction.dml.result.ResultColumn; +import com.feedzai.commons.sql.abstraction.engine.AbstractDatabaseEngine; +import com.feedzai.commons.sql.abstraction.engine.DatabaseEngine; +import com.feedzai.commons.sql.abstraction.engine.DatabaseEngineException; +import com.feedzai.commons.sql.abstraction.engine.DatabaseEngineRuntimeException; +import com.feedzai.commons.sql.abstraction.engine.DatabaseFactory; +import com.feedzai.commons.sql.abstraction.engine.DatabaseFactoryException; +import com.feedzai.commons.sql.abstraction.engine.testconfig.DatabaseConfiguration; +import com.feedzai.commons.sql.abstraction.engine.testconfig.DatabaseTestUtil; +import com.feedzai.commons.sql.abstraction.entry.EntityEntry; +import com.feedzai.commons.sql.abstraction.listeners.BatchListener; +import com.feedzai.commons.sql.abstraction.listeners.MetricsListener; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; import com.google.common.util.concurrent.Uninterruptibles; +import mockit.Invocation; +import mockit.Mock; +import mockit.MockUp; +import org.assertj.core.api.ObjectAssert; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + import java.sql.Connection; import java.sql.SQLException; import java.time.Duration; @@ -42,38 +70,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; import java.util.stream.Collectors; -import mockit.Invocation; -import mockit.Mock; -import mockit.MockUp; -import org.assertj.core.api.ObjectAssert; -import org.junit.After; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.slf4j.LoggerFactory; - -import com.feedzai.commons.sql.abstraction.batch.AbstractBatch; -import com.feedzai.commons.sql.abstraction.batch.AbstractBatchConfig; -import com.feedzai.commons.sql.abstraction.batch.BatchEntry; -import com.feedzai.commons.sql.abstraction.batch.PdbBatch; -import com.feedzai.commons.sql.abstraction.batch.impl.DefaultBatch; -import com.feedzai.commons.sql.abstraction.batch.impl.DefaultBatchConfig; -import com.feedzai.commons.sql.abstraction.batch.impl.MultithreadedBatchConfig; -import com.feedzai.commons.sql.abstraction.ddl.DbEntity; -import com.feedzai.commons.sql.abstraction.dml.result.ResultColumn; -import com.feedzai.commons.sql.abstraction.engine.AbstractDatabaseEngine; -import com.feedzai.commons.sql.abstraction.engine.DatabaseEngine; -import com.feedzai.commons.sql.abstraction.engine.DatabaseEngineException; -import com.feedzai.commons.sql.abstraction.engine.DatabaseEngineRuntimeException; -import com.feedzai.commons.sql.abstraction.engine.DatabaseFactory; -import com.feedzai.commons.sql.abstraction.engine.DatabaseFactoryException; -import com.feedzai.commons.sql.abstraction.engine.testconfig.DatabaseConfiguration; -import com.feedzai.commons.sql.abstraction.engine.testconfig.DatabaseTestUtil; -import com.feedzai.commons.sql.abstraction.entry.EntityEntry; -import com.feedzai.commons.sql.abstraction.listeners.BatchListener; -import com.feedzai.commons.sql.abstraction.listeners.MetricsListener; import static com.feedzai.commons.sql.abstraction.ddl.DbColumnType.BOOLEAN; import static com.feedzai.commons.sql.abstraction.ddl.DbColumnType.DOUBLE; @@ -138,11 +134,6 @@ public static List data() throws Exception { @Parameterized.Parameter(1) public Supplier> batchConfigBuilderSupplier; - @BeforeClass - public static void initStatic() { - ((Logger) LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME)).setLevel(Level.TRACE); - } - @Before public void init() throws DatabaseFactoryException { properties = new Properties() { @@ -829,46 +820,6 @@ public void batchInsertDuplicateFlushWithDBErrorTest() throws Exception { checkFailedDuplicateEntries(batchListener, numTestEntries, idx); } - /** - * Tests if a batch with entries having duplicate ignores when there is an error. - * - * @throws Exception if any operations on the batch fail. - */ - @Test - public void batchInsertOnIgnoreDuplicateFlushTest() throws Exception { - final TestBatchListener batchListener = new TestBatchListener(); - final int numTestEntries = 2; - - addTestEntityWithPrimaryKey(); - - final DefaultBatch batch = engine.createBatch(DefaultBatchConfig.builder() - .withName("batchInsertOnIgnoreDuplicateFlushTest") - .withBatchSize(numTestEntries + 1) - .withBatchTimeout(Duration.ofSeconds(100)) - .withMaxAwaitTimeShutdown(Duration.ofSeconds(1000)) - .withBatchListener(batchListener) - .build() - ); - - // Add entries to batch, no flush should take place because numTestEntries < batch size and batch timeout is huge - final int idx = 0; - final EntityEntry testEntry = getTestEntry(idx); - batch.add("TEST", testEntry); - batch.add("TEST", testEntry); - - // Explicit flush, but ignoring the duplicate entries in the batch. - batch.flushIgnore(); - - // Check that entries were not added to onFlushFailure(). - assertTrue("Entries should not be added to failed", batchListener.failed.isEmpty()); - - // Check that all entries succeeded - assertEquals("Entries should have all succeeded to be persisted", numTestEntries, batchListener.succeeded.size()); - - // Considering they are the same entry, only one should be inserted in the database. - checkTestEntriesInDB(1); - } - /** * Create test table. */ diff --git a/src/test/java/com/feedzai/commons/sql/abstraction/engine/impl/abs/EngineCloseTest.java b/src/test/java/com/feedzai/commons/sql/abstraction/engine/impl/abs/EngineCloseTest.java index e500e56c..19d1428c 100644 --- a/src/test/java/com/feedzai/commons/sql/abstraction/engine/impl/abs/EngineCloseTest.java +++ b/src/test/java/com/feedzai/commons/sql/abstraction/engine/impl/abs/EngineCloseTest.java @@ -15,35 +15,31 @@ */ package com.feedzai.commons.sql.abstraction.engine.impl.abs; -import ch.qos.logback.classic.Level; -import ch.qos.logback.classic.Logger; -import java.sql.SQLException; -import java.sql.Statement; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Properties; +import com.feedzai.commons.sql.abstraction.ddl.DbEntity; +import com.feedzai.commons.sql.abstraction.engine.AbstractDatabaseEngine; +import com.feedzai.commons.sql.abstraction.engine.DatabaseEngine; +import com.feedzai.commons.sql.abstraction.engine.DatabaseEngineException; +import com.feedzai.commons.sql.abstraction.engine.DatabaseFactory; +import com.feedzai.commons.sql.abstraction.engine.DatabaseFactoryException; +import com.feedzai.commons.sql.abstraction.engine.NameAlreadyExistsException; +import com.feedzai.commons.sql.abstraction.engine.testconfig.DatabaseConfiguration; +import com.feedzai.commons.sql.abstraction.engine.testconfig.DatabaseTestUtil; import mockit.Capturing; import mockit.Expectations; import mockit.Mock; import mockit.MockUp; import mockit.Verifications; import org.junit.Before; -import org.junit.BeforeClass; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import org.slf4j.LoggerFactory; -import com.feedzai.commons.sql.abstraction.ddl.DbEntity; -import com.feedzai.commons.sql.abstraction.engine.AbstractDatabaseEngine; -import com.feedzai.commons.sql.abstraction.engine.DatabaseEngine; -import com.feedzai.commons.sql.abstraction.engine.DatabaseEngineException; -import com.feedzai.commons.sql.abstraction.engine.DatabaseFactory; -import com.feedzai.commons.sql.abstraction.engine.DatabaseFactoryException; -import com.feedzai.commons.sql.abstraction.engine.NameAlreadyExistsException; -import com.feedzai.commons.sql.abstraction.engine.testconfig.DatabaseConfiguration; -import com.feedzai.commons.sql.abstraction.engine.testconfig.DatabaseTestUtil; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Properties; import static com.feedzai.commons.sql.abstraction.engine.EngineTestUtils.buildEntity; import static com.feedzai.commons.sql.abstraction.engine.configuration.PdbProperties.ENGINE; @@ -52,7 +48,6 @@ import static com.feedzai.commons.sql.abstraction.engine.configuration.PdbProperties.SCHEMA_POLICY; import static com.feedzai.commons.sql.abstraction.engine.configuration.PdbProperties.USERNAME; - /** * Tests closing a {@link DatabaseEngine} to make sure all resources are cleaned up correctly. *

@@ -97,11 +92,6 @@ public EngineCloseTest(final DatabaseConfiguration config, final String schemaPo this.schemaPolicy = schemaPolicy; } - @BeforeClass - public static void initStatic() { - ((Logger) LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME)).setLevel(Level.TRACE); - } - @Before public void setUp() throws DatabaseFactoryException { properties = new Properties() {{ @@ -118,7 +108,7 @@ public void setUp() throws DatabaseFactoryException { /** * Test that closing a database engine with multiple entities closes all insert statements associated with each * entity, regardless of the schema policy used. - * + *

* Each entity is associated with 3 prepared statements. This test ensures that 3 PSs per entity are closed. * * @param preparedStatementMock The mock to check number of closed prepared statements. @@ -149,8 +139,8 @@ void dropEntity(final DbEntity entity) { } engine.close(); new Verifications() {{ - preparedStatementMock.close(); times = 2 * 4 + 2; // {2 entities} x {PSs per entity} + {cached PSs} - preparedStatementMock.executeBatch(); times = 2 * 4; // {2 entities} x {PSs per entity} + preparedStatementMock.close(); times = 2 * 3 + 2; // {2 entities} x {PSs per entity} + {cached PSs} + preparedStatementMock.executeBatch(); times = 2 * 3; // {2 entities} x {PSs per entity} }}; }