Skip to content

Commit

Permalink
Merge branch 'b1.6' of github.com:datastax/spark-cassandra-connector …
Browse files Browse the repository at this point in the history
…into b1.6
  • Loading branch information
RussellSpitzer committed Nov 9, 2017
2 parents ed318d4 + fac4647 commit 2d6abdd
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
import com.datastax.spark.connector.mapper.DefaultColumnMapper
import com.datastax.spark.connector.types.{CassandraOption, TypeConverter}
import com.datastax.spark.connector.writer.{TimestampOption, TTLOption, WriteConf}
import org.joda.time.{DateTime, LocalDate}

import scala.collection.JavaConversions._
Expand Down Expand Up @@ -1106,6 +1107,85 @@ class CassandraRDDSpec extends SparkCassandraITFlatSpecBase {
(10, 10, "1010"),
(10, 11, "1011"),
(10, 12, "1012"))
}

it should "not delete rows older than year 2000" in {

conn.withSessionDo { session =>
session.execute(s"""DROP TABLE IF EXISTS $ks.delete_old_rows""")
session.execute(s"""CREATE TABLE $ks.delete_old_rows(key INT, group INT, value VARCHAR, PRIMARY KEY (key, group))""")
session.execute(s"""INSERT INTO $ks.delete_old_rows(key, group, value) VALUES (10, 10, '1010')""")
session.execute(s"""INSERT INTO $ks.delete_old_rows(key, group, value) VALUES (10, 11, '1011')""")
session.execute(s"""INSERT INTO $ks.delete_old_rows(key, group, value) VALUES (10, 12, '1012')""")
session.execute(s"""INSERT INTO $ks.delete_old_rows(key, group, value) VALUES (20, 20, '2020')""")
session.execute(s"""INSERT INTO $ks.delete_old_rows(key, group, value) VALUES (20, 21, '2021')""")
session.execute(s"""INSERT INTO $ks.delete_old_rows(key, group, value) VALUES (20, 22, '2022')""")
}

// Try to delete rows older than year 2000.
sc.cassandraTable(ks, "delete_old_rows").where("key = 10")
.deleteFromCassandra(ks, "delete_old_rows",
writeConf = WriteConf(ttl = TTLOption.constant(1), timestamp = TimestampOption.constant(new DateTime(2000, 1, 1, 7, 8, 8, 10))))

val results1 = sc
.cassandraTable[(Int, Int, String)](ks, "delete_old_rows")
.select("key", "group", "value")
.collect()

results1 should have size 6

}

it should "delete rows older than year 2100" in {

conn.withSessionDo { session =>
session.execute(s"""DROP TABLE IF EXISTS $ks.delete_old_rows1""")
session.execute(s"""CREATE TABLE $ks.delete_old_rows1(key INT, group INT, value VARCHAR, PRIMARY KEY (key, group))""")
session.execute(s"""INSERT INTO $ks.delete_old_rows1(key, group, value) VALUES (10, 10, '1010')""")
session.execute(s"""INSERT INTO $ks.delete_old_rows1(key, group, value) VALUES (10, 11, '1011')""")
session.execute(s"""INSERT INTO $ks.delete_old_rows1(key, group, value) VALUES (10, 12, '1012')""")
session.execute(s"""INSERT INTO $ks.delete_old_rows1(key, group, value) VALUES (20, 20, '2020')""")
session.execute(s"""INSERT INTO $ks.delete_old_rows1(key, group, value) VALUES (20, 21, '2021')""")
session.execute(s"""INSERT INTO $ks.delete_old_rows1(key, group, value) VALUES (20, 22, '2022')""")
}

// Try to delete rows older than year 2100.
sc.cassandraTable(ks, "delete_old_rows1").where("key = 10")
.deleteFromCassandra(ks, "delete_old_rows1",
writeConf = WriteConf(ttl = TTLOption.constant(1), timestamp = TimestampOption.constant(new DateTime(2100, 1, 1, 7, 8, 8, 10))))

val results1 = sc
.cassandraTable[(Int, Int, String)](ks, "delete_old_rows1")
.select("key", "group", "value")
.collect()

results1 should have size 3

}

