Skip to content

Commit

Permalink
Merge branch 'datahub-project:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored Dec 16, 2024
2 parents 92827c4 + 35a054f commit 8da922b
Show file tree
Hide file tree
Showing 81 changed files with 3,292 additions and 1,125 deletions.
17 changes: 12 additions & 5 deletions .github/workflows/check-datahub-jars.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ on:
branches:
- master
paths:
- "metadata-integration"
- "metadata-integration/**"
pull_request:
branches:
- "**"
paths:
- "metadata-integration"
- "metadata-integration/**"
release:
types: [published]

Expand All @@ -28,15 +28,22 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: acryldata/sane-checkout-action@v3
- uses: actions/setup-python@v5
with:
python-version: "3.10"
- uses: actions/cache@v4
with:
path: |
~/.cache/uv
key: ${{ runner.os }}-uv-${{ hashFiles('**/requirements.txt') }}
- name: Install dependencies
run: ./metadata-ingestion/scripts/install_deps.sh
- name: Set up JDK 17
uses: actions/setup-java@v4
with:
distribution: "zulu"
java-version: 17
- uses: gradle/actions/setup-gradle@v3
- uses: actions/setup-python@v5
with:
python-version: "3.10"
- name: check ${{ matrix.command }} jar
run: |
./gradlew :metadata-integration:java:${{ matrix.command }}:build --info
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/dagster-plugin.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ jobs:
DATAHUB_TELEMETRY_ENABLED: false
strategy:
matrix:
python-version: ["3.8", "3.10"]
python-version: ["3.9", "3.10"]
include:
- python-version: "3.8"
- python-version: "3.9"
extraPythonRequirement: "dagster>=1.3.3"
- python-version: "3.10"
extraPythonRequirement: "dagster>=1.3.3"
Expand Down
9 changes: 8 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ buildscript {
// Releases: https://github.com/linkedin/rest.li/blob/master/CHANGELOG.md
ext.pegasusVersion = '29.57.0'
ext.mavenVersion = '3.6.3'
ext.versionGradle = '8.11.1'
ext.springVersion = '6.1.13'
ext.springBootVersion = '3.2.9'
ext.springKafkaVersion = '3.1.6'
Expand Down Expand Up @@ -78,7 +79,7 @@ buildscript {

plugins {
id 'com.gorylenko.gradle-git-properties' version '2.4.1'
id 'com.github.johnrengelman.shadow' version '8.1.1' apply false
id 'com.gradleup.shadow' version '8.3.5' apply false
id 'com.palantir.docker' version '0.35.0' apply false
id 'com.avast.gradle.docker-compose' version '0.17.6'
id "com.diffplug.spotless" version "6.23.3"
Expand Down Expand Up @@ -372,6 +373,7 @@ configure(subprojects.findAll {! it.name.startsWith('spark-lineage')}) {
exclude group: "org.slf4j", module: "slf4j-log4j12"
exclude group: "org.slf4j", module: "slf4j-nop"
exclude group: "org.slf4j", module: "slf4j-ext"
exclude group: "org.codehaus.jackson", module: "jackson-mapper-asl"

resolutionStrategy.force externalDependency.antlr4Runtime
resolutionStrategy.force externalDependency.antlr4
Expand Down Expand Up @@ -499,3 +501,8 @@ subprojects {
}
}
}

wrapper {
gradleVersion = project.versionGradle
distributionType = Wrapper.DistributionType.ALL
}
Original file line number Diff line number Diff line change
Expand Up @@ -1318,7 +1318,8 @@ private void configureMutationResolvers(final RuntimeWiring.Builder builder) {
.dataFetcher("updateQuery", new UpdateQueryResolver(this.queryService))
.dataFetcher("deleteQuery", new DeleteQueryResolver(this.queryService))
.dataFetcher(
"createDataProduct", new CreateDataProductResolver(this.dataProductService))
"createDataProduct",
new CreateDataProductResolver(this.dataProductService, this.entityService))
.dataFetcher(
"updateDataProduct", new UpdateDataProductResolver(this.dataProductService))
.dataFetcher(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,11 @@
import com.linkedin.datahub.graphql.exception.AuthorizationException;
import com.linkedin.datahub.graphql.generated.CreateDataProductInput;
import com.linkedin.datahub.graphql.generated.DataProduct;
import com.linkedin.datahub.graphql.generated.OwnerEntityType;
import com.linkedin.datahub.graphql.resolvers.mutate.util.OwnerUtils;
import com.linkedin.datahub.graphql.types.dataproduct.mappers.DataProductMapper;
import com.linkedin.entity.EntityResponse;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.service.DataProductService;
import graphql.schema.DataFetcher;
import graphql.schema.DataFetchingEnvironment;
Expand All @@ -24,6 +27,7 @@
public class CreateDataProductResolver implements DataFetcher<CompletableFuture<DataProduct>> {

private final DataProductService _dataProductService;
private final EntityService _entityService;

@Override
public CompletableFuture<DataProduct> get(final DataFetchingEnvironment environment)
Expand Down Expand Up @@ -56,6 +60,8 @@ public CompletableFuture<DataProduct> get(final DataFetchingEnvironment environm
context.getOperationContext(),
dataProductUrn,
UrnUtils.getUrn(input.getDomainUrn()));
OwnerUtils.addCreatorAsOwner(
context, dataProductUrn.toString(), OwnerEntityType.CORP_USER, _entityService);
EntityResponse response =
_dataProductService.getDataProductEntityResponse(
context.getOperationContext(), dataProductUrn);
Expand Down
2 changes: 1 addition & 1 deletion datahub-upgrade/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ dependencies {
// mock internal schema registry
implementation externalDependency.kafkaAvroSerde
implementation externalDependency.kafkaAvroSerializer
implementation "org.apache.kafka:kafka_2.12:3.7.1"
implementation "org.apache.kafka:kafka_2.13:3.7.2"

implementation externalDependency.slf4jApi
compileOnly externalDependency.lombok
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import static com.linkedin.metadata.Constants.*;

import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.base.Throwables;
import com.linkedin.common.urn.Urn;
import com.linkedin.datahub.upgrade.UpgradeContext;
Expand All @@ -23,8 +25,6 @@
import java.util.Set;
import java.util.function.Function;
import lombok.extern.slf4j.Slf4j;
import org.codehaus.jackson.node.JsonNodeFactory;
import org.codehaus.jackson.node.ObjectNode;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.client.RequestOptions;
Expand Down
2 changes: 1 addition & 1 deletion docker/kafka-setup/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ ARG ALPINE_REPO_URL
ARG APACHE_DOWNLOAD_URL
ARG GITHUB_REPO_URL

ENV KAFKA_VERSION=3.7.1
ENV KAFKA_VERSION=3.7.2
ENV SCALA_VERSION=2.13

LABEL name="kafka" version=${KAFKA_VERSION}
Expand Down
2 changes: 0 additions & 2 deletions docker/profiles/docker-compose.gms.yml
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ x-datahub-gms-service: &datahub-gms-service
- ${DATAHUB_LOCAL_GMS_ENV:-empty2.env}
environment: &datahub-gms-env
<<: [*primary-datastore-mysql-env, *graph-datastore-search-env, *search-datastore-env, *datahub-quickstart-telemetry-env, *kafka-env]
ELASTICSEARCH_QUERY_CUSTOM_CONFIG_FILE: ${ELASTICSEARCH_QUERY_CUSTOM_CONFIG_FILE:-search_config.yaml}
ALTERNATE_MCP_VALIDATION: ${ALTERNATE_MCP_VALIDATION:-true}
STRICT_URN_VALIDATION_ENABLED: ${STRICT_URN_VALIDATION_ENABLED:-true}
healthcheck:
Expand All @@ -126,7 +125,6 @@ x-datahub-gms-service-dev: &datahub-gms-service-dev
- ${DATAHUB_LOCAL_GMS_ENV:-empty2.env}
environment: &datahub-gms-dev-env
<<: [*datahub-dev-telemetry-env, *datahub-gms-env]
ELASTICSEARCH_QUERY_CUSTOM_CONFIG_FILE: ${ELASTICSEARCH_QUERY_CUSTOM_CONFIG_FILE:-search_config.yaml}
SKIP_ELASTICSEARCH_CHECK: false
JAVA_TOOL_OPTIONS: '-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:5001'
BOOTSTRAP_SYSTEM_UPDATE_WAIT_FOR_SYSTEM_UPDATE: false
Expand Down
4 changes: 2 additions & 2 deletions docs-website/vercel-setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ set -euxo pipefail
./metadata-ingestion/scripts/install_deps.sh

# Set up java version for gradle
yum install java-17-amazon-corretto -y
java --version
yum install java-17-amazon-corretto-devel -y
javac --version

# Build python from source.
# Amazon Linux 2 has Python 3.8, but it's version of OpenSSL is super old and hence it
Expand Down
3 changes: 3 additions & 0 deletions docs/advanced/mcp-mcl.md
Original file line number Diff line number Diff line change
Expand Up @@ -218,3 +218,6 @@ Another form of conditional writes which considers the existence of an aspect or

`CREATE_ENTITY` - Create the aspect if no aspects exist for the entity.

By default, a validation exception is thrown if the `CREATE`/`CREATE_ENTITY` constraint is violated. If the write operation
should be dropped without considering it an exception, then add the following header: `If-None-Match: *` to the MCP.

8 changes: 4 additions & 4 deletions docs/plugins.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,14 @@ The sample authenticator implementation can be found at [Authenticator Sample](.
3. Use `getResourceAsStream` to read files: If your plugin read any configuration file like properties or YAML or JSON or xml then use `this.getClass().getClassLoader().getResourceAsStream("<file-name>")` to read that file from DataHub GMS plugin's class-path. For DataHub GMS resource look-up behavior please refer [Plugin Installation](#plugin-installation) section. Sample code of `getResourceAsStream` is available in sample Authenticator plugin [TestAuthenticator.java](../metadata-service/plugin/src/test/sample-test-plugins/src/main/java/com/datahub/plugins/test/TestAuthenticator.java).


4. Bundle your Jar: Use `com.github.johnrengelman.shadow` gradle plugin to create an uber jar.
4. Bundle your Jar: Use `com.gradleup.shadow` gradle plugin to create an uber jar.

To see an example of building an uber jar, check out the `build.gradle` file for the apache-ranger-plugin file of [Apache Ranger Plugin](https://github.com/acryldata/datahub-ranger-auth-plugin/tree/main/apache-ranger-plugin) for reference.

Exclude signature files as shown in below `shadowJar` task.

```groovy
apply plugin: 'com.github.johnrengelman.shadow';
apply plugin: 'com.gradleup.shadow';
shadowJar {
// Exclude com.datahub.plugins package and files related to jar signature
exclude "META-INF/*.RSA", "META-INF/*.SF","META-INF/*.DSA"
Expand Down Expand Up @@ -152,14 +152,14 @@ The sample authorizer implementation can be found at [Authorizer Sample](https:/

3. Use `getResourceAsStream` to read files: If your plugin read any configuration file like properties or YAML or JSON or xml then use `this.getClass().getClassLoader().getResourceAsStream("<file-name>")` to read that file from DataHub GMS plugin's class-path. For DataHub GMS resource look-up behavior please refer [Plugin Installation](#plugin-installation) section. Sample code of `getResourceAsStream` is available in sample Authenticator plugin [TestAuthenticator.java](../metadata-service/plugin/src/test/sample-test-plugins/src/main/java/com/datahub/plugins/test/TestAuthenticator.java).

4. Bundle your Jar: Use `com.github.johnrengelman.shadow` gradle plugin to create an uber jar.
4. Bundle your Jar: Use `com.gradleup.shadow` gradle plugin to create an uber jar.

To see an example of building an uber jar, check out the `build.gradle` file for the apache-ranger-plugin file of [Apache Ranger Plugin](https://github.com/acryldata/datahub-ranger-auth-plugin/tree/main/apache-ranger-plugin) for reference.

Exclude signature files as shown in below `shadowJar` task.

```groovy
apply plugin: 'com.github.johnrengelman.shadow';
apply plugin: 'com.gradleup.shadow';
shadowJar {
// Exclude com.datahub.plugins package and files related to jar signature
exclude "META-INF/*.RSA", "META-INF/*.SF","META-INF/*.DSA"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,45 +18,39 @@ public static AspectValidationException forItem(BatchItem item, String msg) {
}

public static AspectValidationException forItem(BatchItem item, String msg, Exception e) {
return new AspectValidationException(
item.getChangeType(), item.getUrn(), item.getAspectName(), msg, SubType.VALIDATION, e);
return new AspectValidationException(item, msg, SubType.VALIDATION, e);
}

public static AspectValidationException forPrecondition(BatchItem item, String msg) {
return forPrecondition(item, msg, null);
}

public static AspectValidationException forFilter(BatchItem item, String msg) {
return new AspectValidationException(item, msg, SubType.FILTER);
}

public static AspectValidationException forPrecondition(BatchItem item, String msg, Exception e) {
return new AspectValidationException(
item.getChangeType(), item.getUrn(), item.getAspectName(), msg, SubType.PRECONDITION, e);
return new AspectValidationException(item, msg, SubType.PRECONDITION, e);
}

@Nonnull BatchItem item;
@Nonnull ChangeType changeType;
@Nonnull Urn entityUrn;
@Nonnull String aspectName;
@Nonnull SubType subType;
@Nullable String msg;

public AspectValidationException(
@Nonnull ChangeType changeType,
@Nonnull Urn entityUrn,
@Nonnull String aspectName,
String msg,
SubType subType) {
this(changeType, entityUrn, aspectName, msg, subType, null);
public AspectValidationException(@Nonnull BatchItem item, String msg, SubType subType) {
this(item, msg, subType, null);
}

public AspectValidationException(
@Nonnull ChangeType changeType,
@Nonnull Urn entityUrn,
@Nonnull String aspectName,
@Nonnull String msg,
@Nullable SubType subType,
Exception e) {
@Nonnull BatchItem item, @Nonnull String msg, @Nullable SubType subType, Exception e) {
super(msg, e);
this.changeType = changeType;
this.entityUrn = entityUrn;
this.aspectName = aspectName;
this.item = item;
this.changeType = item.getChangeType();
this.entityUrn = item.getUrn();
this.aspectName = item.getAspectName();
this.msg = msg;
this.subType = subType != null ? subType : SubType.VALIDATION;
}
Expand All @@ -65,8 +59,12 @@ public Pair<Urn, String> getAspectGroup() {
return Pair.of(entityUrn, aspectName);
}

public static enum SubType {
public enum SubType {
// A validation exception is thrown
VALIDATION,
PRECONDITION
// A failed precondition is thrown if the header constraints are not met
PRECONDITION,
// Exclude from processing further
FILTER
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,38 @@
public class ValidationExceptionCollection
extends HashMap<Pair<Urn, String>, Set<AspectValidationException>> {

private final Set<Integer> failedHashCodes;
private final Set<Integer> filteredHashCodes;

public ValidationExceptionCollection() {
super();
this.failedHashCodes = new HashSet<>();
this.filteredHashCodes = new HashSet<>();
}

public boolean hasFatalExceptions() {
return !failedHashCodes.isEmpty();
}

public static ValidationExceptionCollection newCollection() {
return new ValidationExceptionCollection();
}

public void addException(AspectValidationException exception) {
super.computeIfAbsent(exception.getAspectGroup(), key -> new HashSet<>()).add(exception);
if (!AspectValidationException.SubType.FILTER.equals(exception.getSubType())) {
failedHashCodes.add(exception.getItem().hashCode());
} else {
filteredHashCodes.add(exception.getItem().hashCode());
}
}

public void addException(BatchItem item, String message) {
addException(item, message, null);
}

public void addException(BatchItem item, String message, Exception ex) {
super.computeIfAbsent(Pair.of(item.getUrn(), item.getAspectName()), key -> new HashSet<>())
.add(AspectValidationException.forItem(item, message, ex));
addException(AspectValidationException.forItem(item, message, ex));
}

public Stream<AspectValidationException> streamAllExceptions() {
Expand All @@ -41,15 +58,16 @@ public <T extends BatchItem> Collection<T> successful(Collection<T> items) {
}

public <T extends BatchItem> Stream<T> streamSuccessful(Stream<T> items) {
return items.filter(i -> !this.containsKey(Pair.of(i.getUrn(), i.getAspectName())));
return items.filter(
i -> !failedHashCodes.contains(i.hashCode()) && !filteredHashCodes.contains(i.hashCode()));
}

public <T extends BatchItem> Collection<T> exceptions(Collection<T> items) {
return streamExceptions(items.stream()).collect(Collectors.toList());
}

public <T extends BatchItem> Stream<T> streamExceptions(Stream<T> items) {
return items.filter(i -> this.containsKey(Pair.of(i.getUrn(), i.getAspectName())));
return items.filter(i -> failedHashCodes.contains(i.hashCode()));
}

@Override
Expand Down
Loading

0 comments on commit 8da922b

Please sign in to comment.