Skip to content

Commit

Permalink
Revert "Add Support to Upsert/Insert Ignore on PDB"
Browse files Browse the repository at this point in the history
This reverts commit e100a51
  • Loading branch information
victorcmg-fdz committed Aug 19, 2024
1 parent 9911c4d commit 5647ad5
Show file tree
Hide file tree
Showing 12 changed files with 159 additions and 425 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,22 @@
*/
package com.feedzai.commons.sql.abstraction.batch;

import com.feedzai.commons.sql.abstraction.FailureListener;
import com.feedzai.commons.sql.abstraction.engine.DatabaseEngine;
import com.feedzai.commons.sql.abstraction.engine.DatabaseEngineException;
import com.feedzai.commons.sql.abstraction.entry.EntityEntry;
import com.feedzai.commons.sql.abstraction.listeners.BatchListener;
import com.feedzai.commons.sql.abstraction.listeners.MetricsListener;
import com.feedzai.commons.sql.abstraction.listeners.impl.NoopBatchListener;
import com.feedzai.commons.sql.abstraction.listeners.impl.NoopMetricsListener;
import com.google.common.base.Strings;
import org.apache.commons.lang3.time.DurationFormatUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.Marker;
import org.slf4j.MarkerFactory;

import javax.annotation.Nullable;
import java.time.Duration;
import java.util.Collections;
import java.util.LinkedList;
Expand All @@ -27,21 +42,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.Nullable;
import org.apache.commons.lang3.time.DurationFormatUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.Marker;
import org.slf4j.MarkerFactory;

import com.feedzai.commons.sql.abstraction.FailureListener;
import com.feedzai.commons.sql.abstraction.engine.DatabaseEngine;
import com.feedzai.commons.sql.abstraction.engine.DatabaseEngineException;
import com.feedzai.commons.sql.abstraction.entry.EntityEntry;
import com.feedzai.commons.sql.abstraction.listeners.BatchListener;
import com.feedzai.commons.sql.abstraction.listeners.MetricsListener;
import com.feedzai.commons.sql.abstraction.listeners.impl.NoopBatchListener;
import com.feedzai.commons.sql.abstraction.listeners.impl.NoopMetricsListener;

/**
* A Batch that periodically flushes pending insertions to the database.
Expand Down Expand Up @@ -365,17 +365,6 @@ protected AbstractBatch(final DatabaseEngine de, final int batchSize, final long
this(de, null, batchSize, batchTimeout, maxAwaitTimeShutdown);
}

/**
* A functional interface to represent a {@link java.util.function.BiConsumer} that throws an exception.
*
* @param <T> the type of the first argument to the operation.
* @param <R> the type of the second argument to the operation.
*/
@FunctionalInterface
public interface ThrowingBiConsumer<T, R> {
void accept(T t, R r) throws Exception;
}

