Skip to content

Commit

Permalink
Merge pull request #19 from trocco-io/marge-treasure-data-v0.6.24
Browse files Browse the repository at this point in the history
Marge treasure data v0.6.24
  • Loading branch information
t3t5u authored Jul 13, 2022
2 parents 0a09ba7 + 547063e commit b6a697f
Show file tree
Hide file tree
Showing 23 changed files with 939 additions and 180 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/gem-push.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ on:
workflow_dispatch:
push:
tags:
- 'v*'
- '*'

jobs:
build:
Expand Down
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
## 0.6.24 - 2021-10-19
- Add retryable in case response data invalid json format

## 0.6.22 - 2021-08-17
- Support Program Members

## 0.6.21 - 2021-07-15
- Upgrade `embulk-*` to `v0.10.29`.
- Apply new lib `embulk-util-*`.
Expand Down
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ embulk-input-marketo is the gem preparing Embulk input plugins for [Marketo](htt
- Lead by program(all_lead_with_program_id)
- Campaign(campaign)
- Assets Programs (program)
- Program Members (program_members)
- List (list)
- Activity Type (activity_type)

Expand Down Expand Up @@ -192,6 +193,14 @@ Configuration:
Schema type: dynamic schema
Incremental support: no

### Program Members configuration parameter

Get Members by Program Ids or All Program.

| name | required | default value | description |
|---------------------|----------|---------------|-----------------------------------------------------------------------------------------------------------------------|
| **program_ids** | false | null | Import Members by specified Program_ID (comma-separated). If not specified will import all Members by all Program IDs |

### List

List extract all list data from Marketo
Expand Down
18 changes: 11 additions & 7 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,17 @@ repositories {
group = "com.treasuredata.embulk.plugins"
description = "Loads records from Marketo."
version = {
def baseVersion = "0.6.23"
def patchVersion = "1"
def baseVersion = "0.6.24"
def troccoVersion = "0.0.1"
def tag = "${baseVersion}-trocco-${troccoVersion}"
def vd = versionDetails()
if (vd.lastTag != "${baseVersion}.${patchVersion}") {
logger.warn "lastTag '${vd.lastTag}' is not '${baseVersion}.${patchVersion}'"
if (vd.lastTag != "${tag}") {
logger.warn "lastTag '${vd.lastTag}' is not '${tag}'"
}
if (vd.commitDistance == 0 && vd.lastTag ==~ /^[0-9]+\.[0-9]+\.[0-9]+(\.[a-zA-Z0-9]+)?/) {
"${vd.lastTag}.trocco"
if (vd.commitDistance == 0 && vd.lastTag ==~ /^[0-9]+\.[0-9]+\.[0-9]+(\[.-][.a-zA-Z0-9-]+)?/) {
vd.lastTag
} else {
"${baseVersion}.${vd.gitHash}.pre"
"0.0.0.${vd.gitHash}"
}
}()

Expand Down Expand Up @@ -56,6 +57,9 @@ dependencies {
compile 'com.fasterxml.jackson.core:jackson-databind:2.6.7'
compile 'com.fasterxml.jackson.datatype:jackson-datatype-jdk8:2.6.7'
compile 'javax.validation:validation-api:1.1.0.Final'
compile('org.apache.bval:bval-jsr303:0.5'){
exclude group: 'org.apache.commons', module: 'commons-lang'
}

compile 'org.embulk:embulk-base-restclient:0.10.1'
compile 'org.embulk:embulk-util-retryhelper-jetty92:0.8.2'
Expand Down
3 changes: 3 additions & 0 deletions gradle/dependency-locks/embulkPluginRuntime.lockfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@ com.fasterxml.jackson.datatype:jackson-datatype-jdk8:2.6.7
com.google.code.findbugs:annotations:3.0.1
com.google.code.findbugs:jsr305:3.0.1
com.google.guava:guava:18.0
commons-beanutils:commons-beanutils-core:1.8.3
javax.validation:validation-api:1.1.0.Final
net.jcip:jcip-annotations:1.0
org.apache.bval:bval-core:0.5
org.apache.bval:bval-jsr303:0.5
org.apache.commons:commons-csv:1.8
org.apache.commons:commons-lang3:3.4
org.eclipse.jetty:jetty-client:9.2.14.v20151106
Expand Down
10 changes: 9 additions & 1 deletion src/main/java/org/embulk/input/marketo/MarketoInputPlugin.java
Original file line number Diff line number Diff line change
@@ -1,15 +1,23 @@
package org.embulk.input.marketo;

import org.apache.bval.jsr303.ApacheValidationProvider;
import org.embulk.base.restclient.RestClientInputPluginBase;
import org.embulk.util.config.ConfigMapperFactory;

import javax.validation.Validation;

/**
* Created by tai.khuu on 8/22/17.
*/
public class MarketoInputPlugin
extends RestClientInputPluginBase<MarketoInputPluginDelegate.PluginTask>
{
public static final ConfigMapperFactory CONFIG_MAPPER_FACTORY = ConfigMapperFactory.builder().addDefaultModules().build();
public static final ConfigMapperFactory CONFIG_MAPPER_FACTORY = ConfigMapperFactory.builder().addDefaultModules()
.withValidator(Validation.byProvider(ApacheValidationProvider.class)
.configure()
.buildValidatorFactory()
.getValidator())
.build();
public MarketoInputPlugin()
{
super(CONFIG_MAPPER_FACTORY, MarketoInputPluginDelegate.PluginTask.class, new MarketoInputPluginDelegate());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.embulk.input.marketo.delegate.LeadWithProgramInputPlugin;
import org.embulk.input.marketo.delegate.ListInputPlugin;
import org.embulk.input.marketo.delegate.ProgramInputPlugin;
import org.embulk.input.marketo.delegate.ProgramMembersBulkExtractInputPlugin;
import org.embulk.input.marketo.rest.MarketoRestClient;
import org.embulk.util.config.Config;
import org.embulk.util.config.ConfigDefault;
Expand All @@ -34,6 +35,7 @@ public interface PluginTask
ProgramInputPlugin.PluginTask,
MarketoRestClient.PluginTask,
CustomObjectInputPlugin.PluginTask,
ProgramMembersBulkExtractInputPlugin.PluginTask,
ListInputPlugin.PluginTask,
ActivityTypeInputPlugin.PluginTask
{
Expand Down Expand Up @@ -81,6 +83,7 @@ public enum Target
ALL_LEAD_WITH_PROGRAM_ID(new LeadWithProgramInputPlugin()),
PROGRAM(new ProgramInputPlugin()),
CUSTOM_OBJECT(new CustomObjectInputPlugin()),
PROGRAM_MEMBERS(new ProgramMembersBulkExtractInputPlugin()),
LIST(new ListInputPlugin()),
ACTIVITY_TYPE(new ActivityTypeInputPlugin());

Expand Down
4 changes: 4 additions & 0 deletions src/main/java/org/embulk/input/marketo/MarketoService.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,8 @@ public interface MarketoService
Iterable<ObjectNode> getListsByIds(Set<String> ids);

Iterable<ObjectNode> getLists();

ObjectNode describeProgramMembers();

File extractProgramMembers(String exportID);
}
19 changes: 19 additions & 0 deletions src/main/java/org/embulk/input/marketo/MarketoServiceImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -256,4 +256,23 @@ public Iterable<ObjectNode> getLists()
{
return marketoRestClient.getLists();
}

@Override
public ObjectNode describeProgramMembers()
{
return marketoRestClient.describeProgramMembers();
}

@Override
public File extractProgramMembers(String exportID)
{
return downloadBulkExtract(new Function<BulkExtractRangeHeader, InputStream>()
{
@Override
public InputStream apply(BulkExtractRangeHeader bulkExtractRangeHeader)
{
return marketoRestClient.getProgramMemberBulkExtractResult(exportID, bulkExtractRangeHeader);
}
});
}
}
64 changes: 62 additions & 2 deletions src/main/java/org/embulk/input/marketo/MarketoUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,23 @@
import org.embulk.base.restclient.jackson.JacksonTopLevelValueLocator;
import org.embulk.base.restclient.record.ServiceRecord;
import org.embulk.base.restclient.record.ValueLocator;
import org.embulk.config.TaskReport;
import org.embulk.input.marketo.model.MarketoField;
import org.embulk.spi.Column;
import org.embulk.spi.ColumnVisitor;
import org.embulk.spi.PageBuilder;
import org.embulk.spi.Schema;
import org.embulk.spi.time.Timestamp;
import org.embulk.util.json.JsonParser;
import org.embulk.util.retryhelper.RetryExecutor;
import org.embulk.util.retryhelper.RetryGiveupException;
import org.embulk.util.retryhelper.Retryable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.time.Instant;
import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.Iterator;
Expand All @@ -28,6 +37,8 @@
import java.util.Optional;
import java.util.Set;

import static org.embulk.input.marketo.MarketoInputPlugin.CONFIG_MAPPER_FACTORY;

/**
* Created by tai.khuu on 9/18/17.
*/
Expand Down Expand Up @@ -139,15 +150,15 @@ public static String getIdentityEndPoint(String accountId, Optional<String> endp
if (endpoint.isPresent()) {
return endpoint.get() + "/identity";
}
return "https://" + accountId + ".mktorest.com/identity";
return "https://" + accountId.trim() + ".mktorest.com/identity";
}

public static String getEndPoint(String accountID, Optional<String> endpoint)
{
if (endpoint.isPresent()) {
return endpoint.get();
}
return "https://" + accountID + ".mktorest.com";
return "https://" + accountID.trim() + ".mktorest.com";
}

public static final class DateRange
Expand Down Expand Up @@ -248,4 +259,53 @@ public void remove()
}
};
}

public static TaskReport importMockPreviewData(final PageBuilder pageBuilder, int numberRecords)
{
final JsonParser jsonParser = new JsonParser();
Schema schema = pageBuilder.getSchema();
for (int i = 1; i <= numberRecords; i++) {
final int rowNum = i;
schema.visitColumns(new ColumnVisitor()
{
@Override
public void booleanColumn(Column column)
{
pageBuilder.setBoolean(column, false);
}

@Override
public void longColumn(Column column)
{
pageBuilder.setLong(column, 12345L);
}

@Override
public void doubleColumn(Column column)
{
pageBuilder.setDouble(column, 12345.123);
}

@Override
public void stringColumn(Column column)
{
pageBuilder.setString(column, column.getName().endsWith("Id") || column.getName().equals("id") ? Integer.toString(rowNum) : column.getName() + "_" + rowNum);
}

@Override
public void timestampColumn(Column column)
{
pageBuilder.setTimestamp(column, Timestamp.ofInstant(Instant.ofEpochMilli(System.currentTimeMillis())));
}

@Override
public void jsonColumn(Column column)
{
pageBuilder.setJson(column, jsonParser.parse("{\"mockKey\":\"mockValue\"}"));
}
});
pageBuilder.addRecord();
}
return CONFIG_MAPPER_FACTORY.newTaskReport();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package org.embulk.input.marketo.bulk_extract;

import com.fasterxml.jackson.databind.node.ObjectNode;
import org.embulk.base.restclient.jackson.JacksonServiceRecord;
import org.embulk.base.restclient.jackson.JacksonServiceValue;
import org.embulk.base.restclient.record.ValueLocator;
import org.embulk.util.json.JsonParser;
import org.embulk.util.timestamp.TimestampFormatter;
import org.msgpack.value.Value;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Instant;

public class AllStringJacksonServiceRecord extends JacksonServiceRecord
{
private static final Logger LOGGER = LoggerFactory.getLogger(AllStringJacksonServiceRecord.class);

public AllStringJacksonServiceRecord(ObjectNode record)
{
super(record);
}

@Override
public JacksonServiceValue getValue(ValueLocator locator)
{
// We know that this thing only contain text.
JacksonServiceValue value = super.getValue(locator);
return new StringConverterJacksonServiceRecord(value.stringValue());
}

private class StringConverterJacksonServiceRecord extends JacksonServiceValue
{
private final String textValue;

public StringConverterJacksonServiceRecord(String textValue)
{
super(null);
this.textValue = textValue;
}

@Override
public boolean isNull()
{
return textValue == null || textValue.equals("null");
}

@Override
public boolean booleanValue()
{
return Boolean.parseBoolean(textValue);
}

@Override
public double doubleValue()
{
try {
return Double.parseDouble(textValue);
}
catch (Exception e) {
LOGGER.info("skipped to parse Double: " + textValue);
return Double.NaN;
}
}

@Override
public Value jsonValue(JsonParser jsonParser)
{
try {
return jsonParser.parse(textValue);
}
catch (Exception e) {
LOGGER.info("skipped to parse JSON: " + textValue);
return jsonParser.parse("{}");
}
}

@Override
public long longValue()
{
try {
return Long.parseLong(textValue);
}
catch (Exception e) {
LOGGER.info("skipped to parse Long: " + textValue);
return Long.MIN_VALUE;
}
}

@Override
public String stringValue()
{
return textValue;
}

@Override
public Instant timestampValue(TimestampFormatter timestampFormatter)
{
try {
return timestampFormatter.parse(textValue);
}
catch (Exception e) {
LOGGER.info("skipped to parse Timestamp: " + textValue);
return null;
}
}
}
}
Loading

0 comments on commit b6a697f

Please sign in to comment.