diff --git a/src/main/java/hlf/java/rest/client/config/KafkaProducerConfig.java b/src/main/java/hlf/java/rest/client/config/KafkaProducerConfig.java index e86dfa4..7e202df 100644 --- a/src/main/java/hlf/java/rest/client/config/KafkaProducerConfig.java +++ b/src/main/java/hlf/java/rest/client/config/KafkaProducerConfig.java @@ -22,6 +22,9 @@ @RefreshScope public class KafkaProducerConfig extends BaseKafkaConfig { + private static final String PRODUCER_ALL_ACKS = "all"; + private static final int RETRIES_CONFIG_FOR_AT_MOST_ONCE = 0; + @Autowired private KafkaProperties kafkaProperties; @Autowired private MeterRegistry meterRegistry; @@ -39,6 +42,15 @@ public ProducerFactory eventProducerFactory( props.put( ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, kafkaProducerProperties.getEnableIdempotence()); + if (kafkaProducerProperties.getEnableAtMostOnceSemantics()) { + // at-most once requires retries to be set as zero since the client wouldn't re-attempt a + // publish in case of Broker failure. + props.put(ProducerConfig.RETRIES_CONFIG, RETRIES_CONFIG_FOR_AT_MOST_ONCE); + props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, false); + + log.info("Kafka producer will be initialised with at-most once behaviour"); + } + // Azure event-hub config configureSaslProperties(props, kafkaProducerProperties.getSaslJaasConfig()); diff --git a/src/main/java/hlf/java/rest/client/config/KafkaProperties.java b/src/main/java/hlf/java/rest/client/config/KafkaProperties.java index f5df694..d909834 100644 --- a/src/main/java/hlf/java/rest/client/config/KafkaProperties.java +++ b/src/main/java/hlf/java/rest/client/config/KafkaProperties.java @@ -30,6 +30,7 @@ public static class Producer extends SSLProperties { private String topic; private String saslJaasConfig; private Boolean enableIdempotence; + private Boolean enableAtMostOnceSemantics; @Override public String toString() { diff --git a/src/main/java/hlf/java/rest/client/service/impl/EventPublishServiceImpl.java b/src/main/java/hlf/java/rest/client/service/impl/EventPublishServiceImpl.java index 5598531..3639069 100644 --- a/src/main/java/hlf/java/rest/client/service/impl/EventPublishServiceImpl.java +++ b/src/main/java/hlf/java/rest/client/service/impl/EventPublishServiceImpl.java @@ -130,16 +130,18 @@ public boolean publishChaincodeEvents( @Override public void onSuccess(SendResult result) { log.info( - "Sent message=[" - + payload - + "] with offset=[" - + result.getRecordMetadata().offset() - + "]"); + "Sent message '{}' to partition {} for offset {}", + payload, + result.getRecordMetadata().partition(), + result.getRecordMetadata().offset()); } @Override public void onFailure(Throwable ex) { - log.error("Unable to send message=[" + payload + "] due to : " + ex.getMessage()); + log.error( + "Failed to send message event for Transaction ID {} due to {}", + fabricTxId, + ex.getMessage()); } }); @@ -214,16 +216,18 @@ public boolean publishBlockEvents( @Override public void onSuccess(SendResult result) { log.info( - "Sent message=[" - + payload - + "] with offset=[" - + result.getRecordMetadata().offset() - + "]"); + "Sent message '{}' to partition {} for offset {}", + payload, + result.getRecordMetadata().partition(), + result.getRecordMetadata().offset()); } @Override public void onFailure(Throwable ex) { - log.error("Unable to send message=[" + payload + "] due to : " + ex.getMessage()); + log.error( + "Failed to send message event for Transaction ID {} due to {}", + fabricTxId, + ex.getMessage()); } });