Skip to content

Commit

Permalink
Rename CassandraServer to TestingCassandraServer
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr committed Jul 28, 2023
1 parent bb43a9d commit 8e0d65a
Show file tree
Hide file tree
Showing 11 changed files with 230 additions and 198 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior)
protected QueryRunner createQueryRunner()
throws Exception
{
server = closeAfterClass(new CassandraServer());
server = closeAfterClass(new TestingCassandraServer());
session = server.getSession();
session.execute("CREATE KEYSPACE IF NOT EXISTS " + KEYSPACE + " WITH REPLICATION = {'class':'SimpleStrategy', 'replication_factor': 1}");
return createCassandraQueryRunner(server, ImmutableMap.of(), ImmutableMap.of(), REQUIRED_TPCH_TABLES);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public static void main(String[] args)
throws Exception
{
DistributedQueryRunner queryRunner = createCassandraQueryRunner(
new CassandraServer(),
new TestingCassandraServer(),
ImmutableMap.of("http-server.http.port", "8080"),
ImmutableMap.of(),
TpchTable.getTables());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,197 +13,17 @@
*/
package io.trino.plugin.cassandra;

import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.CqlSessionBuilder;
import com.datastax.oss.driver.api.core.ProtocolVersion;
import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
import com.datastax.oss.driver.api.core.config.ProgrammaticDriverConfigLoaderBuilder;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.Row;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.Resources;
import io.airlift.json.JsonCodec;
import io.airlift.log.Logger;
import io.airlift.units.Duration;
import io.trino.testing.ResourcePresence;
import org.testcontainers.containers.GenericContainer;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.file.Path;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;

import static com.datastax.oss.driver.api.core.config.DefaultDriverOption.CONTROL_CONNECTION_AGREEMENT_TIMEOUT;
import static com.datastax.oss.driver.api.core.config.DefaultDriverOption.METADATA_SCHEMA_REFRESHED_KEYSPACES;
import static com.datastax.oss.driver.api.core.config.DefaultDriverOption.PROTOCOL_VERSION;
import static com.datastax.oss.driver.api.core.config.DefaultDriverOption.REQUEST_TIMEOUT;
import static com.google.common.io.Resources.getResource;
import static io.trino.plugin.cassandra.CassandraTestingUtils.CASSANDRA_TYPE_MANAGER;
import static java.lang.String.format;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.nio.file.Files.createDirectory;
import static java.nio.file.Files.createTempDirectory;
import static java.nio.file.Files.writeString;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.testcontainers.utility.MountableFile.forHostPath;
import static org.testng.Assert.assertEquals;

public class CassandraServer
implements Closeable
public interface CassandraServer
extends Closeable
{
private static final Logger log = Logger.get(CassandraServer.class);

private static final int PORT = 9142;

private static final Duration REFRESH_SIZE_ESTIMATES_TIMEOUT = new Duration(1, MINUTES);

private final GenericContainer<?> dockerContainer;
private final CassandraSession session;

public CassandraServer()
throws Exception
{
this("cassandra:3.0", "cu-cassandra.yaml");
}

public CassandraServer(String imageName, String configFileName)
throws Exception
{
this(imageName, ImmutableMap.of(), "/etc/cassandra/cassandra.yaml", configFileName);
}

public CassandraServer(String imageName, Map<String, String> environmentVariables, String configPath, String configFileName)
throws Exception
{
log.info("Starting cassandra...");

this.dockerContainer = new GenericContainer<>(imageName)
.withExposedPorts(PORT)
.withCopyFileToContainer(forHostPath(prepareCassandraYaml(configFileName)), configPath)
.withEnv(environmentVariables)
.withStartupTimeout(java.time.Duration.ofMinutes(10));
this.dockerContainer.start();

ProgrammaticDriverConfigLoaderBuilder driverConfigLoaderBuilder = DriverConfigLoader.programmaticBuilder();
driverConfigLoaderBuilder.withDuration(REQUEST_TIMEOUT, java.time.Duration.ofSeconds(30));
driverConfigLoaderBuilder.withString(PROTOCOL_VERSION, ProtocolVersion.V3.name());
driverConfigLoaderBuilder.withDuration(CONTROL_CONNECTION_AGREEMENT_TIMEOUT, java.time.Duration.ofSeconds(30));
// allow the retrieval of metadata for the system keyspaces
driverConfigLoaderBuilder.withStringList(METADATA_SCHEMA_REFRESHED_KEYSPACES, List.of());

CqlSessionBuilder cqlSessionBuilder = CqlSession.builder()
.withApplicationName("TestCluster")
.addContactPoint(new InetSocketAddress(this.dockerContainer.getHost(), this.dockerContainer.getMappedPort(PORT)))
.withLocalDatacenter("datacenter1")
.withConfigLoader(driverConfigLoaderBuilder.build());

CassandraSession session = new CassandraSession(
CASSANDRA_TYPE_MANAGER,
JsonCodec.listJsonCodec(ExtraColumnMetadata.class),
cqlSessionBuilder::build,
new Duration(1, MINUTES));

try {
checkConnectivity(session);
}
catch (RuntimeException e) {
session.close();
this.dockerContainer.stop();
throw e;
}

this.session = session;
}

private static String prepareCassandraYaml(String fileName)
throws IOException
{
String original = Resources.toString(getResource(fileName), UTF_8);

Path tmpDirPath = createTempDirectory(null);
Path dataDir = tmpDirPath.resolve("data");
createDirectory(dataDir);

String modified = original.replaceAll("\\$\\{data_directory\\}", dataDir.toAbsolutePath().toString());

File yamlFile = tmpDirPath.resolve(fileName).toFile();
yamlFile.deleteOnExit();
writeString(yamlFile.toPath(), modified, UTF_8);

return yamlFile.getAbsolutePath();
}

public CassandraSession getSession()
{
return requireNonNull(session, "session is null");
}

public String getHost()
{
return dockerContainer.getHost();
}

public int getPort()
{
return dockerContainer.getMappedPort(PORT);
}

private static void checkConnectivity(CassandraSession session)
{
ResultSet result = session.execute("SELECT release_version FROM system.local");
List<Row> rows = result.all();
assertEquals(rows.size(), 1);
String version = rows.get(0).getString(0);
log.info("Cassandra version: %s", version);
}

public void refreshSizeEstimates(String keyspace, String table)
throws Exception
{
long deadline = System.nanoTime() + REFRESH_SIZE_ESTIMATES_TIMEOUT.roundTo(NANOSECONDS);
while (System.nanoTime() - deadline < 0) {
flushTable(keyspace, table);
refreshSizeEstimates();
List<SizeEstimate> sizeEstimates = getSession().getSizeEstimates(keyspace, table);
if (!sizeEstimates.isEmpty()) {
log.info("Size estimates for the table %s.%s have been refreshed successfully: %s", keyspace, table, sizeEstimates);
return;
}
log.info("Size estimates haven't been refreshed as expected. Retrying ...");
SECONDS.sleep(1);
}
throw new TimeoutException(format("Attempting to refresh size estimates for table %s.%s has timed out after %s", keyspace, table, REFRESH_SIZE_ESTIMATES_TIMEOUT));
}

private void flushTable(String keyspace, String table)
throws Exception
{
dockerContainer.execInContainer("nodetool", "flush", keyspace, table);
}
CassandraSession getSession();

private void refreshSizeEstimates()
throws Exception
{
dockerContainer.execInContainer("nodetool", "refreshsizeestimates");
}
String getHost();

@Override
public void close()
{
session.close();
dockerContainer.close();
}
int getPort();

@ResourcePresence
public boolean isRunning()
{
return dockerContainer.getContainerId() != null;
}
void refreshSizeEstimates(String keyspace, String table)
throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public class TestCassandraConnector
private static final ConnectorSession SESSION = TestingConnectorSession.builder()
.setPropertyMetadata(new CassandraSessionProperties(new CassandraClientConfig()).getSessionProperties())
.build();
private CassandraServer server;
private TestingCassandraServer server;
protected String database;
protected SchemaTableName table;
protected SchemaTableName tableForDelete;
Expand All @@ -113,7 +113,7 @@ public class TestCassandraConnector
public void setup()
throws Exception
{
this.server = new CassandraServer();
this.server = new TestingCassandraServer();

String keyspace = "test_connector";
createTestTables(server.getSession(), keyspace, DATE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class TestCassandraConnectorTest
protected QueryRunner createQueryRunner()
throws Exception
{
server = closeAfterClass(new CassandraServer());
server = closeAfterClass(new TestingCassandraServer());
session = server.getSession();
createTestTables(session, KEYSPACE, Timestamp.from(TIMESTAMP_VALUE.toInstant()));
return createCassandraQueryRunner(server, ImmutableMap.of(), ImmutableMap.of(), REQUIRED_TPCH_TABLES);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class TestCassandraLatestConnectorSmokeTest
protected QueryRunner createQueryRunner()
throws Exception
{
CassandraServer server = closeAfterClass(new CassandraServer("cassandra:4.1", "cu-cassandra-latest.yaml"));
TestingCassandraServer server = closeAfterClass(new TestingCassandraServer("cassandra:4.1", "cu-cassandra-latest.yaml"));
CassandraSession session = server.getSession();
createTestTables(session, KEYSPACE, Timestamp.from(TIMESTAMP_VALUE.toInstant()));
return createCassandraQueryRunner(server, ImmutableMap.of(), ImmutableMap.of(), REQUIRED_TPCH_TABLES);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public class TestCassandraProtocolVersionV3ConnectorSmokeTest
protected QueryRunner createQueryRunner()
throws Exception
{
CassandraServer server = closeAfterClass(new CassandraServer());
CassandraServer server = closeAfterClass(new TestingCassandraServer());
CassandraSession session = server.getSession();
createTestTables(session, KEYSPACE, Timestamp.from(TIMESTAMP_VALUE.toInstant()));
return createCassandraQueryRunner(server, ImmutableMap.of(), ImmutableMap.of("cassandra.protocol-version", "V3"), REQUIRED_TPCH_TABLES);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,15 @@ public class TestCassandraTokenSplitManager
private static final String KEYSPACE = "test_cassandra_token_split_manager_keyspace";
private static final int PARTITION_COUNT = 1000;

private CassandraServer server;
private TestingCassandraServer server;
private CassandraSession session;
private CassandraTokenSplitManager splitManager;

@BeforeClass
public void setUp()
throws Exception
{
server = new CassandraServer();
server = new TestingCassandraServer();
session = server.getSession();
createKeyspace(session, KEYSPACE);
splitManager = new CassandraTokenSplitManager(session, SPLIT_SIZE, Optional.empty());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ private static void checkIsDoubled(ZoneId zone, LocalDateTime dateTime)
protected QueryRunner createQueryRunner()
throws Exception
{
server = closeAfterClass(new CassandraServer());
server = closeAfterClass(new TestingCassandraServer());
session = server.getSession();
return createCassandraQueryRunner(
server,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public class TestDatastaxConnectorSmokeTest
protected QueryRunner createQueryRunner()
throws Exception
{
CassandraServer server = closeAfterClass(new CassandraServer(
TestingCassandraServer server = closeAfterClass(new TestingCassandraServer(
"datastax/dse-server:6.8.25",
Map.of(
"DS_LICENSE", "accept",
Expand Down
Loading

0 comments on commit 8e0d65a

Please sign in to comment.