Skip to content

Commit

Permalink
Merge pull request #1153 from datastax/SPARKC-426-b1.6
Browse files Browse the repository at this point in the history
SPARKC-426: Allow POJOS to be set as NULL
  • Loading branch information
RussellSpitzer authored Nov 29, 2017
2 parents 2d6abdd + b2188d5 commit 386063e
Show file tree
Hide file tree
Showing 3 changed files with 189 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,14 @@ import java.io.IOException
import java.util.Date
import com.datastax.driver.core.ProtocolVersion._
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
import com.datastax.spark.connector.mapper.DefaultColumnMapper
import com.datastax.spark.connector.cql.{CassandraConnector, CassandraConnectorConf, TableDef}
import com.datastax.spark.connector.japi.CassandraJavaUtil
import com.datastax.spark.connector.mapper.ClassWithUDTBean.AddressBean
import com.datastax.spark.connector.mapper._
import com.datastax.spark.connector.rdd.partitioner.dht.TokenFactory
import com.datastax.spark.connector.rdd.reader.RowReaderFactory
import com.datastax.spark.connector.types.{CassandraOption, TypeConverter}
import com.datastax.spark.connector.writer.{TimestampOption, TTLOption, WriteConf}
import com.datastax.spark.connector.writer.{TTLOption, TimestampOption, WriteConf}
import org.joda.time.{DateTime, LocalDate}

import scala.collection.JavaConversions._
Expand Down Expand Up @@ -42,8 +46,8 @@ class SubKeyValue extends SuperKeyValue {
var group: Long = 0L
}

case class Address(street: String, city: String, zip: Int)
case class ClassWithUDT(key: Int, name: String, addr: Address)
case class Address(street: Option[String], city: Option[String], zip: Option[Int])
case class ClassWithUDT(key: Int, name: String, addr: Option[Address])
case class ClassWithTuple(key: Int, value: (Int, String))
case class ClassWithSmallInt(key: Int, value: Short)

Expand Down Expand Up @@ -119,6 +123,8 @@ class CassandraRDDSpec extends SparkCassandraITFlatSpecBase {
session.execute( s"""CREATE TYPE $ks.address (street text, city text, zip int)""")
session.execute( s"""CREATE TABLE $ks.udts(key INT PRIMARY KEY, name text, addr frozen<address>)""")
session.execute( s"""INSERT INTO $ks.udts(key, name, addr) VALUES (1, 'name', {street: 'Some Street', city: 'Paris', zip: 11120})""")
session.execute( s"""INSERT INTO $ks.udts(key, name, addr) VALUES (2, 'name', {street: 'Some Street', city: null, zip: 11120})""")
session.execute( s"""INSERT INTO $ks.udts(key, name, addr) VALUES (3, 'name', null)""")
},

