Skip to content

Commit

Permalink
Add javadocs and unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
kawamuray committed Oct 21, 2020
1 parent 305f1e9 commit 3240264
Show file tree
Hide file tree
Showing 16 changed files with 360 additions and 18 deletions.
28 changes: 28 additions & 0 deletions src/main/java/kmql/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
import lombok.AllArgsConstructor;
import lombok.NonNull;

/**
* A core runtime of kmql.
*/
@AllArgsConstructor
public class Engine implements AutoCloseable {
private final AdminClient adminClient;
Expand All @@ -18,13 +21,27 @@ public class Engine implements AutoCloseable {
@NonNull
private OutputFormat outputFormat;

/**
* Creates a new {@link Engine} from the given {@link AdminClient} and the name of the output format.
* Default instances are used for both of {@link OutputFormatRegistry} and {@link TableRegistry}.
* @param adminClient an {@link AdminClient} to access Kafka cluster's metadata.
* @param outputFormatName the name of output format to use.
* @return an {@link Engine}.
*/
public static Engine from(AdminClient adminClient, String outputFormatName) {
OutputFormatRegistry outputFormatRegistry = OutputFormatRegistry.DEFAULT;
OutputFormat outputFormat = lookupOutputFormat(outputFormatRegistry, outputFormatName);
Database db = Database.from(TableRegistry.DEFAULT);
return new Engine(adminClient, db, outputFormatRegistry, outputFormat);
}

/**
* Execute the given command.
* Command should be a valid SQL for now.
* @param command command to execute.
* @param output the output stream to write the formatted query result.
* @throws SQLException when SQL fails.
*/
public void execute(String command, BufferedOutputStream output) throws SQLException {
prepareRequiredTables(command);
db.executeQuery(command, results -> {
Expand All @@ -50,6 +67,9 @@ private void prepareRequiredTables(String sql) {
}
}

/**
* Initialize all tables that this engine supports.
*/
public void initAllTables() {
try {
db.prepareAllTables(adminClient);
Expand All @@ -58,10 +78,18 @@ public void initAllTables() {
}
}

/**
* Set the output format to the given instance of {@link OutputFormat}.
* @param outputFormat a new output format to use.
*/
public void setOutputFormat(@NonNull OutputFormat outputFormat) {
this.outputFormat = outputFormat;
}

/**
* Set the output format to the specified instance by the given name.
* @param name name of the output format.
*/
public void setOutputFormat(String name) {
OutputFormat newFormat = lookupOutputFormat(outputFormatRegistry, name);
setOutputFormat(newFormat);
Expand Down
10 changes: 10 additions & 0 deletions src/main/java/kmql/OutputFormat.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,16 @@
import java.sql.ResultSet;
import java.sql.SQLException;

/**
* An interface of the kmql output format implementation.
*/
public interface OutputFormat {
/**
* Format the given {@link ResultSet} and write it into {@link BufferedOutputStream}.
* @param results the SQL query result.
* @param out the output stream to write the output.
* @throws SQLException if {@link ResultSet} throws.
* @throws IOException if {@link BufferedOutputStream} throws.
*/
void formatTo(ResultSet results, BufferedOutputStream out) throws SQLException, IOException;
}
18 changes: 18 additions & 0 deletions src/main/java/kmql/OutputFormatRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
import kmql.format.SsvFormat;
import kmql.format.TableFormat;

/**
* Registry of outputs formats.
*/
public class OutputFormatRegistry {
public static final OutputFormatRegistry DEFAULT = new OutputFormatRegistry();

Expand All @@ -23,16 +26,31 @@ public OutputFormatRegistry() {
formats = new ConcurrentHashMap<>();
}

/**
* Register the given format under the given name to the default registry.
* @param name the name of the format.
* @param format the format instance.
*/
public static void registerDefault(String name, OutputFormat format) {
DEFAULT.register(name, format);
}

/**
* Register the given format under the given name.
* @param name the name of the format.
* @param format the format instance.
*/
public void register(String name, OutputFormat format) {
if (formats.putIfAbsent(name, format) != null) {
throw new IllegalArgumentException("conflicting format name: " + name);
}
}

/**
* Lookup a format by the name.
* @param name the name of the format.
* @return an {@link OutputFormat} if presents.
*/
public Optional<OutputFormat> lookup(String name) {
return Optional.ofNullable(formats.get(name));
}
Expand Down
21 changes: 21 additions & 0 deletions src/main/java/kmql/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,33 @@

import org.apache.kafka.clients.admin.AdminClient;

/**
* An interface of a kmql table implementation.
*/
public interface Table {
/**
* Name of the table.
* @return name of the table.
*/
String name();

/**
* Optionally declared list of table names that this table is depending on to construct the table.
* The returned tables are prepared before the {@link #prepare(Connection, AdminClient)} method of this
* table is called.
* @return list of table names to depend on.
*/
default Collection<String> dependencyTables() {
return emptyList();
}

/**
* Prepare this table on the given {@link Connection}. This process includes both of creating a table and
* feeding in its data by obtaining it from the given {@link AdminClient}.
* At the time of this method call, absence of the target table is guaranteed.
* @param connection a JDBC {@link Connection}.
* @param adminClient a Kafka {@link AdminClient}.
* @throws Exception at any errors.
*/
void prepare(Connection connection, AdminClient adminClient) throws Exception;
}
26 changes: 26 additions & 0 deletions src/main/java/kmql/TableRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
import kmql.table.LogdirsTable;
import kmql.table.ReplicasTable;

/**
* Registry of kmql tables.
*/
public class TableRegistry implements Iterable<Map.Entry<String, Table>> {
public static final TableRegistry DEFAULT = new TableRegistry();

Expand All @@ -24,10 +27,19 @@ public class TableRegistry implements Iterable<Map.Entry<String, Table>> {

private final ConcurrentMap<String, Table> tables;

/**
* Register the given table under the {@link Table#name()} to the default registry.
* @param table the table instance.
*/
public static void registerDefault(Table table) {
DEFAULT.register(table.name(), table);
}

/**
* Register the given format under the given name to the default registry.
* @param name the name of the table.
* @param table the table instance.
*/
public static void registerDefault(String name, Table table) {
DEFAULT.register(name, table);
}
Expand All @@ -36,16 +48,30 @@ public TableRegistry() {
tables = new ConcurrentHashMap<>();
}

/**
* Register the given table under the given name.
* @param name the name of the table.
* @param table the table instance.
*/
public void register(String name, Table table) {
if (tables.putIfAbsent(name, table) != null) {
throw new IllegalArgumentException("conflicting table name: " + name);
}
}

/**
* Lookup a table by the name.
* @param name the name of the table.
* @return an {@link Table} if presents.
*/
public Optional<Table> lookup(String name) {
return Optional.ofNullable(tables.get(name));
}

/**
* Returns the iterator over table entries registered in this registry.
* @return an iterator for table name and table instances.
*/
@Override
public Iterator<Entry<String, Table>> iterator() {
return tables.entrySet().iterator();
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/kmql/format/JsonFormat.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@

import kmql.OutputFormat;

/**
* JSON format.
*/
public class JsonFormat implements OutputFormat {
private final ObjectMapper mapper = new ObjectMapper()
.disable(Feature.AUTO_CLOSE_TARGET);
Expand Down
11 changes: 10 additions & 1 deletion src/main/java/kmql/format/SsvFormat.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
package kmql.format;

public class SsvFormat extends AbstractCsvFormat {
/**
* Space-Separated Values format.
* Example:
* {@code
* # HEADER1, HEADER2, HEADER3
* foo 1234 true
* bar 5678 false
* }
*/
public class SsvFormat extends AbstractCsvFormat {
public SsvFormat() {
super(" ");
}
Expand Down
4 changes: 3 additions & 1 deletion src/main/java/kmql/format/TableFormat.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@

import kmql.OutputFormat;

/**
* Pretty-printed table format.
*/
public class TableFormat implements OutputFormat {
public static final String[][] ARRAY_PROTO = new String[0][];

Expand All @@ -36,7 +39,6 @@ public void formatTo(ResultSet results, BufferedOutputStream out) throws SQLExce

PrintWriter pw = new PrintWriter(out);
pw.write(FlipTable.of(headers, rows.toArray(ARRAY_PROTO)));
pw.println();
pw.flush();
}
}
18 changes: 2 additions & 16 deletions src/test/java/kmql/DatabaseTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import static org.mockito.Mockito.verify;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -60,24 +59,11 @@ public void tearDown() throws Exception {
db.close();
}

private boolean tableExists(String name) throws SQLException {
try (Statement stmt = connection.createStatement();
ResultSet results = stmt.executeQuery("SHOW TABLES")) {
while (results.next()) {
String table = results.getString(1);
if (table.toLowerCase().equals(name.toLowerCase())) {
return true;
}
}
}
return false;
}

@Test
public void prepareTable() throws Exception {
db.prepareTable("xyz", adminClient);
verify(xyzTable, times(1)).prepare(connection, adminClient);
assertTrue(tableExists("xyz"));
assertTrue(SqlUtils.tableExists(connection, "xyz"));
// This should be no-op because it's already initialized
db.prepareTable("xyz", adminClient);
verify(xyzTable, times(1)).prepare(connection, adminClient);
Expand All @@ -99,7 +85,7 @@ public void prepareAllTables() throws Exception {
public void dropTable() throws Exception {
db.prepareTable("xyz", adminClient);
db.dropTable("xyz");
assertFalse(tableExists("xyz"));
assertFalse(SqlUtils.tableExists(connection, "xyz"));
}

@Test(expected = IllegalStateException.class)
Expand Down
79 changes: 79 additions & 0 deletions src/test/java/kmql/EngineTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package kmql;

import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;

import java.io.BufferedOutputStream;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import org.apache.kafka.clients.admin.AdminClient;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;

public class EngineTest {
private Engine engine;

@Mock
private AdminClient adminClient;
private Connection connection;
private final List<String> outputs = new ArrayList<>();

@Before
public void setUp() {
TableRegistry tableRegistry = new TableRegistry();
tableRegistry.register("xyz", new Table() {
@Override
public String name() {
return "xyz";
}

@Override
public void prepare(Connection connection, AdminClient adminClient) throws Exception {
try (Statement stmt = connection.createStatement()) {
stmt.execute("CREATE TABLE xyz (id VARCHAR(255) NOT NULL)");
stmt.executeUpdate("INSERT INTO xyz VALUES ('foo'), ('bar'), ('baz')");
}
}
});
connection = SqlUtils.connection();
Database db = new Database(connection, tableRegistry);
OutputFormatRegistry outputFormatRegistry = new OutputFormatRegistry();
OutputFormat rawFormat = (results, out) -> {
while (results.next()) {
String id = results.getString(1);
outputs.add(id);
}
};
outputFormatRegistry.register("raw", rawFormat);
engine = new Engine(adminClient, db, outputFormatRegistry, rawFormat);
}

@After
public void tearDown() throws Exception {
engine.close();
}

@Test
public void execute() throws SQLException {
// This call should initialize the table internally
engine.execute("SELECT id FROM xyz", mock(BufferedOutputStream.class));
assertEquals(Arrays.asList("foo", "bar", "baz"), outputs);
outputs.clear();
// This call should use existing table
engine.execute("SELECT id FROM xyz", mock(BufferedOutputStream.class));
assertEquals(Arrays.asList("foo", "bar", "baz"), outputs);
}

@Test
public void initAllTables() throws SQLException {
engine.initAllTables();
SqlUtils.tableExists(connection, "xyz");
}
}
Loading

0 comments on commit 3240264

Please sign in to comment.