Skip to content

Commit

Permalink
Merge remote-tracking branch 'remotes/origin/master' into feature/upd…
Browse files Browse the repository at this point in the history
…ate_embulk_v10
  • Loading branch information
d-hrs committed Sep 2, 2024
2 parents ff4219d + b1cce88 commit d11313d
Show file tree
Hide file tree
Showing 6 changed files with 199 additions and 59 deletions.
12 changes: 11 additions & 1 deletion shadow-google-ads-helper/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ configurations {
}

dependencies {
compile("com.google.api-ads:google-ads:25.0.0") {
compile("com.google.api-ads:google-ads:30.0.0") {
exclude group: "commons-logging", module: "commons-logging"
}

Expand All @@ -47,3 +47,13 @@ shadowJar {
relocate "com.google.thirdparty.publicsuffix", "embulk.google-ads.com.google.thirdparty.publicsuffix"
relocate "com.fasterxml.jackson", "embulk.google-ads.com.fasterxml.jackson"
}

// https://github.com/google/guava/issues/6612#issuecomment-1614992368
sourceSets.all {
configurations.getByName(runtimeClasspathConfigurationName) {
attributes.attribute(Attribute.of("org.gradle.jvm.environment", String), "standard-jvm")
}
configurations.getByName(compileClasspathConfigurationName) {
attributes.attribute(Attribute.of("org.gradle.jvm.environment", String), "standard-jvm")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# This is a Gradle generated file for dependency locking.
# Manual edits can break the build and are not advised.
# This file is expected to be part of source control.
com.google.android:annotations:4.1.1.4
com.google.api-ads:google-ads-codegen:30.0.0
com.google.api-ads:google-ads-stubs-lib:30.0.0
com.google.api-ads:google-ads-stubs-v14:30.0.0
com.google.api-ads:google-ads-stubs-v15:30.0.0
com.google.api-ads:google-ads-stubs-v16:30.0.0
com.google.api-ads:google-ads:30.0.0
com.google.api.grpc:proto-google-common-protos:2.34.0
com.google.api:api-common:2.26.0
com.google.api:gax-grpc:2.43.0
com.google.api:gax:2.43.0
com.google.auth:google-auth-library-credentials:1.23.0
com.google.auth:google-auth-library-oauth2-http:1.23.0
com.google.auto.service:auto-service-annotations:1.0.1
com.google.auto.service:auto-service:1.0.1
com.google.auto.value:auto-value-annotations:1.10.4
com.google.auto:auto-common:1.2
com.google.cloud:google-cloud-shared-dependencies:3.25.0
com.google.code.findbugs:jsr305:3.0.2
com.google.code.gson:gson:2.10.1
com.google.errorprone:error_prone_annotations:2.24.1
com.google.guava:failureaccess:1.0.1
com.google.guava:guava:32.1.3-jre
com.google.guava:listenablefuture:9999.0-empty-to-avoid-conflict-with-guava
com.google.http-client:google-http-client-gson:1.44.1
com.google.http-client:google-http-client:1.44.1
com.google.j2objc:j2objc-annotations:2.8
com.google.protobuf:protobuf-java-util:3.25.2
com.google.protobuf:protobuf-java:3.25.2
com.google.re2j:re2j:1.7
com.squareup:javapoet:1.11.1
commons-codec:commons-codec:1.16.1
io.grpc:grpc-alts:1.61.1
io.grpc:grpc-api:1.61.1
io.grpc:grpc-auth:1.61.1
io.grpc:grpc-context:1.61.1
io.grpc:grpc-core:1.61.1
io.grpc:grpc-googleapis:1.61.1
io.grpc:grpc-grpclb:1.61.1
io.grpc:grpc-inprocess:1.61.1
io.grpc:grpc-netty-shaded:1.61.1
io.grpc:grpc-protobuf-lite:1.61.1
io.grpc:grpc-protobuf:1.61.1
io.grpc:grpc-services:1.61.1
io.grpc:grpc-stub:1.61.1
io.grpc:grpc-util:1.61.1
io.grpc:grpc-xds:1.61.1
io.opencensus:opencensus-api:0.31.1
io.opencensus:opencensus-contrib-http-util:0.31.1
io.opencensus:opencensus-proto:0.2.0
io.perfmark:perfmark-api:0.27.0
javax.annotation:javax.annotation-api:1.3.2
org.apache.httpcomponents:httpclient:4.5.14
org.apache.httpcomponents:httpcore:4.4.16
org.checkerframework:checker-qual:3.42.0
org.codehaus.mojo:animal-sniffer-annotations:1.23
org.conscrypt:conscrypt-openjdk-uber:2.5.2
org.slf4j:jcl-over-slf4j:1.7.12
org.slf4j:slf4j-api:1.7.25
org.threeten:threetenbp:1.6.8
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# This is a Gradle generated file for dependency locking.
# Manual edits can break the build and are not advised.
# This file is expected to be part of source control.
29 changes: 15 additions & 14 deletions src/main/java/org/embulk/input/google_ads/GoogleAdsInputPlugin.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package org.embulk.input.google_ads;

import com.google.ads.googleads.v13.services.GoogleAdsRow;
import com.google.ads.googleads.v13.services.GoogleAdsServiceClient;
import com.google.ads.googleads.v16.services.GoogleAdsRow;
import com.google.ads.googleads.v16.services.GoogleAdsServiceClient;
import com.google.common.collect.ImmutableList;

import org.embulk.config.ConfigDiff;
Expand Down Expand Up @@ -68,23 +68,24 @@ public TaskReport run(TaskSource taskSource,
{
final TaskMapper taskMapper = CONFIG_MAPPER_FACTORY.createTaskMapper();
final PluginTask task = taskMapper.map(taskSource, PluginTask.class);
Map<String, String> result;

GoogleAdsReporter reporter = new GoogleAdsReporter(task);
reporter.connect();
try {
try (PageBuilder pageBuilder = getPageBuilder(schema, output)) {
for (GoogleAdsServiceClient.SearchPage page : reporter.getReportPage()) {
for (GoogleAdsRow row : page.getValues()) {
result = new HashMap<String, String>()
{
};
reporter.flattenResource(null, row.getAllFields(), result);
schema.visitColumns(new GoogleAdsColumnVisitor(new GoogleAdsAccessor(task, result), pageBuilder, task));
pageBuilder.addRecord();
}
pageBuilder.flush();
}
Map<String, String> params = new HashMap<>();
reporter.search(
searchPage -> {
for (GoogleAdsRow row : searchPage.getValues()) {
Map<String, String> result = new HashMap<>();
reporter.flattenResource(null, row.getAllFields(), result);
schema.visitColumns(new GoogleAdsColumnVisitor(new GoogleAdsAccessor(task, result), pageBuilder, task));
pageBuilder.addRecord();
}
pageBuilder.flush();
},
params
);
pageBuilder.finish();
}
} catch (Exception e) {
Expand Down
149 changes: 106 additions & 43 deletions src/main/java/org/embulk/input/google_ads/GoogleAdsReporter.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,14 @@
import com.fasterxml.jackson.databind.node.JsonNodeType;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.ads.googleads.lib.GoogleAdsClient;
import com.google.ads.googleads.v13.services.GoogleAdsRow;
import com.google.ads.googleads.v13.services.GoogleAdsServiceClient;
import com.google.ads.googleads.v13.services.SearchGoogleAdsRequest;
import com.google.ads.googleads.v16.resources.CustomerName;
import com.google.ads.googleads.v16.services.CustomerServiceClient;
import com.google.ads.googleads.v16.services.GoogleAdsRow;
import com.google.ads.googleads.v16.services.GoogleAdsServiceClient;
import com.google.ads.googleads.v16.services.ListAccessibleCustomersRequest;
import com.google.ads.googleads.v16.services.SearchGoogleAdsRequest;
import com.google.ads.googleads.v16.services.SearchGoogleAdsStreamRequest;
import com.google.ads.googleads.v16.services.SearchGoogleAdsStreamResponse;
import com.google.auth.oauth2.UserCredentials;
import com.google.common.base.CaseFormat;
import com.google.protobuf.Descriptors;
Expand All @@ -23,20 +28,22 @@
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.HashMap;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class GoogleAdsReporter
{
private static final int PAGE_SIZE = 1000;
private final Logger logger = LoggerFactory.getLogger(GoogleAdsReporter.class);
private final PluginTask task;
private final UserCredentials credentials;
private final ObjectMapper mapper = new ObjectMapper();
private GoogleAdsClient client;
private ObjectMapper mapper = new ObjectMapper();

public GoogleAdsReporter(PluginTask task)
{
Expand All @@ -53,40 +60,34 @@ private UserCredentials buildCredential(PluginTask task)
.build();
}

public Iterable<GoogleAdsServiceClient.SearchPage> getReportPage()
{
List<GoogleAdsServiceClient.SearchPage> pages = new ArrayList<GoogleAdsServiceClient.SearchPage>();

String startDateTime = null;
do {
String query = buildQuery(task, startDateTime);
logger.info(query);
SearchGoogleAdsRequest request = buildRequest(task, query);
GoogleAdsServiceClient googleAdsService = client.getVersion13().createGoogleAdsServiceClient();
GoogleAdsServiceClient.SearchPagedResponse response = googleAdsService.search(request);

if (response.getPage().getResponse().getResultsCount() == 0) {
return pages;
}

response.iteratePages().iterator().forEachRemaining(pages::add);
private Iterable<GoogleAdsServiceClient.SearchPage> search(Map<String, String> params) {
String query = buildQuery(task, params);
logger.info(query);
SearchGoogleAdsRequest request = buildRequest(task, query);
GoogleAdsServiceClient googleAdsService = client.getLatestVersion().createGoogleAdsServiceClient();
GoogleAdsServiceClient.SearchPagedResponse response = googleAdsService.search(request);
return response.iteratePages();
}

if (task.getResourceType().equals("change_event")) {
GoogleAdsServiceClient.SearchPage lastPage = pages.get(pages.size() - 1);
GoogleAdsRow lastRow = null;
for(GoogleAdsRow row : lastPage.getValues()) {
lastRow = row;
}
public void search(Consumer<GoogleAdsServiceClient.SearchPage> consumer, Map<String, String> params) {
GoogleAdsServiceClient.SearchPage lastPage = null;
for(GoogleAdsServiceClient.SearchPage page: search(params)) {
consumer.accept(page);
lastPage = page;
}

if (lastRow == null) {
break;
} else {
startDateTime = lastRow.getChangeEvent().getChangeDateTime();
}
if (task.getResourceType().equals("change_event")) {
if (lastPage == null) return ;
GoogleAdsRow lastRow = null;
for (GoogleAdsRow row: lastPage.getValues()) {
lastRow = row;
}
} while (startDateTime != null && !startDateTime.isEmpty());
if (lastRow == null) return ;

return pages;
Map<String, String> nextParams = new HashMap<>();
nextParams.put("start_datetime", lastRow.getChangeEvent().getChangeDateTime());
search(consumer, nextParams);
}
}

public void flattenResource(String resourceName, Map<Descriptors.FieldDescriptor, Object> fields, Map<String, String> result)
Expand Down Expand Up @@ -225,12 +226,11 @@ public SearchGoogleAdsRequest buildRequest(PluginTask task, String query)
{
return SearchGoogleAdsRequest.newBuilder()
.setCustomerId(task.getCustomerId())
.setPageSize(PAGE_SIZE)
.setQuery(query)
.build();
}

public String buildQuery(PluginTask task, String startDateTime)
public String buildQuery(PluginTask task, Map<String, String> params)
{
StringBuilder sb = new StringBuilder();

Expand All @@ -240,7 +240,7 @@ public String buildQuery(PluginTask task, String startDateTime)
sb.append(" FROM ");
sb.append(task.getResourceType());

List<String> whereClause = buildWhereClauseConditions(task, startDateTime);
List<String> whereClause = buildWhereClauseConditions(task, params);
if (!whereClause.isEmpty()) {
sb.append(" WHERE ");
sb.append(String.join(" AND ", whereClause));
Expand All @@ -255,7 +255,7 @@ public String buildQuery(PluginTask task, String startDateTime)
}

@VisibleForTesting
public List<String> buildWhereClauseConditions(PluginTask task, String startDateTime)
public List<String> buildWhereClauseConditions(PluginTask task, Map<String, String> params)
{
List<String> whereConditions = new ArrayList<String>()
{
Expand All @@ -264,7 +264,7 @@ public List<String> buildWhereClauseConditions(PluginTask task, String startDate
if (task.getDateRange().isPresent()) {
StringBuilder dateSb = new StringBuilder();
if (task.getResourceType().equals("change_event")) {
dateSb.append(buildWhereClauseConditionsForChangeEvent(startDateTime));
dateSb.append(buildWhereClauseConditionsForChangeEvent(params.get("start_datetime")));
} else {
dateSb.append("segments.date BETWEEN '");
dateSb.append(task.getDateRange().get().getStartDate());
Expand All @@ -284,14 +284,77 @@ public List<String> buildWhereClauseConditions(PluginTask task, String startDate
}

public void connect()
{
this.client = buildClient(task.getLoginCustomerId().orElseGet(() -> getLoginCustomerId(task.getCustomerId())));
}

private Long getLoginCustomerId(String customerId)
{
List<Long> loginCustomerIds = getLoginCustomerIds(customerId);
if (loginCustomerIds.isEmpty()) {
throw new RuntimeException("login customer not found [customer id: " + customerId + "]");
}
if (loginCustomerIds.size() > 1) {
logger.info("multiple login customers found [login customer ids: {}]", loginCustomerIds.stream().map(Object::toString).collect(Collectors.joining(", ")));
}
Long loginCustomerId = loginCustomerIds.get(0);
logger.info("use this customer [customer id: {}, login customer id: {}] to login", customerId, loginCustomerId);
return loginCustomerId;
}

private List<Long> getLoginCustomerIds(String customerId)
{
try (CustomerServiceClient client = buildClient(null).getLatestVersion().createCustomerServiceClient()) {
return client.listAccessibleCustomers(ListAccessibleCustomersRequest.newBuilder().build())
.getResourceNamesList()
.stream()
.map(CustomerName::parse)
.map(CustomerName::getCustomerId)
.map(this::getLoginCustomerClients)
.flatMap(Collection::stream)
.filter(loginCustomerClient -> loginCustomerClient.customerClientId.equals(customerId))
.map(loginCustomerClient -> Long.valueOf(loginCustomerClient.loginCustomerId))
.collect(Collectors.toList());
}
}

private List<LoginCustomerClient> getLoginCustomerClients(String customerId)
{
try (GoogleAdsServiceClient client = buildClient(Long.valueOf(customerId)).getLatestVersion().createGoogleAdsServiceClient()) {
return client.searchStreamCallable().call(SearchGoogleAdsStreamRequest.newBuilder()
.setCustomerId(customerId)
.setQuery("SELECT customer_client.id FROM customer_client")
.build())
.stream()
.map(SearchGoogleAdsStreamResponse::getResultsList)
.flatMap(Collection::stream)
.map(GoogleAdsRow::getCustomerClient)
.map(customerClient -> new LoginCustomerClient(customerId, customerClient.getId()))
.collect(Collectors.toList());
}
}

private static class LoginCustomerClient
{
LoginCustomerClient(String loginCustomerId, Long customerClientId)
{
this.loginCustomerId = loginCustomerId;
this.customerClientId = String.valueOf(customerClientId);
}

final String loginCustomerId;
final String customerClientId;
}

private GoogleAdsClient buildClient(Long loginCustomerId)
{
GoogleAdsClient.Builder builder = GoogleAdsClient.newBuilder()
.setDeveloperToken(task.getDeveloperToken())
.setCredentials(credentials);
if (task.getLoginCustomerId().isPresent()) {
builder.setLoginCustomerId(Long.parseLong(task.getLoginCustomerId().get()));
if (loginCustomerId != null) {
builder.setLoginCustomerId(loginCustomerId);
}
this.client = builder.build();
return builder.build();
}

private String buildWhereClauseConditionsForChangeEvent(String startDateTime)
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/embulk/input/google_ads/PluginTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public interface PluginTask extends Task

@Config("login_customer_id")
@ConfigDefault("null")
Optional<String> getLoginCustomerId();
Optional<Long> getLoginCustomerId();

@Config("client_id")
String getClientId();
Expand Down

0 comments on commit d11313d

Please sign in to comment.