Skip to content

Commit

Permalink
Created a basic test for creating artifacts concurrently
Browse files Browse the repository at this point in the history
  • Loading branch information
EricWittmann committed Sep 12, 2024
1 parent 47c7e7a commit 802dbfa
Show file tree
Hide file tree
Showing 8 changed files with 268 additions and 113 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@
import java.time.Duration;
import java.time.Instant;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand All @@ -79,7 +81,6 @@ public abstract class AbstractSqlRegistryStorage implements RegistryStorage {

private static int DB_VERSION = Integer.valueOf(
IoUtil.toString(AbstractSqlRegistryStorage.class.getResourceAsStream("db-version"))).intValue();
private static final Object inmemorySequencesMutex = new Object();

private static final ObjectMapper mapper = new ObjectMapper();

Expand All @@ -92,6 +93,14 @@ public abstract class AbstractSqlRegistryStorage implements RegistryStorage {
private static final String CONTENT_ID_SEQUENCE = "contentId";
private static final String COMMENT_ID_SEQUENCE = "commentId";

// Sequence counters ** Note: only used for H2 in-memory **
private static final Map<String, AtomicLong> sequenceCounters = new HashMap<>();
static {
sequenceCounters.put(GLOBAL_ID_SEQUENCE, new AtomicLong(0));
sequenceCounters.put(CONTENT_ID_SEQUENCE, new AtomicLong(0));
sequenceCounters.put(COMMENT_ID_SEQUENCE, new AtomicLong(0));
}

@Inject
Logger log;

Expand Down Expand Up @@ -190,6 +199,17 @@ void initialize() {
return null;
});

// If using H2, we need to initialize the sequence counters by querying for
// the current max value of each in the DB.
if (isH2()) {
handles.withHandleNoException((handle) -> {
sequenceCounters.get(GLOBAL_ID_SEQUENCE).set(getMaxGlobalId(handle));
sequenceCounters.get(CONTENT_ID_SEQUENCE).set(getMaxContentId(handle));
sequenceCounters.get(COMMENT_ID_SEQUENCE).set(getMaxCommentId(handle));
return null;
});
}

isReady = true;
SqlStorageEvent initializeEvent = new SqlStorageEvent();
initializeEvent.setType(SqlStorageEventType.READY);
Expand Down Expand Up @@ -624,18 +644,6 @@ protected Long createOrUpdateContent(Handle handle, String artifactType, Content
return createOrUpdateContent(handle, content, contentHash, canonicalContentHash, references, RegistryContentUtils.serializeReferences(references));
}

private byte[] concatContentAndReferences(byte[] contentBytes, String references) throws IOException {
if (references != null) {
final byte[] referencesBytes = references.getBytes(StandardCharsets.UTF_8);
ByteArrayOutputStream outputStream = new ByteArrayOutputStream(contentBytes.length + referencesBytes.length);
outputStream.write(contentBytes);
outputStream.write(referencesBytes);
return outputStream.toByteArray();
} else {
return contentBytes;
}
}

/**
* Store the content in the database and return the ID of the new row. If the content already exists,
* just return the content ID of the existing row.
Expand All @@ -652,56 +660,23 @@ protected Long createOrUpdateContent(Handle handle, ContentHandle content, Strin
// if a row doesn't already exist. We use the content hash to determine whether
// a row for this content already exists. If we find a row we return its globalId.
// If we don't find a row, we insert one and then return its globalId.
String sql;
Long contentId;
boolean insertReferences = true;
if (Set.of("mysql", "mssql", "postgresql").contains(sqlStatements.dbType())) {
sql = sqlStatements.upsertContent();
handle.createUpdate(sql)
.bind(0, tenantContext.tenantId())
.bind(1, nextContentId(handle))
.bind(2, canonicalContentHash)
.bind(3, contentHash)
.bind(4, contentBytes)
.bind(5, referencesSerialized)
.execute();
sql = sqlStatements.selectContentIdByHash();
contentId = handle.createQuery(sql)
.bind(0, contentHash)
.bind(1, tenantContext.tenantId())
.mapTo(Long.class)
.one();
} else if ("h2".equals(sqlStatements.dbType())) {
sql = sqlStatements.selectContentIdByHash();
Optional<Long> contentIdOptional = handle.createQuery(sql)
.bind(0, contentHash)
.bind(1, tenantContext.tenantId())
.mapTo(Long.class)
.findOne();
if (contentIdOptional.isPresent()) {
contentId = contentIdOptional.get();
//If the content is already present there's no need to create the references.
insertReferences = false;
} else {
sql = sqlStatements.upsertContent();
handle.createUpdate(sql)
.bind(0, tenantContext.tenantId())
.bind(1, nextContentId(handle))
.bind(2, canonicalContentHash)
.bind(3, contentHash)
.bind(4, contentBytes)
.bind(5, referencesSerialized)
.execute();
sql = sqlStatements.selectContentIdByHash();
contentId = handle.createQuery(sql)
.bind(0, contentHash)
.bind(1, tenantContext.tenantId())
.mapTo(Long.class)
.one();
}
} else {
throw new UnsupportedOperationException("Unsupported database type: " + sqlStatements.dbType());
}
String sql = sqlStatements.upsertContent();
handle.createUpdate(sql)
.bind(0, tenantContext.tenantId())
.bind(1, nextContentId(handle))
.bind(2, canonicalContentHash)
.bind(3, contentHash)
.bind(4, contentBytes)
.bind(5, referencesSerialized)
.execute();
sql = sqlStatements.selectContentIdByHash();
contentId = handle.createQuery(sql)
.bind(0, contentHash)
.bind(1, tenantContext.tenantId())
.mapTo(Long.class)
.one();

if (insertReferences) {
//Finally, insert references into the "artifactreferences" table if the content wasn't present yet.
Expand Down Expand Up @@ -752,9 +727,9 @@ protected ArtifactMetaDataDto createArtifactWithMetadata(String groupId, String
String createdBy = securityIdentity.getPrincipal().getName();
Date createdOn = new Date();

//Only create group metadata for non-default groups.
if (groupId != null && !isGroupExists(groupId)) {
//Only create group metadata for non-default groups.
createGroup(GroupMetaDataDto.builder()
upsertGroup(GroupMetaDataDto.builder()
.groupId(groupId)
.createdOn(createdOn.getTime())
.modifiedOn(createdOn.getTime())
Expand Down Expand Up @@ -2694,6 +2669,26 @@ public void createGroup(GroupMetaDataDto group) throws GroupAlreadyExistsExcepti
}
}

private void upsertGroup(GroupMetaDataDto group) throws RegistryStorageException {
this.handles.withHandle(handle -> {
String sql = sqlStatements.upsertGroup();
handle.createUpdate(sql)
.bind(0, tenantContext.tenantId())
.bind(1, group.getGroupId())
.bind(2, group.getDescription())
.bind(3, group.getArtifactsType())
.bind(4, group.getCreatedBy())
// TODO io.apicurio.registry.storage.dto.GroupMetaDataDto should not use raw numeric timestamps
.bind(5, group.getCreatedOn() == 0 ? new Date() : new Date(group.getCreatedOn()))
.bind(6, group.getModifiedBy())
.bind(7, group.getModifiedOn() == 0 ? null : new Date(group.getModifiedOn()))
.bind(8, RegistryContentUtils.serializeProperties(group.getProperties()))
.execute();
return null;
});
}


/**
* @see RegistryStorage#updateGroupMetaData(io.apicurio.registry.storage.dto.GroupMetaDataDto)
*/
Expand Down Expand Up @@ -3563,18 +3558,46 @@ protected void resetCommentId(Handle handle) {
resetSequence(handle, COMMENT_ID_SEQUENCE, sqlStatements.selectMaxCommentId());
}

private void resetSequence(Handle handle, String sequenceName, String sqlMaxIdFromTable) {
Optional<Long> maxIdTable = handle.createQuery(sqlMaxIdFromTable)
protected long getMaxGlobalId(Handle handle) {
return getMaxId(handle, sqlStatements.selectMaxGlobalId());
}

protected long getMaxContentId(Handle handle) {
return getMaxId(handle, sqlStatements.selectMaxContentId());
}

protected long getMaxCommentId(Handle handle) {
return getMaxId(handle, sqlStatements.selectMaxCommentId());
}

protected long getMaxId(Handle handle, String sql) {
Optional<Long> maxIdTable = handle.createQuery(sql)
.bind(0, tenantContext.tenantId())
.mapTo(Long.class)
.findOne();
return maxIdTable.orElseGet(() -> {
return 1L;
});
}

Optional<Long> currentIdSeq = handle.createQuery(sqlStatements.selectCurrentSequenceValue())
.bind(0, sequenceName)
.bind(1, tenantContext.tenantId())
private void resetSequence(Handle handle, String sequenceName, String sqlMaxIdFromTable) {
Optional<Long> maxIdTable = handle.createQuery(sqlMaxIdFromTable)
.bind(0, tenantContext.tenantId())
.mapTo(Long.class)
.findOne();

Optional<Long> current;
if (isH2()) {
current = Optional.of(sequenceCounters.get(sequenceName).get());
} else {
current = handle.createQuery(sqlStatements.selectCurrentSequenceValue())
.bind(0, sequenceName)
.bind(1, tenantContext.tenantId())
.mapTo(Long.class)
.findOne();
}
final Optional<Long> currentIdSeq = current;

//TODO maybe do this in one query
Optional<Long> maxId = maxIdTable
.map(maxIdTableValue -> {
Expand All @@ -3588,7 +3611,6 @@ private void resetSequence(Handle handle, String sequenceName, String sqlMaxIdFr
return maxIdTableValue;
});


if (maxId.isPresent()) {
log.info("Resetting {} sequence", sequenceName);
long id = maxId.get();
Expand All @@ -3600,6 +3622,9 @@ private void resetSequence(Handle handle, String sequenceName, String sqlMaxIdFr
.bind(2, id)
.bind(3, id)
.execute();
} else if (isH2()) {
// H2 uses atomic counters instead of the DB for sequences.
sequenceCounters.get(sequenceName).set(id);
} else {
handle.createUpdate(sqlStatements.resetSequenceValue())
.bind(0, tenantContext.tenantId())
Expand Down Expand Up @@ -3885,37 +3910,7 @@ private long nextSequenceValue(Handle handle, String sequenceName) {
.mapTo(Long.class)
.one();
} else {
// no way to automatically increment the sequence in h2 with just one query
// we are incresing the sequence value in a way that it's not safe for concurrent executions
// for kafkasql storage this method is not supposed to be executed concurrently
// but for inmemory storage that's not guaranteed
// that forces us to use an inmemory lock, should not cause any harm
// caveat emptor , consider yourself as warned
synchronized (inmemorySequencesMutex) {
Optional<Long> seqExists = handle.createQuery(sqlStatements.selectCurrentSequenceValue())
.bind(0, sequenceName)
.bind(1, tenantContext.tenantId())
.mapTo(Long.class)
.findOne();

if (seqExists.isPresent()) {
//
Long newValue = seqExists.get() + 1;
handle.createUpdate(sqlStatements.resetSequenceValue())
.bind(0, tenantContext.tenantId())
.bind(1, sequenceName)
.bind(2, newValue)
.execute();
return newValue;
} else {
handle.createUpdate(sqlStatements.insertSequenceValue())
.bind(0, tenantContext.tenantId())
.bind(1, sequenceName)
.bind(2, 1)
.execute();
return 1;
}
}
return sequenceCounters.get(sequenceName).incrementAndGet();
}
}

Expand Down Expand Up @@ -3953,4 +3948,7 @@ protected String resolveVersion(String groupId, String artifactId, String versio
return version;
}

}
private boolean isH2() {
return sqlStatements.dbType().equals("h2");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ public class H2SqlStatements extends CommonSqlStatements {

/**
* Constructor.
* @param config
*/
public H2SqlStatements() {
}
Expand Down Expand Up @@ -55,7 +54,7 @@ public boolean isForeignKeyViolation(Exception error) {
}

/**
* @see io.apicurio.registry.storage.impl.sql.SqlStatements.core.storage.jdbc.ISqlStatements#isDatabaseInitialized()
* @see io.apicurio.registry.storage.impl.sql.SqlStatements#isDatabaseInitialized()
*/
@Override
public String isDatabaseInitialized() {
Expand All @@ -67,7 +66,19 @@ public String isDatabaseInitialized() {
*/
@Override
public String upsertContent() {
return "INSERT INTO content (tenantId, contentId, canonicalHash, contentHash, content, artifactreferences) VALUES (?, ?, ?, ?, ?, ?)";
return "MERGE INTO content AS target" +
" USING (VALUES(?, ?, ?, ?, ?, ?)) AS source (tenantId, contentId, canonicalHash, contentHash, content, artifactreferences)" +
" ON (target.tenantId = source.tenantId AND target.contentHash = source.contentHash)" +
" WHEN NOT MATCHED THEN" +
" INSERT (tenantId, contentId, canonicalHash, contentHash, content, artifactreferences)" +
" VALUES (source.tenantId, source.contentId, source.canonicalHash, source.contentHash, source.content, source.artifactreferences)";
}

@Override
public String upsertGroup() {
return "MERGE INTO groups (tenantId, groupId, description, artifactsType, createdBy, createdOn, modifiedBy, modifiedOn, properties)" +
" KEY (tenantId, groupId)" +
" VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)";
}

/**
Expand All @@ -83,38 +94,43 @@ public String upsertLogConfiguration() {
*/
@Override
public String getNextSequenceValue() {
return "UPDATE sequences sa SET seq_value = (SELECT sb.seq_value + 1 FROM sequences sb WHERE sb.tenantId = sa.tenantId AND sb.name = sa.name) WHERE sa.tenantId = ? AND sa.name = ?";
throw new RuntimeException("Not supported for H2: getNextSequenceValue");
}

/**
* @see io.apicurio.registry.storage.impl.sql.SqlStatements#selectCurrentSequenceValue()
*/
@Override
public String selectCurrentSequenceValue() {
return "SELECT seq_value FROM sequences WHERE name = ? AND tenantId = ? ";
throw new RuntimeException("Not supported for H2: selectCurrentSequenceValue");
}

/**
* @see io.apicurio.registry.storage.impl.sql.SqlStatements#resetSequenceValue()
*/
@Override
public String resetSequenceValue() {
return "MERGE INTO sequences (tenantId, name, seq_value) KEY (tenantId, name) VALUES(?, ?, ?)";
throw new RuntimeException("Not supported for H2: resetSequenceValue");
}

/**
* @see io.apicurio.registry.storage.impl.sql.SqlStatements#insertSequenceValue()
*/
@Override
public String insertSequenceValue() {
return "INSERT INTO sequences (tenantId, name, seq_value) VALUES (?, ?, ?)";
throw new RuntimeException("Not supported for H2: insertSequenceValue");
}

/**
* @see SqlStatements#upsertReference()
*/
@Override
public String upsertReference() {
return "INSERT INTO artifactreferences (tenantId, contentId, groupId, artifactId, version, name) VALUES (?, ?, ?, ?, ?, ?)";
return "MERGE INTO artifactreferences AS target" +
" USING (VALUES (?, ?, ?, ?, ?, ?)) AS source (tenantId, contentId, groupId, artifactId, version, name)" +
" ON (target.tenantId = source.tenantId AND target.contentId = source.contentId AND target.name = source.name)" +
" WHEN NOT MATCHED THEN" +
" INSERT (tenantId, contentId, groupId, artifactId, version, name)" +
" VALUES (source.tenantId, source.contentId, source.groupId, source.artifactId, source.version, source.name)";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,12 @@ public String insertGroup() {
);
}

@Override
public String upsertGroup() {
return "INSERT IGNORE INTO artifactgroups (tenantId, groupId, description, artifactsType, createdBy, createdOn, modifiedBy, modifiedOn, properties)" +
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)";
}

/**
* @see io.apicurio.registry.storage.impl.sql.SqlStatements#updateGroup()
* In MySQL, 'groups' is a reserved keyword. We've changed it to artifactgroups,
Expand Down
Loading

0 comments on commit 802dbfa

Please sign in to comment.