Skip to content

Commit

Permalink
Oppdaterer kafka config (#1168)
Browse files Browse the repository at this point in the history
* Fikset deprecated kode i kafka config. Setter all config i kafkalistenerfactory og fjerner alt i application.yml for å ha all kafka config på ett sted

* Add deserializers in config
  • Loading branch information
olekvernberg authored Nov 11, 2024
1 parent e9ea8e9 commit feda1fc
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 23 deletions.
18 changes: 16 additions & 2 deletions src/main/kotlin/no/nav/familie/ef/mottak/config/KafkaConfig.kt
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
package no.nav.familie.ef.mottak.config

import io.confluent.kafka.serializers.KafkaAvroDeserializer
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig
import no.nav.familie.kafka.KafkaErrorHandler
import no.nav.joarkjournalfoeringhendelser.JournalfoeringHendelseRecord
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.springframework.beans.factory.ObjectProvider
import org.springframework.boot.autoconfigure.kafka.KafkaProperties
import org.springframework.boot.ssl.SslBundles
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.kafka.annotation.EnableKafka
Expand All @@ -18,12 +24,20 @@ class KafkaConfig {
fun kafkaJournalføringHendelseListenerContainerFactory(
properties: KafkaProperties,
kafkaErrorHandler: KafkaErrorHandler,
sslBundles: ObjectProvider<SslBundles>,
): ConcurrentKafkaListenerContainerFactory<Long, JournalfoeringHendelseRecord> {
properties.properties["specific.avro.reader"] = "true"
val consumerProperties = properties.buildConsumerProperties(sslBundles.getIfAvailable())
consumerProperties[ConsumerConfig.MAX_POLL_RECORDS_CONFIG] = 1
consumerProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest"
consumerProperties[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = false
consumerProperties[KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG] = true
consumerProperties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
consumerProperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = KafkaAvroDeserializer::class.java

val factory = ConcurrentKafkaListenerContainerFactory<Long, JournalfoeringHendelseRecord>()
factory.containerProperties.ackMode = ContainerProperties.AckMode.MANUAL_IMMEDIATE
factory.containerProperties.authExceptionRetryInterval = Duration.ofSeconds(2)
factory.consumerFactory = DefaultKafkaConsumerFactory(properties.buildConsumerProperties())
factory.consumerFactory = DefaultKafkaConsumerFactory(consumerProperties)
factory.setCommonErrorHandler(kafkaErrorHandler)
return factory
}
Expand Down
7 changes: 0 additions & 7 deletions src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,6 @@ spring:
type: PKCS12
location: ${KAFKA_TRUSTSTORE_PATH}
password: ${KAFKA_CREDSTORE_PASSWORD}
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
group-id: srvfamilie-ef-mot
max-poll-records: 1
auto-offset-reset: latest
enable-auto-commit: false
producer:
acks: all
key-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
Expand Down
7 changes: 0 additions & 7 deletions src/test/resources/application-integrationtest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,6 @@ spring:
schema.registry.url: http://localhost:8081
security:
protocol: PLAINTEXT
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
group-id: srvc01
max-poll-records: 1
auto-offset-reset: latest
enable-auto-commit: false

familie:
ef:
Expand Down
7 changes: 0 additions & 7 deletions src/test/resources/application-local.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,6 @@ spring:
schema.registry.url: http://localhost:8081
security:
protocol: PLAINTEXT
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
group-id: srvc01
max-poll-records: 1
auto-offset-reset: latest
enable-auto-commit: false

familie:
ef:
Expand Down

0 comments on commit feda1fc

Please sign in to comment.