Skip to content

Commit

Permalink
Pattern acceleration support (#356)
Browse files Browse the repository at this point in the history
* squash merge pattern-acceleration-support

* apply spotless

* use pth-06 version 3.2.1

* add CreateTableSQL class, make variables final, TokenizerUDF use switch and single return statement, cleanup code

* refactor CreateTableSQL and TokenizerUDF

* Rename CreateTableSQL to TableSQL
  • Loading branch information
elliVM authored Oct 1, 2024
1 parent c974952 commit f4cf08a
Show file tree
Hide file tree
Showing 22 changed files with 1,780 additions and 458 deletions.
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,11 @@
<revision>0.0.1</revision>
<sha1></sha1>
<teragrep.dpf_02.version>3.0.0</teragrep.dpf_02.version>
<teragrep.dpf_03.version>10.0.1</teragrep.dpf_03.version>
<teragrep.dpf_03.version>11.0.1</teragrep.dpf_03.version>
<teragrep.jpr_01.version>3.1.1</teragrep.jpr_01.version>
<teragrep.jue_01.version>0.4.3</teragrep.jue_01.version>
<teragrep.pth_03.version>8.1.0</teragrep.pth_03.version>
<teragrep.pth_06.version>3.1.2</teragrep.pth_06.version>
<teragrep.pth_06.version>3.2.1</teragrep.pth_06.version>
<teragrep.rlp_01.version>4.0.1</teragrep.rlp_01.version>
<teragrep.rlp_03.version>1.7.6</teragrep.rlp_03.version>
</properties>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,7 @@ else if (format.equalsIgnoreCase("bytes")) {
outputCol = new UnquotedText(new TextString(ctx.t_outputParameter().fieldType().getText())).read();
}

return new StepNode(new TokenizerStep(tokenizerFormat, inputCol, outputCol));
return new StepNode(new TokenizerStep(zplnConfig, tokenizerFormat, inputCol, outputCol));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@
import com.teragrep.functions.dpf_03.BloomFilterAggregator;
import com.teragrep.pth10.steps.AbstractStep;
import com.teragrep.pth10.steps.teragrep.bloomfilter.BloomFilterForeachPartitionFunction;
import com.teragrep.pth10.steps.teragrep.bloomfilter.FilterSizes;
import com.teragrep.pth10.steps.teragrep.bloomfilter.BloomFilterTable;
import com.teragrep.pth10.steps.teragrep.bloomfilter.FilterTypes;
import com.teragrep.pth10.steps.teragrep.bloomfilter.LazyConnection;
import com.typesafe.config.Config;
import org.apache.spark.sql.Dataset;
Expand All @@ -66,7 +67,7 @@
/**
* teragrep exec bloom
*/
public class TeragrepBloomStep extends AbstractStep {
public final class TeragrepBloomStep extends AbstractStep {

public enum BloomMode {
UPDATE, CREATE, ESTIMATE, AGGREGATE, DEFAULT
Expand All @@ -76,16 +77,9 @@ public enum BloomMode {

private final Config zeppelinConfig;
public final BloomMode mode;
private String inputCol;
private String outputCol;
private String estimateCol;

// Bloom filter consts
public final static String BLOOMDB_USERNAME_CONFIG_ITEM = "dpl.pth_10.bloom.db.username";
public final static String BLOOMDB_PASSWORD_CONFIG_ITEM = "dpl.pth_10.bloom.db.password";
public final static String BLOOMDB_URL_CONFIG_ITEM = "dpl.pth_06.bloom.db.url";
public final static String BLOOM_NUMBER_OF_FIELDS_CONFIG_ITEM = "dpl.pth_06.bloom.db.fields";
public final static Double MAX_FPP = 0.01;
private final String inputCol;
private final String outputCol;
private final String estimateCol;

public TeragrepBloomStep(
Config zeppelinConfig,
Expand All @@ -100,7 +94,7 @@ public TeragrepBloomStep(
this.outputCol = outputCol;
this.estimateCol = estimateCol;

if (mode == BloomMode.ESTIMATE) {
if (mode == BloomMode.ESTIMATE || mode == BloomMode.AGGREGATE) {
// estimate is run as an aggregation
this.properties.add(CommandProperty.AGGREGATE);
}
Expand Down Expand Up @@ -134,31 +128,29 @@ public Dataset<Row> get(Dataset<Row> dataset) {

/**
* Create and store a bloom filter byte generated from Datasets rows _raw column (Ignores duplicates)
*
*
* @param dataset Dataset that is used to update database
* @return Dataset unmodified
*/
private Dataset<Row> createBloomFilter(Dataset<Row> dataset) {

writeFilterSizesToDatabase(this.zeppelinConfig);

writeFilterTypes(this.zeppelinConfig);
final BloomFilterTable table = new BloomFilterTable(zeppelinConfig);
table.create();
dataset.foreachPartition(new BloomFilterForeachPartitionFunction(this.zeppelinConfig));

return dataset;
}

/**
* Create and store a bloom filter byte arrays generated from Datasets rows _raw column (Replaces duplicates)
*
*
* @param dataset Dataset that is used to update database
* @return Dataset unmodified
*/
private Dataset<Row> updateBloomFilter(Dataset<Row> dataset) {

writeFilterSizesToDatabase(this.zeppelinConfig);

writeFilterTypes(this.zeppelinConfig);
final BloomFilterTable table = new BloomFilterTable(zeppelinConfig);
table.create();
dataset.foreachPartition(new BloomFilterForeachPartitionFunction(this.zeppelinConfig, true));

return dataset;
}

Expand All @@ -171,45 +163,44 @@ private Dataset<Row> estimateSize(Dataset<Row> dataset) {

public Dataset<Row> aggregate(Dataset<Row> dataset) {

FilterSizes filterSizes = new FilterSizes(this.zeppelinConfig);
FilterTypes filterTypes = new FilterTypes(this.zeppelinConfig);

BloomFilterAggregator agg = new BloomFilterAggregator(inputCol, estimateCol, filterSizes.asSortedMap());
BloomFilterAggregator agg = new BloomFilterAggregator(inputCol, estimateCol, filterTypes.sortedMap());

return dataset.groupBy("partition").agg(agg.toColumn().as("bloomfilter"));

}

private void writeFilterSizesToDatabase(Config config) {

FilterSizes filterSizes = new FilterSizes(config);
Connection connection = new LazyConnection(config).get();
SortedMap<Long, Double> filterSizeMap = filterSizes.asSortedMap();

for (Map.Entry<Long, Double> entry : filterSizeMap.entrySet()) {
LOGGER
.info(
"Writing filtertype[expected: <{}>, fpp: <{}>] to bloomdb.filtertype", entry.getKey(),
entry.getValue()
);

String sql = "INSERT IGNORE INTO `filtertype` (`expectedElements`, `targetFpp`) VALUES (?, ?)";

try (PreparedStatement stmt = connection.prepareStatement(sql)) {

private void writeFilterTypes(final Config config) {
final FilterTypes filterTypes = new FilterTypes(config);
final Connection connection = new LazyConnection(config).get();
final SortedMap<Long, Double> filterSizeMap = filterTypes.sortedMap();
final String pattern = filterTypes.pattern();
for (final Map.Entry<Long, Double> entry : filterSizeMap.entrySet()) {
if (LOGGER.isInfoEnabled()) {
LOGGER
.info(
"Writing filtertype (expected <[{}]>, fpp: <[{}]>, pattern: <[{}]>)", entry.getKey(),
entry.getValue(), pattern
);
}
final String sql = "INSERT IGNORE INTO `filtertype` (`expectedElements`, `targetFpp`, `pattern`) VALUES (?, ?, ?)";
try (final PreparedStatement stmt = connection.prepareStatement(sql)) {
stmt.setInt(1, entry.getKey().intValue()); // filtertype.expectedElements
stmt.setDouble(2, entry.getValue()); // filtertype.targetFpp
stmt.setString(3, pattern); // filtertype.pattern
stmt.executeUpdate();
stmt.clearParameters();

connection.commit();

}
catch (SQLException e) {
LOGGER
.error(
"Error writing filter[expected: <{}>, fpp: <{}>] into database", entry.getKey(),
entry.getValue()
);
if (LOGGER.isErrorEnabled()) {
LOGGER
.error(
"Error writing filter[expected: <{}>, fpp: <{}>, pattern: <{}>] into database",
entry.getKey(), entry.getValue(), pattern
);
}
throw new RuntimeException(e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,39 +52,42 @@
import java.sql.Connection;
import java.util.Iterator;

public class BloomFilterForeachPartitionFunction implements ForeachPartitionFunction<Row> {
public final class BloomFilterForeachPartitionFunction implements ForeachPartitionFunction<Row> {

private final FilterSizes filterSizes;
private final FilterTypes filterTypes;
private final LazyConnection lazyConnection;
private final boolean overwrite;

public BloomFilterForeachPartitionFunction(Config config) {
this(new FilterTypes(config), new LazyConnection(config), false);
}

public BloomFilterForeachPartitionFunction(Config config, boolean overwrite) {
this.filterSizes = new FilterSizes(config);
this.lazyConnection = new LazyConnection(config);
this.overwrite = overwrite;
this(new FilterTypes(config), new LazyConnection(config), overwrite);
}

public BloomFilterForeachPartitionFunction(Config config) {
this.filterSizes = new FilterSizes(config);
this.lazyConnection = new LazyConnection(config);
this.overwrite = false;
public BloomFilterForeachPartitionFunction(
FilterTypes filterTypes,
LazyConnection lazyConnection,
boolean overwrite
) {
this.filterTypes = filterTypes;
this.lazyConnection = lazyConnection;
this.overwrite = overwrite;
}

@Override
public void call(Iterator<Row> iter) throws Exception {

Connection conn = lazyConnection.get();

public void call(final Iterator<Row> iter) throws Exception {
final Connection conn = lazyConnection.get();
while (iter.hasNext()) {
Row row = iter.next(); // Row[partitionID, filterBytes]

String partition = row.getString(0);
byte[] filterBytes = (byte[]) row.get(1);
final Row row = iter.next(); // Row[partitionID, filterBytes]
final String partition = row.getString(0);
final byte[] filterBytes = (byte[]) row.get(1);
final TeragrepBloomFilter tgFilter = new TeragrepBloomFilter(partition, filterBytes, conn, filterTypes);
tgFilter.saveFilter(overwrite);

TeragrepBloomFilter filter = new TeragrepBloomFilter(partition, filterBytes, conn, filterSizes);

filter.saveFilter(overwrite);
conn.commit();

}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Teragrep Data Processing Language (DPL) translator for Apache Spark (pth_10)
* Copyright (C) 2019-2024 Suomen Kanuuna Oy
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*
* Additional permission under GNU Affero General Public License version 3
* section 7
*
* If you modify this Program, or any covered work, by linking or combining it
* with other code, such other code is not for that reason alone subject to any
* of the requirements of the GNU Affero GPL version 3 as long as this Program
* is the same Program as licensed from Suomen Kanuuna Oy without any additional
* modifications.
*
* Supplemented terms under GNU Affero General Public License version 3
* section 7
*
* Origin of the software must be attributed to Suomen Kanuuna Oy. Any modified
* versions must be marked as "Modified version of" The Program.
*
* Names of the licensors and authors may not be used for publicity purposes.
*
* No rights are granted for use of trade names, trademarks, or service marks
* which are in The Program if any.
*
* Licensee must indemnify licensors and authors for any liability that these
* contractual assumptions impose on licensors and authors.
*
* To the extent this program is licensed as part of the Commercial versions of
* Teragrep, the applicable Commercial License may apply to this file if you as
* a licensee so wish it.
*/
package com.teragrep.pth10.steps.teragrep.bloomfilter;

import com.typesafe.config.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;

public final class BloomFilterTable {

private static final Logger LOGGER = LoggerFactory.getLogger(BloomFilterTable.class);
private final TableSQL tableSQL;
private final LazyConnection conn;

public BloomFilterTable(Config config) {
this(new TableSQL(new FilterTypes(config).tableName(), false), new LazyConnection(config));
}

// used in testing
public BloomFilterTable(Config config, boolean ignoreConstraints) {
this(new TableSQL(new FilterTypes(config).tableName(), ignoreConstraints), new LazyConnection(config));
}

public BloomFilterTable(TableSQL tableSQL, LazyConnection conn) {
this.tableSQL = tableSQL;
this.conn = conn;
}

public void create() {
final String sql = tableSQL.createTableSQL();
final Connection connection = conn.get();
try (final PreparedStatement stmt = connection.prepareStatement(sql)) {
stmt.execute();
connection.commit();
LOGGER.debug("Create table SQL <{}>", sql);
}
catch (SQLException e) {
throw new RuntimeException("Error creating bloom filter table: " + e);
}
}

@Override
public boolean equals(final Object object) {
if (this == object)
return true;
if (object == null)
return false;
if (object.getClass() != this.getClass())
return false;
final BloomFilterTable cast = (BloomFilterTable) object;
return this.tableSQL.equals(cast.tableSQL) && this.conn.equals(cast.conn);
}
}
Loading

0 comments on commit f4cf08a

Please sign in to comment.