Skip to content

Commit

Permalink
Merge pull request #49 from YarikRevich/feature/api-server-starter
Browse files Browse the repository at this point in the history
Feature: implement Kafka starter
  • Loading branch information
YarikRevich authored Jan 20, 2024
2 parents e90f517 + f64ed75 commit 179185f
Show file tree
Hide file tree
Showing 42 changed files with 940 additions and 277 deletions.
20 changes: 15 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@ All setup related operations are processed via **Makefile** placed in the root d

### CLI

In order to build CLI it's required to execute the following command. Initially it cleans the environment and build Java project using **Maven**
In order to build CLI it's required to execute the following command. Initially it cleans the environment and builds Java project using **Maven**
```shell
make build-cli
```

After the execution of command given above the executable will be generated and placed into **bin** folder in the root directory of the project

**CLI** build automatically places default **user.yaml** configuration file into ~/.resourcetracker/config directory.

### GUI

In order to build GUI it's required to execute the following command. Initially it cleans the environment and build Java project using **Maven**
Expand All @@ -38,15 +40,23 @@ make build-gui

After the execution of command given above the executable will be generated and placed into **bin** folder in the root directory of the project

GUI build automatically compiles API Server and places both executable JAR and other dependencies into **~/.resourcetracker/bin/api-server** directory
**GUI** build automatically compiles **API Server** and places both executable JAR and other dependencies into **~/.resourcetracker/bin/api-server** directory

It's highly recommended not to move API Server files from the default local directory
It's highly recommended not to move **API Server** files from the default local directory

### API Server

In order to build API Server it's required to execute the following command. Initially it cleans the environment and build Java project using **Maven**
In order to build **API Server** it's required to execute the following command. Initially it cleans the environment and build Java project using **Maven**
```shell
make build-api-server
```

After the execution of command given above the executable will be generated and placed into **bin** folder in the root directory of the project
After the execution of command given above the executable will be generated and placed into **bin** folder in the root directory of the project

## Use cases

### CLI



