Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[2.6.x] Concurrent artifact creation test & fixes #5169

Merged
merged 5 commits into from
Sep 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ public void importContent(ContentEntity entity) {

// When we do not want to preserve contentId, the best solution to import content is create new one with the contentBytes
// It makes sure there won't be any conflicts
long newContentId = getRegistryStorage().createOrUpdateContent(getHandle(), ContentHandle.create(entity.contentBytes), entity.contentHash, entity.canonicalHash, references, entity.serializedReferences);
getRegistryStorage().ensureContent(ContentHandle.create(entity.contentBytes), entity.contentHash, entity.canonicalHash, references, entity.serializedReferences);
long newContentId = getRegistryStorage().getContentIdByHash(entity.contentHash);

getContentIdMapping().put(entity.contentId, newContentId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ public class H2SqlStatements extends CommonSqlStatements {

/**
* Constructor.
* @param config
*/
public H2SqlStatements() {
}
Expand Down Expand Up @@ -55,21 +54,13 @@ public boolean isForeignKeyViolation(Exception error) {
}

/**
* @see io.apicurio.registry.storage.impl.sql.SqlStatements.core.storage.jdbc.ISqlStatements#isDatabaseInitialized()
* @see io.apicurio.registry.storage.impl.sql.SqlStatements#isDatabaseInitialized()
*/
@Override
public String isDatabaseInitialized() {
return "SELECT COUNT(*) AS count FROM information_schema.tables WHERE table_name = 'APICURIO'";
}

/**
* @see io.apicurio.registry.storage.impl.sql.SqlStatements#upsertContent()
*/
@Override
public String upsertContent() {
return "INSERT INTO content (tenantId, contentId, canonicalHash, contentHash, content, artifactreferences) VALUES (?, ?, ?, ?, ?, ?)";
}

/**
* @see io.apicurio.registry.storage.impl.sql.SqlStatements#upsertLogConfiguration()
*/
Expand All @@ -83,38 +74,43 @@ public String upsertLogConfiguration() {
*/
@Override
public String getNextSequenceValue() {
return "UPDATE sequences sa SET seq_value = (SELECT sb.seq_value + 1 FROM sequences sb WHERE sb.tenantId = sa.tenantId AND sb.name = sa.name) WHERE sa.tenantId = ? AND sa.name = ?";
throw new RuntimeException("Not supported for H2: getNextSequenceValue");
}

/**
* @see io.apicurio.registry.storage.impl.sql.SqlStatements#selectCurrentSequenceValue()
*/
@Override
public String selectCurrentSequenceValue() {
return "SELECT seq_value FROM sequences WHERE name = ? AND tenantId = ? ";
throw new RuntimeException("Not supported for H2: selectCurrentSequenceValue");
}

/**
* @see io.apicurio.registry.storage.impl.sql.SqlStatements#resetSequenceValue()
*/
@Override
public String resetSequenceValue() {
return "MERGE INTO sequences (tenantId, name, seq_value) KEY (tenantId, name) VALUES(?, ?, ?)";
throw new RuntimeException("Not supported for H2: resetSequenceValue");
}

/**
* @see io.apicurio.registry.storage.impl.sql.SqlStatements#insertSequenceValue()
*/
@Override
public String insertSequenceValue() {
return "INSERT INTO sequences (tenantId, name, seq_value) VALUES (?, ?, ?)";
throw new RuntimeException("Not supported for H2: insertSequenceValue");
}

/**
* @see SqlStatements#upsertReference()
*/
@Override
public String upsertReference() {
return "INSERT INTO artifactreferences (tenantId, contentId, groupId, artifactId, version, name) VALUES (?, ?, ?, ?, ?, ?)";
return "MERGE INTO artifactreferences AS target" +
" USING (VALUES (?, ?, ?, ?, ?, ?)) AS source (tenantId, contentId, groupId, artifactId, version, name)" +
" ON (target.tenantId = source.tenantId AND target.contentId = source.contentId AND target.name = source.name)" +
" WHEN NOT MATCHED THEN" +
" INSERT (tenantId, contentId, groupId, artifactId, version, name)" +
" VALUES (source.tenantId, source.contentId, source.groupId, source.artifactId, source.version, source.name)";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,18 +64,6 @@ public String isDatabaseInitialized() {
);
}

/**
* @see SqlStatements#upsertContent()
*/
@Override
public String upsertContent() {
return String.join(" ",
"INSERT IGNORE INTO content",
"(tenantId, contentId, canonicalHash, contentHash, content, artifactreferences)",
"VALUES (?, ?, ?, ?, ?, ?);"
);
}

/**
* @see SqlStatements#upsertReference()
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ public class PostgreSQLSqlStatements extends CommonSqlStatements {

/**
* Constructor.
* @param config
*/
public PostgreSQLSqlStatements() {
}
Expand Down Expand Up @@ -55,21 +54,13 @@ public boolean isForeignKeyViolation(Exception error) {
}

/**
* @see io.apicurio.registry.storage.impl.sql.SqlStatements.core.storage.jdbc.ISqlStatements#isDatabaseInitialized()
* @see io.apicurio.registry.storage.impl.sql.SqlStatements#isDatabaseInitialized()
*/
@Override
public String isDatabaseInitialized() {
return "SELECT count(*) AS count FROM information_schema.tables WHERE table_name = 'artifacts'";
}

/**
* @see io.apicurio.registry.storage.impl.sql.SqlStatements#upsertContent()
*/
@Override
public String upsertContent() {
return "INSERT INTO content (tenantId, contentId, canonicalHash, contentHash, content, artifactreferences) VALUES (?, ?, ?, ?, ?, ?) ON CONFLICT (tenantId, contentHash) DO NOTHING";
}

/**
* @see io.apicurio.registry.storage.impl.sql.SqlStatements#upsertLogConfiguration()
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ public class SQLServerSqlStatements extends CommonSqlStatements {

/**
* Constructor.
* @param config
*/
public SQLServerSqlStatements() {
}
Expand Down Expand Up @@ -55,27 +54,13 @@ public boolean isForeignKeyViolation(Exception error) {
}

/**
* @see io.apicurio.registry.storage.impl.sql.SqlStatements.core.storage.jdbc.ISqlStatements#isDatabaseInitialized()
* @see io.apicurio.registry.storage.impl.sql.SqlStatements#isDatabaseInitialized()
*/
@Override
public String isDatabaseInitialized() {
return "SELECT count(*) AS count FROM information_schema.tables WHERE table_name = 'artifacts'";
}

/**
* @see io.apicurio.registry.storage.impl.sql.SqlStatements#upsertContent()
*/
@Override
public String upsertContent() {
return String.join(" ",
"MERGE INTO content AS target",
"USING (VALUES (?, ?, ?, ?, ?, ?)) AS source (tenantId, contentId, canonicalHash, contentHash, content, artifactreferences)",
"ON (target.tenantId = source.tenantId AND target.contentHash = source.contentHash)",
"WHEN NOT MATCHED THEN",
"INSERT (tenantId, contentId, canonicalHash, contentHash, content, artifactreferences)",
"VALUES (source.tenantId, source.contentId, source.canonicalHash, source.contentHash, source.content, source.artifactreferences);");
}

/**
* @see io.apicurio.registry.storage.impl.sql.SqlStatements#upsertLogConfiguration()
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,11 @@ public interface SqlStatements {

/**
* Returns true if the given exception represents a primary key violation.
*
* @param error
*/
public boolean isPrimaryKeyViolation(Exception error);

/**
* Returns true if the given exception represents a foreign key violation.
*
* @param error
*/
public boolean isForeignKeyViolation(Exception error);

Expand All @@ -57,9 +53,6 @@ public interface SqlStatements {

/**
* A sequence of statements needed to upgrade the DB from one version to another.
*
* @param fromVersion
* @param toVersion
*/
public List<String> databaseUpgrade(int fromVersion, int toVersion);

Expand Down Expand Up @@ -189,11 +182,6 @@ public interface SqlStatements {
*/
public String selectArtifactContentIds();

/**
* A statement to "upsert" a row in the "content" table.
*/
public String upsertContent();

/**
* A statement to update canonicalHash value in a row in the "content" table
*/
Expand Down Expand Up @@ -652,5 +640,4 @@ public interface SqlStatements {
* A statement used to select all version #s for a given artifactId.
*/
public String selectArtifactVersionsSkipDisabled();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
package io.apicurio.registry.noprofile.rest.v2;

import io.apicurio.registry.AbstractResourceTestBase;
import io.apicurio.registry.rest.v2.beans.ArtifactMetaData;
import io.apicurio.registry.rest.v2.beans.ArtifactSearchResults;
import io.apicurio.registry.rest.v2.beans.SortBy;
import io.apicurio.registry.rest.v2.beans.SortOrder;
import io.apicurio.registry.types.ArtifactType;
import io.apicurio.registry.utils.tests.TestUtils;
import io.quarkus.test.junit.QuarkusTest;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CountDownLatch;

@QuarkusTest
public class ConcurrentCreateTest extends AbstractResourceTestBase {

@Test
public void testMultipleArtifacts() throws Exception {
String oaiArtifactContent = resourceToString("openapi-empty.json");
String groupId = TestUtils.generateGroupId();

Set<String> created = new HashSet<>();
Set<String> failed = new HashSet<>();
CountDownLatch latch = new CountDownLatch(5);

// Create artifacts
for (int i = 0; i < 5; i++) {
final int forkId = i;
TestUtils.fork(() -> {
String artifactId = "artifact-" + forkId;
System.out.println("[Fork-" + forkId + "] Starting");
System.out.println("[Fork-" + forkId + "] Artifact ID: " + artifactId);
try {
InputStream data = new ByteArrayInputStream(oaiArtifactContent.getBytes());

// Create the artifact
ArtifactMetaData amd = clientV2.createArtifact(groupId, artifactId, ArtifactType.OPENAPI, data);
System.out.println("[Fork-" + forkId + "] Artifact created.");
Assertions.assertNotNull(amd);
Assertions.assertEquals(groupId, amd.getGroupId());
Assertions.assertEquals(artifactId, amd.getId());

// Fetch the artifact and make sure it really got created.
amd = clientV2.getArtifactMetaData(groupId, artifactId);
Assertions.assertNotNull(amd);
Assertions.assertEquals(groupId, amd.getGroupId());
Assertions.assertEquals(artifactId, amd.getId());

System.out.println("[Fork-" + forkId + "] Completed successfully.");
created.add(artifactId);
} catch (Exception e) {
System.out.println("[Fork-" + forkId + "] FAILED: " + e.getMessage());
failed.add(artifactId);
}
latch.countDown();
});
}

latch.await();

Assertions.assertEquals(0, failed.size());
Assertions.assertEquals(5, created.size());

ArtifactSearchResults results = clientV2.searchArtifacts(groupId, null, null, null, null, SortBy.createdOn, SortOrder.asc, 0, 100);
Assertions.assertNotNull(results);
Assertions.assertEquals(5, results.getCount());

results = clientV2.listArtifactsInGroup(groupId);
Assertions.assertNotNull(results);
Assertions.assertEquals(5, results.getCount());

}

@Test
public void testSameArtifact() throws Exception {
String oaiArtifactContent = resourceToString("openapi-empty.json");
String groupId = TestUtils.generateGroupId();

Set<String> created = new HashSet<>();
Set<String> failed = new HashSet<>();
CountDownLatch latch = new CountDownLatch(5);

// Try to create the SAME artifact 5 times.
for (int i = 0; i < 5; i++) {
final int forkId = i;
TestUtils.fork(() -> {
String artifactId = "test-artifact";
System.out.println("[Fork-" + forkId + "] Starting");
try {
InputStream data = new ByteArrayInputStream(oaiArtifactContent.getBytes());

// Create the artifact
ArtifactMetaData amd = clientV2.createArtifact(groupId, artifactId, ArtifactType.OPENAPI, data);
System.out.println("[Fork-" + forkId + "] Artifact created.");
Assertions.assertNotNull(amd);
Assertions.assertEquals(groupId, amd.getGroupId());
Assertions.assertEquals(artifactId, amd.getId());

// Fetch the artifact and make sure it really got created.
amd = clientV2.getArtifactMetaData(groupId, artifactId);
Assertions.assertNotNull(amd);
Assertions.assertEquals(groupId, amd.getGroupId());
Assertions.assertEquals(artifactId, amd.getId());

System.out.println("[Fork-" + forkId + "] Completed successfully.");
created.add("" + forkId);
} catch (Exception e) {
System.out.println("[Fork-" + forkId + "] FAILED: " + e.getMessage());
failed.add("" + forkId);
}
latch.countDown();
});
}

latch.await();

Assertions.assertEquals(4, failed.size());
Assertions.assertEquals(1, created.size());

ArtifactSearchResults results = clientV2.searchArtifacts(groupId, null, null, null, null, SortBy.createdOn, SortOrder.asc, 0, 100);
Assertions.assertNotNull(results);
Assertions.assertEquals(1, results.getCount());

results = clientV2.listArtifactsInGroup(groupId);
Assertions.assertNotNull(results);
Assertions.assertEquals(1, results.getCount());

}

}
Loading
Loading