Skip to content

Commit

Permalink
Changes to artifact creation to better support concurrent creates
Browse files Browse the repository at this point in the history
  • Loading branch information
EricWittmann committed Sep 13, 2024
1 parent 9bafeda commit 9552546
Show file tree
Hide file tree
Showing 8 changed files with 91 additions and 206 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.apicurio.registry.storage.impl.sql.jdb.Handle;
import io.apicurio.registry.storage.impl.sql.jdb.Query;
import io.apicurio.registry.storage.impl.sql.jdb.RowMapper;
import io.apicurio.registry.storage.impl.sql.jdb.RuntimeSqlException;
import io.apicurio.registry.storage.impl.sql.jdb.Update;
import io.apicurio.registry.storage.impl.sql.mappers.*;
import io.apicurio.registry.types.ArtifactState;
Expand Down Expand Up @@ -460,7 +461,6 @@ public void updateArtifactState(String groupId, String artifactId, ArtifactState
updateArtifactVersionStateRaw(metadata.getGlobalId(), metadata.getState(), state);
}


@Override
@Transactional
public void updateArtifactState(String groupId, String artifactId, String version, ArtifactState state)
Expand All @@ -470,7 +470,6 @@ public void updateArtifactState(String groupId, String artifactId, String versio
updateArtifactVersionStateRaw(metadata.getGlobalId(), metadata.getState(), state);
}


/**
* IMPORTANT: Private methods can't be @Transactional. Callers MUST have started a transaction.
*/
Expand Down Expand Up @@ -627,59 +626,62 @@ public Long generate(Handle handle) {
}