Future {
Expand Down Expand Up @@ -499,16 +505,29 @@ class CassandraRDDSpec extends SparkCassandraITFlatSpecBase {

it should "allow to fetch columns from a table with user defined Cassandra type (UDT)" in {
val result = sc.cassandraTable(ks, "udts").select("key", "name").collect()
result should have length 1
val row = result.head
result should have length 3
val row = result.map( x => (x.getInt(0), x)).toMap.get(1).get
row.getInt(0) should be(1)
row.getString(1) should be("name")
}

it should "allow fetching columns into a JavaBean with nulls" in {
implicit val rrf = CassandraJavaUtil.mapRowTo(classOf[ClassWithUDTBean])
val rdd = sc.cassandraTable[ClassWithUDTBean](ks, "udts")
val result = rdd.collect
val expected = Array(
new ClassWithUDTBean(1, "name", new AddressBean("Some Street", "Paris", 11120)),
new ClassWithUDTBean(2, "name", new AddressBean("Some Street", null, 11120)),
new ClassWithUDTBean(3, "name", null)
)

result should contain theSameElementsAs expected
}

it should "allow to fetch UDT columns as UDTValue objects" in {
val result = sc.cassandraTable(ks, "udts").select("key", "name", "addr").collect()
result should have length 1
val row = result.head
result should have length 3
val row = result.map( x => (x.getInt(0), x)).toMap.get(1).get
row.getInt(0) should be(1)
row.getString(1) should be("name")

Expand All @@ -521,15 +540,15 @@ class CassandraRDDSpec extends SparkCassandraITFlatSpecBase {

it should "allow to fetch UDT columns as objects of case classes" in {
val result = sc.cassandraTable[ClassWithUDT](ks, "udts").select("key", "name", "addr").collect()
result should have length 1
val row = result.head
result should have length 3
val row = result.map( x => (x.key, x)).toMap.get(1).get
row.key should be(1)
row.name should be("name")

val udtValue = row.addr
udtValue.street should be("Some Street")
udtValue.city should be("Paris")
udtValue.zip should be(11120)
val udtValue = row.addr.get
udtValue.street should be(Some("Some Street"))
udtValue.city should be(Some("Paris"))
udtValue.zip should be(Some(11120))
}

it should "allow to fetch tuple columns as TupleValue objects" in {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,25 @@ private[connector] class GettableDataToMappedTypeConverter[T : TypeTag : ColumnM
val valueConverter = converter(valueColumnType, valueScalaType)
TypeConverter.forType[U](Seq(keyConverter, valueConverter))

case (_, _) =>
TypeConverter.forType[U]
case (_, _) => TypeConverter.forType[U]
}
}

/**
* Avoids getting a "Can not convert Null to X" on allowed nullable types.
*
* This is identical to the trait NullableTypeConverter but
* will end up throwing exceptions on null casting to scala types because of the lack of
* restrictions on T
*
* Since the below "tryConvert Method" will handle NPEs we don't have to worry about the
* fact that T ! <: AnyRef
*/
override def convert(obj: Any): T = {
if (obj != null) {
super.convert(obj)
} else {
null.asInstanceOf[T]
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
package com.datastax.spark.connector.mapper;

import java.io.Serializable;
import java.util.Objects;

public class ClassWithUDTBean implements Serializable
{
public static class AddressBean implements Serializable
{
private String street;
private String city;
private Integer zip;

public AddressBean() {}
public AddressBean(String street, String city, Integer zip) {
this.street = street;
this.city = city;
this.zip = zip;
}



public Integer getZip()
{
return zip;
}

public String getCity()
{
return city;
}

public String getStreet()
{
return street;
}

public void setCity(String city)
{
this.city = city;
}

public void setStreet(String street)
{
this.street = street;
}

public void setZip(Integer zip)
{
this.zip = zip;
}

@Override
public boolean equals(Object obj)
{
if (!(obj instanceof AddressBean))
return false;

AddressBean that = (AddressBean) obj;
return Objects.equals(that.city, this.city) &&
Objects.equals(that.street, this.street) &&
Objects.equals(that.zip, this.zip);
}

@Override
public int hashCode()
{
return Objects.hash(city, street, zip);
}
}

private Integer key;
private String name;
private AddressBean addr;

public ClassWithUDTBean() {}
public ClassWithUDTBean(Integer key, String name, AddressBean addr) {
this.key = key;
this.name = name;
this.addr = addr;
}

public AddressBean getAddr()
{
return addr;
}

public Integer getKey()
{
return key;
}

public String getName()
{
return name;
}

public void setAddr(AddressBean addr)
{
this.addr = addr;
}

public void setKey(Integer key)
{
this.key = key;
}

public void setName(String name)
{
this.name = name;
}

@Override
public String toString()
{
return key + " : " + name + " : " + addr;
}

@Override
public boolean equals(Object obj)
{
if (!(obj instanceof ClassWithUDTBean))
return false;

ClassWithUDTBean that = (ClassWithUDTBean) obj;
return Objects.equals(this.addr, that.addr) &&
Objects.equals(this.key, that.key) &&
Objects.equals(this.name, that.name);
}

@Override
public int hashCode()
{
return Objects.hash(addr, key, name);
}
}

0 comments on commit 386063e

Please sign in to comment.