Skip to content

Commit

Permalink
Merge pull request #45 from trocco-io/support_change_event
Browse files Browse the repository at this point in the history
Supoort resource type of change_event
  • Loading branch information
d-hrs authored Dec 14, 2023
2 parents 333b1e3 + ababe29 commit f172dc8
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 14 deletions.
85 changes: 71 additions & 14 deletions src/main/java/org/embulk/input/google_ads/GoogleAdsReporter.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.fasterxml.jackson.databind.node.JsonNodeType;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.ads.googleads.lib.GoogleAdsClient;
import com.google.ads.googleads.v13.services.GoogleAdsRow;
import com.google.ads.googleads.v13.services.GoogleAdsServiceClient;
import com.google.ads.googleads.v13.services.SearchGoogleAdsRequest;
import com.google.auth.oauth2.UserCredentials;
Expand Down Expand Up @@ -54,12 +55,38 @@ private UserCredentials buildCredential(PluginTask task)

public Iterable<GoogleAdsServiceClient.SearchPage> getReportPage()
{
String query = buildQuery(task);
logger.info(query);
SearchGoogleAdsRequest request = buildRequest(task, query);
GoogleAdsServiceClient googleAdsService = client.getVersion13().createGoogleAdsServiceClient();
GoogleAdsServiceClient.SearchPagedResponse response = googleAdsService.search(request);
return response.iteratePages();
List<GoogleAdsServiceClient.SearchPage> pages = new ArrayList<GoogleAdsServiceClient.SearchPage>();

String startDateTime = null;
do {
String query = buildQuery(task, startDateTime);
logger.info(query);
SearchGoogleAdsRequest request = buildRequest(task, query);
GoogleAdsServiceClient googleAdsService = client.getVersion13().createGoogleAdsServiceClient();
GoogleAdsServiceClient.SearchPagedResponse response = googleAdsService.search(request);

if (response.getPage().getResponse().getResultsCount() == 0) {
return pages;
}

response.iteratePages().iterator().forEachRemaining(pages::add);

if (task.getResourceType().equals("change_event")) {
GoogleAdsServiceClient.SearchPage lastPage = pages.get(pages.size() - 1);
GoogleAdsRow lastRow = null;
for(GoogleAdsRow row : lastPage.getValues()) {
lastRow = row;
}

if (lastRow == null) {
break;
} else {
startDateTime = lastRow.getChangeEvent().getChangeDateTime();
}
}
} while (startDateTime != null && !startDateTime.isEmpty());

return pages;
}

public void flattenResource(String resourceName, Map<Descriptors.FieldDescriptor, Object> fields, Map<String, String> result)
Expand Down Expand Up @@ -203,7 +230,7 @@ public SearchGoogleAdsRequest buildRequest(PluginTask task, String query)
.build();
}

public String buildQuery(PluginTask task)
public String buildQuery(PluginTask task, String startDateTime)
{
StringBuilder sb = new StringBuilder();

Expand All @@ -213,29 +240,38 @@ public String buildQuery(PluginTask task)
sb.append(" FROM ");
sb.append(task.getResourceType());

List<String> whereClause = buildWhereClauseConditions(task);
List<String> whereClause = buildWhereClauseConditions(task, startDateTime);
if (!whereClause.isEmpty()) {
sb.append(" WHERE ");
sb.append(String.join(" AND ", whereClause));
}

if (task.getLimit().isPresent()) {
sb.append(" LIMIT ");
sb.append(task.getLimit().get());
}

return sb.toString();
}

@VisibleForTesting
public List<String> buildWhereClauseConditions(PluginTask task)
public List<String> buildWhereClauseConditions(PluginTask task, String startDateTime)
{
List<String> whereConditions = new ArrayList<String>()
{
};

if (task.getDateRange().isPresent()) {
StringBuilder dateSb = new StringBuilder();
dateSb.append("segments.date BETWEEN '");
dateSb.append(task.getDateRange().get().getStartDate());
dateSb.append("' AND '");
dateSb.append(task.getDateRange().get().getEndDate());
dateSb.append("'");
if (task.getResourceType().equals("change_event")) {
dateSb.append(buildWhereClauseConditionsForChangeEvent(startDateTime));
} else {
dateSb.append("segments.date BETWEEN '");
dateSb.append(task.getDateRange().get().getStartDate());
dateSb.append("' AND '");
dateSb.append(task.getDateRange().get().getEndDate());
dateSb.append("'");
}
whereConditions.add(dateSb.toString());
}

Expand All @@ -257,4 +293,25 @@ public void connect()
}
this.client = builder.build();
}

private String buildWhereClauseConditionsForChangeEvent(String startDateTime)
{
StringBuilder dateSb = new StringBuilder();
dateSb.append("change_event.change_date_time ");
if (startDateTime == null) {
dateSb.append(" >= '");
dateSb.append(task.getDateRange().get().getStartDate());
} else {
dateSb.append(" > '");
dateSb.append(startDateTime);
}
dateSb.append("' AND ");
dateSb.append("change_event.change_date_time ");
dateSb.append(" <= '");
dateSb.append(task.getDateRange().get().getEndDate());
dateSb.append("'");
dateSb.append(" ORDER BY change_event.change_date_time ASC");

return dateSb.toString();
}
}
4 changes: 4 additions & 0 deletions src/main/java/org/embulk/input/google_ads/PluginTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ public interface PluginTask extends Task
@ConfigDefault("null")
Optional<GoogleAdsDateRange> getDateRange();

@Config("limit")
@ConfigDefault("null")
Optional<String> getLimit();

@Config("_use_micro")
@ConfigDefault("true")
boolean getUseMicro();
Expand Down

0 comments on commit f172dc8

Please sign in to comment.