### GUI
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ public class PropertiesEntity {
@ConfigProperty(name = "kafka.partitions")
String kafkaPartitions;

@ConfigProperty(name = "kafka.readiness.period")
Integer kafkaReadinessPeriod;

@ConfigProperty(name = "terraform.directory")
String terraformDirectory;

Expand Down Expand Up @@ -57,9 +60,6 @@ public class PropertiesEntity {
@ConfigProperty(name = "resourcetracker-kafka.image")
String resourceTrackerKafkaImage;

@ConfigProperty(name = "resourcetracker-kafka.image.version")
String resourceTrackerKafkaImageVersion;

@ConfigProperty(name = "resourcetracker-kafka.host.alias")
String resourceTrackerKafkaHostAlias;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import com.resourcetracker.model.HealthCheckResult;
import com.resourcetracker.model.ReadinessCheckApplication;
import com.resourcetracker.model.ReadinessCheckResult;
import com.resourcetracker.service.client.SmallRyeHealthCheckClientService;
import com.resourcetracker.service.client.smallrye.ISmallRyeHealthCheckClientService;
import com.resourcetracker.service.healthcheck.readiness.ReadinessCheckService;
import com.resourcetracker.service.terraform.workspace.WorkspaceService;
import com.resourcetracker.service.terraform.workspace.facade.WorkspaceFacade;
Expand All @@ -27,7 +27,7 @@ public class HealthResource implements HealthResourceApi {

@Inject WorkspaceService workspaceService;

@Inject @RestClient SmallRyeHealthCheckClientService smallRyeHealthCheckClientService;
@Inject @RestClient ISmallRyeHealthCheckClientService smallRyeHealthCheckClientService;

@Override
public HealthCheckResult v1HealthGet() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package com.resourcetracker.service.client.kafkastarter;

import com.resourcetracker.service.client.kafkastarter.common.IKafkaStarterClientService;
import java.net.URI;
import org.eclipse.microprofile.rest.client.RestClientBuilder;

/** Represents Kafka starter service client facade. */
public class KafkaStarterClientServiceFacade {
private final IKafkaStarterClientService kafkaStarterClientService;

public KafkaStarterClientServiceFacade(String host, Integer port) {
kafkaStarterClientService =
RestClientBuilder.newBuilder()
.baseUri(URI.create(String.format("http://%s:%d", host, port)))
.build(IKafkaStarterClientService.class);
}

/**
* Sends query to the Kafka starter client to deploy Kafka with the given external host.
*
* @param host external host to be used as advertised listener.
*/
public void deploy(String host) {
kafkaStarterClientService.qDeployPost(host);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.resourcetracker.service.client.kafkastarter.common;

import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.QueryParam;
import java.io.Closeable;
import org.eclipse.microprofile.rest.client.inject.RegisterRestClient;

/** Represents client for Kafka starter endpoints. */
@RegisterRestClient
public interface IKafkaStarterClientService extends Closeable {
@POST
@Path("/deploy")
void qDeployPost(@QueryParam("host") String host);
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.resourcetracker.service.client;
package com.resourcetracker.service.client.smallrye;

import com.resourcetracker.model.HealthCheckResult;
import jakarta.ws.rs.GET;
Expand All @@ -10,7 +10,7 @@
/** Represents client for SmallRye health check endpoints. */
@Path("/q")
@RegisterRestClient(configKey = "small-rye-health-check")
public interface SmallRyeHealthCheckClientService {
public interface ISmallRyeHealthCheckClientService {
@GET
@Path("/health")
@Produces(MediaType.APPLICATION_JSON)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,15 @@
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.springframework.kafka.support.serializer.JsonDeserializer;

/** */
public class KafkaService {
private AdminClient kafkaAdminClient;

Expand All @@ -25,7 +28,11 @@ public class KafkaService {

private final KafkaConsumer<String, KafkaLogsTopicDto> kafkaConsumer;

public KafkaService(String kafkaBootstrapServer, PropertiesEntity properties) {
public KafkaService(String kafkaBootstrapServerHost, PropertiesEntity properties) {
String kafkaBootstrapServer =
String.format(
"%s:%d", kafkaBootstrapServerHost, properties.getResourceTrackerKafkaMainPort());

Properties kafkaAdminClientProps = new Properties();

kafkaAdminClientProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServer);
Expand All @@ -37,7 +44,15 @@ public KafkaService(String kafkaBootstrapServer, PropertiesEntity properties) {

Properties kafkaConsumerProps = new Properties();

kafkaConsumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
String groupId = UUID.randomUUID().toString();

kafkaConsumerProps.put(
ConsumerConfig.GROUP_INSTANCE_ID_CONFIG,
String.format("scheduler_coordinator_%s", groupId));
kafkaConsumerProps.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
kafkaConsumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
kafkaConsumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
kafkaConsumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
kafkaConsumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServer);
kafkaConsumerProps.put(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
Expand All @@ -52,6 +67,11 @@ public KafkaService(String kafkaBootstrapServer, PropertiesEntity properties) {
this.kafkaConsumer.subscribe(Collections.singletonList(properties.getKafkaTopic()));
}

/**
* Checks if Kafka service is connected to Kafka cluster at the given address.
*
* @return result of the check.
*/
public boolean isConnected() {
if (isAvailable()) {
if (Objects.isNull(kafkaAdminClient)) {
Expand All @@ -74,10 +94,16 @@ public boolean isConnected() {
return false;
}

private boolean isAvailable() {
/**
* Checks if Kafka service is available at the given address.
*
* @return result of the check.
*/
public boolean isAvailable() {
URL url;

try {
url = URI.create(kafkaBootstrapServer).toURL();
url = URI.create(String.format("http://%s", kafkaBootstrapServer)).toURL();
} catch (MalformedURLException e) {
return false;
}
Expand All @@ -91,22 +117,21 @@ private boolean isAvailable() {
}
}

/**
* Retrieves messages from "logs" topic.
*
* @return retrieved messages.
*/
public List<KafkaLogsTopicDto> consumeLogs() {
List<KafkaLogsTopicDto> kafkaLogsTopicEntities = new ArrayList<>();

ConsumerRecords<String, KafkaLogsTopicDto> records = kafkaConsumer.poll(Duration.ofSeconds(5));
List<KafkaLogsTopicDto> result = new ArrayList<>();

System.out.println(records.count());
ConsumerRecords<String, KafkaLogsTopicDto> records = kafkaConsumer.poll(Duration.ofSeconds(30));

// ListIterator<ConsumerRecord<String, KafkaLogsTopicEntity>> iter =
// (ListIterator<ConsumerRecord<String, KafkaLogsTopicEntity>>) records.iterator();
//
// while (iter.hasNext()) {
// ConsumerRecord<String, KafkaLogsTopicEntity> record = iter.next();
// kafkaLogsTopicEntities.add(record.value());
// }
for (ConsumerRecord<String, KafkaLogsTopicDto> record : records) {
result.add(record.value());
}

return kafkaLogsTopicEntities;
return result;
}

@PreDestroy
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ public DestroyCommandService(
TerraformConfigurationHelper.getContentEnvironmentVariables(
agentContext, gitCommitId));
};

System.out.println(command);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ public InitCommandService(
workspaceUnitDirectory, credentials),
AWSProviderConfigurationHelper.getBackendConfig(credentials));
};

System.out.println(command);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,16 @@
import com.resourcetracker.converter.SecretsConverter;
import com.resourcetracker.dto.*;
import com.resourcetracker.entity.PropertiesEntity;
import com.resourcetracker.exception.ContainerStartupFailureException;
import com.resourcetracker.exception.SecretsConversionException;
import com.resourcetracker.model.TerraformDeploymentApplication;
import com.resourcetracker.model.TerraformDestructionApplication;
import com.resourcetracker.model.ValidationSecretsApplication;
import com.resourcetracker.model.ValidationSecretsApplicationResult;
import com.resourcetracker.model.ValidationSecretsApplicationResultSecrets;
import com.resourcetracker.service.client.kafkastarter.KafkaStarterClientServiceFacade;
import com.resourcetracker.service.kafka.KafkaService;
import com.resourcetracker.service.vendor.aws.AWSVendorService;
import com.resourcetracker.service.vendor.common.VendorWaiter;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;

Expand Down Expand Up @@ -129,10 +131,10 @@ public String startContainerExecution(
properties.getGitCommitId()),
AWSKafkaTaskDefinitionRegistrationDto.of(
properties.getResourceTrackerKafkaImage(),
properties.getResourceTrackerKafkaImageVersion(),
properties.getGitCommitId(),
properties.getAwsResourceTrackerKafkaName(),
properties.getResourceTrackerKafkaMainPort(),
properties.getResourceTrackerKafkaStarterPort(),
properties.getResourceTrackerKafkaStarterPort(),
serviceMachineAddress,
properties.getResourceTrackerKafkaHostAlias(),
properties.getKafkaTopic(),
Expand Down Expand Up @@ -165,12 +167,25 @@ public String startContainerExecution(
terraformDeploymentApplication.getCredentials().getRegion(),
properties.getAwsReadinessPeriod());

// yield awsVendorService.getMachineAddress(
// properties.getAwsResourceTrackerKafkaName(),
// awsDeploymentResult.getEcsCluster().getValue(),
// awsCredentialsProvider,
// terraformDeploymentApplication.getCredentials().getRegion());
yield "0.0.0.0";
serviceMachineAddress =
awsVendorService.getMachineAddress(
properties.getAwsResourceTrackerCommonFamily(),
awsDeploymentResult.getEcsCluster().getValue(),
ecsTaskDefinitionsArn,
awsCredentialsProvider,
terraformDeploymentApplication.getCredentials().getRegion());

KafkaStarterClientServiceFacade kafkaStarterClientServiceFacade =
new KafkaStarterClientServiceFacade(
serviceMachineAddress, properties.getResourceTrackerKafkaStarterPort());

kafkaStarterClientServiceFacade.deploy(serviceMachineAddress);

KafkaService kafkaService = new KafkaService(serviceMachineAddress, properties);

VendorWaiter.awaitFor(kafkaService::isAvailable, properties.getKafkaReadinessPeriod());

yield serviceMachineAddress;
}
};
}
Expand Down
Loading

0 comments on commit 179185f

Please sign in to comment.