diff --git a/.github/workflows/gem-push.yml b/.github/workflows/gem-push.yml index 8eb09f1..f7bb584 100644 --- a/.github/workflows/gem-push.yml +++ b/.github/workflows/gem-push.yml @@ -4,7 +4,7 @@ on: workflow_dispatch: push: tags: - - 'v*' + - '*' jobs: build: diff --git a/CHANGELOG.md b/CHANGELOG.md index d557421..a127495 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ +## 0.6.24 - 2021-10-19 +- Add retryable in case response data invalid json format + +## 0.6.22 - 2021-08-17 +- Support Program Members + ## 0.6.21 - 2021-07-15 - Upgrade `embulk-*` to `v0.10.29`. - Apply new lib `embulk-util-*`. diff --git a/README.md b/README.md index 9b10543..da9f4a3 100644 --- a/README.md +++ b/README.md @@ -13,6 +13,7 @@ embulk-input-marketo is the gem preparing Embulk input plugins for [Marketo](htt - Lead by program(all_lead_with_program_id) - Campaign(campaign) - Assets Programs (program) +- Program Members (program_members) - List (list) - Activity Type (activity_type) @@ -192,6 +193,14 @@ Configuration: Schema type: dynamic schema Incremental support: no +### Program Members configuration parameter + +Get Members by Program Ids or All Program. + +| name | required | default value | description | +|---------------------|----------|---------------|-----------------------------------------------------------------------------------------------------------------------| +| **program_ids** | false | null | Import Members by specified Program_ID (comma-separated). If not specified will import all Members by all Program IDs | + ### List List extract all list data from Marketo diff --git a/build.gradle b/build.gradle index aededc7..86000a8 100644 --- a/build.gradle +++ b/build.gradle @@ -15,16 +15,17 @@ repositories { group = "com.treasuredata.embulk.plugins" description = "Loads records from Marketo." version = { - def baseVersion = "0.6.23" - def patchVersion = "1" + def baseVersion = "0.6.24" + def troccoVersion = "0.0.1" + def tag = "${baseVersion}-trocco-${troccoVersion}" def vd = versionDetails() - if (vd.lastTag != "${baseVersion}.${patchVersion}") { - logger.warn "lastTag '${vd.lastTag}' is not '${baseVersion}.${patchVersion}'" + if (vd.lastTag != "${tag}") { + logger.warn "lastTag '${vd.lastTag}' is not '${tag}'" } - if (vd.commitDistance == 0 && vd.lastTag ==~ /^[0-9]+\.[0-9]+\.[0-9]+(\.[a-zA-Z0-9]+)?/) { - "${vd.lastTag}.trocco" + if (vd.commitDistance == 0 && vd.lastTag ==~ /^[0-9]+\.[0-9]+\.[0-9]+(\[.-][.a-zA-Z0-9-]+)?/) { + vd.lastTag } else { - "${baseVersion}.${vd.gitHash}.pre" + "0.0.0.${vd.gitHash}" } }() @@ -56,6 +57,9 @@ dependencies { compile 'com.fasterxml.jackson.core:jackson-databind:2.6.7' compile 'com.fasterxml.jackson.datatype:jackson-datatype-jdk8:2.6.7' compile 'javax.validation:validation-api:1.1.0.Final' + compile('org.apache.bval:bval-jsr303:0.5'){ + exclude group: 'org.apache.commons', module: 'commons-lang' + } compile 'org.embulk:embulk-base-restclient:0.10.1' compile 'org.embulk:embulk-util-retryhelper-jetty92:0.8.2' diff --git a/gradle/dependency-locks/embulkPluginRuntime.lockfile b/gradle/dependency-locks/embulkPluginRuntime.lockfile index e3beda5..703fc62 100644 --- a/gradle/dependency-locks/embulkPluginRuntime.lockfile +++ b/gradle/dependency-locks/embulkPluginRuntime.lockfile @@ -8,8 +8,11 @@ com.fasterxml.jackson.datatype:jackson-datatype-jdk8:2.6.7 com.google.code.findbugs:annotations:3.0.1 com.google.code.findbugs:jsr305:3.0.1 com.google.guava:guava:18.0 +commons-beanutils:commons-beanutils-core:1.8.3 javax.validation:validation-api:1.1.0.Final net.jcip:jcip-annotations:1.0 +org.apache.bval:bval-core:0.5 +org.apache.bval:bval-jsr303:0.5 org.apache.commons:commons-csv:1.8 org.apache.commons:commons-lang3:3.4 org.eclipse.jetty:jetty-client:9.2.14.v20151106 diff --git a/src/main/java/org/embulk/input/marketo/MarketoInputPlugin.java b/src/main/java/org/embulk/input/marketo/MarketoInputPlugin.java index e4a9e0f..bcc5af8 100644 --- a/src/main/java/org/embulk/input/marketo/MarketoInputPlugin.java +++ b/src/main/java/org/embulk/input/marketo/MarketoInputPlugin.java @@ -1,15 +1,23 @@ package org.embulk.input.marketo; +import org.apache.bval.jsr303.ApacheValidationProvider; import org.embulk.base.restclient.RestClientInputPluginBase; import org.embulk.util.config.ConfigMapperFactory; +import javax.validation.Validation; + /** * Created by tai.khuu on 8/22/17. */ public class MarketoInputPlugin extends RestClientInputPluginBase { - public static final ConfigMapperFactory CONFIG_MAPPER_FACTORY = ConfigMapperFactory.builder().addDefaultModules().build(); + public static final ConfigMapperFactory CONFIG_MAPPER_FACTORY = ConfigMapperFactory.builder().addDefaultModules() + .withValidator(Validation.byProvider(ApacheValidationProvider.class) + .configure() + .buildValidatorFactory() + .getValidator()) + .build(); public MarketoInputPlugin() { super(CONFIG_MAPPER_FACTORY, MarketoInputPluginDelegate.PluginTask.class, new MarketoInputPluginDelegate()); diff --git a/src/main/java/org/embulk/input/marketo/MarketoInputPluginDelegate.java b/src/main/java/org/embulk/input/marketo/MarketoInputPluginDelegate.java index 16f8646..eb67ee4 100644 --- a/src/main/java/org/embulk/input/marketo/MarketoInputPluginDelegate.java +++ b/src/main/java/org/embulk/input/marketo/MarketoInputPluginDelegate.java @@ -15,6 +15,7 @@ import org.embulk.input.marketo.delegate.LeadWithProgramInputPlugin; import org.embulk.input.marketo.delegate.ListInputPlugin; import org.embulk.input.marketo.delegate.ProgramInputPlugin; +import org.embulk.input.marketo.delegate.ProgramMembersBulkExtractInputPlugin; import org.embulk.input.marketo.rest.MarketoRestClient; import org.embulk.util.config.Config; import org.embulk.util.config.ConfigDefault; @@ -34,6 +35,7 @@ public interface PluginTask ProgramInputPlugin.PluginTask, MarketoRestClient.PluginTask, CustomObjectInputPlugin.PluginTask, + ProgramMembersBulkExtractInputPlugin.PluginTask, ListInputPlugin.PluginTask, ActivityTypeInputPlugin.PluginTask { @@ -81,6 +83,7 @@ public enum Target ALL_LEAD_WITH_PROGRAM_ID(new LeadWithProgramInputPlugin()), PROGRAM(new ProgramInputPlugin()), CUSTOM_OBJECT(new CustomObjectInputPlugin()), + PROGRAM_MEMBERS(new ProgramMembersBulkExtractInputPlugin()), LIST(new ListInputPlugin()), ACTIVITY_TYPE(new ActivityTypeInputPlugin()); diff --git a/src/main/java/org/embulk/input/marketo/MarketoService.java b/src/main/java/org/embulk/input/marketo/MarketoService.java index a2021a7..1164f9f 100644 --- a/src/main/java/org/embulk/input/marketo/MarketoService.java +++ b/src/main/java/org/embulk/input/marketo/MarketoService.java @@ -44,4 +44,8 @@ public interface MarketoService Iterable getListsByIds(Set ids); Iterable getLists(); + + ObjectNode describeProgramMembers(); + + File extractProgramMembers(String exportID); } diff --git a/src/main/java/org/embulk/input/marketo/MarketoServiceImpl.java b/src/main/java/org/embulk/input/marketo/MarketoServiceImpl.java index d5a7a71..0638d25 100644 --- a/src/main/java/org/embulk/input/marketo/MarketoServiceImpl.java +++ b/src/main/java/org/embulk/input/marketo/MarketoServiceImpl.java @@ -256,4 +256,23 @@ public Iterable getLists() { return marketoRestClient.getLists(); } + + @Override + public ObjectNode describeProgramMembers() + { + return marketoRestClient.describeProgramMembers(); + } + + @Override + public File extractProgramMembers(String exportID) + { + return downloadBulkExtract(new Function() + { + @Override + public InputStream apply(BulkExtractRangeHeader bulkExtractRangeHeader) + { + return marketoRestClient.getProgramMemberBulkExtractResult(exportID, bulkExtractRangeHeader); + } + }); + } } diff --git a/src/main/java/org/embulk/input/marketo/MarketoUtils.java b/src/main/java/org/embulk/input/marketo/MarketoUtils.java index 217a660..1cb04a7 100644 --- a/src/main/java/org/embulk/input/marketo/MarketoUtils.java +++ b/src/main/java/org/embulk/input/marketo/MarketoUtils.java @@ -12,7 +12,14 @@ import org.embulk.base.restclient.jackson.JacksonTopLevelValueLocator; import org.embulk.base.restclient.record.ServiceRecord; import org.embulk.base.restclient.record.ValueLocator; +import org.embulk.config.TaskReport; import org.embulk.input.marketo.model.MarketoField; +import org.embulk.spi.Column; +import org.embulk.spi.ColumnVisitor; +import org.embulk.spi.PageBuilder; +import org.embulk.spi.Schema; +import org.embulk.spi.time.Timestamp; +import org.embulk.util.json.JsonParser; import org.embulk.util.retryhelper.RetryExecutor; import org.embulk.util.retryhelper.RetryGiveupException; import org.embulk.util.retryhelper.Retryable; @@ -20,6 +27,8 @@ import org.slf4j.LoggerFactory; import javax.annotation.Nullable; + +import java.time.Instant; import java.time.OffsetDateTime; import java.util.ArrayList; import java.util.Iterator; @@ -28,6 +37,8 @@ import java.util.Optional; import java.util.Set; +import static org.embulk.input.marketo.MarketoInputPlugin.CONFIG_MAPPER_FACTORY; + /** * Created by tai.khuu on 9/18/17. */ @@ -139,7 +150,7 @@ public static String getIdentityEndPoint(String accountId, Optional endp if (endpoint.isPresent()) { return endpoint.get() + "/identity"; } - return "https://" + accountId + ".mktorest.com/identity"; + return "https://" + accountId.trim() + ".mktorest.com/identity"; } public static String getEndPoint(String accountID, Optional endpoint) @@ -147,7 +158,7 @@ public static String getEndPoint(String accountID, Optional endpoint) if (endpoint.isPresent()) { return endpoint.get(); } - return "https://" + accountID + ".mktorest.com"; + return "https://" + accountID.trim() + ".mktorest.com"; } public static final class DateRange @@ -248,4 +259,53 @@ public void remove() } }; } + + public static TaskReport importMockPreviewData(final PageBuilder pageBuilder, int numberRecords) + { + final JsonParser jsonParser = new JsonParser(); + Schema schema = pageBuilder.getSchema(); + for (int i = 1; i <= numberRecords; i++) { + final int rowNum = i; + schema.visitColumns(new ColumnVisitor() + { + @Override + public void booleanColumn(Column column) + { + pageBuilder.setBoolean(column, false); + } + + @Override + public void longColumn(Column column) + { + pageBuilder.setLong(column, 12345L); + } + + @Override + public void doubleColumn(Column column) + { + pageBuilder.setDouble(column, 12345.123); + } + + @Override + public void stringColumn(Column column) + { + pageBuilder.setString(column, column.getName().endsWith("Id") || column.getName().equals("id") ? Integer.toString(rowNum) : column.getName() + "_" + rowNum); + } + + @Override + public void timestampColumn(Column column) + { + pageBuilder.setTimestamp(column, Timestamp.ofInstant(Instant.ofEpochMilli(System.currentTimeMillis()))); + } + + @Override + public void jsonColumn(Column column) + { + pageBuilder.setJson(column, jsonParser.parse("{\"mockKey\":\"mockValue\"}")); + } + }); + pageBuilder.addRecord(); + } + return CONFIG_MAPPER_FACTORY.newTaskReport(); + } } diff --git a/src/main/java/org/embulk/input/marketo/bulk_extract/AllStringJacksonServiceRecord.java b/src/main/java/org/embulk/input/marketo/bulk_extract/AllStringJacksonServiceRecord.java new file mode 100644 index 0000000..c6938ca --- /dev/null +++ b/src/main/java/org/embulk/input/marketo/bulk_extract/AllStringJacksonServiceRecord.java @@ -0,0 +1,108 @@ +package org.embulk.input.marketo.bulk_extract; + +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.embulk.base.restclient.jackson.JacksonServiceRecord; +import org.embulk.base.restclient.jackson.JacksonServiceValue; +import org.embulk.base.restclient.record.ValueLocator; +import org.embulk.util.json.JsonParser; +import org.embulk.util.timestamp.TimestampFormatter; +import org.msgpack.value.Value; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Instant; + +public class AllStringJacksonServiceRecord extends JacksonServiceRecord +{ + private static final Logger LOGGER = LoggerFactory.getLogger(AllStringJacksonServiceRecord.class); + + public AllStringJacksonServiceRecord(ObjectNode record) + { + super(record); + } + + @Override + public JacksonServiceValue getValue(ValueLocator locator) + { + // We know that this thing only contain text. + JacksonServiceValue value = super.getValue(locator); + return new StringConverterJacksonServiceRecord(value.stringValue()); + } + + private class StringConverterJacksonServiceRecord extends JacksonServiceValue + { + private final String textValue; + + public StringConverterJacksonServiceRecord(String textValue) + { + super(null); + this.textValue = textValue; + } + + @Override + public boolean isNull() + { + return textValue == null || textValue.equals("null"); + } + + @Override + public boolean booleanValue() + { + return Boolean.parseBoolean(textValue); + } + + @Override + public double doubleValue() + { + try { + return Double.parseDouble(textValue); + } + catch (Exception e) { + LOGGER.info("skipped to parse Double: " + textValue); + return Double.NaN; + } + } + + @Override + public Value jsonValue(JsonParser jsonParser) + { + try { + return jsonParser.parse(textValue); + } + catch (Exception e) { + LOGGER.info("skipped to parse JSON: " + textValue); + return jsonParser.parse("{}"); + } + } + + @Override + public long longValue() + { + try { + return Long.parseLong(textValue); + } + catch (Exception e) { + LOGGER.info("skipped to parse Long: " + textValue); + return Long.MIN_VALUE; + } + } + + @Override + public String stringValue() + { + return textValue; + } + + @Override + public Instant timestampValue(TimestampFormatter timestampFormatter) + { + try { + return timestampFormatter.parse(textValue); + } + catch (Exception e) { + LOGGER.info("skipped to parse Timestamp: " + textValue); + return null; + } + } + } +} diff --git a/src/main/java/org/embulk/input/marketo/bulk_extract/CsvRecordIterator.java b/src/main/java/org/embulk/input/marketo/bulk_extract/CsvRecordIterator.java new file mode 100644 index 0000000..a6407ac --- /dev/null +++ b/src/main/java/org/embulk/input/marketo/bulk_extract/CsvRecordIterator.java @@ -0,0 +1,80 @@ +package org.embulk.input.marketo.bulk_extract; + +import org.embulk.input.marketo.CsvTokenizer; +import org.embulk.spi.DataException; +import org.embulk.util.text.LineDecoder; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; + +public class CsvRecordIterator implements Iterator> +{ + private final CsvTokenizer tokenizer; + private final List headers; + private Map currentCsvRecord; + + public CsvRecordIterator(LineDecoder lineDecoder, T task) + { + tokenizer = new CsvTokenizer(lineDecoder, task); + if (!tokenizer.nextFile()) { + throw new DataException("Can't read extract input stream"); + } + headers = new ArrayList<>(); + tokenizer.nextRecord(); + while (tokenizer.hasNextColumn()) { + headers.add(tokenizer.nextColumn()); + } + } + + @Override + public boolean hasNext() + { + if (currentCsvRecord == null) { + currentCsvRecord = getNextCSVRecord(); + } + return currentCsvRecord != null; + } + + @Override + public Map next() + { + try { + if (hasNext()) { + return currentCsvRecord; + } + } + finally { + currentCsvRecord = null; + } + throw new NoSuchElementException(); + } + + @Override + public void remove() + { + throw new UnsupportedOperationException(); + } + private Map getNextCSVRecord() + { + if (!tokenizer.nextRecord()) { + return null; + } + Map kvMap = new HashMap<>(); + try { + int i = 0; + while (tokenizer.hasNextColumn()) { + kvMap.put(headers.get(i), tokenizer.nextColumnOrNull()); + i++; + } + } + catch (CsvTokenizer.InvalidValueException ex) { + throw new DataException("Encounter exception when parse csv file. Please check to see if you are using the correct" + + "quote or escape character.", ex); + } + return kvMap; + } +} diff --git a/src/main/java/org/embulk/input/marketo/delegate/MarketoBaseBulkExtractInputPlugin.java b/src/main/java/org/embulk/input/marketo/delegate/MarketoBaseBulkExtractInputPlugin.java index 7cea88c..8a514b1 100644 --- a/src/main/java/org/embulk/input/marketo/delegate/MarketoBaseBulkExtractInputPlugin.java +++ b/src/main/java/org/embulk/input/marketo/delegate/MarketoBaseBulkExtractInputPlugin.java @@ -4,11 +4,8 @@ import org.apache.commons.csv.CSVParser; import org.apache.commons.csv.CSVRecord; import org.apache.commons.lang3.exception.ExceptionUtils; -import org.embulk.base.restclient.jackson.JacksonServiceRecord; -import org.embulk.base.restclient.jackson.JacksonServiceValue; import org.embulk.base.restclient.record.RecordImporter; import org.embulk.base.restclient.record.ServiceRecord; -import org.embulk.base.restclient.record.ValueLocator; import org.embulk.config.ConfigDiff; import org.embulk.config.ConfigException; import org.embulk.config.TaskReport; @@ -16,21 +13,16 @@ import org.embulk.input.marketo.MarketoService; import org.embulk.input.marketo.MarketoServiceImpl; import org.embulk.input.marketo.MarketoUtils; +import org.embulk.input.marketo.bulk_extract.AllStringJacksonServiceRecord; import org.embulk.input.marketo.rest.MarketoRestClient; -import org.embulk.spi.Column; -import org.embulk.spi.ColumnVisitor; import org.embulk.spi.Exec; import org.embulk.spi.PageBuilder; import org.embulk.spi.Schema; -import org.embulk.spi.time.Timestamp; import org.embulk.util.config.Config; import org.embulk.util.config.ConfigDefault; import org.embulk.util.file.FileInputInputStream; import org.embulk.util.file.InputStreamFileInput; -import org.embulk.util.json.JsonParser; import org.embulk.util.text.LineDecoder; -import org.embulk.util.timestamp.TimestampFormatter; -import org.msgpack.value.Value; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,7 +35,6 @@ import java.nio.charset.CodingErrorAction; import java.text.DateFormat; import java.text.SimpleDateFormat; -import java.time.Instant; import java.time.OffsetDateTime; import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; @@ -159,7 +150,7 @@ public TaskReport ingestServiceData(final T task, RecordImporter recordImporter, { TaskReport taskReport = CONFIG_MAPPER_FACTORY.newTaskReport(); if (Exec.isPreview()) { - return importMockPreviewData(pageBuilder); + return MarketoUtils.importMockPreviewData(pageBuilder, PREVIEW_RECORD_LIMIT); } else { try (LineDecoderIterator decoderIterator = getLineDecoderIterator(task)) { @@ -196,63 +187,6 @@ private CSVParser getCsvParser(LineDecoderIterator decoderIterator) throws IOExc return csvTokenizer.csvParse(); } - /** - * This method should be removed when we allow skip preview phase - */ - private TaskReport importMockPreviewData(final PageBuilder pageBuilder) - { - final JsonParser jsonParser = new JsonParser(); - Schema schema = pageBuilder.getSchema(); - for (int i = 1; i <= PREVIEW_RECORD_LIMIT; i++) { - final int rowNum = i; - schema.visitColumns(new ColumnVisitor() - { - @Override - public void booleanColumn(Column column) - { - pageBuilder.setBoolean(column, false); - } - - @Override - public void longColumn(Column column) - { - pageBuilder.setLong(column, 12345L); - } - - @Override - public void doubleColumn(Column column) - { - pageBuilder.setDouble(column, 12345.123); - } - - @Override - public void stringColumn(Column column) - { - if (column.getName().endsWith("Id") || column.getName().equals("id")) { - pageBuilder.setString(column, Integer.toString(rowNum)); - } - else { - pageBuilder.setString(column, column.getName() + "_" + rowNum); - } - } - - @Override - public void timestampColumn(Column column) - { - pageBuilder.setTimestamp(column, Timestamp.ofInstant(Instant.ofEpochMilli(System.currentTimeMillis()))); - } - - @Override - public void jsonColumn(Column column) - { - pageBuilder.setJson(column, jsonParser.parse("{\"mockKey\":\"mockValue\"}")); - } - }); - pageBuilder.addRecord(); - } - return CONFIG_MAPPER_FACTORY.newTaskReport(); - } - private LineDecoderIterator getLineDecoderIterator(T task) { final OffsetDateTime fromDate = OffsetDateTime.ofInstant(task.getFromDate().toInstant(), ZoneOffset.UTC); @@ -272,99 +206,6 @@ protected final Iterator getServiceRecords(MarketoService marketo protected abstract InputStream getExtractedStream(MarketoService service, T task, OffsetDateTime fromDate, OffsetDateTime toDate); - private static class AllStringJacksonServiceRecord extends JacksonServiceRecord - { - public AllStringJacksonServiceRecord(ObjectNode record) - { - super(record); - } - - @Override - public JacksonServiceValue getValue(ValueLocator locator) - { - // We know that this thing only contain text. - JacksonServiceValue value = super.getValue(locator); - return new StringConverterJacksonServiceRecord(value.stringValue()); - } - } - - private static class StringConverterJacksonServiceRecord extends JacksonServiceValue - { - private final String textValue; - - public StringConverterJacksonServiceRecord(String textValue) - { - super(null); - this.textValue = textValue; - } - - @Override - public boolean isNull() - { - return textValue == null || textValue.equals("null"); - } - - @Override - public boolean booleanValue() - { - return Boolean.parseBoolean(textValue); - } - - @Override - public double doubleValue() - { - try { - return Double.parseDouble(textValue); - } - catch (Exception e) { - LOGGER.info("skipped to parse Double: " + textValue); - return Double.NaN; - } - } - - @Override - public Value jsonValue(JsonParser jsonParser) - { - try { - return jsonParser.parse(textValue); - } - catch (Exception e) { - LOGGER.info("skipped to parse JSON: " + textValue); - return jsonParser.parse("{}"); - } - } - - @Override - public long longValue() - { - try { - return Long.parseLong(textValue); - } - catch (Exception e) { - LOGGER.info("skipped to parse Long: " + textValue); - return Long.MIN_VALUE; - } - } - - @Override - public String stringValue() - { - return textValue; - } - - @Override - public Instant timestampValue(TimestampFormatter timestampFormatter) - { - try { - return timestampFormatter.parse(textValue); - } - catch (Exception e) { - LOGGER.info("skipped to parse Timestamp: " + textValue); - return null; - } - } - } - private final class LineDecoderIterator implements Iterator, AutoCloseable { private LineDecoder currentLineDecoder; diff --git a/src/main/java/org/embulk/input/marketo/delegate/ProgramMembersBulkExtractInputPlugin.java b/src/main/java/org/embulk/input/marketo/delegate/ProgramMembersBulkExtractInputPlugin.java new file mode 100644 index 0000000..75c974d --- /dev/null +++ b/src/main/java/org/embulk/input/marketo/delegate/ProgramMembersBulkExtractInputPlugin.java @@ -0,0 +1,242 @@ +package org.embulk.input.marketo.delegate; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.commons.lang3.StringUtils; +import org.embulk.base.restclient.ServiceResponseMapper; +import org.embulk.base.restclient.record.RecordImporter; +import org.embulk.base.restclient.record.ServiceRecord; +import org.embulk.base.restclient.record.ValueLocator; +import org.embulk.config.ConfigDiff; +import org.embulk.config.ConfigException; +import org.embulk.config.TaskReport; +import org.embulk.input.marketo.CsvTokenizer; +import org.embulk.input.marketo.MarketoService; +import org.embulk.input.marketo.MarketoServiceImpl; +import org.embulk.input.marketo.MarketoUtils; +import org.embulk.input.marketo.bulk_extract.AllStringJacksonServiceRecord; +import org.embulk.input.marketo.bulk_extract.CsvRecordIterator; +import org.embulk.input.marketo.model.MarketoField; +import org.embulk.input.marketo.rest.MarketoRestClient; +import org.embulk.spi.DataException; +import org.embulk.spi.Exec; +import org.embulk.spi.PageBuilder; +import org.embulk.spi.Schema; +import org.embulk.util.config.Config; +import org.embulk.util.config.ConfigDefault; +import org.embulk.util.file.InputStreamFileInput; +import org.embulk.util.text.LineDecoder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.validation.constraints.Max; +import javax.validation.constraints.Min; + +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.stream.Collectors; + +import static org.embulk.input.marketo.MarketoInputPlugin.CONFIG_MAPPER_FACTORY; + +public class ProgramMembersBulkExtractInputPlugin extends MarketoBaseInputPluginDelegate +{ + private final Logger logger = LoggerFactory.getLogger(getClass()); + + public interface PluginTask extends MarketoBaseInputPluginDelegate.PluginTask, CsvTokenizer.PluginTask + { + @Config("program_ids") + @ConfigDefault("null") + Optional getProgramIds(); + + @Config("polling_interval_second") + @ConfigDefault("60") + Integer getPollingIntervalSecond(); + + @Config("bulk_job_timeout_second") + @ConfigDefault("3600") + Integer getBulkJobTimeoutSecond(); + + //https://developers.marketo.com/rest-api/bulk-extract/ + @Max(2) + @Min(1) + @Config("number_concurrent_export_job") + @ConfigDefault("2") + Integer getNumberConcurrentExportJob(); + + @Config("program_member_fields") + @ConfigDefault("null") + Optional> getProgramMemberFields(); + void setProgramMemberFields(Optional> programMemberFields); + + @Config("extracted_program_ids") + @ConfigDefault("null") + Optional> getExtractedProgramIds(); + void setExtractedProgramIds(Optional> extractedProgramIds); + } + + @Override + public void validateInputTask(PluginTask task) + { + super.validateInputTask(task); + try (MarketoRestClient marketoRestClient = createMarketoRestClient(task)) { + MarketoService marketoService = new MarketoServiceImpl(marketoRestClient); + Iterable programsToRequest; + List programIds = new ArrayList<>(); + if (task.getProgramIds().isPresent() && StringUtils.isNotBlank(task.getProgramIds().get())) { + final String[] idsStr = StringUtils.split(task.getProgramIds().get(), ID_LIST_SEPARATOR_CHAR); + java.util.function.Function, Iterable> getListIds = marketoService::getProgramsByIds; + programsToRequest = super.getObjectsByIds(idsStr, getListIds); + } + else { + programsToRequest = marketoService.getPrograms(); + } + Iterator iterator = programsToRequest.iterator(); + while (iterator.hasNext()) { + ObjectNode program = iterator.next(); + int id = program.get("id").asInt(); + if (!programIds.contains(id)) { + programIds.add(id); + } + } + if (programIds.size() <= 0) { + throw new DataException("No program belong to this account."); + } + task.setExtractedProgramIds(Optional.of(programIds)); + + ObjectNode result = marketoService.describeProgramMembers(); + JsonNode fields = result.get("fields"); + if (!fields.isArray()) { + throw new DataException("[fields] isn't array node."); + } + Map extractFields = new HashMap<>(); + for (JsonNode field : fields) { + String dataType = field.get("dataType").asText(); + String name = field.get("name").asText(); + if (!extractFields.containsKey(name)) { + extractFields.put(name, dataType); + } + } + task.setProgramMemberFields(Optional.of(extractFields)); + } + } + + @Override + public ConfigDiff buildConfigDiff(PluginTask task, Schema schema, int taskCount, List taskReports) + { + return CONFIG_MAPPER_FACTORY.newConfigDiff(); + } + + @Override + public TaskReport ingestServiceData(final PluginTask task, RecordImporter recordImporter, int taskIndex, PageBuilder pageBuilder) + { + TaskReport taskReport = CONFIG_MAPPER_FACTORY.newTaskReport(); + if (Exec.isPreview()) { + return MarketoUtils.importMockPreviewData(pageBuilder, PREVIEW_RECORD_LIMIT); + } + else { + if (!task.getProgramMemberFields().isPresent() || !task.getExtractedProgramIds().isPresent()) { + throw new ConfigException("program_member_fields or extracted_programs are missing."); + } + ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(task.getNumberConcurrentExportJob()); + final MarketoRestClient restClient = createMarketoRestClient(task); + final List fieldNames = new ArrayList<>(task.getProgramMemberFields().get().keySet()); + + List listFutureExportIDs = task.getExtractedProgramIds().get().stream() + .map(programId -> { + Runnable exportTask = () -> { + String exportJobID = restClient.createProgramMembersBulkExtract(fieldNames, programId); + restClient.startProgramMembersBulkExtract(exportJobID); + int numberRecord; + try { + ObjectNode status = restClient.waitProgramMembersExportJobComplete(exportJobID, task.getPollingIntervalSecond(), task.getBulkJobTimeoutSecond()); + numberRecord = status.get("numberOfRecords").asInt(); + } + catch (InterruptedException e) { + logger.error("Exception when waiting for export program [{}], job id [{}]", programId, exportJobID, e); + throw new DataException(e); + } + if (numberRecord == 0) { + logger.info("Export program [{}], job [{}] have no record.", programId, exportJobID); + return; + } + MarketoService marketoService = new MarketoServiceImpl(restClient); + LineDecoder lineDecoder = null; + try { + InputStream extractedStream = new FileInputStream(marketoService.extractProgramMembers(exportJobID)); + lineDecoder = LineDecoder.of(new InputStreamFileInput(Exec.getBufferAllocator(), extractedStream), StandardCharsets.UTF_8, null); + Iterator> csvRecords = new CsvRecordIterator(lineDecoder, task); + int imported = 0; + while (csvRecords.hasNext()) { + Map csvRecord = csvRecords.next(); + ObjectNode objectNode = MarketoUtils.OBJECT_MAPPER.valueToTree(csvRecord); + recordImporter.importRecord(new AllStringJacksonServiceRecord(objectNode), pageBuilder); + imported = imported + 1; + } + + logger.info("Import data for program [{}], job_id [{}] finish.[{}] records imported/total [{}]", programId, exportJobID, imported, numberRecord); + } + catch (FileNotFoundException e) { + throw new RuntimeException("File export cannot be found", e); + } + finally { + if (lineDecoder != null) { + lineDecoder.close(); + } + } + }; + return executor.submit(exportTask); + }) + .collect(Collectors.toList()); + + for (Future future : listFutureExportIDs) { + try { + future.get(); + } + catch (InterruptedException | ExecutionException ex) { + logger.error("Exception occur. Shutdown execute service.........", ex); + if (executor != null && !executor.isShutdown()) { + executor.shutdownNow(); + } + throw new RuntimeException(ex); + } + } + restClient.close(); + if (executor != null && !executor.isShutdown()) { + executor.shutdownNow(); + } + return taskReport; + } + } + + @Override + protected final Iterator getServiceRecords(MarketoService marketoService, PluginTask task) + { + throw new UnsupportedOperationException(); + } + + @Override + public ServiceResponseMapper buildServiceResponseMapper(ProgramMembersBulkExtractInputPlugin.PluginTask task) + { + if (!task.getProgramMemberFields().isPresent() || task.getProgramMemberFields().get().size() <= 0) { + throw new ConfigException("program_member_fields are missing."); + } + List programMembersColumns = new ArrayList<>(); + for (Map.Entry entry : task.getProgramMemberFields().get().entrySet()) { + programMembersColumns.add(new MarketoField(entry.getKey(), entry.getValue())); + } + return MarketoUtils.buildDynamicResponseMapper(task.getSchemaColumnPrefix(), programMembersColumns); + } +} diff --git a/src/main/java/org/embulk/input/marketo/rest/MarketoBaseRestClient.java b/src/main/java/org/embulk/input/marketo/rest/MarketoBaseRestClient.java index eced5a9..fc1514f 100644 --- a/src/main/java/org/embulk/input/marketo/rest/MarketoBaseRestClient.java +++ b/src/main/java/org/embulk/input/marketo/rest/MarketoBaseRestClient.java @@ -35,6 +35,7 @@ import static com.fasterxml.jackson.core.JsonParser.Feature.ALLOW_UNQUOTED_CONTROL_CHARS; import static com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES; +import static org.embulk.input.marketo.rest.MarketoResponseJetty92EntityReader.jsonResponseInvalid; /** * Marketo base rest client @@ -108,8 +109,8 @@ public String getAccessToken() private String requestAccessToken() { final Multimap params = ArrayListMultimap.create(); - params.put("client_id", clientId); - params.put("client_secret", clientSecret); + params.put("client_id", clientId.trim()); + params.put("client_secret", clientSecret.trim()); params.put("grant_type", "client_credentials"); // add partner api key to the request @@ -295,6 +296,10 @@ protected boolean isExceptionToRetry(Exception exception) return false; } } + //retry in case request return data but invalid format + if ((exception instanceof DataException) && exception.getMessage().equals(jsonResponseInvalid)) { + return true; + } return false; } }); diff --git a/src/main/java/org/embulk/input/marketo/rest/MarketoRESTEndpoint.java b/src/main/java/org/embulk/input/marketo/rest/MarketoRESTEndpoint.java index 824bacc..02048e4 100644 --- a/src/main/java/org/embulk/input/marketo/rest/MarketoRESTEndpoint.java +++ b/src/main/java/org/embulk/input/marketo/rest/MarketoRESTEndpoint.java @@ -27,7 +27,12 @@ public enum MarketoRESTEndpoint GET_PROGRAMS_BY_TAG("/rest/asset/v1/program/byTag.json"), GET_CUSTOM_OBJECT("/rest/v1/customobjects/${api_name}.json"), GET_CUSTOM_OBJECT_DESCRIBE("/rest/v1/customobjects/${api_name}/describe.json"), - GET_ACTIVITY_TYPES("/rest/v1/activities/types.json"); + GET_ACTIVITY_TYPES("/rest/v1/activities/types.json"), + DESCRIBE_PROGRAM_MEMBERS("/rest/v1/programs/members/describe.json"), + CREATE_PROGRAM_MEMBERS_EXPORT_JOB("/bulk/v1/program/members/export/create.json"), + START_PROGRAM_MEMBERS_EXPORT_JOB("/bulk/v1/program/members/export/${export_id}/enqueue.json"), + GET_PROGRAM_MEMBERS_EXPORT_STATUS("/bulk/v1/program/members/export/${export_id}/status.json"), + GET_PROGRAM_MEMBERS_EXPORT_RESULT("/bulk/v1/program/members/export/${export_id}/file.json"); private final String endpoint; MarketoRESTEndpoint(String endpoint) diff --git a/src/main/java/org/embulk/input/marketo/rest/MarketoResponseJetty92EntityReader.java b/src/main/java/org/embulk/input/marketo/rest/MarketoResponseJetty92EntityReader.java index 354c563..669dbbc 100644 --- a/src/main/java/org/embulk/input/marketo/rest/MarketoResponseJetty92EntityReader.java +++ b/src/main/java/org/embulk/input/marketo/rest/MarketoResponseJetty92EntityReader.java @@ -34,6 +34,8 @@ public class MarketoResponseJetty92EntityReader implements Jetty92ResponseRea private final JavaType javaType; + public static String jsonResponseInvalid = "Exception when parse json content"; + public MarketoResponseJetty92EntityReader(long timeout) { this.timeout = timeout; @@ -73,7 +75,7 @@ public MarketoResponse readResponseContent() throws Exception } catch (IOException ex) { LOGGER.error("Can't parse json content", ex); - throw new DataException("Exception when parse json content"); + throw new DataException(jsonResponseInvalid); } } diff --git a/src/main/java/org/embulk/input/marketo/rest/MarketoRestClient.java b/src/main/java/org/embulk/input/marketo/rest/MarketoRestClient.java index 04c8640..092e6bf 100644 --- a/src/main/java/org/embulk/input/marketo/rest/MarketoRestClient.java +++ b/src/main/java/org/embulk/input/marketo/rest/MarketoRestClient.java @@ -294,7 +294,7 @@ public void waitActitvityExportJobComplete(String exportId, int pollingInterval, waitExportJobComplete(MarketoRESTEndpoint.GET_ACTIVITY_EXPORT_STATUS, exportId, pollingInterval, waitTimeout); } - private void waitExportJobComplete(MarketoRESTEndpoint marketoRESTEndpoint, String exportId, int pollingInterval, int waitTimeout) throws InterruptedException + private ObjectNode waitExportJobComplete(MarketoRESTEndpoint marketoRESTEndpoint, String exportId, int pollingInterval, int waitTimeout) throws InterruptedException { long waitTime = 0; long waitTimeoutMs = waitTimeout * 1000L; @@ -313,7 +313,7 @@ private void waitExportJobComplete(MarketoRESTEndpoint marketoRESTEndpoint, Stri case "Completed": logger.info("Total wait time ms is [{}]", waitTime); logger.info("File size is [{}] bytes", objectNode.get("fileSize")); - return; + return objectNode; case "Failed": throw new DataException("Bulk extract job failed exportId: " + exportId + " errorMessage: " + objectNode.get("errorMsg").asText()); case "Cancel": @@ -594,4 +594,38 @@ public Iterable getActivityTypes() { return getRecordWithOffsetPagination(endPoint + MarketoRESTEndpoint.GET_ACTIVITY_TYPES.getEndpoint(), new ImmutableListMultimap.Builder().put(MAX_RETURN, DEFAULT_MAX_RETURN).build(), ObjectNode.class); } + + public ObjectNode describeProgramMembers() + { + MarketoResponse jsonResponse = doGet(endPoint + MarketoRESTEndpoint.DESCRIBE_PROGRAM_MEMBERS.getEndpoint(), null, null, new MarketoResponseJetty92EntityReader<>(this.readTimeoutMillis)); + return jsonResponse.getResult().get(0); + } + + public String createProgramMembersBulkExtract(List extractFields, int programId) + { + MarketoBulkExtractRequest marketoBulkExtractRequest = new MarketoBulkExtractRequest(); + if (extractFields != null) { + marketoBulkExtractRequest.setFields(extractFields); + } + marketoBulkExtractRequest.setFormat("CSV"); + Map filterMap = new HashMap<>(); + filterMap.put("programId", programId); + marketoBulkExtractRequest.setFilter(filterMap); + return sendCreateBulkExtractRequest(marketoBulkExtractRequest, MarketoRESTEndpoint.CREATE_PROGRAM_MEMBERS_EXPORT_JOB); + } + + public void startProgramMembersBulkExtract(String exportId) + { + startBulkExtract(MarketoRESTEndpoint.START_PROGRAM_MEMBERS_EXPORT_JOB, exportId); + } + + public ObjectNode waitProgramMembersExportJobComplete(String exportId, int pollingInterval, int waitTimeout) throws InterruptedException + { + return waitExportJobComplete(MarketoRESTEndpoint.GET_PROGRAM_MEMBERS_EXPORT_STATUS, exportId, pollingInterval, waitTimeout); + } + + public InputStream getProgramMemberBulkExtractResult(String exportId, BulkExtractRangeHeader bulkExtractRangeHeader) + { + return getBulkExtractResult(MarketoRESTEndpoint.GET_PROGRAM_MEMBERS_EXPORT_RESULT, exportId, bulkExtractRangeHeader); + } } diff --git a/src/test/java/org/embulk/input/marketo/delegate/LeadBulkExtractInputPluginTest.java b/src/test/java/org/embulk/input/marketo/delegate/LeadBulkExtractInputPluginTest.java index 3c34a25..ddf63b1 100644 --- a/src/test/java/org/embulk/input/marketo/delegate/LeadBulkExtractInputPluginTest.java +++ b/src/test/java/org/embulk/input/marketo/delegate/LeadBulkExtractInputPluginTest.java @@ -21,7 +21,6 @@ import org.junit.Rule; import org.junit.Test; import org.mockito.ArgumentCaptor; -import org.mockito.Mockito; import java.io.IOException; import java.text.DateFormat; @@ -35,7 +34,12 @@ import static org.embulk.input.marketo.MarketoUtilsTest.CONFIG_MAPPER; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; /** * Created by khuutantaitai on 10/3/17. diff --git a/src/test/java/org/embulk/input/marketo/delegate/ProgramMembersBulkExtractInputPluginTest.java b/src/test/java/org/embulk/input/marketo/delegate/ProgramMembersBulkExtractInputPluginTest.java new file mode 100644 index 0000000..669432a --- /dev/null +++ b/src/test/java/org/embulk/input/marketo/delegate/ProgramMembersBulkExtractInputPluginTest.java @@ -0,0 +1,93 @@ +package org.embulk.input.marketo.delegate; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.embulk.EmbulkTestRuntime; +import org.embulk.base.restclient.ServiceResponseMapper; +import org.embulk.base.restclient.record.ValueLocator; +import org.embulk.config.ConfigLoader; +import org.embulk.config.ConfigSource; +import org.embulk.input.marketo.model.BulkExtractRangeHeader; +import org.embulk.input.marketo.rest.MarketoRestClient; +import org.embulk.input.marketo.rest.RecordPagingIterable; +import org.embulk.spi.Column; +import org.embulk.spi.PageBuilder; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.mockito.ArgumentCaptor; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.embulk.input.marketo.MarketoUtilsTest.CONFIG_MAPPER; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class ProgramMembersBulkExtractInputPluginTest +{ + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + @Rule + public EmbulkTestRuntime embulkTestRuntime = new EmbulkTestRuntime(); + + private ProgramMembersBulkExtractInputPlugin bulkExtractInputPlugin; + + private ConfigSource configSource; + + private MarketoRestClient mockMarketoRestclient; + + @Before + public void prepare() throws IOException + { + bulkExtractInputPlugin = spy(new ProgramMembersBulkExtractInputPlugin()); + ConfigLoader configLoader = embulkTestRuntime.getInjector().getInstance(ConfigLoader.class); + configSource = configLoader.fromYaml(this.getClass().getResourceAsStream("/config/rest_config.yaml")); + mockMarketoRestclient = mock(MarketoRestClient.class); + doReturn(mockMarketoRestclient).when(bulkExtractInputPlugin).createMarketoRestClient(any(ProgramMembersBulkExtractInputPlugin.PluginTask.class)); + } + + @Test + public void testRun() throws InterruptedException, IOException + { + RecordPagingIterable mockProgramRecords = mock(RecordPagingIterable.class); + ProgramMembersBulkExtractInputPlugin.PluginTask task = CONFIG_MAPPER.map(configSource, ProgramMembersBulkExtractInputPlugin.PluginTask.class); + PageBuilder pageBuilder = mock(PageBuilder.class); + String exportId1 = "exportId1"; + List programs = new ArrayList<>(); + programs.add(OBJECT_MAPPER.createObjectNode().put("id", 100)); + ObjectNode marketoFields = (ObjectNode) OBJECT_MAPPER.readTree(this.getClass().getResourceAsStream("/fixtures/program_members_describe.json")); + when(mockProgramRecords.iterator()).thenReturn(programs.iterator()); + ObjectNode objectNode = OBJECT_MAPPER.createObjectNode().put("numberOfRecords", 3); + when(mockMarketoRestclient.waitProgramMembersExportJobComplete(anyString(), anyInt(), anyInt())).thenReturn(objectNode); + when(mockMarketoRestclient.describeProgramMembers()).thenReturn(marketoFields); + when(mockMarketoRestclient.createProgramMembersBulkExtract(any(List.class), any(Integer.class))).thenReturn(exportId1).thenReturn(null); + when(mockMarketoRestclient.getProgramMemberBulkExtractResult(eq(exportId1), any(BulkExtractRangeHeader.class))).thenReturn(this.getClass().getResourceAsStream("/fixtures/program_members_extract.csv")); + when(mockMarketoRestclient.getPrograms()).thenReturn(mockProgramRecords); + bulkExtractInputPlugin.validateInputTask(task); + ServiceResponseMapper mapper = bulkExtractInputPlugin.buildServiceResponseMapper(task); + bulkExtractInputPlugin.ingestServiceData(task, mapper.createRecordImporter(), 1, pageBuilder); + ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(Long.class); + Column idColumn = mapper.getEmbulkSchema().lookupColumn("mk_leadId"); + verify(pageBuilder, times(3)).setLong(eq(idColumn), argumentCaptor.capture()); + verify(mockMarketoRestclient, times(1)).startProgramMembersBulkExtract(eq(exportId1)); + verify(mockMarketoRestclient, times(1)).waitProgramMembersExportJobComplete(eq(exportId1), eq(task.getPollingIntervalSecond()), eq(task.getBulkJobTimeoutSecond())); + verify(mockMarketoRestclient, times(1)).createProgramMembersBulkExtract(anyList(), anyInt()); + List leadIds = argumentCaptor.getAllValues(); + Assert.assertEquals(3, leadIds.size()); + Assert.assertTrue(leadIds.contains(452L)); + Assert.assertTrue(leadIds.contains(453L)); + Assert.assertTrue(leadIds.contains(454L)); + } +} diff --git a/src/test/java/org/embulk/input/marketo/rest/MarketoBaseRestClientTest.java b/src/test/java/org/embulk/input/marketo/rest/MarketoBaseRestClientTest.java index dde61f7..a2c7aef 100644 --- a/src/test/java/org/embulk/input/marketo/rest/MarketoBaseRestClientTest.java +++ b/src/test/java/org/embulk/input/marketo/rest/MarketoBaseRestClientTest.java @@ -16,6 +16,7 @@ import org.embulk.input.marketo.model.MarketoError; import org.embulk.input.marketo.model.MarketoResponse; import org.embulk.spi.DataException; +import org.embulk.util.retryhelper.jetty92.DefaultJetty92ClientCreator; import org.embulk.util.retryhelper.jetty92.Jetty92ClientCreator; import org.embulk.util.retryhelper.jetty92.Jetty92RetryHelper; import org.embulk.util.retryhelper.jetty92.Jetty92SingleRequester; @@ -37,6 +38,8 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; +import static org.embulk.input.marketo.rest.MarketoResponseJetty92EntityReader.jsonResponseInvalid; + /** * Created by tai.khuu on 9/21/17. */ @@ -323,10 +326,58 @@ public void testDoRequesterRetry() Assert.assertTrue(jetty92SingleRequester.toRetry(new SocketTimeoutException())); Assert.assertTrue(jetty92SingleRequester.toRetry(new TimeoutException())); Assert.assertTrue(jetty92SingleRequester.toRetry(new EOFException())); + + Assert.assertTrue(jetty92SingleRequester.toRetry(new DataException(jsonResponseInvalid))); // Call 3 times First call then 602 error and 601 error Mockito.verify(mockJetty92, Mockito.times(3)).requestWithRetry(Mockito.any(StringJetty92ResponseEntityReader.class), Mockito.any(Jetty92SingleRequester.class)); } + @Test(expected = DataException.class) + public void testResponseInvalidJson() throws Exception + { + MarketoResponseJetty92EntityReader reader = Mockito.spy(new MarketoResponseJetty92EntityReader<>(10000)); + Response.Listener listener = Mockito.mock(Response.Listener.class); + Mockito.doReturn(listener).when(reader).getListener(); + Response response = Mockito.mock(Response.class); + Mockito.doReturn(200).when(response).getStatus(); + Mockito.doReturn(response).when(reader).getResponse(); + String ret = "{\n" + + " \"requestId\": \"d01f#15d672f8560\",\n" + + " \"result\": [\n" + + " {\n" + + " \"batchId\": 3404,\n" + + " \"importId\": \"3404\",\n" + + " \"status\": \"Queued\"\n" + + " }\n" + + " ,\n" + + " \"success\": true\n" + + "}\n"; + Mockito.doReturn(ret).when(reader).readResponseContentInString(); + Jetty92RetryHelper retryHelper = Mockito.spy(new Jetty92RetryHelper(1, + 1000, 12000, + new DefaultJetty92ClientCreator(10000, 10000))); + retryHelper.requestWithRetry(reader, new Jetty92SingleRequester() + { + @Override + public void requestOnce(HttpClient client, Response.Listener responseListener) + { + // do nothing + } + + @Override + protected boolean isResponseStatusToRetry(Response response) + { + return false; + } + + @Override + protected boolean isExceptionToRetry(Exception exception) + { + return false; + } + }); + } + private HttpResponseException createHttpResponseException(int statusCode) { HttpResponseException exception = Mockito.mock(HttpResponseException.class); diff --git a/src/test/resources/fixtures/program_members_describe.json b/src/test/resources/fixtures/program_members_describe.json new file mode 100644 index 0000000..1ad6101 --- /dev/null +++ b/src/test/resources/fixtures/program_members_describe.json @@ -0,0 +1,174 @@ +{ + "name": "API Program Membership", + "description": "Map for API program membership fields", + "createdAt": "2021-03-20T01:24:34Z", + "updatedAt": "2021-03-20T01:24:34Z", + "dedupeFields": [ + "leadId", + "programId" + ], + "searchableFields": [ + [ + "leadId" + ], + [ + "programMemberCustomFieldTest" + ], + [ + "reachedSuccess" + ], + [ + "statusName" + ] + ], + "fields": [ + { + "name": "acquiredBy", + "displayName": "acquiredBy", + "dataType": "boolean", + "updateable": false, + "crmManaged": false + }, + { + "name": "attendanceLikelihood", + "displayName": "attendanceLikelihood", + "dataType": "integer", + "updateable": false, + "crmManaged": false + }, + { + "name": "createdAt", + "displayName": "createdAt", + "dataType": "datetime", + "updateable": false, + "crmManaged": false + }, + { + "name": "isExhausted", + "displayName": "isExhausted", + "dataType": "boolean", + "updateable": false, + "crmManaged": false + }, + { + "name": "leadId", + "displayName": "leadId", + "dataType": "integer", + "updateable": false, + "crmManaged": false + }, + { + "name": "membershipDate", + "displayName": "membershipDate", + "dataType": "datetime", + "updateable": false, + "crmManaged": false + }, + { + "name": "nurtureCadence", + "displayName": "nurtureCadence", + "dataType": "string", + "length": 4, + "updateable": false, + "crmManaged": false + }, + { + "name": "program", + "displayName": "program", + "dataType": "string", + "length": 255, + "updateable": false, + "crmManaged": false + }, + { + "name": "programId", + "displayName": "programId", + "dataType": "integer", + "updateable": false, + "crmManaged": false + }, + { + "name": "reachedSuccess", + "displayName": "reachedSuccess", + "dataType": "boolean", + "updateable": false, + "crmManaged": false + }, + { + "name": "reachedSuccessDate", + "displayName": "reachedSuccessDate", + "dataType": "datetime", + "updateable": false, + "crmManaged": false + }, + { + "name": "registrationLikelihood", + "displayName": "registrationLikelihood", + "dataType": "integer", + "updateable": false, + "crmManaged": false + }, + { + "name": "statusName", + "displayName": "statusName", + "dataType": "string", + "length": 255, + "updateable": false, + "crmManaged": false + }, + { + "name": "statusReason", + "displayName": "statusReason", + "dataType": "string", + "length": 255, + "updateable": false, + "crmManaged": false + }, + { + "name": "trackName", + "displayName": "trackName", + "dataType": "string", + "length": 255, + "updateable": false, + "crmManaged": false + }, + { + "name": "updatedAt", + "displayName": "updatedAt", + "dataType": "datetime", + "updateable": false, + "crmManaged": false + }, + { + "name": "waitlistPriority", + "displayName": "waitlistPriority", + "dataType": "integer", + "updateable": false, + "crmManaged": false + }, + { + "name": "programMemberCustomFieldTest", + "displayName": "ProgramMemberCustomFieldTest", + "dataType": "string", + "length": 255, + "updateable": true, + "crmManaged": false + }, + { + "name": "registrationCode", + "displayName": "registrationCode", + "dataType": "string", + "length": 100, + "updateable": true, + "crmManaged": false + }, + { + "name": "webinarUrl", + "displayName": "webinarUrl", + "dataType": "string", + "length": 2000, + "updateable": true, + "crmManaged": false + } + ] +} \ No newline at end of file diff --git a/src/test/resources/fixtures/program_members_extract.csv b/src/test/resources/fixtures/program_members_extract.csv new file mode 100644 index 0000000..681b7d6 --- /dev/null +++ b/src/test/resources/fixtures/program_members_extract.csv @@ -0,0 +1,4 @@ +isExhausted,webinarUrl,program,reachedSuccessDate,trackName,acquiredBy,membershipDate,createdAt,statusReason,registrationLikelihood,nurtureCadence,programMemberCustomFieldTest,statusName,registrationCode,attendanceLikelihood,waitlistPriority,programId,reachedSuccess,leadId,updatedAt +0,null,DA - Demo Account (DO NOT EDIT),null,null,1,2019-04-12T05:58:10Z,2019-04-12T05:58:10Z,null,null,null,null,02 Opened,null,null,null,1001,0,452,2019-04-12T05:58:10Z +0,null,DA - Demo Account (DO NOT EDIT),null,null,1,2019-04-12T05:58:10Z,2019-04-12T05:58:10Z,null,null,null,null,01 Sent,null,null,null,1001,0,453,2019-04-12T05:58:10Z +0,null,DA - Demo Account (DO NOT EDIT),null,null,1,2019-04-12T05:58:10Z,2019-04-12T05:58:10Z,null,null,null,null,03 Clicked,null,null,null,1001,1,454,2019-04-12T05:58:10Z