From 1edcd4e9aba94db99ff5c9ce48ccf9b9fe15d7e5 Mon Sep 17 00:00:00 2001 From: Nithin-Kasam Date: Tue, 6 Aug 2024 22:12:41 +0530 Subject: [PATCH] Kafka beats implementation (#20067) * sys logs from kafka through file beats implementation * updated change log --- changelog/unreleased/issue-5254.toml | 5 ++ .../inputs/beats/kafka/BeatsKafkaInput.java | 72 +++++++++++++++++++ .../graylog2/inputs/codecs/CodecsModule.java | 3 + .../shared/bindings/MessageInputBindings.java | 2 + 4 files changed, 82 insertions(+) create mode 100644 changelog/unreleased/issue-5254.toml create mode 100644 graylog2-server/src/main/java/org/graylog2/inputs/beats/kafka/BeatsKafkaInput.java diff --git a/changelog/unreleased/issue-5254.toml b/changelog/unreleased/issue-5254.toml new file mode 100644 index 000000000000..994852361a3a --- /dev/null +++ b/changelog/unreleased/issue-5254.toml @@ -0,0 +1,5 @@ +type="a" +message="Added new Beats Kafka input to ingest system logs from Kafka topic using Filebeat." + +issues=["5254"] +pulls=["20067"] diff --git a/graylog2-server/src/main/java/org/graylog2/inputs/beats/kafka/BeatsKafkaInput.java b/graylog2-server/src/main/java/org/graylog2/inputs/beats/kafka/BeatsKafkaInput.java new file mode 100644 index 000000000000..737683d397e1 --- /dev/null +++ b/graylog2-server/src/main/java/org/graylog2/inputs/beats/kafka/BeatsKafkaInput.java @@ -0,0 +1,72 @@ +/* + * Copyright (C) 2020 Graylog, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + */ +package org.graylog2.inputs.beats.kafka; + +import com.codahale.metrics.MetricRegistry; +import com.google.inject.assistedinject.Assisted; +import com.google.inject.assistedinject.AssistedInject; +import jakarta.inject.Inject; +import org.graylog.plugins.beats.Beats2Codec; +import org.graylog2.inputs.transports.KafkaTransport; +import org.graylog2.plugin.LocalMetricRegistry; +import org.graylog2.plugin.ServerStatus; +import org.graylog2.plugin.configuration.Configuration; +import org.graylog2.plugin.inputs.MessageInput; +import org.graylog2.plugin.inputs.annotations.ConfigClass; +import org.graylog2.plugin.inputs.annotations.FactoryClass; + +public class BeatsKafkaInput extends MessageInput { + private static final String NAME = "Beats Kafka"; + + @AssistedInject + public BeatsKafkaInput(@Assisted Configuration configuration, + MetricRegistry metricRegistry, + KafkaTransport.Factory transport, + Beats2Codec.Factory codec, + LocalMetricRegistry localRegistry, + Config config, + Descriptor descriptor, ServerStatus serverStatus) { + super(metricRegistry, configuration, transport.create(configuration), localRegistry, codec.create(configuration), config, descriptor, serverStatus); + } + + @FactoryClass + public interface Factory extends MessageInput.Factory { + @Override + BeatsKafkaInput create(Configuration configuration); + + @Override + Config getConfig(); + + @Override + Descriptor getDescriptor(); + } + + public static class Descriptor extends MessageInput.Descriptor { + @Inject + public Descriptor() { + super(NAME, false, ""); + } + } + + @ConfigClass + public static class Config extends MessageInput.Config { + @Inject + public Config(KafkaTransport.Factory transport, Beats2Codec.Factory codec) { + super(transport.getConfig(), codec.getConfig()); + } + } +} diff --git a/graylog2-server/src/main/java/org/graylog2/inputs/codecs/CodecsModule.java b/graylog2-server/src/main/java/org/graylog2/inputs/codecs/CodecsModule.java index 41a88ece3b4b..db67ea2e98eb 100644 --- a/graylog2-server/src/main/java/org/graylog2/inputs/codecs/CodecsModule.java +++ b/graylog2-server/src/main/java/org/graylog2/inputs/codecs/CodecsModule.java @@ -24,6 +24,7 @@ import com.google.inject.Scopes; import com.google.inject.multibindings.MapBinder; +import org.graylog.plugins.beats.Beats2Codec; import org.graylog2.plugin.inject.Graylog2Module; import org.graylog2.plugin.inputs.codecs.Codec; @@ -40,5 +41,7 @@ protected void configure() { installCodec(mapBinder, RandomHttpMessageCodec.class); installCodec(mapBinder, GelfCodec.class); installCodec(mapBinder, JsonPathCodec.class); + installCodec(mapBinder, Beats2Codec.class); + } } diff --git a/graylog2-server/src/main/java/org/graylog2/shared/bindings/MessageInputBindings.java b/graylog2-server/src/main/java/org/graylog2/shared/bindings/MessageInputBindings.java index 73ce201e6bbc..b91c638d6698 100644 --- a/graylog2-server/src/main/java/org/graylog2/shared/bindings/MessageInputBindings.java +++ b/graylog2-server/src/main/java/org/graylog2/shared/bindings/MessageInputBindings.java @@ -18,6 +18,7 @@ import com.google.inject.multibindings.MapBinder; import org.graylog.plugins.beats.BeatsInputPluginModule; +import org.graylog2.inputs.beats.kafka.BeatsKafkaInput; import org.graylog2.inputs.codecs.CodecsModule; import org.graylog2.inputs.gelf.amqp.GELFAMQPInput; import org.graylog2.inputs.gelf.http.GELFHttpInput; @@ -61,6 +62,7 @@ protected void configure() { installInput(inputMapBinder, GELFAMQPInput.class, GELFAMQPInput.Factory.class); installInput(inputMapBinder, GELFKafkaInput.class, GELFKafkaInput.Factory.class); installInput(inputMapBinder, JsonPathInput.class, JsonPathInput.Factory.class); + installInput(inputMapBinder, BeatsKafkaInput.class, BeatsKafkaInput.Factory.class); install(new BeatsInputPluginModule()); }