Skip to content

Commit

Permalink
[Flag Database] Fix performance issue (osmlab#207)
Browse files Browse the repository at this point in the history
* back features take 1

* clean up batch
arrangement check

* remove whitespace

* more whitespace

* add additional tests

* checkstyle
  • Loading branch information
danielduhh authored Oct 23, 2019
1 parent e46c1ea commit ce249b1
Show file tree
Hide file tree
Showing 2 changed files with 122 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import java.io.LineNumberReader;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Timestamp;
Expand Down Expand Up @@ -56,7 +57,7 @@ public class FlagDatabaseSubCommand extends AbstractAtlasShellToolsCommand
private static final String OSM_ID_LEGACY = "osmid";
private static final String CREATE_FLAG_SQL = "INSERT INTO flag(flag_id, check_name, instructions, date_created) VALUES (?,?,?,?);";
private static final String CREATE_FEATURE_SQL = String.format(
"INSERT INTO feature (flag_id, geom, osm_id, atlas_id, iso_country_code, tags, item_type, date_created) VALUES ((SELECT id FROM flag WHERE flag_id = ? LIMIT 1),%s,?,?,?,?);",
"INSERT INTO feature (flag_id, geom, osm_id, atlas_id, iso_country_code, tags, item_type, date_created) VALUES (?,%s,?,?,?,?);",
"ST_GeomFromGeoJSON(?), ?, ?");
private static final int THREE = 3;
private static final int FOUR = 4;
Expand Down Expand Up @@ -102,18 +103,20 @@ public FlagDatabaseSubCommand()
* PreparedStatement to add parameterized values to
* @param flag
* CheckFlag to associate with feature
* @param flagIdentifier
* Flag table record unique identifier
* @param feature
* - Flagged AtlasItem
*/
public void batchFlagFeatureStatement(final PreparedStatement sql, final CheckFlag flag,
final JsonObject feature)
final int flagIdentifier, final JsonObject feature)
{

final JsonObject properties = feature.get(PROPERTIES).getAsJsonObject();

try
{
sql.setString(1, flag.getIdentifier());
sql.setInt(1, flagIdentifier);
sql.setString(2, feature.get("geometry").toString());
sql.setLong(THREE, this.getOsmIdentifier(properties));
sql.setLong(FOUR, properties.get("identifier").getAsLong());
Expand All @@ -134,31 +137,6 @@ public void batchFlagFeatureStatement(final PreparedStatement sql, final CheckFl

}

/**
* Add CheckFlag values to parameterized sql INSERT statement
*
* @param sql
* - PreparedStatement to add parameterized values to
* @param flag
* - CheckFlag to insert into flag table
*/
public void batchFlagStatement(final PreparedStatement sql, final CheckFlag flag)
{
try
{
sql.setString(1, flag.getIdentifier());
sql.setString(2, flag.getChallengeName().orElse(""));
sql.setString(THREE, flag.getInstructions().replace("\n", " ").replace("'", "''"));
sql.setObject(FOUR, this.timestamp);

sql.addBatch();
}
catch (final SQLException error)
{
logger.error("Unable to create Flag {} SQL statement", flag.getIdentifier());
}
}

/***
* Create database schema from schema.sql resource file.
*
Expand Down Expand Up @@ -214,42 +192,13 @@ public int execute()

try (BufferedReader reader = FileUtility.getReader(file, logOutputFileType);
PreparedStatement flagSqlStatement = databaseConnection
.prepareStatement(CREATE_FLAG_SQL);
.prepareStatement(CREATE_FLAG_SQL,
Statement.RETURN_GENERATED_KEYS);
PreparedStatement featureSqlStatement = databaseConnection
.prepareStatement(CREATE_FEATURE_SQL))
{

final List<String> lines = reader.lines().collect(Collectors.toList());
int startIndex = 0;
int endIndex;

do
{
endIndex = Math.min(startIndex + BATCH_SIZE, lines.size());
final List<String> batchLines = lines.subList(startIndex, endIndex);

batchLines.forEach(line ->
{
final JsonObject parsedFlag = new JsonParser().parse(line)
.getAsJsonObject();
final JsonArray features = this.filterOutPointsFromGeojson(
parsedFlag.get(FEATURES).getAsJsonArray());
final CheckFlag flag = gson.fromJson(line, CheckFlag.class);

this.batchFlagStatement(flagSqlStatement, flag);

StreamSupport.stream(features.spliterator(), false)
.forEach(feature -> this.batchFlagFeatureStatement(
featureSqlStatement, flag,
feature.getAsJsonObject()));
});

flagSqlStatement.executeBatch();
featureSqlStatement.executeBatch();

startIndex = startIndex + BATCH_SIZE;
}
while (endIndex != lines.size() - 1 && startIndex < lines.size());
this.processCheckFlags(lines, flagSqlStatement, featureSqlStatement);
}
catch (final IOException error)
{
Expand All @@ -273,6 +222,31 @@ public int execute()
return 0;
}

/**
* Add CheckFlag values to parameterized sql INSERT statement
*
* @param sql
* - PreparedStatement to add parameterized values to
* @param flag
* - CheckFlag to insert into flag table
*/
public void executeFlagStatement(final PreparedStatement sql, final CheckFlag flag)
{
try
{
sql.setString(1, flag.getIdentifier());
sql.setString(2, flag.getChallengeName().orElse(""));
sql.setString(THREE, flag.getInstructions().replace("\n", " ").replace("'", "''"));
sql.setObject(FOUR, this.timestamp);

sql.executeUpdate();
}
catch (final SQLException error)
{
logger.error("Unable to create Flag {} SQL statement", flag.getIdentifier(), error);
}
}

@Override
public String getCommandName()
{
Expand All @@ -282,7 +256,7 @@ public String getCommandName()
/**
* Returns the OSM identifier for a given JsonObject. Atlas Checks OSM identifier changed from
* "osmid" to "osmIdentifier"
*
*
* @see <a href="https://github.com/osmlab/atlas-checks/pull/116/files">here</a>
* @param properties
* CheckFlag properties
Expand Down Expand Up @@ -319,6 +293,67 @@ public Map<String, String> getTags(final JsonObject properties)
return hstore;
}

/**
* This function handles parsing each CheckFlag, and batching flag features into the database
*
* @param lines
* a List of stringified CheckFlags read in from line-delimited json
* @param flagSqlStatement
* Flag PreparedStatement
* @param featureSqlStatement
* Feature PreparedStatement
*/
public void processCheckFlags(final List<String> lines,
final PreparedStatement flagSqlStatement, final PreparedStatement featureSqlStatement)
{
int counter = 0;
try
{
for (final String line : lines)
{
final JsonObject parsedFlag = new JsonParser().parse(line).getAsJsonObject();
final JsonArray features = this
.filterOutPointsFromGeojson(parsedFlag.get(FEATURES).getAsJsonArray());
final CheckFlag flag = gson.fromJson(line, CheckFlag.class);
final int flagRecordId;

// First check if the number of features in our batch is less than the maximum
if (counter + features.size() > BATCH_SIZE)
{
featureSqlStatement.executeBatch();
logger.debug("Batching {} features.", counter);
counter = 0;
}

// Add flag record to database
this.executeFlagStatement(flagSqlStatement, flag);

try (ResultSet resultSet = flagSqlStatement.getGeneratedKeys())
{
if (resultSet != null && resultSet.next())
{
// Save flag record unique id to use for feature record
flagRecordId = resultSet.getInt(1);

StreamSupport.stream(features.spliterator(), false)
.forEach(feature -> this.batchFlagFeatureStatement(
featureSqlStatement, flag, flagRecordId,
feature.getAsJsonObject()));

counter += features.size();
}
}
}
// Execute the remaining features
featureSqlStatement.executeBatch();
logger.debug("Batching the remaining {} features.", counter);
}
catch (final SQLException failure)
{
logger.error("Error creating Flag record.", failure);
}
}

@Override
public void registerManualPageSections()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ public class FlagDatabaseSubCommandTest
private Statement statement = Mockito.mock(Statement.class);
@Mock
private PreparedStatement preparedStatement = Mockito.mock(PreparedStatement.class);
@Mock
private PreparedStatement preparedStatement2 = Mockito.mock(PreparedStatement.class);

@Test
public void batchFeatureStatementTest() throws IOException, SQLException
Expand All @@ -58,24 +60,12 @@ public void batchFeatureStatementTest() throws IOException, SQLException
final CheckFlag checkFlag = gson.fromJson(flag, CheckFlag.class);
final JsonElement feature = new JsonParser().parse(flag).getAsJsonObject().get("features");

command.batchFlagFeatureStatement(this.preparedStatement, checkFlag,
command.batchFlagFeatureStatement(this.preparedStatement, checkFlag, 1,
feature.getAsJsonArray().get(0).getAsJsonObject());

Mockito.verify(this.preparedStatement).addBatch();
}

@Test
public void batchFlagStatementTest() throws IOException, SQLException
{
final FlagDatabaseSubCommand command = new FlagDatabaseSubCommand();
final String flag = this.getResource("checkflags1.log").get(0);
final CheckFlag checkFlag = gson.fromJson(flag, CheckFlag.class);

command.batchFlagStatement(this.preparedStatement, checkFlag);

Mockito.verify(this.preparedStatement).addBatch();
}

@Test
public void commandFailedExitCodeTest()
{
Expand Down Expand Up @@ -104,6 +94,18 @@ public void createDatabaseSchemaTest() throws IOException, SQLException
Mockito.verify(this.statement).execute(Mockito.anyString());
}

@Test
public void executeFlagStatementTest() throws IOException, SQLException
{
final FlagDatabaseSubCommand command = new FlagDatabaseSubCommand();
final String flag = this.getResource("checkflags1.log").get(0);
final CheckFlag checkFlag = gson.fromJson(flag, CheckFlag.class);

command.executeFlagStatement(this.preparedStatement, checkFlag);

Mockito.verify(this.preparedStatement).executeUpdate();
}

@Test
public void getOsmIdentifierTest() throws IOException
{
Expand All @@ -127,6 +129,17 @@ public void getOsmIdentifierTest() throws IOException
Assert.assertEquals(167709671, osmId2);
}

@Test
public void processCheckFlagsTest() throws IOException, SQLException
{
final FlagDatabaseSubCommand command = new FlagDatabaseSubCommand();
final List<String> flags = this.getResource("checkflags1.log");
command.processCheckFlags(flags, this.preparedStatement, this.preparedStatement2);

Mockito.verify(this.preparedStatement, Mockito.times(2)).executeUpdate();
Mockito.verify(this.preparedStatement2).executeBatch();
}

@Test
public void tagBlacklistTest() throws IOException
{
Expand Down

0 comments on commit ce249b1

Please sign in to comment.