Skip to content

Commit

Permalink
Merge pull request #12 from trocco-io/improve_error_handling
Browse files Browse the repository at this point in the history
Improve error handling
  • Loading branch information
d-hrs authored Apr 21, 2023
2 parents 59732e4 + 6c7f72b commit cc59ce3
Show file tree
Hide file tree
Showing 8 changed files with 270 additions and 61 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ on:
jobs:
main:
runs-on: ubuntu-latest
permissions:
packages: write
contents: read

steps:
- uses: actions/checkout@v1
Expand Down
30 changes: 19 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,32 +1,40 @@
# Sf Bulk Api file output plugin for Embulk
# embulk-output-sf_bulk_api

TODO: Write short description here and build.gradle file.
Embulk output plugin for Salesforce Bulk API.

## Overview

* **Plugin type**: file output
* **Plugin type**: output
* **Load all or nothing**: no
* **Resume supported**: no
* **Cleanup supported**: yes

## Configuration

- **option1**: description (integer, required)
- **option2**: description (string, default: `"myvalue"`)
- **option3**: description (string, default: `null`)
- **username**: Login username (string, required)
- **password**: Login password (string, required)
- **security_token**: User’s security token (string, required)
- **auth_end_point**: SOAP API authentication endpoint (string, default: `https://login.salesforce.com/services/Soap/u/`)
- **api_version**: SOAP API version (string, default: `46.0`)
- **object**: Salesforce object (sObject) type (string, required)
- **action_type**: Action type (`insert`, `update`, or `upsert`, required)
- **upsert_key**: Name of the external ID field (string, required when `upsert` action, default: `key`)
- **throw_if_failed**: Whether to throw exception at the end of transaction if there are one or more failures (boolean, default: `true`)

## Example

