diff --git a/CHANGES_NEXT_RELEASE b/CHANGES_NEXT_RELEASE index bc22e22d6..6bfe7dfbe 100644 --- a/CHANGES_NEXT_RELEASE +++ b/CHANGES_NEXT_RELEASE @@ -1 +1,2 @@ +- [cygnus-ngsi][cygnus-common] Fix way to handle CygnusPersistenceException to allow batch retries in arcgis-sink if `.batch_ttl` configured - [cygnus-ngsi][cygnus-common] Remove new line chars from Arcgis logs diff --git a/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/sinks/NGSIArcgisFeatureTableSink.java b/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/sinks/NGSIArcgisFeatureTableSink.java index 1709a8031..151c278d9 100644 --- a/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/sinks/NGSIArcgisFeatureTableSink.java +++ b/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/sinks/NGSIArcgisFeatureTableSink.java @@ -167,7 +167,7 @@ protected int featuresBatched() { * @return The persistence backend * @throws CygnusRuntimeError */ - protected ArcgisFeatureTable getPersistenceBackend(String featureServiceUrl) throws CygnusRuntimeError { + protected ArcgisFeatureTable getPersistenceBackend(String featureServiceUrl) throws CygnusPersistenceError, CygnusRuntimeError { LOGGER.debug("Current persistenceBackend has " + arcgisPersistenceBackend.size() +" tables "); @@ -187,7 +187,7 @@ protected ArcgisFeatureTable getPersistenceBackend(String featureServiceUrl) thr if (newTable.hasError() || !newTable.connected()) { LOGGER.error("Error creating new persistence Backend. " + newTable.getErrorDesc()); - throw new CygnusRuntimeError("[" + this.getName() + "Error creating Persistence backend: " + throw new CygnusPersistenceError("[" + this.getName() + "Error creating Persistence backend: " + newTable.getErrorCode() + " - " + newTable.getErrorDesc()); } else { arcgisPersistenceBackend.put(featureServiceUrl, newTable); @@ -199,7 +199,7 @@ protected ArcgisFeatureTable getPersistenceBackend(String featureServiceUrl) thr LOGGER.error("Error creating new persistence Backend. " +e.getClass().getSimpleName()); LOGGER.debug(stackTrace); - throw new CygnusRuntimeError("Error creating new persistence Backend. ", e.getClass().getName(), + throw new CygnusPersistenceError("Error creating new persistence Backend. ", e.getClass().getName(), e.getMessage()); } } @@ -306,34 +306,29 @@ void persistBatch(NGSIBatch batch) return; } // if - try { - // Iterate on the destinations - batch.startIterator(); + // Iterate on the destinations + batch.startIterator(); - while (batch.hasNext()) { - String destination = batch.getNextDestination(); - LOGGER.debug( - "[" + this.getName() + "] Processing sub-batch regarding the " + destination + " destination"); + while (batch.hasNext()) { + String destination = batch.getNextDestination(); + LOGGER.debug( + "[" + this.getName() + "] Processing sub-batch regarding the " + destination + " destination"); - // Get the events within the current sub-batch - ArrayList events = batch.getNextEvents(); + // Get the events within the current sub-batch + ArrayList events = batch.getNextEvents(); - // Get an aggregator for this destination and initialize it - NGSIArcgisAggregator aggregator = new NGSIArcgisAggregator(getrAcgisServicesUrl(), enableNameMappings); - - for (NGSIEvent event : events) { - aggregator.aggregate(event); - } // for + // Get an aggregator for this destination and initialize it + NGSIArcgisAggregator aggregator = new NGSIArcgisAggregator(getrAcgisServicesUrl(), enableNameMappings); - // Persist the aggregation - persistAggregation(aggregator); - batch.setNextPersisted(true); + for (NGSIEvent event : events) { + aggregator.aggregate(event); + } // for - } // while - } catch (Exception e) { - LOGGER.error("[" + this.getName() + "] Error persisting batch, " + e.getClass().getSimpleName() + "." + e.getMessage()); - throw new CygnusRuntimeError(e.getMessage()); - } + // Persist the aggregation + persistAggregation(aggregator); + batch.setNextPersisted(true); + + } // while } // persistBatch /* @@ -396,7 +391,7 @@ public void expirateRecords(long expirationTime) throws CygnusExpiratingError { * @param aggregator * @throws CygnusRuntimeError */ - public void persistAggregation(NGSIArcgisAggregator aggregator) throws CygnusRuntimeError { + public void persistAggregation(NGSIArcgisAggregator aggregator) throws CygnusPersistenceError, CygnusRuntimeError, CygnusBadContextData { try { List aggregationList = aggregator.getListArcgisAggregatorDomain(); LOGGER.debug("[" + this.getName() + "] persisting aggregation, " @@ -422,8 +417,16 @@ public void persistAggregation(NGSIArcgisAggregator aggregator) throws CygnusRun } } catch (CygnusRuntimeError e) { String stackTrace = ExceptionUtils.getFullStackTrace(e); - LOGGER.debug(" PersistAggregation Error: " + stackTrace); + LOGGER.debug(" PersistAggregation CygnusRuntimeError: " + stackTrace); + throw (e); + } catch (CygnusPersistenceError e) { + String stackTrace = ExceptionUtils.getFullStackTrace(e); + LOGGER.debug(" PersistAggregation CygnusPersistenceError: " + stackTrace); throw (e); + } catch (ArcgisException e) { + LOGGER.error("[" + this.getName() + "] Error persisting batch, " + e.getClass().getSimpleName() + " - " + + e.getMessage()); + throw new CygnusPersistenceError(e.getMessage()); } catch (Exception e) { LOGGER.error("[" + this.getName() + "] Error persisting batch, " + e.getClass().getSimpleName() + " - " + e.getMessage());