/**
* Store the content in the database and return the ID of the new row. If the content already exists,
* just return the content ID of the existing row.
*
* @param handle
* @param artifactType
* @param content
* Make sure the content exists in the database (try to insert it). Regardless of whether it
* already existed or not, return the contentId of the content in the DB.
*/
protected Long createOrUpdateContent(Handle handle, String artifactType, ContentHandle content, List<ArtifactReferenceDto> references) throws IOException {
protected Long ensureContentAndGetId(String artifactType, ContentHandle content, List<ArtifactReferenceDto> references) {
var car = ContentAndReferencesDto.builder().content(content).references(references).build();
var contentHash = RegistryContentUtils.contentHash(car);
var canonicalContentHash = RegistryContentUtils.canonicalContentHash(artifactType, car, this::getContentByReference);
return createOrUpdateContent(handle, content, contentHash, canonicalContentHash, references, RegistryContentUtils.serializeReferences(references));
var serializedReferences = RegistryContentUtils.serializeReferences(references);

ensureContent(content, contentHash, canonicalContentHash, references, serializedReferences);
var contentId = getContentIdByHash(contentHash);
return contentId;
}

/**
* Store the content in the database and return the ID of the new row. If the content already exists,
* just return the content ID of the existing row.
*
* @param handle
* @param content
* @param contentHash
* @param canonicalContentHash
*/
protected Long createOrUpdateContent(Handle handle, ContentHandle content, String contentHash, String canonicalContentHash, List<ArtifactReferenceDto> references, String referencesSerialized) {
byte[] contentBytes = content.bytes();

// Upsert a row in the "content" table. This will insert a row for the content
// if a row doesn't already exist. We use the content hash to determine whether
// a row for this content already exists. If we find a row we return its globalId.
// If we don't find a row, we insert one and then return its globalId.
Long contentId;
boolean insertReferences = true;
String sql = sqlStatements.upsertContent();
handle.createUpdate(sql)
.bind(0, tenantContext.tenantId())
.bind(1, nextContentId(handle))
.bind(2, canonicalContentHash)
.bind(3, contentHash)
.bind(4, contentBytes)
.bind(5, referencesSerialized)
.execute();
sql = sqlStatements.selectContentIdByHash();
contentId = handle.createQuery(sql)
.bind(0, contentHash)
.bind(1, tenantContext.tenantId())
.mapTo(Long.class)
.one();
@Transactional(Transactional.TxType.REQUIRES_NEW)
protected void ensureContent(ContentHandle content, String contentHash, String canonicalContentHash,
List<ArtifactReferenceDto> references, String referencesSerialized) {
this.handles.withHandle(handle -> {
byte[] contentBytes = content.bytes();

// Insert the content into the content table.
String sql = sqlStatements.insertContent();
long contentId = nextContentId(handle);

try {
handle.createUpdate(sql)
.bind(0, tenantContext.tenantId())
.bind(1, contentId)
.bind(2, canonicalContentHash)
.bind(3, contentHash)
.bind(4, contentBytes)
.bind(5, referencesSerialized)
.execute();
} catch (RuntimeSqlException e) {
// Assume this is a unique key violation: content already exists. If so,
// the content already exists and we can just return.
return null;
}

if (insertReferences) {
//Finally, insert references into the "artifactreferences" table if the content wasn't present yet.
// If we get here, then the content was inserted and we need to insert the references.
createOrUpdateReferences(handle, contentId, references);
}
return contentId;
return null;
});
}

@Transactional(Transactional.TxType.REQUIRES_NEW)
protected Long getContentIdByHash(String contentHash) {
return this.handles.withHandle(handle -> {
String sql = sqlStatements.selectContentIdByHash();
long contentId = handle.createQuery(sql)
.bind(0, contentHash)
.bind(1, tenantContext.tenantId())
.mapTo(Long.class)
.one();
return contentId;
});
}

protected void createOrUpdateReferences(Handle handle, Long contentId, List<ArtifactReferenceDto> references) {
Expand Down Expand Up @@ -710,7 +712,7 @@ protected void createOrUpdateReferences(Handle handle, Long contentId, List<Arti
* @see RegistryStorage#createArtifactWithMetadata (java.lang.String, java.lang.String, java.lang.String, io.apicurio.registry.types.ArtifactType, io.apicurio.registry.content.ContentHandle, io.apicurio.registry.storage.dto.EditableArtifactMetaDataDto, java.util.List)
*/
@Override
@Transactional
@Transactional()
public ArtifactMetaDataDto createArtifactWithMetadata(String groupId, String artifactId, String version,
String artifactType, ContentHandle content, EditableArtifactMetaDataDto metaData, List<ArtifactReferenceDto> references)
throws ArtifactNotFoundException, ArtifactAlreadyExistsException, RegistryStorageException {
Expand All @@ -724,9 +726,9 @@ protected ArtifactMetaDataDto createArtifactWithMetadata(String groupId, String
String createdBy = securityIdentity.getPrincipal().getName();
Date createdOn = new Date();

//Only create group metadata for non-default groups.
// Only create group metadata for non-default groups.
if (groupId != null && !isGroupExists(groupId)) {
upsertGroup(GroupMetaDataDto.builder()
ensureGroup(GroupMetaDataDto.builder()
.groupId(groupId)
.createdOn(createdOn.getTime())
.modifiedOn(createdOn.getTime())
Expand All @@ -736,9 +738,7 @@ protected ArtifactMetaDataDto createArtifactWithMetadata(String groupId, String
}

// Put the content in the DB and get the unique content ID back.
long contentId = handles.withHandleNoException(handle -> {
return createOrUpdateContent(handle, artifactType, content, references);
});
long contentId = ensureContentAndGetId(artifactType, content, references);

// If the metaData provided is null, try to figure it out from the content.
EditableArtifactMetaDataDto md = metaData;
Expand Down Expand Up @@ -1020,9 +1020,7 @@ protected ArtifactMetaDataDto updateArtifactWithMetadata(String groupId, String
Date createdOn = new Date();

// Put the content in the DB and get the unique content ID back.
long contentId = handles.withHandleNoException(handle -> {
return createOrUpdateContent(handle, artifactType, content, references);
});
long contentId = ensureContentAndGetId(artifactType, content, references);

// Extract meta-data from the content if no metadata is provided
if (metaData == null) {
Expand Down Expand Up @@ -2666,21 +2664,29 @@ public void createGroup(GroupMetaDataDto group) throws GroupAlreadyExistsExcepti
}
}

private void upsertGroup(GroupMetaDataDto group) throws RegistryStorageException {
@Transactional(Transactional.TxType.REQUIRES_NEW)
protected void ensureGroup(GroupMetaDataDto group) throws RegistryStorageException {
this.handles.withHandle(handle -> {
String sql = sqlStatements.upsertGroup();
handle.createUpdate(sql)
.bind(0, tenantContext.tenantId())
.bind(1, group.getGroupId())
.bind(2, group.getDescription())
.bind(3, group.getArtifactsType())
.bind(4, group.getCreatedBy())
// TODO io.apicurio.registry.storage.dto.GroupMetaDataDto should not use raw numeric timestamps
.bind(5, group.getCreatedOn() == 0 ? new Date() : new Date(group.getCreatedOn()))
.bind(6, group.getModifiedBy())
.bind(7, group.getModifiedOn() == 0 ? null : new Date(group.getModifiedOn()))
.bind(8, RegistryContentUtils.serializeProperties(group.getProperties()))
.execute();
try {
String sql = sqlStatements.insertGroup();
handle.createUpdate(sql)
.bind(0, tenantContext.tenantId())
.bind(1, group.getGroupId())
.bind(2, group.getDescription())
.bind(3, group.getArtifactsType())
.bind(4, group.getCreatedBy())
// TODO io.apicurio.registry.storage.dto.GroupMetaDataDto should not use raw numeric timestamps
.bind(5, group.getCreatedOn() == 0 ? new Date() : new Date(group.getCreatedOn()))
.bind(6, group.getModifiedBy())
.bind(7, group.getModifiedOn() == 0 ? null : new Date(group.getModifiedOn()))
.bind(8, RegistryContentUtils.serializeProperties(group.getProperties()))
.execute();
} catch (Exception e) {
// Primary key violation is OK - that just means it already exists.
if (!sqlStatements.isPrimaryKeyViolation(e)) {
throw new RegistryStorageException(e);
}
}
return null;
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ public void importContent(ContentEntity entity) {

// When we do not want to preserve contentId, the best solution to import content is create new one with the contentBytes
// It makes sure there won't be any conflicts
long newContentId = getRegistryStorage().createOrUpdateContent(getHandle(), ContentHandle.create(entity.contentBytes), entity.contentHash, entity.canonicalHash, references, entity.serializedReferences);
getRegistryStorage().ensureContent(ContentHandle.create(entity.contentBytes), entity.contentHash, entity.canonicalHash, references, entity.serializedReferences);
long newContentId = getRegistryStorage().getContentIdByHash(entity.contentHash);

getContentIdMapping().put(entity.contentId, newContentId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,26 +61,6 @@ public String isDatabaseInitialized() {
return "SELECT COUNT(*) AS count FROM information_schema.tables WHERE table_name = 'APICURIO'";
}

/**
* @see io.apicurio.registry.storage.impl.sql.SqlStatements#upsertContent()
*/
@Override
public String upsertContent() {
return "MERGE INTO content AS target" +
" USING (VALUES(?, ?, ?, ?, ?, ?)) AS source (tenantId, contentId, canonicalHash, contentHash, content, artifactreferences)" +
" ON (target.tenantId = source.tenantId AND target.contentHash = source.contentHash)" +
" WHEN NOT MATCHED THEN" +
" INSERT (tenantId, contentId, canonicalHash, contentHash, content, artifactreferences)" +
" VALUES (source.tenantId, source.contentId, source.canonicalHash, source.contentHash, source.content, source.artifactreferences)";
}

@Override
public String upsertGroup() {
return "MERGE INTO groups (tenantId, groupId, description, artifactsType, createdBy, createdOn, modifiedBy, modifiedOn, properties)" +
" KEY (tenantId, groupId)" +
" VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)";
}

/**
* @see io.apicurio.registry.storage.impl.sql.SqlStatements#upsertLogConfiguration()
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,18 +64,6 @@ public String isDatabaseInitialized() {
);
}

/**
* @see SqlStatements#upsertContent()
*/
@Override
public String upsertContent() {
return String.join(" ",
"INSERT IGNORE INTO content",
"(tenantId, contentId, canonicalHash, contentHash, content, artifactreferences)",
"VALUES (?, ?, ?, ?, ?, ?);"
);
}

/**
* @see SqlStatements#upsertReference()
*/
Expand Down Expand Up @@ -191,12 +179,6 @@ public String insertGroup() {
);
}

@Override
public String upsertGroup() {
return "INSERT IGNORE INTO artifactgroups (tenantId, groupId, description, artifactsType, createdBy, createdOn, modifiedBy, modifiedOn, properties)" +
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)";
}

/**
* @see io.apicurio.registry.storage.impl.sql.SqlStatements#updateGroup()
* In MySQL, 'groups' is a reserved keyword. We've changed it to artifactgroups,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,23 +61,6 @@ public String isDatabaseInitialized() {
return "SELECT count(*) AS count FROM information_schema.tables WHERE table_name = 'artifacts'";
}

/**
* @see io.apicurio.registry.storage.impl.sql.SqlStatements#upsertContent()
*/
@Override
public String upsertContent() {
return "INSERT INTO content (tenantId, contentId, canonicalHash, contentHash, content, artifactreferences)" +
" VALUES (?, ?, ?, ?, ?, ?)" +
" ON CONFLICT (tenantId, contentHash) DO NOTHING";
}

@Override
public String upsertGroup() {
return "INSERT INTO groups (tenantId, groupId, description, artifactsType, createdBy, createdOn, modifiedBy, modifiedOn, properties)" +
" VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)" +
" ON CONFLICT (tenantId, groupId) DO NOTHING";
}

/**
* @see io.apicurio.registry.storage.impl.sql.SqlStatements#upsertLogConfiguration()
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ public class SQLServerSqlStatements extends CommonSqlStatements {

/**
* Constructor.
* @param config
*/
public SQLServerSqlStatements() {
}
Expand Down Expand Up @@ -55,37 +54,13 @@ public boolean isForeignKeyViolation(Exception error) {
}

/**
* @see io.apicurio.registry.storage.impl.sql.SqlStatements.core.storage.jdbc.ISqlStatements#isDatabaseInitialized()
* @see io.apicurio.registry.storage.impl.sql.SqlStatements#isDatabaseInitialized()
*/
@Override
public String isDatabaseInitialized() {
return "SELECT count(*) AS count FROM information_schema.tables WHERE table_name = 'artifacts'";
}

/**
* @see io.apicurio.registry.storage.impl.sql.SqlStatements#upsertContent()
*/
@Override
public String upsertContent() {
return String.join(" ",
"MERGE INTO content AS target",
"USING (VALUES (?, ?, ?, ?, ?, ?)) AS source (tenantId, contentId, canonicalHash, contentHash, content, artifactreferences)",
"ON (target.tenantId = source.tenantId AND target.contentHash = source.contentHash)",
"WHEN NOT MATCHED THEN",
"INSERT (tenantId, contentId, canonicalHash, contentHash, content, artifactreferences)",
"VALUES (source.tenantId, source.contentId, source.canonicalHash, source.contentHash, source.content, source.artifactreferences);");
}

@Override
public String upsertGroup() {
return "MERGE INTO groups AS target" +
" USING (VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?)) AS source (tenantId, groupId, description, artifactsType, createdBy, createdOn, modifiedBy, modifiedOn, properties)" +
" ON (target.tenantId = source.tenantId AND target.groupId = source.groupId)" +
" WHEN NOT MATCHED THEN" +
" INSERT (tenantId, groupId, description, artifactsType, createdBy, createdOn, modifiedBy, modifiedOn, properties)" +
" VALUES (source.tenantId, source.groupId, source.description, source.artifactsType, source.createdBy, source.createdOn, source.modifiedBy, source.modifiedOn, source.properties)";
}

/**
* @see io.apicurio.registry.storage.impl.sql.SqlStatements#upsertLogConfiguration()
*/
Expand Down
Loading

0 comments on commit 9552546

Please sign in to comment.