Skip to content

Commit

Permalink
Merge pull request #7 from venkateshragi/cas-support-cql
Browse files Browse the repository at this point in the history
Refactored code and added how-to
  • Loading branch information
Rohit Rai committed Sep 5, 2013
2 parents 7c4dbe1 + 8583dce commit 584add4
Show file tree
Hide file tree
Showing 28 changed files with 1,077 additions and 1,320 deletions.
62 changes: 62 additions & 0 deletions cassandra-handler/how-to
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
This document describes how to use cassandra-handler

check out the project from https://github.com/milliondreams/hive.git.

> git clone https://github.com/milliondreams/hive.git

Go to branch cas-support-cql

> git checkout cas-support-cql

Now run the maven package

> mvn package

This generates a hive-cassandra-x.x.x.jar file in target folder and all other project dependencies are downloaded to target/dependency.

Copy the target/hive-cassandra-x.x.x.jar to the hive lib directory

Copy target/dependency/cassandra-all-x.x.x.jar and target/dependency/cassandra-thrift-x.x.x.jar to hive lib directory.

Start hive

> bin/hive

Create a database in hive (or use an existing one)

To create a cql3 table in cassandra from hive, execute the following command

hive> CREATE EXTERNAL TABLE test.messages(message_id string, author string, body string)
STORED BY 'org.apache.hadoop.hive.cassandra.cql.CqlStorageHandler'
WITH SERDEPROPERTIES ("cql.primarykey" = "message_id, author", "comment"="check", "read_repair_chance" = "0.2",
"dclocal_read_repair_chance" = "0.14", "gc_grace_seconds" = "989898", "bloom_filter_fp_chance" = "0.2",
"compaction" = "{'class' : 'LeveledCompactionStrategy'}", "replicate_on_write" = "false", "caching" = "all")

where 'test' is the keyspace in cassandra. The above query also creates a column family in cassandra if does not exist.

To create a keyspace that doesnot exist in cassandra execute the following query

hive> CREATE EXTERNAL TABLE test.messages(row_key string, col1 string, col2 string)
STORED BY 'org.apache.hadoop.hive.cassandra.cql.CqlStorageHandler' WITH SERDEPROPERTIES("cql.primarykey" = "row_key")
TBLPROPERTIES ("cassandra.ks.name" = "mycqlks", "cassandra.ks.stratOptions"="'DC':1, 'DC2':1",
"cassandra.ks.strategy"="NetworkTopologyStrategy");

Note: For brevity, only minimal SERDEPROPERTIES are given in the above query.

If 'test' keyspace does not exist in cassandra it will be created.

Inserting values into CQL3 table through hive:

hive> insert into table messages select * from tweets;

The values from tweets table are appended to messages table.

Note: With Cassandra INSERT OVERWRITE is same as INSERT INTO as Cassandra merges changes if keys are same.

Retrieving values from a CQL3 table using hive:

hive> select * from messages;

While CqlStorageHandler is used to create/access cql3 tables in cassandra, CassandraStorageHandler can be used to create/access
thrift tables in cassandra.

Original file line number Diff line number Diff line change
@@ -1,21 +1,18 @@
package org.apache.hadoop.hive.cassandra;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.HashMap;