it should "delete rows and ignore ttl setting" in {

conn.withSessionDo { session =>
session.execute(s"""DROP TABLE IF EXISTS $ks.delete_old_rows2""")
session.execute(s"""CREATE TABLE $ks.delete_old_rows2(key INT, group INT, value VARCHAR, PRIMARY KEY (key, group))""")
session.execute(s"""INSERT INTO $ks.delete_old_rows2(key, group, value) VALUES (10, 10, '1010')""")
session.execute(s"""INSERT INTO $ks.delete_old_rows2(key, group, value) VALUES (10, 11, '1011')""")
session.execute(s"""INSERT INTO $ks.delete_old_rows2(key, group, value) VALUES (10, 12, '1012')""")
session.execute(s"""INSERT INTO $ks.delete_old_rows2(key, group, value) VALUES (20, 20, '2020')""")
session.execute(s"""INSERT INTO $ks.delete_old_rows2(key, group, value) VALUES (20, 21, '2021')""")
session.execute(s"""INSERT INTO $ks.delete_old_rows2(key, group, value) VALUES (20, 22, '2022')""")
}

sc.cassandraTable(ks, "delete_old_rows2").where("key = 10")
.deleteFromCassandra(ks, "delete_old_rows2",
writeConf = WriteConf(ttl = TTLOption.constant(13456)))

val results1 = sc
.cassandraTable[(Int, Int, String)](ks, "delete_old_rows1")
.select("key", "group", "value")
.collect()

results1 should have size 3

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import com.datastax.spark.connector.cql._
import com.datastax.spark.connector.mapper.DefaultColumnMapper
import com.datastax.spark.connector.types._

import org.joda.time.DateTime

case class Address(street: String, city: String, zip: Int)
case class KV(key: Int, value: String)
case class KeyValue(key: Int, group: Long, value: String)
Expand Down Expand Up @@ -873,6 +875,27 @@ class TableWriterSpec extends SparkCassandraITFlatSpecBase {
e.getMessage should include("scol")
}


it should "allow to write data with specific timestamp" in {

val setElements = sc.parallelize(Seq(
(6, Set("Four")),
(6, Set("Five")),
(6, Set("Six"))))
//Update data to year 1999
setElements.saveToCassandra(ks, "collections_mod", SomeColumns("key", "scol" append),
writeConf = WriteConf(timestamp = TimestampOption.constant(new DateTime(1999, 1, 1, 7, 8, 8, 10))))

// Try to delete rows older than year 2000.
sc.cassandraTable(ks, "collections_mod").where("key = 6")
.deleteFromCassandra(ks, "collections_mod",
writeConf = WriteConf(ttl = TTLOption.constant(1), timestamp = TimestampOption.constant(new DateTime(2000, 1, 1, 7, 8, 8, 10))))

val result = sc.cassandraTable(ks, "collections_mod").where("key = 6").collect()

result should have size 0
}

it should "insert and not overwrite existing keys when ifNotExists is true" in {
conn.withSessionDo { session =>
session.execute(s"""TRUNCATE $ks.write_if_not_exists_test""")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,21 +41,6 @@ class TableWriter[T] private (

val ifNotExistsSpec = if (writeConf.ifNotExists) "IF NOT EXISTS " else ""

val ttlSpec = writeConf.ttl match {
case TTLOption(PerRowWriteOptionValue(placeholder)) => Some(s"TTL :$placeholder")
case TTLOption(StaticWriteOptionValue(value)) => Some(s"TTL $value")
case _ => None
}

val timestampSpec = writeConf.timestamp match {
case TimestampOption(PerRowWriteOptionValue(placeholder)) => Some(s"TIMESTAMP :$placeholder")
case TimestampOption(StaticWriteOptionValue(value)) => Some(s"TIMESTAMP $value")
case _ => None
}

val options = List(ttlSpec, timestampSpec).flatten
val optionsSpec = if (options.nonEmpty) s"USING ${options.mkString(" AND ")}" else ""

s"INSERT INTO ${quote(keyspaceName)}.${quote(tableName)} ($columnSpec) VALUES ($valueSpec) $ifNotExistsSpec$optionsSpec".trim
}

Expand All @@ -72,9 +57,15 @@ class TableWriter[T] private (
val deleteColumnsClause = deleteColumnNames.map(quote).mkString(", ")
val whereClause = quotedColumnNames(primaryKey).map(c => s"$c = :$c").mkString(" AND ")

s"DELETE ${deleteColumnsClause} FROM ${quote(keyspaceName)}.${quote(tableName)} WHERE $whereClause"
val usingTimestampClause = if (timestampSpec.nonEmpty) s"USING ${timestampSpec.get}" else ""

if (ttlEnabled)
logWarning(s"${writeConf.ttl} is ignored for DELETE query")

s"DELETE ${deleteColumnsClause} FROM ${quote(keyspaceName)}.${quote(tableName)} $usingTimestampClause WHERE $whereClause"
}
private lazy val queryTemplateUsingUpdate: String = {

private[connector] lazy val queryTemplateUsingUpdate: String = {
val (primaryKey, regularColumns) = columns.partition(_.isPrimaryKeyColumn)
val (counterColumns, nonCounterColumns) = regularColumns.partition(_.isCounterColumn)

Expand All @@ -99,7 +90,28 @@ class TableWriter[T] private (
val setClause = (setNonCounterColumnsClause ++ setCounterColumnsClause).mkString(", ")
val whereClause = quotedColumnNames(primaryKey).map(c => s"$c = :$c").mkString(" AND ")

s"UPDATE ${quote(keyspaceName)}.${quote(tableName)} SET $setClause WHERE $whereClause"
s"UPDATE ${quote(keyspaceName)}.${quote(tableName)} $optionsSpec SET $setClause WHERE $whereClause"
}

private lazy val timestampSpec: Option[String] = {
writeConf.timestamp match {
case TimestampOption(PerRowWriteOptionValue(placeholder)) => Some(s"TIMESTAMP :$placeholder")
case TimestampOption(StaticWriteOptionValue(value)) => Some(s"TIMESTAMP $value")
case _ => None
}
}

private lazy val ttlEnabled: Boolean = writeConf.ttl != TTLOption.defaultValue

private lazy val optionsSpec: String = {
val ttlSpec = writeConf.ttl match {
case TTLOption(PerRowWriteOptionValue(placeholder)) => Some(s"TTL :$placeholder")
case TTLOption(StaticWriteOptionValue(value)) => Some(s"TTL $value")
case _ => None
}

val options = List(ttlSpec, timestampSpec).flatten
if (options.nonEmpty) s"USING ${options.mkString(" AND ")}" else ""
}

private val isCounterUpdate =
Expand Down Expand Up @@ -178,6 +190,7 @@ class TableWriter[T] private (
*/
def update(taskContext: TaskContext, data: Iterator[T]): Unit =
writeInternal(queryTemplateUsingUpdate, taskContext, data)

/**
* Write data with Cql INSERT statement
*/
Expand Down

0 comments on commit 2d6abdd

Please sign in to comment.