/**
* Starts the timer task.
*/
Expand Down Expand Up @@ -463,24 +452,6 @@ public void add(final String entityName, final EntityEntry ee) throws DatabaseEn
* @implSpec Same as {@link #flush(boolean)} with {@code false}.
*/
public void flush() {
logger.trace("Start batch flushing entries.");
flush(this::processBatch);
}

/**
* Flushes the pending batches ignoring duplicate entries.
*/
public void flushIgnore() {
logger.trace("Start batch flushing ignoring duplicated entries.");
flush((this::processBatchIgnoring));
}

/**
* Flushes the pending batches given a processing callback function.
*
* @param processBatch A (throwing) BiConsumer to process the batch entries.
*/
private void flush(final ThrowingBiConsumer<DatabaseEngine, List<BatchEntry>> processBatch) {
this.metricsListener.onFlushTriggered();
final long flushTriggeredMs = System.currentTimeMillis();
List<BatchEntry> temp;
Expand Down Expand Up @@ -514,7 +485,7 @@ private void flush(final ThrowingBiConsumer<DatabaseEngine, List<BatchEntry>> pr
this.metricsListener.onFlushStarted(flushTriggeredMs, temp.size());
start = System.currentTimeMillis();

processBatch.accept(de, temp);
processBatch(de, temp);

onFlushFinished(flushTriggeredMs, temp, Collections.emptyList());
logger.trace("[{}] Batch flushed. Took {} ms, {} rows.", name, System.currentTimeMillis() - start, temp.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@

package com.feedzai.commons.sql.abstraction.batch;

import java.util.List;
import java.util.concurrent.CompletableFuture;

import com.feedzai.commons.sql.abstraction.engine.DatabaseEngine;
import com.feedzai.commons.sql.abstraction.engine.DatabaseEngineException;
import com.feedzai.commons.sql.abstraction.entry.EntityEntry;

import java.util.List;
import java.util.concurrent.CompletableFuture;

/**
* A abstract {@link PdbBatch} with useful default base methods for concrete implementations.
*
Expand Down Expand Up @@ -72,29 +72,4 @@ protected void processBatch(final DatabaseEngine de, final List<BatchEntry> batc
de.flush();
de.commit();
}

/**
* Processes all batch entries ignoring duplicate entries.
*
* @implSpec Same as {@link #processBatch(DatabaseEngine, List)}}.
*
* @param de The {@link DatabaseEngine} on which to perform the flush.
* @param batchEntries The list of batch entries to be flushed.
* @throws DatabaseEngineException If the operation failed.
*/
protected void processBatchIgnoring(final DatabaseEngine de, final List<BatchEntry> batchEntries) throws DatabaseEngineException {
/*
Begin transaction before the addBatch calls, in order to force the retry of the connection if it was lost during
or since the last batch. Otherwise, the addBatch call that uses a prepared statement will fail.
*/
de.beginTransaction();

for (final BatchEntry entry : batchEntries) {
de.addBatchIgnore(entry.getTableName(), entry.getEntityEntry());
}

de.flushIgnore();
de.commit();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@

package com.feedzai.commons.sql.abstraction.batch;

import java.util.concurrent.CompletableFuture;

import com.feedzai.commons.sql.abstraction.entry.EntityEntry;

import java.util.concurrent.CompletableFuture;

/**
* Interface specifying a batch that periodically flushes pending insertions to the database.
*
Expand Down Expand Up @@ -60,12 +60,4 @@ public interface PdbBatch extends AutoCloseable {
* @return A void {@link CompletableFuture} that completes when the flush action finishes.
*/
CompletableFuture<Void> flushAsync() throws Exception;

/**
* Flushes the pending batches ignoring duplicated key violations.
*
* @throws Exception If an error occurs while flushing.
*/
void flushIgnore() throws Exception;

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,21 @@

package com.feedzai.commons.sql.abstraction.batch.impl;

import com.feedzai.commons.sql.abstraction.batch.AbstractPdbBatch;
import com.feedzai.commons.sql.abstraction.batch.BatchEntry;
import com.feedzai.commons.sql.abstraction.batch.PdbBatch;
import com.feedzai.commons.sql.abstraction.engine.DatabaseEngine;
import com.feedzai.commons.sql.abstraction.engine.configuration.PdbProperties;
import com.feedzai.commons.sql.abstraction.listeners.BatchListener;
import com.feedzai.commons.sql.abstraction.listeners.MetricsListener;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.commons.lang3.time.DurationFormatUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.Marker;
import org.slf4j.MarkerFactory;

import javax.inject.Inject;
import java.time.Duration;
import java.util.Collections;
import java.util.LinkedList;
Expand All @@ -38,20 +52,6 @@
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import javax.inject.Inject;
import org.apache.commons.lang3.time.DurationFormatUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.Marker;
import org.slf4j.MarkerFactory;

import com.feedzai.commons.sql.abstraction.batch.AbstractPdbBatch;
import com.feedzai.commons.sql.abstraction.batch.BatchEntry;
import com.feedzai.commons.sql.abstraction.batch.PdbBatch;
import com.feedzai.commons.sql.abstraction.engine.DatabaseEngine;
import com.feedzai.commons.sql.abstraction.engine.configuration.PdbProperties;
import com.feedzai.commons.sql.abstraction.listeners.BatchListener;
import com.feedzai.commons.sql.abstraction.listeners.MetricsListener;

/**
* A Batch that periodically flushes pending insertions to the database using multiple threads/connections.
Expand Down Expand Up @@ -340,11 +340,6 @@ When done, the future removes itself (if done already, all this can be skipped).
}
}

@Override
public void flushIgnore() {
logger.trace("Flush ignoring not available for MultithreadedBatch. Skipping ...");
}

/**
* Flushes the given list batch entries to {@link DatabaseEngine} immediately.
*
Expand Down
Loading

0 comments on commit 5647ad5

Please sign in to comment.