import org.apache.cassandra.thrift.CfDef;
import org.apache.cassandra.thrift.InvalidRequestException;
import org.apache.cassandra.thrift.KsDef;
import org.apache.cassandra.thrift.NotFoundException;
import org.apache.cassandra.thrift.SchemaDisagreementException;
import org.apache.hadoop.hive.cassandra.serde.AbstractColumnSerDe;
import org.apache.cassandra.thrift.*;
import org.apache.hadoop.hive.cassandra.serde.AbstractCassandraSerDe;
import org.apache.hadoop.hive.cassandra.serde.CassandraColumnSerDe;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.thrift.TException;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* A class to handle the transaction to cassandra backend database.
*
Expand Down Expand Up @@ -51,22 +48,22 @@ public class CassandraManager {
public CassandraManager(Table tbl) throws MetaException {
Map<String, String> serdeParam = tbl.getSd().getSerdeInfo().getParameters();

String cassandraHost = serdeParam.get(AbstractColumnSerDe.CASSANDRA_HOST);
String cassandraHost = serdeParam.get(AbstractCassandraSerDe.CASSANDRA_HOST);
if (cassandraHost == null) {
cassandraHost = AbstractColumnSerDe.DEFAULT_CASSANDRA_HOST;
cassandraHost = AbstractCassandraSerDe.DEFAULT_CASSANDRA_HOST;
}

this.host = cassandraHost;

String cassandraPortStr = serdeParam.get(AbstractColumnSerDe.CASSANDRA_PORT);
String cassandraPortStr = serdeParam.get(AbstractCassandraSerDe.CASSANDRA_PORT);
if (cassandraPortStr == null) {
cassandraPortStr = AbstractColumnSerDe.DEFAULT_CASSANDRA_PORT;
cassandraPortStr = AbstractCassandraSerDe.DEFAULT_CASSANDRA_PORT;
}

try {
port = Integer.parseInt(cassandraPortStr);
} catch (NumberFormatException e) {
throw new MetaException(AbstractColumnSerDe.CASSANDRA_PORT + " must be a number");
throw new MetaException(AbstractCassandraSerDe.CASSANDRA_PORT + " must be a number");
}

this.tbl = tbl;
Expand Down Expand Up @@ -217,10 +214,10 @@ public CfDef createColumnFamily() throws MetaException {
}

private String getColumnType() throws MetaException {
String prop = getPropertyFromTable(AbstractColumnSerDe.CASSANDRA_COL_MAPPING);
String prop = getPropertyFromTable(AbstractCassandraSerDe.CASSANDRA_COL_MAPPING);
List<String> mapping;
if (prop != null) {
mapping = AbstractColumnSerDe.parseColumnMapping(prop);
mapping = CassandraColumnSerDe.parseColumnMapping(prop);
} else {
List<FieldSchema> schema = tbl.getSd().getCols();
if (schema.size() ==0) {
Expand All @@ -232,7 +229,7 @@ private String getColumnType() throws MetaException {
colNames[i] = schema.get(i).getName();
}

String mappingStr = AbstractColumnSerDe.createColumnMappingString(colNames);
String mappingStr = CassandraColumnSerDe.createColumnMappingString(colNames);
mapping = Arrays.asList(mappingStr.split(","));
}

Expand All @@ -242,13 +239,13 @@ private String getColumnType() throws MetaException {
boolean hasSubColumn = false;

for (String column : mapping) {
if (column.equalsIgnoreCase(AbstractColumnSerDe.CASSANDRA_KEY_COLUMN)) {
if (column.equalsIgnoreCase(CassandraColumnSerDe.CASSANDRA_KEY_COLUMN)) {
hasKey = true;
} else if (column.equalsIgnoreCase(AbstractColumnSerDe.CASSANDRA_COLUMN_COLUMN)) {
} else if (column.equalsIgnoreCase(CassandraColumnSerDe.CASSANDRA_COLUMN_COLUMN)) {
hasColumn = true;
} else if (column.equalsIgnoreCase(AbstractColumnSerDe.CASSANDRA_SUBCOLUMN_COLUMN)) {
} else if (column.equalsIgnoreCase(CassandraColumnSerDe.CASSANDRA_SUBCOLUMN_COLUMN)) {
hasSubColumn = true;
} else if (column.equalsIgnoreCase(AbstractColumnSerDe.CASSANDRA_VALUE_COLUMN)) {
} else if (column.equalsIgnoreCase(CassandraColumnSerDe.CASSANDRA_VALUE_COLUMN)) {
hasValue = true;
} else {
return "Standard";
Expand All @@ -272,14 +269,14 @@ private String getColumnType() throws MetaException {
* @throws MetaException error
*/
private int getReplicationFactor() throws MetaException {
String prop = getPropertyFromTable(AbstractColumnSerDe.CASSANDRA_KEYSPACE_REPFACTOR);
String prop = getPropertyFromTable(AbstractCassandraSerDe.CASSANDRA_KEYSPACE_REPFACTOR);
if (prop == null) {
return DEFAULT_REPLICATION_FACTOR;
} else {
try {
return Integer.parseInt(prop);
} catch (NumberFormatException e) {
throw new MetaException(AbstractColumnSerDe.CASSANDRA_KEYSPACE_REPFACTOR + " must be a number");
throw new MetaException(AbstractCassandraSerDe.CASSANDRA_KEYSPACE_REPFACTOR + " must be a number");
}
}
}
Expand All @@ -290,7 +287,7 @@ private int getReplicationFactor() throws MetaException {
* @return strategy
*/
private String getStrategy() {
String prop = getPropertyFromTable(AbstractColumnSerDe.CASSANDRA_KEYSPACE_STRATEGY);
String prop = getPropertyFromTable(AbstractCassandraSerDe.CASSANDRA_KEYSPACE_STRATEGY);
if (prop == null) {
return DEFAULT_STRATEGY;
} else {
Expand All @@ -304,13 +301,13 @@ private String getStrategy() {
* @return keyspace name
*/
private String getCassandraKeyspace() {
String tableName = getPropertyFromTable(AbstractColumnSerDe.CASSANDRA_KEYSPACE_NAME);
String tableName = getPropertyFromTable(AbstractCassandraSerDe.CASSANDRA_KEYSPACE_NAME);

if (tableName == null) {
tableName = tbl.getDbName();
}

tbl.getParameters().put(AbstractColumnSerDe.CASSANDRA_KEYSPACE_NAME, tableName);
tbl.getParameters().put(AbstractCassandraSerDe.CASSANDRA_KEYSPACE_NAME, tableName);

return tableName;
}
Expand All @@ -321,13 +318,13 @@ private String getCassandraKeyspace() {
* @return cassandra column family name
*/
private String getCassandraColumnFamily() {
String tableName = getPropertyFromTable(AbstractColumnSerDe.CASSANDRA_CF_NAME);
String tableName = getPropertyFromTable(AbstractCassandraSerDe.CASSANDRA_CF_NAME);

if (tableName == null) {
tableName = tbl.getTableName();
}

tbl.getParameters().put(AbstractColumnSerDe.CASSANDRA_CF_NAME, tableName);
tbl.getParameters().put(AbstractCassandraSerDe.CASSANDRA_CF_NAME, tableName);

return tableName;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,6 @@ public boolean getValue() {

}

;

private static final Logger logger = Logger.getLogger(CassandraProxyClient.class);

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,37 +1,21 @@
package org.apache.hadoop.hive.cassandra;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import org.apache.cassandra.exceptions.ConfigurationException;
import com.google.common.base.Joiner;
import com.google.common.base.Splitter;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.TypeParser;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.exceptions.SyntaxException;
import org.apache.cassandra.thrift.CfDef;
import org.apache.cassandra.thrift.ColumnDef;
import org.apache.cassandra.thrift.IndexExpression;
import org.apache.cassandra.thrift.IndexOperator;
import org.apache.cassandra.thrift.InvalidRequestException;
import org.apache.cassandra.thrift.KsDef;
import org.apache.cassandra.thrift.NotFoundException;
import org.apache.cassandra.thrift.TBinaryProtocol;
import org.apache.cassandra.thrift.*;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.Hex;
import org.apache.hadoop.hive.cassandra.serde.AbstractColumnSerDe;
import org.apache.hadoop.hive.cassandra.serde.AbstractCassandraSerDe;
import org.apache.hadoop.hive.ql.exec.ExprNodeConstantEvaluator;
import org.apache.hadoop.hive.ql.index.IndexPredicateAnalyzer;
import org.apache.hadoop.hive.ql.index.IndexSearchCondition;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrGreaterThan;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrLessThan;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPGreaterThan;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPLessThan;
import org.apache.hadoop.hive.ql.udf.generic.*;
import org.apache.hadoop.hive.serde2.ByteStream;
import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef;
import org.apache.hadoop.hive.serde2.lazy.LazyCassandraUtils;
Expand All @@ -43,8 +27,12 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Joiner;
import com.google.common.base.Splitter;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

public class CassandraPushdownPredicate {

Expand Down Expand Up @@ -114,7 +102,7 @@ public static String serializeIndexedColumns(Set<ColumnDef> columns)
logger.info("Encoded column def: " + encoded);
hexStrings.add(encoded);
}
return Joiner.on(AbstractColumnSerDe.DELIMITER).join(hexStrings);
return Joiner.on(AbstractCassandraSerDe.DELIMITER).join(hexStrings);
}
catch (TException e)
{
Expand All @@ -136,7 +124,7 @@ public static Set<ColumnDef> deserializeIndexedColumns(String serialized)
return columns;
}

Iterable<String> strings = Splitter.on(AbstractColumnSerDe.DELIMITER).omitEmptyStrings().trimResults().split(serialized);
Iterable<String> strings = Splitter.on(AbstractCassandraSerDe.DELIMITER).omitEmptyStrings().trimResults().split(serialized);
TDeserializer deserializer = new TDeserializer(new TBinaryProtocol.Factory());
for (String encoded : strings)
{
Expand Down
Loading

0 comments on commit 584add4

Please sign in to comment.