```yaml
out:
type: sf_bulk_api
option1: example1
option2: example2
username: username
password: password
security_token: security_token
object: ExampleCustomObject__c
action_type: upsert
upsert_key: Name
```
## Build
```
$ ./gradlew gem # -t to watch change of files and rebuild continuously
$ ./gradlew gem
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package org.embulk.output.sf_bulk_api;

public class AbortException extends RuntimeException {
public AbortException(final Throwable cause) {
super(cause);
}
}
175 changes: 175 additions & 0 deletions src/main/java/org/embulk/output/sf_bulk_api/ErrorHandler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
package org.embulk.output.sf_bulk_api;

import com.google.gson.Gson;
import com.sforce.soap.partner.IError;
import com.sforce.soap.partner.SaveResult;
import com.sforce.soap.partner.UpsertResult;
import com.sforce.soap.partner.fault.ApiFault;
import com.sforce.soap.partner.fault.ExceptionCode;
import com.sforce.soap.partner.sobject.SObject;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.embulk.spi.Column;
import org.embulk.spi.Schema;
import org.embulk.spi.time.Timestamp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ErrorHandler {
private static final List<ExceptionCode> ABORT_EXCEPTION_CODES =
Collections.unmodifiableList(
Arrays.asList(
ExceptionCode.INVALID_SESSION_ID,
ExceptionCode.INVALID_OPERATION_WITH_EXPIRED_PASSWORD));
private static final Gson GSON = new Gson();

private final Logger logger = LoggerFactory.getLogger(getClass());
private final Schema schema;

public ErrorHandler(final Schema schema) {
this.schema = schema;
}

public long handleFault(final List<SObject> sObjects, final ApiFault fault) {
if (ABORT_EXCEPTION_CODES.contains(fault.getExceptionCode())) {
throw new AbortException(fault); // Abort immediately
}
sObjects.forEach(sObject -> log(sObject, fault));
return sObjects.size();
}

private void log(final SObject sObject, final ApiFault fault) {
logger.error(String.format("[output sf_bulk_api failure] %s", getFailure(sObject, fault)));
}

private String getFailure(final SObject sObject, final ApiFault fault) {
final Map<String, Object> map = new LinkedHashMap<>();
map.put("object", getObject(sObject));
map.put("errors", getErrors(fault));
return GSON.toJson(map);
}

private List<Map<String, Object>> getErrors(final ApiFault fault) {
return Arrays.stream(new ApiFault[] {fault}).map(this::getError).collect(Collectors.toList());
}

private Map<String, Object> getError(final ApiFault fault) {
final Map<String, Object> map = new LinkedHashMap<>();
map.put("code", fault.getExceptionCode());
map.put("message", fault.getExceptionMessage());
return map;
}

public long handleErrors(final List<SObject> sObjects, final SaveResult[] results) {
return handleErrors(
sObjects,
Arrays.stream(results)
.map(
result ->
new Result() {
@Override
public boolean isFailure() {
return !result.isSuccess();
}

@Override
public IError[] getErrors() {
return result.getErrors();
}
})
.collect(Collectors.toList()));
}

public long handleErrors(final List<SObject> sObjects, final UpsertResult[] results) {
return handleErrors(
sObjects,
Arrays.stream(results)
.map(
result ->
new Result() {
@Override
public boolean isFailure() {
return !result.isSuccess();
}

@Override
public IError[] getErrors() {
return result.getErrors();
}
})
.collect(Collectors.toList()));
}

private long handleErrors(final List<SObject> sObjects, final List<Result> results) {
if (sObjects.size() != results.size()) {
throw new IllegalArgumentException(
String.format("%d != %d", sObjects.size(), results.size()));
}
IntStream.range(0, sObjects.size())
.forEach(index -> log(sObjects.get(index), results.get(index)));
return results.stream().filter(Result::isFailure).count();
}

private void log(final SObject sObject, final Result result) {
if (!result.isFailure()) {
return;
}
logger.error(String.format("[output sf_bulk_api failure] %s", getFailure(sObject, result)));
}

private String getFailure(final SObject sObject, final Result result) {
final Map<String, Object> map = new LinkedHashMap<>();
map.put("object", getObject(sObject));
map.put("errors", getErrors(result));
return GSON.toJson(map);
}

private Map<String, Object> getObject(final SObject sObject) {
final Map<String, Object> map = new LinkedHashMap<>();
schema.getColumns().forEach(column -> map.put(column.getName(), getField(sObject, column)));
return map;
}

private Object getField(final SObject sObject, final Column column) {
final Object field = sObject.getField(column.getName());
if (field == null) {
return null;
}
final String type = column.getType().getName();
if ("timestamp".equals(type)) {
return Timestamp.ofInstant(((Calendar) field).toInstant()).toString();
} else if ("boolean".equals(type)) {
return Boolean.valueOf(field.toString());
} else if ("double".equals(type)) {
return Double.valueOf(field.toString());
} else if ("long".equals(type)) {
return Long.valueOf(field.toString());
} else {
return field.toString();
}
}

private List<Map<String, Object>> getErrors(final Result result) {
return Arrays.stream(result.getErrors()).map(this::getError).collect(Collectors.toList());
}

private Map<String, Object> getError(final IError error) {
final Map<String, Object> map = new LinkedHashMap<>();
map.put("code", error.getStatusCode());
map.put("message", error.getMessage());
map.put("fields", error.getFields());
return map;
}

private interface Result {
boolean isFailure();

IError[] getErrors();
}
}
55 changes: 15 additions & 40 deletions src/main/java/org/embulk/output/sf_bulk_api/ForceClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@
import com.sforce.soap.partner.sobject.SObject;
import com.sforce.ws.ConnectionException;
import com.sforce.ws.ConnectorConfig;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -19,27 +17,28 @@ public class ForceClient {
private final Logger logger = LoggerFactory.getLogger(ForceClient.class);
private final ActionType actionType;
private final String upsertKey;
private final ErrorHandler errorHandler;

public ForceClient(final PluginTask pluginTask) throws ConnectionException {
public ForceClient(final PluginTask pluginTask, final ErrorHandler errorHandler)
throws ConnectionException {
final ConnectorConfig connectorConfig = createConnectorConfig(pluginTask);
this.partnerConnection = Connector.newConnection(connectorConfig);
this.actionType = ActionType.convertActionType(pluginTask.getActionType());
this.upsertKey = pluginTask.getUpsertKey();
this.errorHandler = errorHandler;
}

public void action(final List<SObject> sObjects) throws ConnectionException {
public long action(final List<SObject> sObjects) throws ConnectionException {
logger.info("sObjects size:" + sObjects.size());
switch (this.actionType) {
case INSERT:
insert(sObjects);
return;
return insert(sObjects);
case UPSERT:
upsert(this.upsertKey, sObjects);
return;
return upsert(this.upsertKey, sObjects);
case UPDATE:
update(sObjects);
return;
return update(sObjects);
default:
throw new AssertionError("Invalid actionType: " + actionType);
}
}

Expand All @@ -55,46 +54,22 @@ private ConnectorConfig createConnectorConfig(final PluginTask pluginTask) {
return config;
}

private void insert(final List<SObject> sObjects) throws ConnectionException {
private long insert(final List<SObject> sObjects) throws ConnectionException {
final SaveResult[] saveResultArray =
partnerConnection.create(sObjects.toArray(new SObject[sObjects.size()]));
loggingSaveErrorMessage(saveResultArray);
return errorHandler.handleErrors(sObjects, saveResultArray);
}

private void upsert(final String key, final List<SObject> sObjects) throws ConnectionException {
private long upsert(final String key, final List<SObject> sObjects) throws ConnectionException {
final UpsertResult[] upsertResultArray =
partnerConnection.upsert(key, sObjects.toArray(new SObject[sObjects.size()]));
final List<UpsertResult> upsertResults = Arrays.asList(upsertResultArray);
upsertResults.forEach(
result -> {
if (!result.isSuccess()) {
final List<String> errors =
Arrays.asList(result.getErrors()).stream()
.map(e -> e.getStatusCode() + ":" + e.getMessage())
.collect(Collectors.toList());
logger.warn(String.join(",", errors));
}
});
return errorHandler.handleErrors(sObjects, upsertResultArray);
}

private void update(final List<SObject> sObjects) throws ConnectionException {
private long update(final List<SObject> sObjects) throws ConnectionException {
final SaveResult[] saveResultArray =
partnerConnection.update(sObjects.toArray(new SObject[sObjects.size()]));
loggingSaveErrorMessage(saveResultArray);
}

private void loggingSaveErrorMessage(final SaveResult[] saveResultArray) {
final List<SaveResult> saveResults = Arrays.asList(saveResultArray);
saveResults.forEach(
result -> {
if (!result.isSuccess()) {
final List<String> errors =
Arrays.asList(result.getErrors()).stream()
.map(e -> e.getStatusCode() + ":" + e.getMessage())
.collect(Collectors.toList());
logger.warn(String.join(",", errors));
}
});
return errorHandler.handleErrors(sObjects, saveResultArray);
}

private enum ActionType {
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/org/embulk/output/sf_bulk_api/PluginTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,8 @@ interface PluginTask extends Task {
@Config("upsert_key")
@ConfigDefault("\"key\"")
String getUpsertKey();

@Config("throw_if_failed")
@ConfigDefault("\"true\"")
boolean getThrowIfFailed();
}
Loading

0 comments on commit cc59ce3

Please sign in to comment.