diff --git a/cassandra-handler/how-to b/cassandra-handler/how-to new file mode 100644 index 00000000..9661efbc --- /dev/null +++ b/cassandra-handler/how-to @@ -0,0 +1,62 @@ +This document describes how to use cassandra-handler + +check out the project from https://github.com/milliondreams/hive.git. + + > git clone https://github.com/milliondreams/hive.git + +Go to branch cas-support-cql + + > git checkout cas-support-cql + +Now run the maven package + + > mvn package + +This generates a hive-cassandra-x.x.x.jar file in target folder and all other project dependencies are downloaded to target/dependency. + +Copy the target/hive-cassandra-x.x.x.jar to the hive lib directory + +Copy target/dependency/cassandra-all-x.x.x.jar and target/dependency/cassandra-thrift-x.x.x.jar to hive lib directory. + +Start hive + + > bin/hive + +Create a database in hive (or use an existing one) + +To create a cql3 table in cassandra from hive, execute the following command + + hive> CREATE EXTERNAL TABLE test.messages(message_id string, author string, body string) + STORED BY 'org.apache.hadoop.hive.cassandra.cql.CqlStorageHandler' + WITH SERDEPROPERTIES ("cql.primarykey" = "message_id, author", "comment"="check", "read_repair_chance" = "0.2", + "dclocal_read_repair_chance" = "0.14", "gc_grace_seconds" = "989898", "bloom_filter_fp_chance" = "0.2", + "compaction" = "{'class' : 'LeveledCompactionStrategy'}", "replicate_on_write" = "false", "caching" = "all") + + where 'test' is the keyspace in cassandra. The above query also creates a column family in cassandra if does not exist. + +To create a keyspace that doesnot exist in cassandra execute the following query + + hive> CREATE EXTERNAL TABLE test.messages(row_key string, col1 string, col2 string) + STORED BY 'org.apache.hadoop.hive.cassandra.cql.CqlStorageHandler' WITH SERDEPROPERTIES("cql.primarykey" = "row_key") + TBLPROPERTIES ("cassandra.ks.name" = "mycqlks", "cassandra.ks.stratOptions"="'DC':1, 'DC2':1", + "cassandra.ks.strategy"="NetworkTopologyStrategy"); + + Note: For brevity, only minimal SERDEPROPERTIES are given in the above query. + +If 'test' keyspace does not exist in cassandra it will be created. + +Inserting values into CQL3 table through hive: + + hive> insert into table messages select * from tweets; + +The values from tweets table are appended to messages table. + + Note: With Cassandra INSERT OVERWRITE is same as INSERT INTO as Cassandra merges changes if keys are same. + +Retrieving values from a CQL3 table using hive: + + hive> select * from messages; + +While CqlStorageHandler is used to create/access cql3 tables in cassandra, CassandraStorageHandler can be used to create/access +thrift tables in cassandra. + diff --git a/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/CassandraManager.java b/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/CassandraManager.java index 546842df..0d16be16 100644 --- a/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/CassandraManager.java +++ b/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/CassandraManager.java @@ -1,21 +1,18 @@ package org.apache.hadoop.hive.cassandra; -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.HashMap; - -import org.apache.cassandra.thrift.CfDef; -import org.apache.cassandra.thrift.InvalidRequestException; -import org.apache.cassandra.thrift.KsDef; -import org.apache.cassandra.thrift.NotFoundException; -import org.apache.cassandra.thrift.SchemaDisagreementException; -import org.apache.hadoop.hive.cassandra.serde.AbstractColumnSerDe; +import org.apache.cassandra.thrift.*; +import org.apache.hadoop.hive.cassandra.serde.AbstractCassandraSerDe; +import org.apache.hadoop.hive.cassandra.serde.CassandraColumnSerDe; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.thrift.TException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + /** * A class to handle the transaction to cassandra backend database. * @@ -51,22 +48,22 @@ public class CassandraManager { public CassandraManager(Table tbl) throws MetaException { Map serdeParam = tbl.getSd().getSerdeInfo().getParameters(); - String cassandraHost = serdeParam.get(AbstractColumnSerDe.CASSANDRA_HOST); + String cassandraHost = serdeParam.get(AbstractCassandraSerDe.CASSANDRA_HOST); if (cassandraHost == null) { - cassandraHost = AbstractColumnSerDe.DEFAULT_CASSANDRA_HOST; + cassandraHost = AbstractCassandraSerDe.DEFAULT_CASSANDRA_HOST; } this.host = cassandraHost; - String cassandraPortStr = serdeParam.get(AbstractColumnSerDe.CASSANDRA_PORT); + String cassandraPortStr = serdeParam.get(AbstractCassandraSerDe.CASSANDRA_PORT); if (cassandraPortStr == null) { - cassandraPortStr = AbstractColumnSerDe.DEFAULT_CASSANDRA_PORT; + cassandraPortStr = AbstractCassandraSerDe.DEFAULT_CASSANDRA_PORT; } try { port = Integer.parseInt(cassandraPortStr); } catch (NumberFormatException e) { - throw new MetaException(AbstractColumnSerDe.CASSANDRA_PORT + " must be a number"); + throw new MetaException(AbstractCassandraSerDe.CASSANDRA_PORT + " must be a number"); } this.tbl = tbl; @@ -217,10 +214,10 @@ public CfDef createColumnFamily() throws MetaException { } private String getColumnType() throws MetaException { - String prop = getPropertyFromTable(AbstractColumnSerDe.CASSANDRA_COL_MAPPING); + String prop = getPropertyFromTable(AbstractCassandraSerDe.CASSANDRA_COL_MAPPING); List mapping; if (prop != null) { - mapping = AbstractColumnSerDe.parseColumnMapping(prop); + mapping = CassandraColumnSerDe.parseColumnMapping(prop); } else { List schema = tbl.getSd().getCols(); if (schema.size() ==0) { @@ -232,7 +229,7 @@ private String getColumnType() throws MetaException { colNames[i] = schema.get(i).getName(); } - String mappingStr = AbstractColumnSerDe.createColumnMappingString(colNames); + String mappingStr = CassandraColumnSerDe.createColumnMappingString(colNames); mapping = Arrays.asList(mappingStr.split(",")); } @@ -242,13 +239,13 @@ private String getColumnType() throws MetaException { boolean hasSubColumn = false; for (String column : mapping) { - if (column.equalsIgnoreCase(AbstractColumnSerDe.CASSANDRA_KEY_COLUMN)) { + if (column.equalsIgnoreCase(CassandraColumnSerDe.CASSANDRA_KEY_COLUMN)) { hasKey = true; - } else if (column.equalsIgnoreCase(AbstractColumnSerDe.CASSANDRA_COLUMN_COLUMN)) { + } else if (column.equalsIgnoreCase(CassandraColumnSerDe.CASSANDRA_COLUMN_COLUMN)) { hasColumn = true; - } else if (column.equalsIgnoreCase(AbstractColumnSerDe.CASSANDRA_SUBCOLUMN_COLUMN)) { + } else if (column.equalsIgnoreCase(CassandraColumnSerDe.CASSANDRA_SUBCOLUMN_COLUMN)) { hasSubColumn = true; - } else if (column.equalsIgnoreCase(AbstractColumnSerDe.CASSANDRA_VALUE_COLUMN)) { + } else if (column.equalsIgnoreCase(CassandraColumnSerDe.CASSANDRA_VALUE_COLUMN)) { hasValue = true; } else { return "Standard"; @@ -272,14 +269,14 @@ private String getColumnType() throws MetaException { * @throws MetaException error */ private int getReplicationFactor() throws MetaException { - String prop = getPropertyFromTable(AbstractColumnSerDe.CASSANDRA_KEYSPACE_REPFACTOR); + String prop = getPropertyFromTable(AbstractCassandraSerDe.CASSANDRA_KEYSPACE_REPFACTOR); if (prop == null) { return DEFAULT_REPLICATION_FACTOR; } else { try { return Integer.parseInt(prop); } catch (NumberFormatException e) { - throw new MetaException(AbstractColumnSerDe.CASSANDRA_KEYSPACE_REPFACTOR + " must be a number"); + throw new MetaException(AbstractCassandraSerDe.CASSANDRA_KEYSPACE_REPFACTOR + " must be a number"); } } } @@ -290,7 +287,7 @@ private int getReplicationFactor() throws MetaException { * @return strategy */ private String getStrategy() { - String prop = getPropertyFromTable(AbstractColumnSerDe.CASSANDRA_KEYSPACE_STRATEGY); + String prop = getPropertyFromTable(AbstractCassandraSerDe.CASSANDRA_KEYSPACE_STRATEGY); if (prop == null) { return DEFAULT_STRATEGY; } else { @@ -304,13 +301,13 @@ private String getStrategy() { * @return keyspace name */ private String getCassandraKeyspace() { - String tableName = getPropertyFromTable(AbstractColumnSerDe.CASSANDRA_KEYSPACE_NAME); + String tableName = getPropertyFromTable(AbstractCassandraSerDe.CASSANDRA_KEYSPACE_NAME); if (tableName == null) { tableName = tbl.getDbName(); } - tbl.getParameters().put(AbstractColumnSerDe.CASSANDRA_KEYSPACE_NAME, tableName); + tbl.getParameters().put(AbstractCassandraSerDe.CASSANDRA_KEYSPACE_NAME, tableName); return tableName; } @@ -321,13 +318,13 @@ private String getCassandraKeyspace() { * @return cassandra column family name */ private String getCassandraColumnFamily() { - String tableName = getPropertyFromTable(AbstractColumnSerDe.CASSANDRA_CF_NAME); + String tableName = getPropertyFromTable(AbstractCassandraSerDe.CASSANDRA_CF_NAME); if (tableName == null) { tableName = tbl.getTableName(); } - tbl.getParameters().put(AbstractColumnSerDe.CASSANDRA_CF_NAME, tableName); + tbl.getParameters().put(AbstractCassandraSerDe.CASSANDRA_CF_NAME, tableName); return tableName; } diff --git a/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/CassandraProxyClient.java b/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/CassandraProxyClient.java index bc6c110f..98e26c6e 100644 --- a/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/CassandraProxyClient.java +++ b/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/CassandraProxyClient.java @@ -48,8 +48,6 @@ public boolean getValue() { } - ; - private static final Logger logger = Logger.getLogger(CassandraProxyClient.class); /** diff --git a/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/CassandraPushdownPredicate.java b/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/CassandraPushdownPredicate.java index 3b569c55..860510f2 100644 --- a/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/CassandraPushdownPredicate.java +++ b/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/CassandraPushdownPredicate.java @@ -1,37 +1,21 @@ package org.apache.hadoop.hive.cassandra; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; - -import org.apache.cassandra.exceptions.ConfigurationException; +import com.google.common.base.Joiner; +import com.google.common.base.Splitter; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.marshal.TypeParser; +import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.exceptions.SyntaxException; -import org.apache.cassandra.thrift.CfDef; -import org.apache.cassandra.thrift.ColumnDef; -import org.apache.cassandra.thrift.IndexExpression; -import org.apache.cassandra.thrift.IndexOperator; -import org.apache.cassandra.thrift.InvalidRequestException; -import org.apache.cassandra.thrift.KsDef; -import org.apache.cassandra.thrift.NotFoundException; -import org.apache.cassandra.thrift.TBinaryProtocol; +import org.apache.cassandra.thrift.*; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.Hex; -import org.apache.hadoop.hive.cassandra.serde.AbstractColumnSerDe; +import org.apache.hadoop.hive.cassandra.serde.AbstractCassandraSerDe; import org.apache.hadoop.hive.ql.exec.ExprNodeConstantEvaluator; import org.apache.hadoop.hive.ql.index.IndexPredicateAnalyzer; import org.apache.hadoop.hive.ql.index.IndexSearchCondition; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; -import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual; -import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrGreaterThan; -import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrLessThan; -import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPGreaterThan; -import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPLessThan; +import org.apache.hadoop.hive.ql.udf.generic.*; import org.apache.hadoop.hive.serde2.ByteStream; import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef; import org.apache.hadoop.hive.serde2.lazy.LazyCassandraUtils; @@ -43,8 +27,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Joiner; -import com.google.common.base.Splitter; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; public class CassandraPushdownPredicate { @@ -114,7 +102,7 @@ public static String serializeIndexedColumns(Set columns) logger.info("Encoded column def: " + encoded); hexStrings.add(encoded); } - return Joiner.on(AbstractColumnSerDe.DELIMITER).join(hexStrings); + return Joiner.on(AbstractCassandraSerDe.DELIMITER).join(hexStrings); } catch (TException e) { @@ -136,7 +124,7 @@ public static Set deserializeIndexedColumns(String serialized) return columns; } - Iterable strings = Splitter.on(AbstractColumnSerDe.DELIMITER).omitEmptyStrings().trimResults().split(serialized); + Iterable strings = Splitter.on(AbstractCassandraSerDe.DELIMITER).omitEmptyStrings().trimResults().split(serialized); TDeserializer deserializer = new TDeserializer(new TBinaryProtocol.Factory()); for (String encoded : strings) { diff --git a/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/CassandraStorageHandler.java b/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/CassandraStorageHandler.java index 400a06b1..cc59cc34 100644 --- a/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/CassandraStorageHandler.java +++ b/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/CassandraStorageHandler.java @@ -1,18 +1,12 @@ package org.apache.hadoop.hive.cassandra; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.Set; - import org.apache.cassandra.thrift.ColumnDef; import org.apache.cassandra.thrift.KsDef; import org.apache.cassandra.thrift.NotFoundException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.cassandra.input.HiveCassandraStandardColumnInputFormat; import org.apache.hadoop.hive.cassandra.output.HiveCassandraOutputFormat; -import org.apache.hadoop.hive.cassandra.serde.AbstractColumnSerDe; +import org.apache.hadoop.hive.cassandra.serde.AbstractCassandraSerDe; import org.apache.hadoop.hive.cassandra.serde.CassandraColumnSerDe; import org.apache.hadoop.hive.metastore.HiveMetaHook; import org.apache.hadoop.hive.metastore.MetaStoreUtils; @@ -35,6 +29,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.*; + public class CassandraStorageHandler implements HiveStorageHandler, HiveMetaHook, HiveStoragePredicateHandler { @@ -47,163 +43,163 @@ public void configureTableJobProperties(TableDesc tableDesc, Map Properties tableProperties = tableDesc.getProperties(); //Identify Keyspace - String keyspace = tableProperties.getProperty(AbstractColumnSerDe.CASSANDRA_KEYSPACE_NAME); + String keyspace = tableProperties.getProperty(AbstractCassandraSerDe.CASSANDRA_KEYSPACE_NAME); if (keyspace == null) { keyspace = tableProperties.getProperty(Constants.META_TABLE_DB); } - jobProperties.put(AbstractColumnSerDe.CASSANDRA_KEYSPACE_NAME, keyspace); + jobProperties.put(AbstractCassandraSerDe.CASSANDRA_KEYSPACE_NAME, keyspace); //Identify ColumnFamily - String columnFamily = tableProperties.getProperty(AbstractColumnSerDe.CASSANDRA_CF_NAME); + String columnFamily = tableProperties.getProperty(AbstractCassandraSerDe.CASSANDRA_CF_NAME); if (columnFamily == null) { columnFamily = tableProperties.getProperty(Constants.META_TABLE_NAME); } - jobProperties.put(AbstractColumnSerDe.CASSANDRA_CF_NAME, columnFamily); + jobProperties.put(AbstractCassandraSerDe.CASSANDRA_CF_NAME, columnFamily); //If no column mapping has been configured, we should create the default column mapping. - String columnInfo = tableProperties.getProperty(AbstractColumnSerDe.CASSANDRA_COL_MAPPING); + String columnInfo = tableProperties.getProperty(AbstractCassandraSerDe.CASSANDRA_COL_MAPPING); if(columnInfo == null) { - columnInfo = AbstractColumnSerDe.createColumnMappingString( + columnInfo = CassandraColumnSerDe.createColumnMappingString( tableProperties.getProperty(org.apache.hadoop.hive.serde.Constants.LIST_COLUMNS)); } - jobProperties.put(AbstractColumnSerDe.CASSANDRA_COL_MAPPING, columnInfo); + jobProperties.put(AbstractCassandraSerDe.CASSANDRA_COL_MAPPING, columnInfo); - String host = configuration.get(AbstractColumnSerDe.CASSANDRA_HOST); + String host = configuration.get(AbstractCassandraSerDe.CASSANDRA_HOST); if (host == null) { - host = tableProperties.getProperty(AbstractColumnSerDe.CASSANDRA_HOST, AbstractColumnSerDe.DEFAULT_CASSANDRA_HOST); + host = tableProperties.getProperty(AbstractCassandraSerDe.CASSANDRA_HOST, AbstractCassandraSerDe.DEFAULT_CASSANDRA_HOST); } - jobProperties.put(AbstractColumnSerDe.CASSANDRA_HOST, host); + jobProperties.put(AbstractCassandraSerDe.CASSANDRA_HOST, host); - String port = configuration.get(AbstractColumnSerDe.CASSANDRA_PORT); + String port = configuration.get(AbstractCassandraSerDe.CASSANDRA_PORT); if (port== null) { - port = tableProperties.getProperty(AbstractColumnSerDe.CASSANDRA_PORT, AbstractColumnSerDe.DEFAULT_CASSANDRA_PORT); + port = tableProperties.getProperty(AbstractCassandraSerDe.CASSANDRA_PORT, AbstractCassandraSerDe.DEFAULT_CASSANDRA_PORT); } - jobProperties.put(AbstractColumnSerDe.CASSANDRA_PORT, port); + jobProperties.put(AbstractCassandraSerDe.CASSANDRA_PORT, port); - if (configuration.get(AbstractColumnSerDe.CASSANDRA_PARTITIONER) == null) + if (configuration.get(AbstractCassandraSerDe.CASSANDRA_PARTITIONER) == null) { - jobProperties.put(AbstractColumnSerDe.CASSANDRA_PARTITIONER, - tableProperties.getProperty(AbstractColumnSerDe.CASSANDRA_PARTITIONER, + jobProperties.put(AbstractCassandraSerDe.CASSANDRA_PARTITIONER, + tableProperties.getProperty(AbstractCassandraSerDe.CASSANDRA_PARTITIONER, "org.apache.cassandra.dht.Murmur3Partitioner")); } else { - jobProperties.put(AbstractColumnSerDe.CASSANDRA_PARTITIONER,configuration.get(AbstractColumnSerDe.CASSANDRA_PARTITIONER)); + jobProperties.put(AbstractCassandraSerDe.CASSANDRA_PARTITIONER,configuration.get(AbstractCassandraSerDe.CASSANDRA_PARTITIONER)); } - if (configuration.get(AbstractColumnSerDe.CASSANDRA_CONSISTENCY_LEVEL) == null) + if (configuration.get(AbstractCassandraSerDe.CASSANDRA_CONSISTENCY_LEVEL) == null) { - jobProperties.put(AbstractColumnSerDe.CASSANDRA_CONSISTENCY_LEVEL, - tableProperties.getProperty(AbstractColumnSerDe.CASSANDRA_CONSISTENCY_LEVEL, - AbstractColumnSerDe.DEFAULT_CONSISTENCY_LEVEL)); + jobProperties.put(AbstractCassandraSerDe.CASSANDRA_CONSISTENCY_LEVEL, + tableProperties.getProperty(AbstractCassandraSerDe.CASSANDRA_CONSISTENCY_LEVEL, + AbstractCassandraSerDe.DEFAULT_CONSISTENCY_LEVEL)); } else { - jobProperties.put(AbstractColumnSerDe.CASSANDRA_CONSISTENCY_LEVEL,configuration.get(AbstractColumnSerDe.CASSANDRA_CONSISTENCY_LEVEL)); + jobProperties.put(AbstractCassandraSerDe.CASSANDRA_CONSISTENCY_LEVEL,configuration.get(AbstractCassandraSerDe.CASSANDRA_CONSISTENCY_LEVEL)); } - if (configuration.get(AbstractColumnSerDe.CASSANDRA_RANGE_BATCH_SIZE) == null) + if (configuration.get(AbstractCassandraSerDe.CASSANDRA_RANGE_BATCH_SIZE) == null) { - jobProperties.put(AbstractColumnSerDe.CASSANDRA_RANGE_BATCH_SIZE, - tableProperties.getProperty(AbstractColumnSerDe.CASSANDRA_RANGE_BATCH_SIZE, - Integer.toString(AbstractColumnSerDe.DEFAULT_RANGE_BATCH_SIZE))); + jobProperties.put(AbstractCassandraSerDe.CASSANDRA_RANGE_BATCH_SIZE, + tableProperties.getProperty(AbstractCassandraSerDe.CASSANDRA_RANGE_BATCH_SIZE, + Integer.toString(AbstractCassandraSerDe.DEFAULT_RANGE_BATCH_SIZE))); } else { - jobProperties.put(AbstractColumnSerDe.CASSANDRA_RANGE_BATCH_SIZE, configuration.get(AbstractColumnSerDe.CASSANDRA_RANGE_BATCH_SIZE)); + jobProperties.put(AbstractCassandraSerDe.CASSANDRA_RANGE_BATCH_SIZE, configuration.get(AbstractCassandraSerDe.CASSANDRA_RANGE_BATCH_SIZE)); } - if (configuration.get(AbstractColumnSerDe.CASSANDRA_SLICE_PREDICATE_SIZE) == null) + if (configuration.get(AbstractCassandraSerDe.CASSANDRA_SLICE_PREDICATE_SIZE) == null) { - jobProperties.put(AbstractColumnSerDe.CASSANDRA_SLICE_PREDICATE_SIZE, - tableProperties.getProperty(AbstractColumnSerDe.CASSANDRA_SLICE_PREDICATE_SIZE, - Integer.toString(AbstractColumnSerDe.DEFAULT_SLICE_PREDICATE_SIZE))); + jobProperties.put(AbstractCassandraSerDe.CASSANDRA_SLICE_PREDICATE_SIZE, + tableProperties.getProperty(AbstractCassandraSerDe.CASSANDRA_SLICE_PREDICATE_SIZE, + Integer.toString(AbstractCassandraSerDe.DEFAULT_SLICE_PREDICATE_SIZE))); } else { - jobProperties.put(AbstractColumnSerDe.CASSANDRA_SLICE_PREDICATE_SIZE, configuration.get(AbstractColumnSerDe.CASSANDRA_SLICE_PREDICATE_SIZE)); + jobProperties.put(AbstractCassandraSerDe.CASSANDRA_SLICE_PREDICATE_SIZE, configuration.get(AbstractCassandraSerDe.CASSANDRA_SLICE_PREDICATE_SIZE)); } - if (configuration.get(AbstractColumnSerDe.CASSANDRA_SPLIT_SIZE) == null) + if (configuration.get(AbstractCassandraSerDe.CASSANDRA_SPLIT_SIZE) == null) { - jobProperties.put(AbstractColumnSerDe.CASSANDRA_SPLIT_SIZE, - tableProperties.getProperty(AbstractColumnSerDe.CASSANDRA_SPLIT_SIZE, - Integer.toString(AbstractColumnSerDe.DEFAULT_SPLIT_SIZE))); + jobProperties.put(AbstractCassandraSerDe.CASSANDRA_SPLIT_SIZE, + tableProperties.getProperty(AbstractCassandraSerDe.CASSANDRA_SPLIT_SIZE, + Integer.toString(AbstractCassandraSerDe.DEFAULT_SPLIT_SIZE))); } else { - jobProperties.put(AbstractColumnSerDe.CASSANDRA_SPLIT_SIZE, configuration.get(AbstractColumnSerDe.CASSANDRA_SPLIT_SIZE)); + jobProperties.put(AbstractCassandraSerDe.CASSANDRA_SPLIT_SIZE, configuration.get(AbstractCassandraSerDe.CASSANDRA_SPLIT_SIZE)); } - if (configuration.get(AbstractColumnSerDe.CASSANDRA_BATCH_MUTATION_SIZE) == null) + if (configuration.get(AbstractCassandraSerDe.CASSANDRA_BATCH_MUTATION_SIZE) == null) { - jobProperties.put(AbstractColumnSerDe.CASSANDRA_BATCH_MUTATION_SIZE, - tableProperties.getProperty(AbstractColumnSerDe.CASSANDRA_BATCH_MUTATION_SIZE, - Integer.toString(AbstractColumnSerDe.DEFAULT_BATCH_MUTATION_SIZE))); + jobProperties.put(AbstractCassandraSerDe.CASSANDRA_BATCH_MUTATION_SIZE, + tableProperties.getProperty(AbstractCassandraSerDe.CASSANDRA_BATCH_MUTATION_SIZE, + Integer.toString(AbstractCassandraSerDe.DEFAULT_BATCH_MUTATION_SIZE))); } else { - jobProperties.put(AbstractColumnSerDe.CASSANDRA_BATCH_MUTATION_SIZE, configuration.get(AbstractColumnSerDe.CASSANDRA_BATCH_MUTATION_SIZE)); + jobProperties.put(AbstractCassandraSerDe.CASSANDRA_BATCH_MUTATION_SIZE, configuration.get(AbstractCassandraSerDe.CASSANDRA_BATCH_MUTATION_SIZE)); } - if (configuration.get(AbstractColumnSerDe.CASSANDRA_SLICE_PREDICATE_RANGE_START) == null) + if (configuration.get(AbstractCassandraSerDe.CASSANDRA_SLICE_PREDICATE_RANGE_START) == null) { - jobProperties.put(AbstractColumnSerDe.CASSANDRA_SLICE_PREDICATE_RANGE_START, - tableProperties.getProperty(AbstractColumnSerDe.CASSANDRA_SLICE_PREDICATE_RANGE_START, "")); + jobProperties.put(AbstractCassandraSerDe.CASSANDRA_SLICE_PREDICATE_RANGE_START, + tableProperties.getProperty(AbstractCassandraSerDe.CASSANDRA_SLICE_PREDICATE_RANGE_START, "")); } else { - jobProperties.put(AbstractColumnSerDe.CASSANDRA_SLICE_PREDICATE_RANGE_START, configuration.get(AbstractColumnSerDe.CASSANDRA_SLICE_PREDICATE_RANGE_START)); + jobProperties.put(AbstractCassandraSerDe.CASSANDRA_SLICE_PREDICATE_RANGE_START, configuration.get(AbstractCassandraSerDe.CASSANDRA_SLICE_PREDICATE_RANGE_START)); } - if (configuration.get(AbstractColumnSerDe.CASSANDRA_SLICE_PREDICATE_RANGE_FINISH) == null) + if (configuration.get(AbstractCassandraSerDe.CASSANDRA_SLICE_PREDICATE_RANGE_FINISH) == null) { - jobProperties.put(AbstractColumnSerDe.CASSANDRA_SLICE_PREDICATE_RANGE_FINISH, - tableProperties.getProperty(AbstractColumnSerDe.CASSANDRA_SLICE_PREDICATE_RANGE_FINISH, "")); + jobProperties.put(AbstractCassandraSerDe.CASSANDRA_SLICE_PREDICATE_RANGE_FINISH, + tableProperties.getProperty(AbstractCassandraSerDe.CASSANDRA_SLICE_PREDICATE_RANGE_FINISH, "")); } else { - jobProperties.put(AbstractColumnSerDe.CASSANDRA_SLICE_PREDICATE_RANGE_FINISH, configuration.get(AbstractColumnSerDe.CASSANDRA_SLICE_PREDICATE_RANGE_FINISH)); + jobProperties.put(AbstractCassandraSerDe.CASSANDRA_SLICE_PREDICATE_RANGE_FINISH, configuration.get(AbstractCassandraSerDe.CASSANDRA_SLICE_PREDICATE_RANGE_FINISH)); } - if (configuration.get(AbstractColumnSerDe.CASSANDRA_SLICE_PREDICATE_RANGE_COMPARATOR) == null) + if (configuration.get(AbstractCassandraSerDe.CASSANDRA_SLICE_PREDICATE_RANGE_COMPARATOR) == null) { - jobProperties.put(AbstractColumnSerDe.CASSANDRA_SLICE_PREDICATE_RANGE_COMPARATOR, - tableProperties.getProperty(AbstractColumnSerDe.CASSANDRA_SLICE_PREDICATE_RANGE_COMPARATOR, "")); + jobProperties.put(AbstractCassandraSerDe.CASSANDRA_SLICE_PREDICATE_RANGE_COMPARATOR, + tableProperties.getProperty(AbstractCassandraSerDe.CASSANDRA_SLICE_PREDICATE_RANGE_COMPARATOR, "")); } else { - jobProperties.put(AbstractColumnSerDe.CASSANDRA_SLICE_PREDICATE_RANGE_COMPARATOR, - configuration.get(AbstractColumnSerDe.CASSANDRA_SLICE_PREDICATE_RANGE_COMPARATOR)); + jobProperties.put(AbstractCassandraSerDe.CASSANDRA_SLICE_PREDICATE_RANGE_COMPARATOR, + configuration.get(AbstractCassandraSerDe.CASSANDRA_SLICE_PREDICATE_RANGE_COMPARATOR)); } - if (configuration.get(AbstractColumnSerDe.CASSANDRA_SLICE_PREDICATE_RANGE_REVERSED) == null) + if (configuration.get(AbstractCassandraSerDe.CASSANDRA_SLICE_PREDICATE_RANGE_REVERSED) == null) { - jobProperties.put(AbstractColumnSerDe.CASSANDRA_SLICE_PREDICATE_RANGE_REVERSED, - tableProperties.getProperty(AbstractColumnSerDe.CASSANDRA_SLICE_PREDICATE_RANGE_REVERSED, "false")); + jobProperties.put(AbstractCassandraSerDe.CASSANDRA_SLICE_PREDICATE_RANGE_REVERSED, + tableProperties.getProperty(AbstractCassandraSerDe.CASSANDRA_SLICE_PREDICATE_RANGE_REVERSED, "false")); } else { - jobProperties.put(AbstractColumnSerDe.CASSANDRA_SLICE_PREDICATE_RANGE_REVERSED, - configuration.get(AbstractColumnSerDe.CASSANDRA_SLICE_PREDICATE_RANGE_REVERSED)); + jobProperties.put(AbstractCassandraSerDe.CASSANDRA_SLICE_PREDICATE_RANGE_REVERSED, + configuration.get(AbstractCassandraSerDe.CASSANDRA_SLICE_PREDICATE_RANGE_REVERSED)); } //Set the indexed column names - leave unset if we have problems determining them - String indexedColumns = tableProperties.getProperty(AbstractColumnSerDe.CASSANDRA_INDEXED_COLUMNS); + String indexedColumns = tableProperties.getProperty(AbstractCassandraSerDe.CASSANDRA_INDEXED_COLUMNS); if (indexedColumns != null) { - jobProperties.put(AbstractColumnSerDe.CASSANDRA_INDEXED_COLUMNS, indexedColumns); + jobProperties.put(AbstractCassandraSerDe.CASSANDRA_INDEXED_COLUMNS, indexedColumns); } else { try { Set columns = CassandraPushdownPredicate.getIndexedColumns(host, Integer.parseInt(port), keyspace, columnFamily); - jobProperties.put(AbstractColumnSerDe.CASSANDRA_INDEXED_COLUMNS, CassandraPushdownPredicate.serializeIndexedColumns(columns)); + jobProperties.put(AbstractCassandraSerDe.CASSANDRA_INDEXED_COLUMNS, CassandraPushdownPredicate.serializeIndexedColumns(columns)); } catch (CassandraException e) { // this results in the property remaining unset on the Jobconf, so indexes will not be used on the C* side logger.info("Error determining cassandra indexed columns, will not include in JobConf", e); @@ -343,8 +339,8 @@ public void configureOutputJobProperties(TableDesc tableDesc, Map indexedColumns = CassandraPushdownPredicate.getIndexedColumns(host, port, ksName, cfName); diff --git a/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/cql/CqlManager.java b/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/cql/CqlManager.java index 7a1d9076..232b5c75 100644 --- a/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/cql/CqlManager.java +++ b/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/cql/CqlManager.java @@ -5,11 +5,12 @@ import org.apache.hadoop.hive.cassandra.CassandraClientHolder; import org.apache.hadoop.hive.cassandra.CassandraException; import org.apache.hadoop.hive.cassandra.CassandraProxyClient; -import org.apache.hadoop.hive.cassandra.serde.AbstractColumnSerDe; -import org.apache.hadoop.hive.cassandra.serde.cql.AbstractCqlSerDe; +import org.apache.hadoop.hive.cassandra.serde.AbstractCassandraSerDe; +import org.apache.hadoop.hive.cassandra.serde.cql.CqlSerDe; import org.apache.hadoop.hive.metastore.MetaStoreUtils; -import org.apache.hadoop.hive.metastore.api.*; import org.apache.hadoop.hive.metastore.api.Constants; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.Table; import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,22 +62,22 @@ public class CqlManager { public CqlManager(Table tbl) throws MetaException { Map serdeParam = tbl.getSd().getSerdeInfo().getParameters(); - String cassandraHost = serdeParam.get(AbstractColumnSerDe.CASSANDRA_HOST); + String cassandraHost = serdeParam.get(AbstractCassandraSerDe.CASSANDRA_HOST); if (cassandraHost == null) { - cassandraHost = AbstractColumnSerDe.DEFAULT_CASSANDRA_HOST; + cassandraHost = AbstractCassandraSerDe.DEFAULT_CASSANDRA_HOST; } this.host = cassandraHost; - String cassandraPortStr = serdeParam.get(AbstractColumnSerDe.CASSANDRA_PORT); + String cassandraPortStr = serdeParam.get(AbstractCassandraSerDe.CASSANDRA_PORT); if (cassandraPortStr == null) { - cassandraPortStr = AbstractColumnSerDe.DEFAULT_CASSANDRA_PORT; + cassandraPortStr = AbstractCassandraSerDe.DEFAULT_CASSANDRA_PORT; } try { port = Integer.parseInt(cassandraPortStr); } catch (NumberFormatException e) { - throw new MetaException(AbstractColumnSerDe.CASSANDRA_PORT + " must be a number"); + throw new MetaException(AbstractCassandraSerDe.CASSANDRA_PORT + " must be a number"); } this.tbl = tbl; @@ -128,7 +129,7 @@ public boolean doesKeyspaceExist() throws MetaException { public void createKeyspace() throws MetaException { String createKeyspaceQuery = "create keyspace %s WITH replication = { 'class' : %s, %s } AND durable_writes = %s"; - String durableWrites = getPropertyFromTable(AbstractCqlSerDe.DURABLE_WRITES); + String durableWrites = getPropertyFromTable(AbstractCassandraSerDe.DURABLE_WRITES); if(durableWrites == null){ durableWrites = "true"; } @@ -144,8 +145,8 @@ public void createKeyspace() throws MetaException { } public String getStrategyOptions() throws MetaException { - String replicationFactor = getPropertyFromTable(AbstractCqlSerDe.CASSANDRA_KEYSPACE_REPFACTOR); - String strategyOptions = getPropertyFromTable(AbstractCqlSerDe.CASSANDRA_KEYSPACE_STRATEGY_OPTIONS); + String replicationFactor = getPropertyFromTable(AbstractCassandraSerDe.CASSANDRA_KEYSPACE_REPFACTOR); + String strategyOptions = getPropertyFromTable(AbstractCassandraSerDe.CASSANDRA_KEYSPACE_STRATEGY_OPTIONS); if(replicationFactor != null) { if(strategyOptions != null){ throw new MetaException("Unable to create keyspace '" + keyspace + "' Specify only one of 'cassandra.ks.repfactor' or 'cassandra.ks.stratOptions'"); @@ -238,7 +239,7 @@ public void createColumnFamily() throws MetaException { queryBuilder.append(hiveTypeToCqlType.get(columnTypes[i])); queryBuilder.append(","); } - String keyStr = getPropertyFromTable(AbstractCqlSerDe.CASSANDRA_COLUMN_FAMILY_PRIMARY_KEY); + String keyStr = getPropertyFromTable(CqlSerDe.CASSANDRA_COLUMN_FAMILY_PRIMARY_KEY); if(keyStr == null || keyStr.isEmpty()) { keyStr = columnNames[0]; } @@ -289,15 +290,15 @@ public void createColumnFamily() throws MetaException { private Map constructTableOptions(){ Map options = new HashMap(); - addIfNotEmpty(AbstractCqlSerDe.COLUMN_FAMILY_COMMENT, options, true); - addIfNotEmpty(AbstractCqlSerDe.READ_REPAIR_CHANCE, options, false); - addIfNotEmpty(AbstractCqlSerDe.DCLOCAL_READ_REPAIR_CHANCE, options, false); - addIfNotEmpty(AbstractCqlSerDe.GC_GRACE_SECONDS, options, false); - addIfNotEmpty(AbstractCqlSerDe.BLOOM_FILTER_FP_CHANCE, options, false); - addIfNotEmpty(AbstractCqlSerDe.COMPACTION, options, false); - addIfNotEmpty(AbstractCqlSerDe.COMPRESSION, options, false); - addIfNotEmpty(AbstractCqlSerDe.REPLICATE_ON_WRITE, options, false); - addIfNotEmpty(AbstractCqlSerDe.CACHING, options, false); + addIfNotEmpty(AbstractCassandraSerDe.COLUMN_FAMILY_COMMENT, options, true); + addIfNotEmpty(AbstractCassandraSerDe.READ_REPAIR_CHANCE, options, false); + addIfNotEmpty(AbstractCassandraSerDe.DCLOCAL_READ_REPAIR_CHANCE, options, false); + addIfNotEmpty(AbstractCassandraSerDe.GC_GRACE_SECONDS, options, false); + addIfNotEmpty(AbstractCassandraSerDe.BLOOM_FILTER_FP_CHANCE, options, false); + addIfNotEmpty(AbstractCassandraSerDe.COMPACTION, options, false); + addIfNotEmpty(AbstractCassandraSerDe.COMPRESSION, options, false); + addIfNotEmpty(AbstractCassandraSerDe.REPLICATE_ON_WRITE, options, false); + addIfNotEmpty(AbstractCassandraSerDe.CACHING, options, false); return options; } @@ -319,14 +320,14 @@ private void addIfNotEmpty(String property, Map options, boolean * @throws MetaException error */ private int getReplicationFactor() throws MetaException { - String prop = getPropertyFromTable(AbstractColumnSerDe.CASSANDRA_KEYSPACE_REPFACTOR); + String prop = getPropertyFromTable(AbstractCassandraSerDe.CASSANDRA_KEYSPACE_REPFACTOR); if (prop == null) { return DEFAULT_REPLICATION_FACTOR; } else { try { return Integer.parseInt(prop); } catch (NumberFormatException e) { - throw new MetaException(AbstractColumnSerDe.CASSANDRA_KEYSPACE_REPFACTOR + " must be a number"); + throw new MetaException(AbstractCassandraSerDe.CASSANDRA_KEYSPACE_REPFACTOR + " must be a number"); } } } @@ -337,7 +338,7 @@ private int getReplicationFactor() throws MetaException { * @return strategy */ private String getStrategy() { - String prop = getPropertyFromTable(AbstractCqlSerDe.CASSANDRA_KEYSPACE_STRATEGY); + String prop = getPropertyFromTable(AbstractCassandraSerDe.CASSANDRA_KEYSPACE_STRATEGY); if (prop == null) { return DEFAULT_STRATEGY; } else { @@ -351,13 +352,13 @@ private String getStrategy() { * @return keyspace name */ private String getCassandraKeyspace() { - String tableName = getPropertyFromTable(AbstractColumnSerDe.CASSANDRA_KEYSPACE_NAME); + String tableName = getPropertyFromTable(AbstractCassandraSerDe.CASSANDRA_KEYSPACE_NAME); if (tableName == null) { tableName = tbl.getDbName(); } - tbl.getParameters().put(AbstractColumnSerDe.CASSANDRA_KEYSPACE_NAME, tableName); + tbl.getParameters().put(AbstractCassandraSerDe.CASSANDRA_KEYSPACE_NAME, tableName); return tableName; } @@ -368,13 +369,13 @@ private String getCassandraKeyspace() { * @return cassandra column family name */ private String getCassandraColumnFamily() { - String tableName = getPropertyFromTable(AbstractColumnSerDe.CASSANDRA_CF_NAME); + String tableName = getPropertyFromTable(AbstractCassandraSerDe.CASSANDRA_CF_NAME); if (tableName == null) { tableName = tbl.getTableName(); } - tbl.getParameters().put(AbstractColumnSerDe.CASSANDRA_CF_NAME, tableName); + tbl.getParameters().put(AbstractCassandraSerDe.CASSANDRA_CF_NAME, tableName); return tableName; } diff --git a/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/cql/CqlPushdownPredicate.java b/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/cql/CqlPushdownPredicate.java index ad3185cf..36079b13 100644 --- a/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/cql/CqlPushdownPredicate.java +++ b/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/cql/CqlPushdownPredicate.java @@ -1,16 +1,10 @@ package org.apache.hadoop.hive.cassandra.cql; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.charset.CharacterCodingException; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; - -import org.apache.cassandra.exceptions.ConfigurationException; +import com.google.common.base.Joiner; +import com.google.common.base.Splitter; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.marshal.TypeParser; +import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.exceptions.SyntaxException; import org.apache.cassandra.thrift.*; import org.apache.cassandra.utils.ByteBufferUtil; @@ -18,17 +12,13 @@ import org.apache.hadoop.hive.cassandra.CassandraClientHolder; import org.apache.hadoop.hive.cassandra.CassandraException; import org.apache.hadoop.hive.cassandra.CassandraProxyClient; -import org.apache.hadoop.hive.cassandra.serde.AbstractColumnSerDe; +import org.apache.hadoop.hive.cassandra.serde.AbstractCassandraSerDe; import org.apache.hadoop.hive.ql.exec.ExprNodeConstantEvaluator; import org.apache.hadoop.hive.ql.index.IndexPredicateAnalyzer; import org.apache.hadoop.hive.ql.index.IndexSearchCondition; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; -import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual; -import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrGreaterThan; -import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrLessThan; -import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPGreaterThan; -import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPLessThan; +import org.apache.hadoop.hive.ql.udf.generic.*; import org.apache.hadoop.hive.serde2.ByteStream; import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef; import org.apache.hadoop.hive.serde2.lazy.LazyCassandraUtils; @@ -40,8 +30,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Joiner; -import com.google.common.base.Splitter; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.CharacterCodingException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; public class CqlPushdownPredicate { @@ -114,7 +109,7 @@ public static String serializeIndexedColumns(Set columns) { logger.info("Encoded column def: " + encoded); hexStrings.add(encoded); } - return Joiner.on(AbstractColumnSerDe.DELIMITER).join(hexStrings); + return Joiner.on(AbstractCassandraSerDe.DELIMITER).join(hexStrings); } catch (TException e) { throw new RuntimeException(e); } @@ -133,7 +128,7 @@ public static Set deserializeIndexedColumns(String serialized) { return columns; } - Iterable strings = Splitter.on(AbstractColumnSerDe.DELIMITER).omitEmptyStrings().trimResults().split(serialized); + Iterable strings = Splitter.on(AbstractCassandraSerDe.DELIMITER).omitEmptyStrings().trimResults().split(serialized); TDeserializer deserializer = new TDeserializer(new TBinaryProtocol.Factory()); for (String encoded : strings) { ColumnDef column = new ColumnDef(); diff --git a/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/cql/CqlStorageHandler.java b/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/cql/CqlStorageHandler.java index 274d10fe..f74b425c 100644 --- a/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/cql/CqlStorageHandler.java +++ b/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/cql/CqlStorageHandler.java @@ -6,7 +6,7 @@ import org.apache.hadoop.hive.cassandra.CassandraManager; import org.apache.hadoop.hive.cassandra.input.cql.HiveCqlInputFormat; import org.apache.hadoop.hive.cassandra.output.cql.HiveCqlOutputFormat; -import org.apache.hadoop.hive.cassandra.serde.cql.AbstractCqlSerDe; +import org.apache.hadoop.hive.cassandra.serde.AbstractCassandraSerDe; import org.apache.hadoop.hive.cassandra.serde.cql.CqlSerDe; import org.apache.hadoop.hive.metastore.HiveMetaHook; import org.apache.hadoop.hive.metastore.MetaStoreUtils; @@ -43,129 +43,129 @@ public void configureTableJobProperties(TableDesc tableDesc, Map Properties tableProperties = tableDesc.getProperties(); //Identify Keyspace - String keyspace = tableProperties.getProperty(AbstractCqlSerDe.CASSANDRA_KEYSPACE_NAME); + String keyspace = tableProperties.getProperty(AbstractCassandraSerDe.CASSANDRA_KEYSPACE_NAME); if (keyspace == null) { keyspace = tableProperties.getProperty(Constants.META_TABLE_DB); } - jobProperties.put(AbstractCqlSerDe.CASSANDRA_KEYSPACE_NAME, keyspace); + jobProperties.put(AbstractCassandraSerDe.CASSANDRA_KEYSPACE_NAME, keyspace); //Identify ColumnFamily - String columnFamily = tableProperties.getProperty(AbstractCqlSerDe.CASSANDRA_CF_NAME); + String columnFamily = tableProperties.getProperty(AbstractCassandraSerDe.CASSANDRA_CF_NAME); if (columnFamily == null) { columnFamily = tableProperties.getProperty(Constants.META_TABLE_NAME); } - jobProperties.put(AbstractCqlSerDe.CASSANDRA_CF_NAME, columnFamily); + jobProperties.put(AbstractCassandraSerDe.CASSANDRA_CF_NAME, columnFamily); //If no column mapping has been configured, we should create the default column mapping. - String columnInfo = tableProperties.getProperty(AbstractCqlSerDe.CASSANDRA_COL_MAPPING); + String columnInfo = tableProperties.getProperty(AbstractCassandraSerDe.CASSANDRA_COL_MAPPING); if (columnInfo == null) { - columnInfo = AbstractCqlSerDe.createColumnMappingString( + columnInfo = CqlSerDe.createColumnMappingString( tableProperties.getProperty(org.apache.hadoop.hive.serde.Constants.LIST_COLUMNS)); } - jobProperties.put(AbstractCqlSerDe.CASSANDRA_COL_MAPPING, columnInfo); + jobProperties.put(AbstractCassandraSerDe.CASSANDRA_COL_MAPPING, columnInfo); - String host = configuration.get(AbstractCqlSerDe.CASSANDRA_HOST); + String host = configuration.get(AbstractCassandraSerDe.CASSANDRA_HOST); if (host == null) { - host = tableProperties.getProperty(AbstractCqlSerDe.CASSANDRA_HOST, AbstractCqlSerDe.DEFAULT_CASSANDRA_HOST); + host = tableProperties.getProperty(AbstractCassandraSerDe.CASSANDRA_HOST, AbstractCassandraSerDe.DEFAULT_CASSANDRA_HOST); } - jobProperties.put(AbstractCqlSerDe.CASSANDRA_HOST, host); + jobProperties.put(AbstractCassandraSerDe.CASSANDRA_HOST, host); - String port = configuration.get(AbstractCqlSerDe.CASSANDRA_PORT); + String port = configuration.get(AbstractCassandraSerDe.CASSANDRA_PORT); if (port == null) { - port = tableProperties.getProperty(AbstractCqlSerDe.CASSANDRA_PORT, AbstractCqlSerDe.DEFAULT_CASSANDRA_PORT); + port = tableProperties.getProperty(AbstractCassandraSerDe.CASSANDRA_PORT, AbstractCassandraSerDe.DEFAULT_CASSANDRA_PORT); } - jobProperties.put(AbstractCqlSerDe.CASSANDRA_PORT, port); + jobProperties.put(AbstractCassandraSerDe.CASSANDRA_PORT, port); - if (configuration.get(AbstractCqlSerDe.CASSANDRA_PARTITIONER) == null) { - jobProperties.put(AbstractCqlSerDe.CASSANDRA_PARTITIONER, - tableProperties.getProperty(AbstractCqlSerDe.CASSANDRA_PARTITIONER, + if (configuration.get(AbstractCassandraSerDe.CASSANDRA_PARTITIONER) == null) { + jobProperties.put(AbstractCassandraSerDe.CASSANDRA_PARTITIONER, + tableProperties.getProperty(AbstractCassandraSerDe.CASSANDRA_PARTITIONER, "org.apache.cassandra.dht.Murmur3Partitioner")); } else { - jobProperties.put(AbstractCqlSerDe.CASSANDRA_PARTITIONER, configuration.get(AbstractCqlSerDe.CASSANDRA_PARTITIONER)); + jobProperties.put(AbstractCassandraSerDe.CASSANDRA_PARTITIONER, configuration.get(AbstractCassandraSerDe.CASSANDRA_PARTITIONER)); } - if (configuration.get(AbstractCqlSerDe.CASSANDRA_CONSISTENCY_LEVEL) == null) { - jobProperties.put(AbstractCqlSerDe.CASSANDRA_CONSISTENCY_LEVEL, - tableProperties.getProperty(AbstractCqlSerDe.CASSANDRA_CONSISTENCY_LEVEL, - AbstractCqlSerDe.DEFAULT_CONSISTENCY_LEVEL)); + if (configuration.get(AbstractCassandraSerDe.CASSANDRA_CONSISTENCY_LEVEL) == null) { + jobProperties.put(AbstractCassandraSerDe.CASSANDRA_CONSISTENCY_LEVEL, + tableProperties.getProperty(AbstractCassandraSerDe.CASSANDRA_CONSISTENCY_LEVEL, + AbstractCassandraSerDe.DEFAULT_CONSISTENCY_LEVEL)); } else { - jobProperties.put(AbstractCqlSerDe.CASSANDRA_CONSISTENCY_LEVEL, configuration.get(AbstractCqlSerDe.CASSANDRA_CONSISTENCY_LEVEL)); + jobProperties.put(AbstractCassandraSerDe.CASSANDRA_CONSISTENCY_LEVEL, configuration.get(AbstractCassandraSerDe.CASSANDRA_CONSISTENCY_LEVEL)); } - if (configuration.get(AbstractCqlSerDe.CASSANDRA_RANGE_BATCH_SIZE) == null) { - jobProperties.put(AbstractCqlSerDe.CASSANDRA_RANGE_BATCH_SIZE, - tableProperties.getProperty(AbstractCqlSerDe.CASSANDRA_RANGE_BATCH_SIZE, - Integer.toString(AbstractCqlSerDe.DEFAULT_RANGE_BATCH_SIZE))); + if (configuration.get(AbstractCassandraSerDe.CASSANDRA_RANGE_BATCH_SIZE) == null) { + jobProperties.put(AbstractCassandraSerDe.CASSANDRA_RANGE_BATCH_SIZE, + tableProperties.getProperty(AbstractCassandraSerDe.CASSANDRA_RANGE_BATCH_SIZE, + Integer.toString(AbstractCassandraSerDe.DEFAULT_RANGE_BATCH_SIZE))); } else { - jobProperties.put(AbstractCqlSerDe.CASSANDRA_RANGE_BATCH_SIZE, configuration.get(AbstractCqlSerDe.CASSANDRA_RANGE_BATCH_SIZE)); + jobProperties.put(AbstractCassandraSerDe.CASSANDRA_RANGE_BATCH_SIZE, configuration.get(AbstractCassandraSerDe.CASSANDRA_RANGE_BATCH_SIZE)); } - if (configuration.get(AbstractCqlSerDe.CASSANDRA_SLICE_PREDICATE_SIZE) == null) { - jobProperties.put(AbstractCqlSerDe.CASSANDRA_SLICE_PREDICATE_SIZE, - tableProperties.getProperty(AbstractCqlSerDe.CASSANDRA_SLICE_PREDICATE_SIZE, - Integer.toString(AbstractCqlSerDe.DEFAULT_SLICE_PREDICATE_SIZE))); + if (configuration.get(AbstractCassandraSerDe.CASSANDRA_SLICE_PREDICATE_SIZE) == null) { + jobProperties.put(AbstractCassandraSerDe.CASSANDRA_SLICE_PREDICATE_SIZE, + tableProperties.getProperty(AbstractCassandraSerDe.CASSANDRA_SLICE_PREDICATE_SIZE, + Integer.toString(AbstractCassandraSerDe.DEFAULT_SLICE_PREDICATE_SIZE))); } else { - jobProperties.put(AbstractCqlSerDe.CASSANDRA_SLICE_PREDICATE_SIZE, configuration.get(AbstractCqlSerDe.CASSANDRA_SLICE_PREDICATE_SIZE)); + jobProperties.put(AbstractCassandraSerDe.CASSANDRA_SLICE_PREDICATE_SIZE, configuration.get(AbstractCassandraSerDe.CASSANDRA_SLICE_PREDICATE_SIZE)); } - if (configuration.get(AbstractCqlSerDe.CASSANDRA_SPLIT_SIZE) == null) { - jobProperties.put(AbstractCqlSerDe.CASSANDRA_SPLIT_SIZE, - tableProperties.getProperty(AbstractCqlSerDe.CASSANDRA_SPLIT_SIZE, - Integer.toString(AbstractCqlSerDe.DEFAULT_SPLIT_SIZE))); + if (configuration.get(AbstractCassandraSerDe.CASSANDRA_SPLIT_SIZE) == null) { + jobProperties.put(AbstractCassandraSerDe.CASSANDRA_SPLIT_SIZE, + tableProperties.getProperty(AbstractCassandraSerDe.CASSANDRA_SPLIT_SIZE, + Integer.toString(AbstractCassandraSerDe.DEFAULT_SPLIT_SIZE))); } else { - jobProperties.put(AbstractCqlSerDe.CASSANDRA_SPLIT_SIZE, configuration.get(AbstractCqlSerDe.CASSANDRA_SPLIT_SIZE)); + jobProperties.put(AbstractCassandraSerDe.CASSANDRA_SPLIT_SIZE, configuration.get(AbstractCassandraSerDe.CASSANDRA_SPLIT_SIZE)); } - if (configuration.get(AbstractCqlSerDe.CASSANDRA_BATCH_MUTATION_SIZE) == null) { - jobProperties.put(AbstractCqlSerDe.CASSANDRA_BATCH_MUTATION_SIZE, - tableProperties.getProperty(AbstractCqlSerDe.CASSANDRA_BATCH_MUTATION_SIZE, - Integer.toString(AbstractCqlSerDe.DEFAULT_BATCH_MUTATION_SIZE))); + if (configuration.get(AbstractCassandraSerDe.CASSANDRA_BATCH_MUTATION_SIZE) == null) { + jobProperties.put(AbstractCassandraSerDe.CASSANDRA_BATCH_MUTATION_SIZE, + tableProperties.getProperty(AbstractCassandraSerDe.CASSANDRA_BATCH_MUTATION_SIZE, + Integer.toString(AbstractCassandraSerDe.DEFAULT_BATCH_MUTATION_SIZE))); } else { - jobProperties.put(AbstractCqlSerDe.CASSANDRA_BATCH_MUTATION_SIZE, configuration.get(AbstractCqlSerDe.CASSANDRA_BATCH_MUTATION_SIZE)); + jobProperties.put(AbstractCassandraSerDe.CASSANDRA_BATCH_MUTATION_SIZE, configuration.get(AbstractCassandraSerDe.CASSANDRA_BATCH_MUTATION_SIZE)); } - if (configuration.get(AbstractCqlSerDe.CASSANDRA_SLICE_PREDICATE_RANGE_START) == null) { - jobProperties.put(AbstractCqlSerDe.CASSANDRA_SLICE_PREDICATE_RANGE_START, - tableProperties.getProperty(AbstractCqlSerDe.CASSANDRA_SLICE_PREDICATE_RANGE_START, "")); + if (configuration.get(AbstractCassandraSerDe.CASSANDRA_SLICE_PREDICATE_RANGE_START) == null) { + jobProperties.put(AbstractCassandraSerDe.CASSANDRA_SLICE_PREDICATE_RANGE_START, + tableProperties.getProperty(AbstractCassandraSerDe.CASSANDRA_SLICE_PREDICATE_RANGE_START, "")); } else { - jobProperties.put(AbstractCqlSerDe.CASSANDRA_SLICE_PREDICATE_RANGE_START, configuration.get(AbstractCqlSerDe.CASSANDRA_SLICE_PREDICATE_RANGE_START)); + jobProperties.put(AbstractCassandraSerDe.CASSANDRA_SLICE_PREDICATE_RANGE_START, configuration.get(AbstractCassandraSerDe.CASSANDRA_SLICE_PREDICATE_RANGE_START)); } - if (configuration.get(AbstractCqlSerDe.CASSANDRA_SLICE_PREDICATE_RANGE_FINISH) == null) { - jobProperties.put(AbstractCqlSerDe.CASSANDRA_SLICE_PREDICATE_RANGE_FINISH, - tableProperties.getProperty(AbstractCqlSerDe.CASSANDRA_SLICE_PREDICATE_RANGE_FINISH, "")); + if (configuration.get(AbstractCassandraSerDe.CASSANDRA_SLICE_PREDICATE_RANGE_FINISH) == null) { + jobProperties.put(AbstractCassandraSerDe.CASSANDRA_SLICE_PREDICATE_RANGE_FINISH, + tableProperties.getProperty(AbstractCassandraSerDe.CASSANDRA_SLICE_PREDICATE_RANGE_FINISH, "")); } else { - jobProperties.put(AbstractCqlSerDe.CASSANDRA_SLICE_PREDICATE_RANGE_FINISH, configuration.get(AbstractCqlSerDe.CASSANDRA_SLICE_PREDICATE_RANGE_FINISH)); + jobProperties.put(AbstractCassandraSerDe.CASSANDRA_SLICE_PREDICATE_RANGE_FINISH, configuration.get(AbstractCassandraSerDe.CASSANDRA_SLICE_PREDICATE_RANGE_FINISH)); } - if (configuration.get(AbstractCqlSerDe.CASSANDRA_SLICE_PREDICATE_RANGE_COMPARATOR) == null) { - jobProperties.put(AbstractCqlSerDe.CASSANDRA_SLICE_PREDICATE_RANGE_COMPARATOR, - tableProperties.getProperty(AbstractCqlSerDe.CASSANDRA_SLICE_PREDICATE_RANGE_COMPARATOR, "")); + if (configuration.get(AbstractCassandraSerDe.CASSANDRA_SLICE_PREDICATE_RANGE_COMPARATOR) == null) { + jobProperties.put(AbstractCassandraSerDe.CASSANDRA_SLICE_PREDICATE_RANGE_COMPARATOR, + tableProperties.getProperty(AbstractCassandraSerDe.CASSANDRA_SLICE_PREDICATE_RANGE_COMPARATOR, "")); } else { - jobProperties.put(AbstractCqlSerDe.CASSANDRA_SLICE_PREDICATE_RANGE_COMPARATOR, - configuration.get(AbstractCqlSerDe.CASSANDRA_SLICE_PREDICATE_RANGE_COMPARATOR)); + jobProperties.put(AbstractCassandraSerDe.CASSANDRA_SLICE_PREDICATE_RANGE_COMPARATOR, + configuration.get(AbstractCassandraSerDe.CASSANDRA_SLICE_PREDICATE_RANGE_COMPARATOR)); } - if (configuration.get(AbstractCqlSerDe.CASSANDRA_SLICE_PREDICATE_RANGE_REVERSED) == null) { - jobProperties.put(AbstractCqlSerDe.CASSANDRA_SLICE_PREDICATE_RANGE_REVERSED, - tableProperties.getProperty(AbstractCqlSerDe.CASSANDRA_SLICE_PREDICATE_RANGE_REVERSED, "false")); + if (configuration.get(AbstractCassandraSerDe.CASSANDRA_SLICE_PREDICATE_RANGE_REVERSED) == null) { + jobProperties.put(AbstractCassandraSerDe.CASSANDRA_SLICE_PREDICATE_RANGE_REVERSED, + tableProperties.getProperty(AbstractCassandraSerDe.CASSANDRA_SLICE_PREDICATE_RANGE_REVERSED, "false")); } else { - jobProperties.put(AbstractCqlSerDe.CASSANDRA_SLICE_PREDICATE_RANGE_REVERSED, - configuration.get(AbstractCqlSerDe.CASSANDRA_SLICE_PREDICATE_RANGE_REVERSED)); + jobProperties.put(AbstractCassandraSerDe.CASSANDRA_SLICE_PREDICATE_RANGE_REVERSED, + configuration.get(AbstractCassandraSerDe.CASSANDRA_SLICE_PREDICATE_RANGE_REVERSED)); } //Set the indexed column names - leave unset if we have problems determining them - String indexedColumns = tableProperties.getProperty(AbstractCqlSerDe.CASSANDRA_INDEXED_COLUMNS); + String indexedColumns = tableProperties.getProperty(AbstractCassandraSerDe.CASSANDRA_INDEXED_COLUMNS); if (indexedColumns != null) { - jobProperties.put(AbstractCqlSerDe.CASSANDRA_INDEXED_COLUMNS, indexedColumns); + jobProperties.put(AbstractCassandraSerDe.CASSANDRA_INDEXED_COLUMNS, indexedColumns); } else { try { Set columns = CqlPushdownPredicate.getIndexedColumns(host, Integer.parseInt(port), keyspace, columnFamily); - jobProperties.put(AbstractCqlSerDe.CASSANDRA_INDEXED_COLUMNS, CqlPushdownPredicate.serializeIndexedColumns(columns)); + jobProperties.put(AbstractCassandraSerDe.CASSANDRA_INDEXED_COLUMNS, CqlPushdownPredicate.serializeIndexedColumns(columns)); } catch (CassandraException e) { // this results in the property remaining unset on the Jobconf, so indexes will not be used on the C* side logger.info("Error determining cassandra indexed columns, will not include in JobConf", e); @@ -305,8 +305,8 @@ public void configureOutputJobProperties(TableDesc tableDesc, Map indexedColumns = CqlPushdownPredicate.getIndexedColumns(host, port, ksName, cfName); diff --git a/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/input/CassandraHiveRecordReader.java b/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/input/CassandraHiveRecordReader.java index d939e254..47cd51fd 100644 --- a/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/input/CassandraHiveRecordReader.java +++ b/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/input/CassandraHiveRecordReader.java @@ -1,27 +1,27 @@ package org.apache.hadoop.hive.cassandra.input; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.Iterator; -import java.util.Map; -import java.util.SortedMap; - import org.apache.cassandra.db.IColumn; import org.apache.cassandra.db.SuperColumn; import org.apache.cassandra.hadoop.ColumnFamilyRecordReader; import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hive.cassandra.serde.AbstractColumnSerDe; +import org.apache.hadoop.hive.cassandra.serde.CassandraColumnSerDe; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Iterator; +import java.util.Map; +import java.util.SortedMap; public class CassandraHiveRecordReader extends RecordReader implements org.apache.hadoop.mapred.RecordReader { - static final Log LOG = LogFactory.getLog(CassandraHiveRecordReader.class); + static final Logger LOG = LoggerFactory.getLogger(CassandraHiveRecordReader.class); private final boolean isTransposed; private final ColumnFamilyRecordReader cfrr; @@ -31,10 +31,10 @@ public class CassandraHiveRecordReader extends RecordReader implements org.apache.hadoop.mapred.InputFormat { - static final Log LOG = LogFactory.getLog(HiveCassandraStandardColumnInputFormat.class); + static final Logger LOG = LoggerFactory.getLogger(HiveCassandraStandardColumnInputFormat.class); private boolean isTransposed; private final ColumnFamilyInputFormat cfif = new ColumnFamilyInputFormat(); @@ -59,8 +55,8 @@ public RecordReader getRecordReader(InputSplit split JobConf jobConf, final Reporter reporter) throws IOException { HiveCassandraStandardSplit cassandraSplit = (HiveCassandraStandardSplit) split; - List columns = AbstractColumnSerDe.parseColumnMapping(cassandraSplit.getColumnMapping()); - isTransposed = AbstractColumnSerDe.isTransposed(columns); + List columns = CassandraColumnSerDe.parseColumnMapping(cassandraSplit.getColumnMapping()); + isTransposed = CassandraColumnSerDe.isTransposed(columns); List readColIDs = ColumnProjectionUtils.getReadColumnIDs(jobConf); @@ -85,7 +81,7 @@ public void progress() { SliceRange range = new SliceRange(); AbstractType comparator = BytesType.instance; - String comparatorType = jobConf.get(AbstractColumnSerDe.CASSANDRA_SLICE_PREDICATE_RANGE_COMPARATOR); + String comparatorType = jobConf.get(AbstractCassandraSerDe.CASSANDRA_SLICE_PREDICATE_RANGE_COMPARATOR); if (comparatorType != null && !comparatorType.equals("")) { try { comparator = TypeParser.parse(comparatorType); @@ -96,9 +92,9 @@ public void progress() { } } - String sliceStart = jobConf.get(AbstractColumnSerDe.CASSANDRA_SLICE_PREDICATE_RANGE_START); - String sliceEnd = jobConf.get(AbstractColumnSerDe.CASSANDRA_SLICE_PREDICATE_RANGE_FINISH); - String reversed = jobConf.get(AbstractColumnSerDe.CASSANDRA_SLICE_PREDICATE_RANGE_REVERSED); + String sliceStart = jobConf.get(AbstractCassandraSerDe.CASSANDRA_SLICE_PREDICATE_RANGE_START); + String sliceEnd = jobConf.get(AbstractCassandraSerDe.CASSANDRA_SLICE_PREDICATE_RANGE_FINISH); + String reversed = jobConf.get(AbstractCassandraSerDe.CASSANDRA_SLICE_PREDICATE_RANGE_REVERSED); range.setStart(comparator.fromString(sliceStart == null ? "" : sliceStart)); range.setFinish(comparator.fromString(sliceEnd == null ? "" : sliceEnd)); @@ -106,7 +102,7 @@ public void progress() { range.setCount(cassandraSplit.getSlicePredicateSize()); predicate.setSlice_range(range); } else { - int iKey = columns.indexOf(AbstractColumnSerDe.CASSANDRA_KEY_COLUMN); + int iKey = columns.indexOf(CassandraColumnSerDe.CASSANDRA_KEY_COLUMN); predicate.setColumn_names(getColumnNames(iKey, columns, readColIDs)); } @@ -114,7 +110,7 @@ public void progress() { try { boolean wideRows = false; - if (isTransposed && tac.getConfiguration().getBoolean(AbstractColumnSerDe.CASSANDRA_ENABLE_WIDEROW_ITERATOR, true)) { + if (isTransposed && tac.getConfiguration().getBoolean(CassandraColumnSerDe.CASSANDRA_ENABLE_WIDEROW_ITERATOR, true)) { wideRows = true; } @@ -150,20 +146,20 @@ public void progress() { @Override public InputSplit[] getSplits(JobConf jobConf, int numSplits) throws IOException { - String ks = jobConf.get(AbstractColumnSerDe.CASSANDRA_KEYSPACE_NAME); - String cf = jobConf.get(AbstractColumnSerDe.CASSANDRA_CF_NAME); - int slicePredicateSize = jobConf.getInt(AbstractColumnSerDe.CASSANDRA_SLICE_PREDICATE_SIZE, - AbstractColumnSerDe.DEFAULT_SLICE_PREDICATE_SIZE); + String ks = jobConf.get(AbstractCassandraSerDe.CASSANDRA_KEYSPACE_NAME); + String cf = jobConf.get(AbstractCassandraSerDe.CASSANDRA_CF_NAME); + int slicePredicateSize = jobConf.getInt(AbstractCassandraSerDe.CASSANDRA_SLICE_PREDICATE_SIZE, + AbstractCassandraSerDe.DEFAULT_SLICE_PREDICATE_SIZE); int sliceRangeSize = jobConf.getInt( - AbstractColumnSerDe.CASSANDRA_RANGE_BATCH_SIZE, - AbstractColumnSerDe.DEFAULT_RANGE_BATCH_SIZE); + AbstractCassandraSerDe.CASSANDRA_RANGE_BATCH_SIZE, + AbstractCassandraSerDe.DEFAULT_RANGE_BATCH_SIZE); int splitSize = jobConf.getInt( - AbstractColumnSerDe.CASSANDRA_SPLIT_SIZE, - AbstractColumnSerDe.DEFAULT_SPLIT_SIZE); - String cassandraColumnMapping = jobConf.get(AbstractColumnSerDe.CASSANDRA_COL_MAPPING); - int rpcPort = jobConf.getInt(AbstractColumnSerDe.CASSANDRA_PORT, 9160); - String host = jobConf.get(AbstractColumnSerDe.CASSANDRA_HOST); - String partitioner = jobConf.get(AbstractColumnSerDe.CASSANDRA_PARTITIONER); + AbstractCassandraSerDe.CASSANDRA_SPLIT_SIZE, + AbstractCassandraSerDe.DEFAULT_SPLIT_SIZE); + String cassandraColumnMapping = jobConf.get(AbstractCassandraSerDe.CASSANDRA_COL_MAPPING); + int rpcPort = jobConf.getInt(AbstractCassandraSerDe.CASSANDRA_PORT, 9160); + String host = jobConf.get(AbstractCassandraSerDe.CASSANDRA_HOST); + String partitioner = jobConf.get(AbstractCassandraSerDe.CASSANDRA_PARTITIONER); if (cassandraColumnMapping == null) { throw new IOException("cassandra.columns.mapping required for Cassandra Table."); @@ -269,7 +265,7 @@ private List parseFilterPredicate(JobConf jobConf) throws IOExc } ExprNodeDesc filterExpr = Utilities.deserializeExpression(filterExprSerialized, jobConf); - String encodedIndexedColumns = jobConf.get(AbstractColumnSerDe.CASSANDRA_INDEXED_COLUMNS); + String encodedIndexedColumns = jobConf.get(AbstractCassandraSerDe.CASSANDRA_INDEXED_COLUMNS); Set indexedColumns = CassandraPushdownPredicate.deserializeIndexedColumns(encodedIndexedColumns); if (indexedColumns.isEmpty()) { return null; diff --git a/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/input/LazyCassandraRow.java b/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/input/LazyCassandraRow.java index 939bac29..fa4b1aaf 100644 --- a/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/input/LazyCassandraRow.java +++ b/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/input/LazyCassandraRow.java @@ -1,11 +1,5 @@ package org.apache.hadoop.hive.cassandra.input; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.cassandra.serde.CassandraLazyFactory; import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef; import org.apache.hadoop.hive.serde2.lazy.LazyObject; @@ -16,9 +10,15 @@ import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.MapWritable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; public class LazyCassandraRow extends LazyStruct { - static final Log LOG = LogFactory.getLog(LazyCassandraRow.class); + static final Logger LOG = LoggerFactory.getLogger(LazyCassandraRow.class); private List cassandraColumns; private List cassandraColumnsBB; diff --git a/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/input/cql/CqlHiveRecordReader.java b/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/input/cql/CqlHiveRecordReader.java index 0423488e..a114bdb5 100644 --- a/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/input/cql/CqlHiveRecordReader.java +++ b/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/input/cql/CqlHiveRecordReader.java @@ -2,15 +2,14 @@ import org.apache.cassandra.hadoop.cql3.CqlPagingRecordReader; import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hive.cassandra.serde.cql.AbstractCqlSerDe; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.nio.ByteBuffer; @@ -20,7 +19,7 @@ public class CqlHiveRecordReader extends RecordReader implements org.apache.hadoop.mapred.RecordReader { - static final Log LOG = LogFactory.getLog(CqlHiveRecordReader.class); + static final Logger LOG = LoggerFactory.getLogger(CqlHiveRecordReader.class); //private final boolean isTransposed; private final CqlPagingRecordReader cfrr; @@ -30,12 +29,6 @@ public class CqlHiveRecordReader extends RecordReader private MapWritable currentKey = null; private final MapWritable currentValue = new MapWritable(); - public static final BytesWritable keyColumn = new BytesWritable(AbstractCqlSerDe.CASSANDRA_KEY_COLUMN.getBytes()); - public static final BytesWritable columnColumn = new BytesWritable(AbstractCqlSerDe.CASSANDRA_COLUMN_COLUMN.getBytes()); - public static final BytesWritable subColumnColumn = new BytesWritable(AbstractCqlSerDe.CASSANDRA_SUBCOLUMN_COLUMN.getBytes()); - public static final BytesWritable valueColumn = new BytesWritable(AbstractCqlSerDe.CASSANDRA_VALUE_COLUMN.getBytes()); - - public CqlHiveRecordReader(CqlPagingRecordReader cprr) { //, boolean isTransposed) { this.cfrr = cprr; //this.isTransposed = isTransposed; diff --git a/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/input/cql/HiveCqlInputFormat.java b/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/input/cql/HiveCqlInputFormat.java index d35ba2c1..ff3979c6 100644 --- a/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/input/cql/HiveCqlInputFormat.java +++ b/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/input/cql/HiveCqlInputFormat.java @@ -1,10 +1,5 @@ package org.apache.hadoop.hive.cassandra.input.cql; -import org.apache.cassandra.db.marshal.AbstractType; -import org.apache.cassandra.db.marshal.BytesType; -import org.apache.cassandra.db.marshal.TypeParser; -import org.apache.cassandra.exceptions.ConfigurationException; -import org.apache.cassandra.exceptions.SyntaxException; import org.apache.cassandra.hadoop.ColumnFamilySplit; import org.apache.cassandra.hadoop.ConfigHelper; import org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat; @@ -14,13 +9,12 @@ import org.apache.cassandra.thrift.SlicePredicate; import org.apache.cassandra.thrift.SliceRange; import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.cassandra.CassandraPushdownPredicate; import org.apache.hadoop.hive.cassandra.input.HiveCassandraStandardSplit; -import org.apache.hadoop.hive.cassandra.serde.cql.AbstractCqlSerDe; +import org.apache.hadoop.hive.cassandra.serde.AbstractCassandraSerDe; import org.apache.hadoop.hive.cassandra.serde.CassandraColumnSerDe; +import org.apache.hadoop.hive.cassandra.serde.cql.CqlSerDe; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.index.IndexPredicateAnalyzer; import org.apache.hadoop.hive.ql.index.IndexSearchCondition; @@ -34,6 +28,8 @@ import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.nio.ByteBuffer; @@ -45,7 +41,7 @@ public class HiveCqlInputFormat extends InputFormat implements org.apache.hadoop.mapred.InputFormat { - static final Log LOG = LogFactory.getLog(HiveCqlInputFormat.class); + static final Logger LOG = LoggerFactory.getLogger(HiveCqlInputFormat.class); private final CqlPagingInputFormat cfif = new CqlPagingInputFormat(); @@ -54,7 +50,7 @@ public RecordReader getRecordReader(InputSplit split, JobConf jobConf, final Reporter reporter) throws IOException { HiveCassandraStandardSplit cassandraSplit = (HiveCassandraStandardSplit) split; - List columns = AbstractCqlSerDe.parseColumnMapping(cassandraSplit.getColumnMapping()); + List columns = CqlSerDe.parseColumnMapping(cassandraSplit.getColumnMapping()); List readColIDs = ColumnProjectionUtils.getReadColumnIDs(jobConf); @@ -75,10 +71,7 @@ public void progress() { SlicePredicate predicate = new SlicePredicate(); - - int iKey = columns.indexOf(AbstractCqlSerDe.CASSANDRA_KEY_COLUMN); - predicate.setColumn_names(getColumnNames(iKey, columns, readColIDs)); - + predicate.setColumn_names(getColumnNames(columns, readColIDs)); try { @@ -116,20 +109,20 @@ public void progress() { @Override public InputSplit[] getSplits(JobConf jobConf, int numSplits) throws IOException { - String ks = jobConf.get(AbstractCqlSerDe.CASSANDRA_KEYSPACE_NAME); - String cf = jobConf.get(AbstractCqlSerDe.CASSANDRA_CF_NAME); - int slicePredicateSize = jobConf.getInt(AbstractCqlSerDe.CASSANDRA_SLICE_PREDICATE_SIZE, - AbstractCqlSerDe.DEFAULT_SLICE_PREDICATE_SIZE); + String ks = jobConf.get(AbstractCassandraSerDe.CASSANDRA_KEYSPACE_NAME); + String cf = jobConf.get(AbstractCassandraSerDe.CASSANDRA_CF_NAME); + int slicePredicateSize = jobConf.getInt(AbstractCassandraSerDe.CASSANDRA_SLICE_PREDICATE_SIZE, + AbstractCassandraSerDe.DEFAULT_SLICE_PREDICATE_SIZE); int sliceRangeSize = jobConf.getInt( - AbstractCqlSerDe.CASSANDRA_RANGE_BATCH_SIZE, - AbstractCqlSerDe.DEFAULT_RANGE_BATCH_SIZE); + AbstractCassandraSerDe.CASSANDRA_RANGE_BATCH_SIZE, + AbstractCassandraSerDe.DEFAULT_RANGE_BATCH_SIZE); int splitSize = jobConf.getInt( - AbstractCqlSerDe.CASSANDRA_SPLIT_SIZE, - AbstractCqlSerDe.DEFAULT_SPLIT_SIZE); - String cassandraColumnMapping = jobConf.get(AbstractCqlSerDe.CASSANDRA_COL_MAPPING); - int rpcPort = jobConf.getInt(AbstractCqlSerDe.CASSANDRA_PORT, 9160); - String host = jobConf.get(AbstractCqlSerDe.CASSANDRA_HOST); - String partitioner = jobConf.get(AbstractCqlSerDe.CASSANDRA_PARTITIONER); + AbstractCassandraSerDe.CASSANDRA_SPLIT_SIZE, + AbstractCassandraSerDe.DEFAULT_SPLIT_SIZE); + String cassandraColumnMapping = jobConf.get(AbstractCassandraSerDe.CASSANDRA_COL_MAPPING); + int rpcPort = jobConf.getInt(AbstractCassandraSerDe.CASSANDRA_PORT, 9160); + String host = jobConf.get(AbstractCassandraSerDe.CASSANDRA_HOST); + String partitioner = jobConf.get(AbstractCassandraSerDe.CASSANDRA_PARTITIONER); if (cassandraColumnMapping == null) { throw new IOException("cassandra.columns.mapping required for Cassandra Table."); @@ -180,20 +173,17 @@ public InputSplit[] getSplits(JobConf jobConf, int numSplits) throws IOException * column mapping * should be skipped. * - * @param iKey the index of the key defined in the column mappping * @param columns column mapping * @param readColIDs column names to read from cassandra */ - private List getColumnNames(int iKey, List columns, List readColIDs) { + private List getColumnNames(List columns, List readColIDs) { List results = new ArrayList(); int maxSize = columns.size(); for (Integer i : readColIDs) { assert (i < maxSize); - if (i != iKey) { results.add(ByteBufferUtil.bytes(columns.get(i.intValue()))); - } } return results; @@ -235,7 +225,7 @@ private List parseFilterPredicate(JobConf jobConf) throws IOExc } ExprNodeDesc filterExpr = Utilities.deserializeExpression(filterExprSerialized, jobConf); - String encodedIndexedColumns = jobConf.get(AbstractCqlSerDe.CASSANDRA_INDEXED_COLUMNS); + String encodedIndexedColumns = jobConf.get(AbstractCassandraSerDe.CASSANDRA_INDEXED_COLUMNS); Set indexedColumns = CassandraPushdownPredicate.deserializeIndexedColumns(encodedIndexedColumns); if (indexedColumns.isEmpty()) { return null; diff --git a/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/input/cql/LazyCqlCellMap.java b/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/input/cql/LazyCqlCellMap.java index 0e808302..9d9996d2 100644 --- a/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/input/cql/LazyCqlCellMap.java +++ b/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/input/cql/LazyCqlCellMap.java @@ -1,14 +1,14 @@ package org.apache.hadoop.hive.cassandra.input.cql; -import java.util.LinkedHashMap; -import java.util.Map; - import org.apache.cassandra.hadoop.ColumnFamilyRecordReader; import org.apache.hadoop.hive.serde2.lazy.LazyMap; import org.apache.hadoop.hive.serde2.lazy.LazyObject; import org.apache.hadoop.hive.serde2.lazy.LazyPrimitive; import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazyMapObjectInspector; +import java.util.LinkedHashMap; +import java.util.Map; + public class LazyCqlCellMap extends LazyMap { private ColumnFamilyRecordReader rowResult; diff --git a/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/input/cql/LazyCqlRow.java b/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/input/cql/LazyCqlRow.java index 34107b01..45a5a10e 100644 --- a/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/input/cql/LazyCqlRow.java +++ b/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/input/cql/LazyCqlRow.java @@ -1,7 +1,5 @@ package org.apache.hadoop.hive.cassandra.input.cql; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.cassandra.serde.CassandraLazyFactory; import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef; import org.apache.hadoop.hive.serde2.lazy.LazyObject; @@ -13,17 +11,15 @@ import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import java.io.DataOutputStream; -import java.io.IOException; -import java.io.UnsupportedEncodingException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; public class LazyCqlRow extends LazyStruct { - static final Log LOG = LogFactory.getLog(LazyCqlRow.class); + static final Logger LOG = LoggerFactory.getLogger(LazyCqlRow.class); private List cassandraColumns; private List cassandraColumnsBB; diff --git a/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/output/CassandraAbstractPut.java b/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/output/CassandraAbstractPut.java index 4b2a1e7d..6feba6d5 100644 --- a/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/output/CassandraAbstractPut.java +++ b/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/output/CassandraAbstractPut.java @@ -1,20 +1,16 @@ package org.apache.hadoop.hive.cassandra.output; +import org.apache.cassandra.thrift.*; +import org.apache.hadoop.hive.cassandra.CassandraProxyClient; +import org.apache.hadoop.hive.cassandra.serde.AbstractCassandraSerDe; +import org.apache.hadoop.mapred.JobConf; +import org.apache.thrift.TException; + import java.io.IOException; import java.nio.ByteBuffer; import java.util.List; import java.util.Map; -import org.apache.cassandra.thrift.ConsistencyLevel; -import org.apache.cassandra.thrift.InvalidRequestException; -import org.apache.cassandra.thrift.Mutation; -import org.apache.cassandra.thrift.TimedOutException; -import org.apache.cassandra.thrift.UnavailableException; -import org.apache.hadoop.hive.cassandra.CassandraProxyClient; -import org.apache.hadoop.hive.cassandra.serde.AbstractColumnSerDe; -import org.apache.hadoop.mapred.JobConf; -import org.apache.thrift.TException; - public abstract class CassandraAbstractPut implements Put { /** @@ -25,8 +21,8 @@ public abstract class CassandraAbstractPut implements Put { */ protected int getBatchMutationSize(JobConf jc) { return jc.getInt( - AbstractColumnSerDe.CASSANDRA_BATCH_MUTATION_SIZE, - AbstractColumnSerDe.DEFAULT_BATCH_MUTATION_SIZE); + AbstractCassandraSerDe.CASSANDRA_BATCH_MUTATION_SIZE, + AbstractCassandraSerDe.DEFAULT_BATCH_MUTATION_SIZE); } /** @@ -37,8 +33,8 @@ protected int getBatchMutationSize(JobConf jc) { * @return cassandra consistency level */ protected static ConsistencyLevel getConsistencyLevel(JobConf jc) { - String consistencyLevel = jc.get(AbstractColumnSerDe.CASSANDRA_CONSISTENCY_LEVEL, - AbstractColumnSerDe.DEFAULT_CONSISTENCY_LEVEL); + String consistencyLevel = jc.get(AbstractCassandraSerDe.CASSANDRA_CONSISTENCY_LEVEL, + AbstractCassandraSerDe.DEFAULT_CONSISTENCY_LEVEL); ConsistencyLevel level = null; try { level = ConsistencyLevel.valueOf(consistencyLevel); diff --git a/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/output/HiveCassandraOutputFormat.java b/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/output/HiveCassandraOutputFormat.java index 51b19766..625ffcac 100644 --- a/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/output/HiveCassandraOutputFormat.java +++ b/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/output/HiveCassandraOutputFormat.java @@ -1,15 +1,10 @@ package org.apache.hadoop.hive.cassandra.output; -import java.io.IOException; -import java.util.Properties; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.cassandra.CassandraException; import org.apache.hadoop.hive.cassandra.CassandraProxyClient; -import org.apache.hadoop.hive.cassandra.serde.AbstractColumnSerDe; +import org.apache.hadoop.hive.cassandra.serde.AbstractCassandraSerDe; import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter; import org.apache.hadoop.hive.ql.io.HiveOutputFormat; import org.apache.hadoop.io.Text; @@ -17,21 +12,26 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputFormat; import org.apache.hadoop.util.Progressable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Properties; @SuppressWarnings("deprecation") public class HiveCassandraOutputFormat implements HiveOutputFormat, OutputFormat { - static final Log LOG = LogFactory.getLog(HiveCassandraOutputFormat.class); + static final Logger LOG = LoggerFactory.getLogger(HiveCassandraOutputFormat.class); @Override public RecordWriter getHiveRecordWriter(final JobConf jc, Path finalOutPath, Class valueClass, boolean isCompressed, Properties tableProperties, Progressable progress) throws IOException { - final String cassandraKeySpace = jc.get(AbstractColumnSerDe.CASSANDRA_KEYSPACE_NAME); - final String cassandraHost = jc.get(AbstractColumnSerDe.CASSANDRA_HOST); - final int cassandraPort = Integer.parseInt(jc.get(AbstractColumnSerDe.CASSANDRA_PORT)); + final String cassandraKeySpace = jc.get(AbstractCassandraSerDe.CASSANDRA_KEYSPACE_NAME); + final String cassandraHost = jc.get(AbstractCassandraSerDe.CASSANDRA_HOST); + final int cassandraPort = Integer.parseInt(jc.get(AbstractCassandraSerDe.CASSANDRA_PORT)); final CassandraProxyClient client; try { diff --git a/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/output/cql/CqlPut.java b/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/output/cql/CqlPut.java index 14baec3e..894fd7b8 100644 --- a/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/output/cql/CqlPut.java +++ b/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/output/cql/CqlPut.java @@ -1,20 +1,22 @@ package org.apache.hadoop.hive.cassandra.output.cql; -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.*; - import org.apache.cassandra.thrift.*; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.hadoop.hive.cassandra.CassandraProxyClient; import org.apache.hadoop.hive.cassandra.output.CassandraAbstractPut; -import org.apache.hadoop.hive.cassandra.serde.cql.AbstractCqlSerDe; +import org.apache.hadoop.hive.cassandra.serde.AbstractCassandraSerDe; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.JobConf; import org.apache.thrift.TException; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + /** * This represents a standard column family. It implements hadoop Writable interface. */ @@ -95,7 +97,7 @@ public void write(String keySpace, CassandraProxyClient client, JobConf jc) thro StringBuilder valuesBuilder = new StringBuilder(" VALUES ("); StringBuilder queryBuilder = new StringBuilder("INSERT INTO "); - queryBuilder.append(jc.get(AbstractCqlSerDe.CASSANDRA_CF_NAME)); + queryBuilder.append(jc.get(AbstractCassandraSerDe.CASSANDRA_CF_NAME)); queryBuilder.append("("); Iterator iter = columns.iterator(); while (iter.hasNext()){ @@ -116,6 +118,7 @@ public void write(String keySpace, CassandraProxyClient client, JobConf jc) thro try { //tODO check compression + client.getProxyConnection().set_keyspace(keySpace); CqlPreparedResult result = client.getProxyConnection().prepare_cql3_query(ByteBufferUtil.bytes(queryBuilder.toString()), Compression.NONE); client.getProxyConnection().execute_prepared_cql3_query(result.itemId, values, flevel); } catch (InvalidRequestException e) { diff --git a/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/output/cql/HiveCqlOutputFormat.java b/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/output/cql/HiveCqlOutputFormat.java index 8b0e3c94..006ec2d5 100644 --- a/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/output/cql/HiveCqlOutputFormat.java +++ b/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/output/cql/HiveCqlOutputFormat.java @@ -1,16 +1,11 @@ package org.apache.hadoop.hive.cassandra.output.cql; -import java.io.IOException; -import java.util.Properties; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.cassandra.CassandraException; import org.apache.hadoop.hive.cassandra.CassandraProxyClient; import org.apache.hadoop.hive.cassandra.output.Put; -import org.apache.hadoop.hive.cassandra.serde.AbstractColumnSerDe; +import org.apache.hadoop.hive.cassandra.serde.AbstractCassandraSerDe; import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter; import org.apache.hadoop.hive.ql.io.HiveOutputFormat; import org.apache.hadoop.io.Text; @@ -18,21 +13,26 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputFormat; import org.apache.hadoop.util.Progressable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Properties; @SuppressWarnings("deprecation") public class HiveCqlOutputFormat implements HiveOutputFormat, OutputFormat { - static final Log LOG = LogFactory.getLog(HiveCqlOutputFormat.class); + static final Logger LOG = LoggerFactory.getLogger(HiveCqlOutputFormat.class); @Override public RecordWriter getHiveRecordWriter(final JobConf jc, Path finalOutPath, Class valueClass, boolean isCompressed, Properties tableProperties, Progressable progress) throws IOException { - final String cassandraKeySpace = jc.get(AbstractColumnSerDe.CASSANDRA_KEYSPACE_NAME); - final String cassandraHost = jc.get(AbstractColumnSerDe.CASSANDRA_HOST); - final int cassandraPort = Integer.parseInt(jc.get(AbstractColumnSerDe.CASSANDRA_PORT)); + final String cassandraKeySpace = jc.get(AbstractCassandraSerDe.CASSANDRA_KEYSPACE_NAME); + final String cassandraHost = jc.get(AbstractCassandraSerDe.CASSANDRA_HOST); + final int cassandraPort = Integer.parseInt(jc.get(AbstractCassandraSerDe.CASSANDRA_PORT)); final CassandraProxyClient client; try { diff --git a/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/serde/AbstractCassandraSerDe.java b/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/serde/AbstractCassandraSerDe.java new file mode 100644 index 00000000..ac3b037a --- /dev/null +++ b/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/serde/AbstractCassandraSerDe.java @@ -0,0 +1,227 @@ +package org.apache.hadoop.hive.cassandra.serde; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.SerDeStats; +import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; +import java.util.Properties; + +public abstract class AbstractCassandraSerDe implements SerDe{ + + public static final Logger LOG = LoggerFactory.getLogger(AbstractCassandraSerDe.class); + + public static final String CASSANDRA_KEYSPACE_NAME = "cassandra.ks.name"; // keyspace + public static final String CASSANDRA_KEYSPACE_REPFACTOR = "cassandra.ks.repfactor"; //keyspace replication factor + public static final String CASSANDRA_KEYSPACE_STRATEGY = "cassandra.ks.strategy"; //keyspace replica placement strategy + public static final String CASSANDRA_KEYSPACE_STRATEGY_OPTIONS = "cassandra.ks.stratOptions"; + public static final String DURABLE_WRITES = "durable.writes"; + + public static final String CASSANDRA_CF_NAME = "cassandra.cf.name"; // column family + public static final String CASSANDRA_RANGE_BATCH_SIZE = "cassandra.range.size"; + public static final String CASSANDRA_SLICE_PREDICATE_SIZE = "cassandra.slice.predicate.size"; + public static final String CASSANDRA_SPLIT_SIZE = "cassandra.input.split.size"; + public static final String CASSANDRA_HOST = "cassandra.host"; // initialHost + public static final String CASSANDRA_PORT = "cassandra.port"; // rcpPort + public static final String CASSANDRA_PARTITIONER = "cassandra.partitioner"; // partitioner + public static final String CASSANDRA_COL_MAPPING = "cassandra.columns.mapping"; + public static final String CASSANDRA_INDEXED_COLUMNS = "cassandra.indexed.columns"; + + public static final String CASSANDRA_BATCH_MUTATION_SIZE = "cassandra.batchmutate.size"; + public static final String CASSANDRA_SLICE_PREDICATE_COLUMN_NAMES = "cassandra.slice.predicate.column_names"; + public static final String CASSANDRA_SLICE_PREDICATE_RANGE_START = "cassandra.slice.predicate.range.start"; + public static final String CASSANDRA_SLICE_PREDICATE_RANGE_FINISH = "cassandra.slice.predicate.range.finish"; + public static final String CASSANDRA_SLICE_PREDICATE_RANGE_COMPARATOR = "cassandra.slice.predicate.range.comparator"; + public static final String CASSANDRA_SLICE_PREDICATE_RANGE_REVERSED = "cassandra.slice.predicate.range.reversed"; + public static final String CASSANDRA_SLICE_PREDICATE_RANGE_COUNT = "cassandra.slice.predicate.range.count"; + + public static final String COLUMN_FAMILY_COMMENT = "comment"; + public static final String READ_REPAIR_CHANCE = "read_repair_chance"; + public static final String DCLOCAL_READ_REPAIR_CHANCE = "dclocal_read_repair_chance"; + public static final String GC_GRACE_SECONDS = "gc_grace_seconds"; + public static final String BLOOM_FILTER_FP_CHANCE = "bloom_filter_fp_chance"; + public static final String COMPACTION = "compaction"; + public static final String COMPRESSION = "compression"; + public static final String REPLICATE_ON_WRITE = "replicate_on_write"; + public static final String CACHING = "caching"; + + public static final String CASSANDRA_CONSISTENCY_LEVEL = "cassandra.consistency.level"; + public static final String CASSANDRA_THRIFT_MODE = "cassandra.thrift.mode"; + + public static final int DEFAULT_SPLIT_SIZE = 64 * 1024; + public static final int DEFAULT_RANGE_BATCH_SIZE = 1000; + public static final int DEFAULT_SLICE_PREDICATE_SIZE = 1000; + public static final String DEFAULT_CASSANDRA_HOST = "localhost"; + public static final String DEFAULT_CASSANDRA_PORT = "9160"; + public static final String DEFAULT_CONSISTENCY_LEVEL = "ONE"; + public static final int DEFAULT_BATCH_MUTATION_SIZE = 500; + public static final String DELIMITER = ","; + + /* names of columns from SerdeParameters */ + protected List cassandraColumnNames; + + protected TableMapping mapping; + + protected ObjectInspector cachedObjectInspector; + protected LazySimpleSerDe.SerDeParameters serdeParams; + protected String cassandraKeyspace; + protected String cassandraColumnFamily; + protected List cassandraColumnNamesText; + + protected abstract void initCassandraSerDeParameters(Configuration job, Properties tbl, String serdeName) + throws SerDeException; + + /** + * Create the object inspector. + * + * @return object inspector + */ + public abstract ObjectInspector createObjectInspector(); + + /* + * Turns obj (a Hive Row) into a cassandra data format. + */ + @Override + public Writable serialize(Object obj, ObjectInspector objInspector) throws SerDeException { + if (objInspector.getCategory() != ObjectInspector.Category.STRUCT) { + throw new SerDeException(getClass().toString() + + " can only serialize struct types, but we got: " + + objInspector.getTypeName()); + } + // Prepare the field ObjectInspectors + StructObjectInspector soi = (StructObjectInspector) objInspector; + List fields = soi.getAllStructFieldRefs(); + List list = soi.getStructFieldsDataAsList(obj); + List declaredFields = + (serdeParams.getRowTypeInfo() != null && + ((StructTypeInfo) serdeParams.getRowTypeInfo()) + .getAllStructFieldNames().size() > 0) ? + ((StructObjectInspector) getObjectInspector()).getAllStructFieldRefs() + : null; + try { + return mapping.getWritable(fields, list, declaredFields); + } catch (IOException e) { + throw new SerDeException("Unable to serialize this object! " + e); + } + } + + /** + * @see org.apache.hadoop.hive.serde2.Deserializer#deserialize(org.apache.hadoop.io.Writable) + * Turns a Cassandra row into a Hive row. + */ + public abstract Object deserialize(Writable w) throws SerDeException; + + /** + * Parse cassandra keyspace from table properties. + * + * @param tbl table properties + * @return cassandra keyspace + * @throws org.apache.hadoop.hive.serde2.SerDeException + * error parsing keyspace + */ + protected String parseCassandraKeyspace(Properties tbl) throws SerDeException { + String result = tbl.getProperty(CASSANDRA_KEYSPACE_NAME); + + if (result == null) { + + result = tbl + .getProperty(org.apache.hadoop.hive.metastore.api.Constants.META_TABLE_NAME); + + if (result == null) { + throw new SerDeException("CassandraKeyspace not defined" + tbl.toString()); + } + + if (result.indexOf(".") != -1) { + result = result.substring(0, result.indexOf(".")); + } + } + + return result; + } + + /** + * Parse cassandra column family name from table properties. + * + * @param tbl table properties + * @return cassandra column family name + * @throws org.apache.hadoop.hive.serde2.SerDeException + * error parsing column family name + */ + protected String parseCassandraColumnFamily(Properties tbl) throws SerDeException { + String result = tbl.getProperty(CASSANDRA_CF_NAME); + + if (result == null) { + + result = tbl + .getProperty(org.apache.hadoop.hive.metastore.api.Constants.META_TABLE_NAME); + + if (result == null) { + throw new SerDeException("CassandraColumnFamily not defined" + tbl.toString()); + } + + if (result.indexOf(".") != -1) { + result = result.substring(result.indexOf(".") + 1); + } + } + + return result; + } + + @Override + public ObjectInspector getObjectInspector() throws SerDeException { + return cachedObjectInspector; + } + + /** + * Set the table mapping. We only support transposed mapping and regular table mapping for now. + * + * @throws org.apache.hadoop.hive.serde2.SerDeException + * + */ + protected abstract void setTableMapping() throws SerDeException; + + /** + * Trim the white spaces, new lines from the input array. + * + * @param input a input string array + * @return a trimmed string array + */ + protected static String[] trim(String[] input) { + String[] trimmed = new String[input.length]; + for (int i = 0; i < input.length; i++) { + trimmed[i] = input[i].trim(); + } + + return trimmed; + } + + @Override + public SerDeStats getSerDeStats() { + return null; + } + + /** + * @return the name of the cassandra keyspace as parsed from table properties + */ + public String getCassandraKeyspace(){ + return cassandraKeyspace; + } + + /** + * @return the name of the cassandra columnfamily as parsed from table properties + */ + public String getCassandraColumnFamily(){ + return cassandraColumnFamily; + } +} diff --git a/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/serde/AbstractColumnSerDe.java b/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/serde/AbstractColumnSerDe.java deleted file mode 100644 index e9ebb190..00000000 --- a/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/serde/AbstractColumnSerDe.java +++ /dev/null @@ -1,469 +0,0 @@ -package org.apache.hadoop.hive.cassandra.serde; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Properties; - -import org.apache.commons.lang.StringUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.cassandra.input.LazyCassandraRow; -import org.apache.hadoop.hive.cassandra.output.CassandraPut; -import org.apache.hadoop.hive.serde.Constants; -import org.apache.hadoop.hive.serde2.SerDe; -import org.apache.hadoop.hive.serde2.SerDeException; -import org.apache.hadoop.hive.serde2.SerDeStats; -import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.SerDeParameters; -import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; -import org.apache.hadoop.hive.serde2.objectinspector.StructField; -import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; -import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.MapWritable; -import org.apache.hadoop.io.Writable; - -public abstract class AbstractColumnSerDe implements SerDe { - - public static final Log LOG = LogFactory.getLog(AbstractColumnSerDe.class.getName()); - - public static final String CASSANDRA_KEYSPACE_NAME = "cassandra.ks.name"; // keyspace - public static final String CASSANDRA_KEYSPACE_REPFACTOR = "cassandra.ks.repfactor"; //keyspace replication factor - public static final String CASSANDRA_KEYSPACE_STRATEGY = "cassandra.ks.strategy"; //keyspace replica placement strategy - - public static final String CASSANDRA_CF_NAME = "cassandra.cf.name"; // column family - public static final String CASSANDRA_CF_COUNTERS = "cassandra.cf.counters"; // flag this as a counter CF - public static final String CASSANDRA_RANGE_BATCH_SIZE = "cassandra.range.size"; - public static final String CASSANDRA_SLICE_PREDICATE_SIZE = "cassandra.slice.predicate.size"; - public static final String CASSANDRA_SPLIT_SIZE = "cassandra.input.split.size"; - public static final String CASSANDRA_HOST = "cassandra.host"; // initialHost - public static final String CASSANDRA_PORT = "cassandra.port"; // rcpPort - public static final String CASSANDRA_PARTITIONER = "cassandra.partitioner"; // partitioner - public static final String CASSANDRA_COL_MAPPING = "cassandra.columns.mapping"; - public static final String CASSANDRA_INDEXED_COLUMNS = "cassandra.indexed.columns"; - public static final String CASSANDRA_BATCH_MUTATION_SIZE = "cassandra.batchmutate.size"; - public static final String CASSANDRA_SLICE_PREDICATE_COLUMN_NAMES = "cassandra.slice.predicate.column_names"; - public static final String CASSANDRA_SLICE_PREDICATE_RANGE_START = "cassandra.slice.predicate.range.start"; - public static final String CASSANDRA_SLICE_PREDICATE_RANGE_FINISH = "cassandra.slice.predicate.range.finish"; - public static final String CASSANDRA_SLICE_PREDICATE_RANGE_COMPARATOR = "cassandra.slice.predicate.range.comparator"; - public static final String CASSANDRA_SLICE_PREDICATE_RANGE_REVERSED = "cassandra.slice.predicate.range.reversed"; - public static final String CASSANDRA_SLICE_PREDICATE_RANGE_COUNT = "cassandra.slice.predicate.range.count"; - public static final String CASSANDRA_ENABLE_WIDEROW_ITERATOR = "cassandra.enable.widerow.iterator"; - - public static final String CASSANDRA_SPECIAL_COLUMN_KEY = "row_key"; - public static final String CASSANDRA_SPECIAL_COLUMN_COL = "column_name"; - public static final String CASSANDRA_SPECIAL_COLUMN_SCOL= "sub_column_name"; - public static final String CASSANDRA_SPECIAL_COLUMN_VAL = "value"; - - public static final String CASSANDRA_KEY_COLUMN = ":key"; - public static final String CASSANDRA_COLUMN_COLUMN = ":column"; - public static final String CASSANDRA_SUBCOLUMN_COLUMN = ":subcolumn"; - public static final String CASSANDRA_VALUE_COLUMN = ":value"; - - public static final String CASSANDRA_CONSISTENCY_LEVEL = "cassandra.consistency.level"; - public static final String CASSANDRA_THRIFT_MODE = "cassandra.thrift.mode"; - - public static final int DEFAULT_SPLIT_SIZE = 64 * 1024; - public static final int DEFAULT_RANGE_BATCH_SIZE = 1000; - public static final int DEFAULT_SLICE_PREDICATE_SIZE = 1000; - public static final String DEFAULT_CASSANDRA_HOST = "localhost"; - public static final String DEFAULT_CASSANDRA_PORT = "9160"; - public static final String DEFAULT_CONSISTENCY_LEVEL = "ONE"; - public static final int DEFAULT_BATCH_MUTATION_SIZE = 500; - public static final String DELIMITER = ","; - - /* names of columns from SerdeParameters */ - protected List cassandraColumnNames; - /* index of key column in results */ - protected int iKey; - protected TableMapping mapping; - - protected ObjectInspector cachedObjectInspector; - protected SerDeParameters serdeParams; - protected LazyCassandraRow cachedCassandraRow; - protected String cassandraKeyspace; - protected String cassandraColumnFamily; - protected List cassandraColumnNamesBytes; - - @Override - public void initialize(Configuration conf, Properties tbl) throws SerDeException { - initCassandraSerDeParameters(conf, tbl, getClass().getName()); - cachedObjectInspector = createObjectInspector(); - - cachedCassandraRow = new LazyCassandraRow( - (LazySimpleStructObjectInspector) cachedObjectInspector); - - if (LOG.isDebugEnabled()) { - LOG.debug("CassandraSerDe initialized with : columnNames = " - + StringUtils.join(serdeParams.getColumnNames(), ",") - + " columnTypes = " - + StringUtils.join(serdeParams.getColumnTypes(), ",") - + " cassandraColumnMapping = " - + cassandraColumnNames); - } - - } - - /** - * Create the object inspector. - * - * @return object inspector - */ - protected abstract ObjectInspector createObjectInspector(); - - /* - * - * @see org.apache.hadoop.hive.serde2.Deserializer#deserialize(org.apache.hadoop.io.Writable) - * Turns a Cassandra row into a Hive row. - */ - @Override - public Object deserialize(Writable w) throws SerDeException { - if (!(w instanceof MapWritable)) { - throw new SerDeException(getClass().getName() + ": expects MapWritable not "+w.getClass().getName()); - } - - MapWritable columnMap = (MapWritable) w; - cachedCassandraRow.init(columnMap, cassandraColumnNames, cassandraColumnNamesBytes); - return cachedCassandraRow; - } - - @Override - public ObjectInspector getObjectInspector() throws SerDeException { - return cachedObjectInspector; - } - - @Override - public Class getSerializedClass() { - return CassandraPut.class; - } - - /* - * Turns obj (a Hive Row) into a cassandra data format. - */ - @Override - public Writable serialize(Object obj, ObjectInspector objInspector) throws SerDeException { - if (objInspector.getCategory() != Category.STRUCT) { - throw new SerDeException(getClass().toString() - + " can only serialize struct types, but we got: " - + objInspector.getTypeName()); - } - // Prepare the field ObjectInspectors - StructObjectInspector soi = (StructObjectInspector) objInspector; - List fields = soi.getAllStructFieldRefs(); - List list = soi.getStructFieldsDataAsList(obj); - List declaredFields = - (serdeParams.getRowTypeInfo() != null && - ((StructTypeInfo) serdeParams.getRowTypeInfo()) - .getAllStructFieldNames().size() > 0) ? - ((StructObjectInspector) getObjectInspector()).getAllStructFieldRefs() - : null; - try { - assert iKey >= 0; - return mapping.getWritable(fields, list, declaredFields); - } catch (IOException e) { - throw new SerDeException("Unable to serialize this object! " + e); - } - } - - - protected abstract void initCassandraSerDeParameters(Configuration job, Properties tbl, String serdeName) - throws SerDeException; - - /** - * Parses the cassandra columns mapping to identify the column name. - * One of the Hive table columns maps to the cassandra row key, by default the - * first column. - * - * @param columnMapping - the column mapping specification to be parsed - * @return a list of cassandra column names - */ - public static List parseColumnMapping(String columnMapping) - { - assert StringUtils.isNotBlank(columnMapping); - String[] columnArray = columnMapping.split(","); - String[] trimmedColumnArray = trim(columnArray); - - List columnList = Arrays.asList(trimmedColumnArray); - - int iKey = columnList.indexOf(CASSANDRA_KEY_COLUMN); - - if (iKey == -1) { - columnList = new ArrayList(columnList); - columnList.add(0, CASSANDRA_KEY_COLUMN); - } - - return columnList; - } - - /** - * Return the column mapping created from column names. - * - * @param colNames column names in array format - * @return column mapping string - */ - public static String createColumnMappingString(String[] colNames) { - - //First check of this is a "transposed_table" by seeing if all - //values match our special column names - boolean isTransposedTable = true; - boolean hasKey = false; - boolean hasVal = false; - boolean hasCol = false; - boolean hasSubCol = false; - String transposedMapping = ""; - for(String column : colNames) { - if (column.equalsIgnoreCase(CASSANDRA_SPECIAL_COLUMN_KEY)){ - transposedMapping += ","+CASSANDRA_KEY_COLUMN; - hasKey = true; - } else if(column.equalsIgnoreCase(CASSANDRA_SPECIAL_COLUMN_COL)){ - transposedMapping += ","+CASSANDRA_COLUMN_COLUMN; - hasCol = true; - } else if(column.equalsIgnoreCase(CASSANDRA_SPECIAL_COLUMN_SCOL)){ - transposedMapping += ","+CASSANDRA_SUBCOLUMN_COLUMN; - hasSubCol = true; - } else if(column.equalsIgnoreCase(CASSANDRA_SPECIAL_COLUMN_VAL)){ - transposedMapping += ","+CASSANDRA_VALUE_COLUMN; - hasVal = true; - } else { - isTransposedTable = false; - break; - } - } - - if(isTransposedTable && !(colNames.length == 1 && hasKey)){ - - if(!hasKey || !hasVal || !hasCol ) { - throw new IllegalArgumentException("Transposed table definition missing required fields!"); - } - - return transposedMapping.substring(1);//skip leading , - } - - //Regular non-transposed logic. The first column maps to the key automatically. - StringBuilder mappingStr = new StringBuilder(CASSANDRA_KEY_COLUMN); - for (int i = 1; i < colNames.length; i++) { - mappingStr.append(","); - mappingStr.append(colNames[i]); - } - - return mappingStr.toString(); - } - - /* - * Creates the cassandra column mappings from the hive column names. - * This would be triggered when no cassandra.columns.mapping has been defined - * in the user query. - * - * row_key is a special column name, it maps to the key of a row in cassandra; - * column_name maps to the name of a column/supercolumn; - * value maps to the value of a column; - * sub_column_name maps to the name of a column (This can only be used for a super column family.) - * - * @param tblColumnStr hive table column names - */ - public static String createColumnMappingString(String tblColumnStr) { - if(StringUtils.isBlank(tblColumnStr)) { - throw new IllegalArgumentException("table must have columns"); - } - - String[] colNames = tblColumnStr.split(","); - - return createColumnMappingString(colNames); - } - - /** - * Parse cassandra keyspace from table properties. - * - * @param tbl table properties - * @return cassandra keyspace - * @throws SerDeException error parsing keyspace - */ - protected String parseCassandraKeyspace(Properties tbl) throws SerDeException { - String result = tbl.getProperty(CASSANDRA_KEYSPACE_NAME); - - if (result == null) { - - result = tbl - .getProperty(org.apache.hadoop.hive.metastore.api.Constants.META_TABLE_NAME); - - if (result == null) { - throw new SerDeException("CassandraKeyspace not defined" + tbl.toString()); - } - - if (result.indexOf(".") != -1) { - result = result.substring(0, result.indexOf(".")); - } - } - - return result; - } - - /** - * Parse cassandra column family name from table properties. - * - * @param tbl table properties - * @return cassandra column family name - * @throws SerDeException error parsing column family name - */ - protected String parseCassandraColumnFamily(Properties tbl) throws SerDeException { - String result = tbl.getProperty(CASSANDRA_CF_NAME); - - if (result == null) { - - result = tbl - .getProperty(org.apache.hadoop.hive.metastore.api.Constants.META_TABLE_NAME); - - if (result == null) { - throw new SerDeException("CassandraColumnFamily not defined" + tbl.toString()); - } - - if (result.indexOf(".") != -1) { - result = result.substring(result.indexOf(".") + 1); - } - } - - return result; - } - - /** - * Parse the column mappping from table properties. If cassandra.columns.mapping - * is defined in the property, use it to create the mapping. Otherwise, create the mapping from table - * columns using the default mapping. - * - * @param tbl table properties - * @return A list of column names - * @throws SerDeException - */ - protected List parseOrCreateColumnMapping(Properties tbl) throws SerDeException { - String prop = tbl.getProperty(CASSANDRA_COL_MAPPING); - - if (prop != null) { - return parseColumnMapping(prop); - } else { - String tblColumnStr = tbl.getProperty(Constants.LIST_COLUMNS); - - if (tblColumnStr != null) { - //auto-create - String mappingStr = createColumnMappingString(tblColumnStr); - - if (LOG.isDebugEnabled()) { - LOG.debug("table column string: " + tblColumnStr); - LOG.debug("Auto-created mapping string: " + mappingStr); - } - - return Arrays.asList(mappingStr.split(",")); - - } else { - throw new SerDeException("Can't find table column definitions"); - } - } - } - - /** - * Set the table mapping. We only support transposed mapping and regular table mapping for now. - * - * @throws SerDeException - */ - protected void setTableMapping() throws SerDeException { - if (isTransposed(cassandraColumnNames)) { - mapping = new TransposedMapping(cassandraColumnFamily, cassandraColumnNames, serdeParams); - } else { - mapping = new RegularTableMapping(cassandraColumnFamily, cassandraColumnNames, serdeParams); - } - } - - /** - * Trim the white spaces, new lines from the input array. - * - * @param input a input string array - * @return a trimmed string array - */ - protected static String[] trim(String[] input) { - String[] trimmed = new String[input.length]; - for (int i = 0; i < input.length; i++) { - trimmed[i] = input[i].trim(); - } - - return trimmed; - } - - /** - * Return if a table is a transposed. A table is transposed when the column mapping is like - * (:key, :column, :value) or (:key, :column, :subcolumn, :value). - * - * @param column mapping - * @return true if a table is transposed, otherwise false - */ - public static boolean isTransposed(List columnNames) - { - if(columnNames == null || columnNames.size() == 0) { - throw new IllegalArgumentException("no cassandra column information found"); - } - - boolean hasKey = false; - boolean hasColumn = false; - boolean hasValue = false; - boolean hasSubColumn = false; - - for (String column : columnNames) { - if (column.equalsIgnoreCase(CASSANDRA_KEY_COLUMN)) { - hasKey = true; - } else if (column.equalsIgnoreCase(CASSANDRA_COLUMN_COLUMN)) { - hasColumn = true; - } else if (column.equalsIgnoreCase(CASSANDRA_SUBCOLUMN_COLUMN)) { - hasSubColumn = true; - } else if (column.equalsIgnoreCase(CASSANDRA_VALUE_COLUMN)) { - hasValue = true; - } else { - return false; - } - } - - //only requested row key - if(columnNames.size() == 1 && hasKey) { - return false; - } - - if(!hasKey || !hasValue || !hasColumn) { - return false; - } - - return true; - } - - - /** - * @return 0-based offset of the key column within the table - */ - public int getKeyColumnOffset() { - return iKey; - } - - protected class ColumnData { - - } - - @Override - public SerDeStats getSerDeStats() { - // TODO Auto-generated method stub - return null; - } - - /** - * @return the name of the cassandra keyspace as parsed from table properties - */ - public String getCassandraKeyspace(){ - return cassandraKeyspace; - } - - /** - * @return the name of the cassandra columnfamily as parsed from table properties - */ - public String getCassandraColumnFamily(){ - return cassandraColumnFamily; - } -} diff --git a/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/serde/CassandraColumnSerDe.java b/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/serde/CassandraColumnSerDe.java index 32feb4a6..f1466aea 100644 --- a/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/serde/CassandraColumnSerDe.java +++ b/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/serde/CassandraColumnSerDe.java @@ -12,16 +12,25 @@ import org.apache.cassandra.exceptions.SyntaxException; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.cassandra.input.LazyCassandraRow; +import org.apache.hadoop.hive.cassandra.output.CassandraPut; import org.apache.hadoop.hive.serde.Constants; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; +import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.MapWritable; +import org.apache.hadoop.io.Writable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -public class CassandraColumnSerDe extends AbstractColumnSerDe { +public class CassandraColumnSerDe extends AbstractCassandraSerDe { + + public static final Logger LOG = LoggerFactory.getLogger(CassandraColumnSerDe.class); public static final String CASSANDRA_VALIDATOR_TYPE = "cassandra.cf.validatorType"; // validator type @@ -29,6 +38,64 @@ public class CassandraColumnSerDe extends AbstractColumnSerDe { private List validatorType; + public static final String CASSANDRA_ENABLE_WIDEROW_ITERATOR = "cassandra.enable.widerow.iterator"; + + public static final String CASSANDRA_SPECIAL_COLUMN_KEY = "row_key"; + public static final String CASSANDRA_SPECIAL_COLUMN_COL = "column_name"; + public static final String CASSANDRA_SPECIAL_COLUMN_SCOL= "sub_column_name"; + public static final String CASSANDRA_SPECIAL_COLUMN_VAL = "value"; + + public static final String CASSANDRA_KEY_COLUMN = ":key"; + public static final String CASSANDRA_COLUMN_COLUMN = ":column"; + public static final String CASSANDRA_SUBCOLUMN_COLUMN = ":subcolumn"; + public static final String CASSANDRA_VALUE_COLUMN = ":value"; + + /* names of columns from SerdeParameters */ + protected List cassandraColumnNames; + /* index of key column in results */ + protected int iKey; + protected LazyCassandraRow cachedCassandraRow; + protected List cassandraColumnNamesBytes; + + @Override + public void initialize(Configuration conf, Properties tbl) throws SerDeException { + initCassandraSerDeParameters(conf, tbl, getClass().getName()); + cachedObjectInspector = createObjectInspector(); + + cachedCassandraRow = new LazyCassandraRow( + (LazySimpleStructObjectInspector) cachedObjectInspector); + + if (LOG.isDebugEnabled()) { + LOG.debug("AbstractCassandraSerDe initialized with : columnNames = " + + StringUtils.join(serdeParams.getColumnNames(), ",") + + " columnTypes = " + + StringUtils.join(serdeParams.getColumnTypes(), ",") + + " cassandraColumnMapping = " + + cassandraColumnNames); + } + } + + /* + * + * @see org.apache.hadoop.hive.serde2.Deserializer#deserialize(org.apache.hadoop.io.Writable) + * Turns a Cassandra row into a Hive row. + */ + @Override + public Object deserialize(Writable w) throws SerDeException { + if (!(w instanceof MapWritable)) { + throw new SerDeException(getClass().getName() + ": expects MapWritable not "+w.getClass().getName()); + } + + MapWritable columnMap = (MapWritable) w; + cachedCassandraRow.init(columnMap, cassandraColumnNames, cassandraColumnNamesBytes); + return cachedCassandraRow; + } + + @Override + public Class getSerializedClass() { + return CassandraPut.class; + } + /** * Initialize the cassandra serialization and deserialization parameters from table properties and configuration. * @@ -49,7 +116,7 @@ protected void initCassandraSerDeParameters(Configuration job, Properties tbl, S cassandraColumnNamesBytes.add(new BytesWritable(columnName.getBytes())); } - iKey = cassandraColumnNames.indexOf(AbstractColumnSerDe.CASSANDRA_KEY_COLUMN); + iKey = cassandraColumnNames.indexOf(CassandraColumnSerDe.CASSANDRA_KEY_COLUMN); serdeParams = LazySimpleSerDe.initSerdeParams(job, tbl, serdeName); @@ -85,7 +152,7 @@ protected void initCassandraSerDeParameters(Configuration job, Properties tbl, S } @Override - protected ObjectInspector createObjectInspector() { + public ObjectInspector createObjectInspector() { return CassandraLazyFactory.createLazyStructInspector( serdeParams.getColumnNames(), serdeParams.getColumnTypes(), @@ -157,4 +224,196 @@ private List parseValidatorType(List columnList) return types; } + /** + * Set the table mapping. We only support transposed mapping and regular table mapping for now. + * + * @throws SerDeException + */ + protected void setTableMapping() throws SerDeException { + if (isTransposed(cassandraColumnNames)) { + mapping = new TransposedMapping(cassandraColumnFamily, cassandraColumnNames, serdeParams); + } else { + mapping = new RegularTableMapping(cassandraColumnFamily, cassandraColumnNames, serdeParams); + } + } + + /** + * Parses the cassandra columns mapping to identify the column name. + * One of the Hive table columns maps to the cassandra row key, by default the + * first column. + * + * @param columnMapping - the column mapping specification to be parsed + * @return a list of cassandra column names + */ + public static List parseColumnMapping(String columnMapping) + { + assert StringUtils.isNotBlank(columnMapping); + String[] columnArray = columnMapping.split(","); + String[] trimmedColumnArray = trim(columnArray); + + List columnList = Arrays.asList(trimmedColumnArray); + + int iKey = columnList.indexOf(CASSANDRA_KEY_COLUMN); + + if (iKey == -1) { + columnList = new ArrayList(columnList); + columnList.add(0, CASSANDRA_KEY_COLUMN); + } + + return columnList; + } + + /** + * Return the column mapping created from column names. + * + * @param colNames column names in array format + * @return column mapping string + */ + public static String createColumnMappingString(String[] colNames) { + + //First check of this is a "transposed_table" by seeing if all + //values match our special column names + boolean isTransposedTable = true; + boolean hasKey = false; + boolean hasVal = false; + boolean hasCol = false; + boolean hasSubCol = false; + String transposedMapping = ""; + for(String column : colNames) { + if (column.equalsIgnoreCase(CASSANDRA_SPECIAL_COLUMN_KEY)){ + transposedMapping += ","+CASSANDRA_KEY_COLUMN; + hasKey = true; + } else if(column.equalsIgnoreCase(CASSANDRA_SPECIAL_COLUMN_COL)){ + transposedMapping += ","+CASSANDRA_COLUMN_COLUMN; + hasCol = true; + } else if(column.equalsIgnoreCase(CASSANDRA_SPECIAL_COLUMN_SCOL)){ + transposedMapping += ","+CASSANDRA_SUBCOLUMN_COLUMN; + hasSubCol = true; + } else if(column.equalsIgnoreCase(CASSANDRA_SPECIAL_COLUMN_VAL)){ + transposedMapping += ","+CASSANDRA_VALUE_COLUMN; + hasVal = true; + } else { + isTransposedTable = false; + break; + } + } + + if(isTransposedTable && !(colNames.length == 1 && hasKey)){ + + if(!hasKey || !hasVal || !hasCol ) { + throw new IllegalArgumentException("Transposed table definition missing required fields!"); + } + + return transposedMapping.substring(1);//skip leading , + } + + //Regular non-transposed logic. The first column maps to the key automatically. + StringBuilder mappingStr = new StringBuilder(CASSANDRA_KEY_COLUMN); + for (int i = 1; i < colNames.length; i++) { + mappingStr.append(","); + mappingStr.append(colNames[i]); + } + + return mappingStr.toString(); + } + + /* + * Creates the cassandra column mappings from the hive column names. + * This would be triggered when no cassandra.columns.mapping has been defined + * in the user query. + * + * row_key is a special column name, it maps to the key of a row in cassandra; + * column_name maps to the name of a column/supercolumn; + * value maps to the value of a column; + * sub_column_name maps to the name of a column (This can only be used for a super column family.) + * + * @param tblColumnStr hive table column names + */ + public static String createColumnMappingString(String tblColumnStr) { + if(StringUtils.isBlank(tblColumnStr)) { + throw new IllegalArgumentException("table must have columns"); + } + + String[] colNames = tblColumnStr.split(","); + + return createColumnMappingString(colNames); + } + + /** + * Parse the column mappping from table properties. If cassandra.columns.mapping + * is defined in the property, use it to create the mapping. Otherwise, create the mapping from table + * columns using the default mapping. + * + * @param tbl table properties + * @return A list of column names + * @throws SerDeException + */ + protected List parseOrCreateColumnMapping(Properties tbl) throws SerDeException { + String prop = tbl.getProperty(AbstractCassandraSerDe.CASSANDRA_COL_MAPPING); + + if (prop != null) { + return parseColumnMapping(prop); + } else { + String tblColumnStr = tbl.getProperty(Constants.LIST_COLUMNS); + + if (tblColumnStr != null) { + //auto-create + String mappingStr = createColumnMappingString(tblColumnStr); + + if (LOG.isDebugEnabled()) { + LOG.debug("table column string: " + tblColumnStr); + LOG.debug("Auto-created mapping string: " + mappingStr); + } + + return Arrays.asList(mappingStr.split(",")); + + } else { + throw new SerDeException("Can't find table column definitions"); + } + } + } + + /** + * Return if a table is a transposed. A table is transposed when the column mapping is like + * (:key, :column, :value) or (:key, :column, :subcolumn, :value). + * + * @return true if a table is transposed, otherwise false + */ + public static boolean isTransposed(List columnNames) + { + if(columnNames == null || columnNames.size() == 0) { + throw new IllegalArgumentException("no cassandra column information found"); + } + + boolean hasKey = false; + boolean hasColumn = false; + boolean hasValue = false; + boolean hasSubColumn = false; + + for (String column : columnNames) { + if (column.equalsIgnoreCase(CASSANDRA_KEY_COLUMN)) { + hasKey = true; + } else if (column.equalsIgnoreCase(CASSANDRA_COLUMN_COLUMN)) { + hasColumn = true; + } else if (column.equalsIgnoreCase(CASSANDRA_SUBCOLUMN_COLUMN)) { + hasSubColumn = true; + } else if (column.equalsIgnoreCase(CASSANDRA_VALUE_COLUMN)) { + hasValue = true; + } else { + return false; + } + } + + //only requested row key + if(columnNames.size() == 1 && hasKey) { + return false; + } + + if(!hasKey || !hasValue || !hasColumn) { + return false; + } + + return true; + } + } diff --git a/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/serde/RegularTableMapping.java b/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/serde/RegularTableMapping.java index aff98ca3..b3280b43 100644 --- a/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/serde/RegularTableMapping.java +++ b/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/serde/RegularTableMapping.java @@ -14,21 +14,29 @@ import org.apache.hadoop.io.Writable; public class RegularTableMapping extends TableMapping { + + // index of key column in results + protected final int iKey; + public RegularTableMapping( String colFamily, List columnNames, SerDeParameters serdeParams) { super(colFamily, columnNames, serdeParams); + this.iKey = cassandraColumnNames.indexOf(CassandraColumnSerDe.CASSANDRA_KEY_COLUMN); } - public RegularTableMapping( - String colFamily, - List columnNames, - SerDeParameters serdeParams, int iKey) { - super(colFamily, columnNames, serdeParams, iKey); + public Writable getWritable( + List fields, + List list, + List declaredFields) throws IOException { + assert iKey >= 0; + //First get the cassandra row key + byte[] keyBytes = serializeToBytes(iKey, fields, list, declaredFields); + + return write(keyBytes, fields, list, declaredFields); } - @Override public Writable write( byte[] keyBytes, List fields, diff --git a/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/serde/TableMapping.java b/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/serde/TableMapping.java index 31375ade..31cdb416 100644 --- a/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/serde/TableMapping.java +++ b/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/serde/TableMapping.java @@ -29,8 +29,7 @@ public abstract class TableMapping { /* names of columns from SerdeParameters */ protected final List cassandraColumnNames; - /* index of key column in results */ - protected final int iKey; + protected final String cassandraColumnFamily; private boolean useJSONSerialize; @@ -45,7 +44,6 @@ public abstract class TableMapping { protected TableMapping(String colFamily, List columnNames, SerDeParameters serdeParams) { this.cassandraColumnFamily = colFamily; this.cassandraColumnNames = columnNames; - this.iKey = cassandraColumnNames.indexOf(AbstractColumnSerDe.CASSANDRA_KEY_COLUMN); separators = serdeParams.getSeparators(); escaped = serdeParams.isEscaped(); @@ -53,30 +51,7 @@ protected TableMapping(String colFamily, List columnNames, SerDeParamete needsEscape = serdeParams.getNeedsEscape(); } - protected TableMapping(String colFamily, List columnNames, SerDeParameters serdeParams, int iKey) { - this.cassandraColumnFamily = colFamily; - this.cassandraColumnNames = columnNames; - this.iKey = iKey; - - separators = serdeParams.getSeparators(); - escaped = serdeParams.isEscaped(); - escapeChar = serdeParams.getEscapeChar(); - needsEscape = serdeParams.getNeedsEscape(); - } - - public Writable getWritable( - List fields, - List list, - List declaredFields) throws IOException { - assert iKey >= 0; - //First get the cassandra row key - byte[] keyBytes = serializeToBytes(iKey, fields, list, declaredFields); - - return write(keyBytes, fields, list, declaredFields); - } - - public abstract Writable write( - byte[] keyBytes, + public abstract Writable getWritable( List fields, List list, List declaredFields) throws IOException; @@ -118,7 +93,6 @@ protected boolean useJsonSerialize(int index, List declar /** * Serialize a object into bytes. * @param foi object inspector - * @param decalred output object inspector * @param obj object to be serialized * @param useJsonSerialize true to use json serialization * @return object in serialized bytes diff --git a/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/serde/TransposedMapping.java b/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/serde/TransposedMapping.java index 7ca4dfaa..28636f6f 100644 --- a/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/serde/TransposedMapping.java +++ b/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/serde/TransposedMapping.java @@ -18,11 +18,15 @@ public class TransposedMapping extends TableMapping { private int columnValue = -1; private int subColumnName = -1; + // index of key column in results + protected final int iKey; + public TransposedMapping( String colFamily, List columnNames, SerDeParameters serdeParams) throws SerDeException { super(colFamily, columnNames, serdeParams); + this.iKey = cassandraColumnNames.indexOf(CassandraColumnSerDe.CASSANDRA_KEY_COLUMN); init(); } @@ -30,7 +34,17 @@ private void init() throws SerDeException { setTransposedTableIndex(); } - @Override + public Writable getWritable( + List fields, + List list, + List declaredFields) throws IOException { + assert iKey >= 0; + //First get the cassandra row key + byte[] keyBytes = serializeToBytes(iKey, fields, list, declaredFields); + + return write(keyBytes, fields, list, declaredFields); + } + public Writable write( byte[] keyBytes, List fields, @@ -83,13 +97,13 @@ private void setTransposedTableIndex() throws SerDeException { int subColumnName = -1; for (int i = 0; i < cassandraColumnNames.size(); i++) { String str = cassandraColumnNames.get(i); - if (str.equals(AbstractColumnSerDe.CASSANDRA_KEY_COLUMN)) { + if (str.equals(CassandraColumnSerDe.CASSANDRA_KEY_COLUMN)) { key = i; - } else if (str.equals(AbstractColumnSerDe.CASSANDRA_COLUMN_COLUMN)) { + } else if (str.equals(CassandraColumnSerDe.CASSANDRA_COLUMN_COLUMN)) { columnName = i; - } else if (str.equals(AbstractColumnSerDe.CASSANDRA_VALUE_COLUMN)) { + } else if (str.equals(CassandraColumnSerDe.CASSANDRA_VALUE_COLUMN)) { columnValue = i; - } else if (str.equals(AbstractColumnSerDe.CASSANDRA_SUBCOLUMN_COLUMN)) { + } else if (str.equals(CassandraColumnSerDe.CASSANDRA_SUBCOLUMN_COLUMN)) { subColumnName = i; } else { throw new SerDeException("An expected mapping appears in the column mapping " + str); diff --git a/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/serde/cql/AbstractCqlSerDe.java b/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/serde/cql/AbstractCqlSerDe.java deleted file mode 100644 index b6d7ef9a..00000000 --- a/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/serde/cql/AbstractCqlSerDe.java +++ /dev/null @@ -1,408 +0,0 @@ -package org.apache.hadoop.hive.cassandra.serde.cql; - -import org.apache.commons.lang.StringUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.cassandra.input.cql.LazyCqlRow; -import org.apache.hadoop.hive.cassandra.output.CassandraPut; -import org.apache.hadoop.hive.cassandra.serde.TableMapping; -import org.apache.hadoop.hive.serde.Constants; -import org.apache.hadoop.hive.serde2.SerDe; -import org.apache.hadoop.hive.serde2.SerDeException; -import org.apache.hadoop.hive.serde2.SerDeStats; -import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.SerDeParameters; -import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; -import org.apache.hadoop.hive.serde2.objectinspector.StructField; -import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; -import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; -import org.apache.hadoop.io.MapWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Properties; - - -public abstract class AbstractCqlSerDe implements SerDe { - - public static final Log LOG = LogFactory.getLog(AbstractCqlSerDe.class.getName()); - - public static final String CASSANDRA_KEYSPACE_NAME = "cassandra.ks.name"; // keyspace - public static final String CASSANDRA_KEYSPACE_REPFACTOR = "cassandra.ks.repfactor"; //keyspace replication factor - public static final String CASSANDRA_KEYSPACE_STRATEGY = "cassandra.ks.strategy"; //keyspace replica placement strategy - public static final String CASSANDRA_KEYSPACE_STRATEGY_OPTIONS = "cassandra.ks.stratOptions"; - public static final String DURABLE_WRITES = "durable.writes"; - - public static final String CASSANDRA_CF_NAME = "cassandra.cf.name"; // column family - public static final String CASSANDRA_CF_COUNTERS = "cassandra.cf.counters"; // flag this as a counter CF - public static final String CASSANDRA_RANGE_BATCH_SIZE = "cassandra.range.size"; - public static final String CASSANDRA_SLICE_PREDICATE_SIZE = "cassandra.slice.predicate.size"; - public static final String CASSANDRA_SPLIT_SIZE = "cassandra.input.split.size"; - public static final String CASSANDRA_HOST = "cassandra.host"; // initialHost - public static final String CASSANDRA_PORT = "cassandra.port"; // rcpPort - public static final String CASSANDRA_PARTITIONER = "cassandra.partitioner"; // partitioner - public static final String CASSANDRA_COL_MAPPING = "cassandra.columns.mapping"; - public static final String CASSANDRA_INDEXED_COLUMNS = "cassandra.indexed.columns"; - public static final String CASSANDRA_BATCH_MUTATION_SIZE = "cassandra.batchmutate.size"; - public static final String CASSANDRA_SLICE_PREDICATE_COLUMN_NAMES = "cassandra.slice.predicate.column_names"; - public static final String CASSANDRA_SLICE_PREDICATE_RANGE_START = "cassandra.slice.predicate.range.start"; - public static final String CASSANDRA_SLICE_PREDICATE_RANGE_FINISH = "cassandra.slice.predicate.range.finish"; - public static final String CASSANDRA_SLICE_PREDICATE_RANGE_COMPARATOR = "cassandra.slice.predicate.range.comparator"; - public static final String CASSANDRA_SLICE_PREDICATE_RANGE_REVERSED = "cassandra.slice.predicate.range.reversed"; - public static final String CASSANDRA_SLICE_PREDICATE_RANGE_COUNT = "cassandra.slice.predicate.range.count"; - public static final String CASSANDRA_ENABLE_WIDEROW_ITERATOR = "cassandra.enable.widerow.iterator"; - - public static final String CASSANDRA_COLUMN_FAMILY_PRIMARY_KEY = "cql.primarykey"; - public static final String COLUMN_FAMILY_COMMENT = "comment"; - public static final String READ_REPAIR_CHANCE = "read_repair_chance"; - public static final String DCLOCAL_READ_REPAIR_CHANCE = "dclocal_read_repair_chance"; - public static final String GC_GRACE_SECONDS = "gc_grace_seconds"; - public static final String BLOOM_FILTER_FP_CHANCE = "bloom_filter_fp_chance"; - public static final String COMPACTION = "compaction"; - public static final String COMPRESSION = "compression"; - public static final String REPLICATE_ON_WRITE = "replicate_on_write"; - public static final String CACHING = "caching"; - - public static final String CASSANDRA_KEY_COLUMN = ":key"; - public static final String CASSANDRA_COLUMN_COLUMN = ":column"; - public static final String CASSANDRA_SUBCOLUMN_COLUMN = ":subcolumn"; - public static final String CASSANDRA_VALUE_COLUMN = ":value"; - - public static final String CASSANDRA_CONSISTENCY_LEVEL = "cassandra.consistency.level"; - public static final String CASSANDRA_THRIFT_MODE = "cassandra.thrift.mode"; - - public static final int DEFAULT_SPLIT_SIZE = 64 * 1024; - public static final int DEFAULT_RANGE_BATCH_SIZE = 1000; - public static final int DEFAULT_SLICE_PREDICATE_SIZE = 1000; - public static final String DEFAULT_CASSANDRA_HOST = "localhost"; - public static final String DEFAULT_CASSANDRA_PORT = "9160"; - public static final String DEFAULT_CONSISTENCY_LEVEL = "ONE"; - public static final int DEFAULT_BATCH_MUTATION_SIZE = 500; - public static final String DELIMITER = ","; - - /* names of columns from SerdeParameters */ - protected List cassandraColumnNames; - /* index of key column in results */ - protected int iKey; - protected TableMapping mapping; - - protected ObjectInspector cachedObjectInspector; - protected SerDeParameters serdeParams; - protected LazyCqlRow lazyCqlRow; - protected String cassandraKeyspace; - protected String cassandraColumnFamily; - protected List cassandraColumnNamesText; - - @Override - public void initialize(Configuration conf, Properties tbl) throws SerDeException { - initCassandraSerDeParameters(conf, tbl, getClass().getName()); - cachedObjectInspector = createObjectInspector(); - - lazyCqlRow = new LazyCqlRow( - (LazySimpleStructObjectInspector) cachedObjectInspector); - - if (LOG.isDebugEnabled()) { - LOG.debug("CassandraSerDe initialized with : columnNames = " - + StringUtils.join(serdeParams.getColumnNames(), ",") - + " columnTypes = " - + StringUtils.join(serdeParams.getColumnTypes(), ",") - + " cassandraColumnMapping = " - + cassandraColumnNames); - } - - } - - /** - * Create the object inspector. - * - * @return object inspector - */ - protected abstract ObjectInspector createObjectInspector(); - - /* - * - * @see org.apache.hadoop.hive.serde2.Deserializer#deserialize(org.apache.hadoop.io.Writable) - * Turns a Cassandra row into a Hive row. - */ - @Override - public Object deserialize(Writable w) throws SerDeException { - if (!(w instanceof MapWritable)) { - throw new SerDeException(getClass().getName() + ": expects MapWritable not " + w.getClass().getName()); - } - - MapWritable columnMap = (MapWritable) w; - lazyCqlRow.init(columnMap, cassandraColumnNames, cassandraColumnNamesText); - return lazyCqlRow; - } - - @Override - public ObjectInspector getObjectInspector() throws SerDeException { - return cachedObjectInspector; - } - - @Override - public Class getSerializedClass() { - return CassandraPut.class; - } - - /* - * Turns obj (a Hive Row) into a cassandra data format. - */ - @Override - public Writable serialize(Object obj, ObjectInspector objInspector) throws SerDeException { - if (objInspector.getCategory() != Category.STRUCT) { - throw new SerDeException(getClass().toString() - + " can only serialize struct types, but we got: " - + objInspector.getTypeName()); - } - // Prepare the field ObjectInspectors - StructObjectInspector soi = (StructObjectInspector) objInspector; - List fields = soi.getAllStructFieldRefs(); - List list = soi.getStructFieldsDataAsList(obj); - List declaredFields = - (serdeParams.getRowTypeInfo() != null && - ((StructTypeInfo) serdeParams.getRowTypeInfo()) - .getAllStructFieldNames().size() > 0) ? - ((StructObjectInspector) getObjectInspector()).getAllStructFieldRefs() - : null; - try { - assert iKey >= 0; - return mapping.getWritable(fields, list, declaredFields); - } catch (IOException e) { - throw new SerDeException("Unable to serialize this object! " + e); - } - } - - - protected abstract void initCassandraSerDeParameters(Configuration job, Properties tbl, String serdeName) - throws SerDeException; - - /** - * Parses the cassandra columns mapping to identify the column name. - * One of the Hive table columns maps to the cassandra row key, by default the - * first column. - * - * @param columnMapping - the column mapping specification to be parsed - * @return a list of cassandra column names - */ - public static List parseColumnMapping(String columnMapping) { - assert StringUtils.isNotBlank(columnMapping); - String[] columnArray = columnMapping.split(","); - String[] trimmedColumnArray = trim(columnArray); - - List columnList = Arrays.asList(trimmedColumnArray); - - int iKey = columnList.indexOf(CASSANDRA_KEY_COLUMN); - - if (iKey == -1) { - columnList = new ArrayList(columnList); - columnList.add(0, CASSANDRA_KEY_COLUMN); - } - - return columnList; - } - - /** - * Return the column mapping created from column names. - * - * @param colNames column names in array format - * @return column mapping string - */ - public static String createColumnMappingString(String[] colNames) { - - //Regular non-transposed logic. The first column maps to the key automatically. - StringBuilder mappingStr = new StringBuilder(CASSANDRA_KEY_COLUMN); - //StringBuilder mappingStr = new StringBuilder(); //Since CQL doesn't have the special :key column - for (int i = 1; i < colNames.length; i++) { - mappingStr.append(","); - mappingStr.append(colNames[i]); - } - - return mappingStr.toString(); - } - - /* - * Creates the cassandra column mappings from the hive column names. - * This would be triggered when no cassandra.columns.mapping has been defined - * in the user query. - * - * row_key is a special column name, it maps to the key of a row in cassandra; - * column_name maps to the name of a column/supercolumn; - * value maps to the value of a column; - * sub_column_name maps to the name of a column (This can only be used for a super column family.) - * - * @param tblColumnStr hive table column names - */ - public static String createColumnMappingString(String tblColumnStr) { - if (StringUtils.isBlank(tblColumnStr)) { - throw new IllegalArgumentException("table must have columns"); - } - - //String[] colNames = tblColumnStr.split(","); - - //return createColumnMappingString(colNames); - return tblColumnStr; - } - - /** - * Parse cassandra keyspace from table properties. - * - * @param tbl table properties - * @return cassandra keyspace - * @throws org.apache.hadoop.hive.serde2.SerDeException - * error parsing keyspace - */ - protected String parseCassandraKeyspace(Properties tbl) throws SerDeException { - String result = tbl.getProperty(CASSANDRA_KEYSPACE_NAME); - - if (result == null) { - - result = tbl - .getProperty(org.apache.hadoop.hive.metastore.api.Constants.META_TABLE_NAME); - - if (result == null) { - throw new SerDeException("CassandraKeyspace not defined" + tbl.toString()); - } - - if (result.indexOf(".") != -1) { - result = result.substring(0, result.indexOf(".")); - } - } - - return result; - } - - /** - * Parse cassandra column family name from table properties. - * - * @param tbl table properties - * @return cassandra column family name - * @throws org.apache.hadoop.hive.serde2.SerDeException - * error parsing column family name - */ - protected String parseCassandraColumnFamily(Properties tbl) throws SerDeException { - String result = tbl.getProperty(CASSANDRA_CF_NAME); - - if (result == null) { - - result = tbl - .getProperty(org.apache.hadoop.hive.metastore.api.Constants.META_TABLE_NAME); - - if (result == null) { - throw new SerDeException("CassandraColumnFamily not defined" + tbl.toString()); - } - - if (result.indexOf(".") != -1) { - result = result.substring(result.indexOf(".") + 1); - } - } - - return result; - } - - protected int parseIndexOfKeyColumn(Properties tbl){ - String prop = tbl.getProperty(CASSANDRA_COL_MAPPING); - //Default first column is taken as key column. - int colIndex = 0; - if (prop != null) { - assert StringUtils.isNotBlank(prop); - String[] columnArray = prop.split(","); - String[] trimmedColumnArray = trim(columnArray); - - List columnList = Arrays.asList(trimmedColumnArray); - - colIndex = columnList.indexOf(CASSANDRA_KEY_COLUMN); - if (colIndex == -1) { - //Default first column is taken as key column. - colIndex = 0; - } - } - return colIndex; - } - - /** - * Parse the column mappping from table properties. If cassandra.columns.mapping - * is defined in the property, use it to create the mapping. Otherwise, create the mapping from table - * columns using the default mapping. - * - * @param tbl table properties - * @return A list of column names - * @throws org.apache.hadoop.hive.serde2.SerDeException - * - */ - protected List parseOrCreateColumnMapping(Properties tbl) throws SerDeException { - String prop = tbl.getProperty(CASSANDRA_COL_MAPPING); - - if (prop != null) { - return parseColumnMapping(prop); - } else { - String tblColumnStr = tbl.getProperty(Constants.LIST_COLUMNS); - - if (tblColumnStr != null) { - //auto-create - String mappingStr = createColumnMappingString(tblColumnStr); - - if (LOG.isDebugEnabled()) { - LOG.debug("table column string: " + tblColumnStr); - LOG.debug("Auto-created mapping string: " + mappingStr); - } - - return Arrays.asList(mappingStr.split(",")); - - } else { - throw new SerDeException("Can't find table column definitions"); - } - } - } - - /** - * Set the table mapping. We only support transposed mapping and regular table mapping for now. - * - * @throws org.apache.hadoop.hive.serde2.SerDeException - * - */ - protected void setTableMapping() throws SerDeException { - mapping = new CqlRegularTableMapping(cassandraColumnFamily, cassandraColumnNames, serdeParams, iKey); - } - - /** - * Trim the white spaces, new lines from the input array. - * - * @param input a input string array - * @return a trimmed string array - */ - protected static String[] trim(String[] input) { - String[] trimmed = new String[input.length]; - for (int i = 0; i < input.length; i++) { - trimmed[i] = input[i].trim(); - } - - return trimmed; - } - - @Override - public SerDeStats getSerDeStats() { - // TODO Auto-generated method stub - return null; - } - - /** - * @return the name of the cassandra keyspace as parsed from table properties - */ - public String getCassandraKeyspace() { - return cassandraKeyspace; - } - - /** - * @return the name of the cassandra columnfamily as parsed from table properties - */ - public String getCassandraColumnFamily() { - return cassandraColumnFamily; - } -} diff --git a/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/serde/cql/CqlRegularTableMapping.java b/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/serde/cql/CqlRegularTableMapping.java index 8f34057d..c2f71179 100644 --- a/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/serde/cql/CqlRegularTableMapping.java +++ b/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/serde/cql/CqlRegularTableMapping.java @@ -18,13 +18,18 @@ public class CqlRegularTableMapping extends TableMapping { public CqlRegularTableMapping( String colFamily, List columnNames, - SerDeParameters serdeParams, int iKey) { - super(colFamily, columnNames, serdeParams, iKey); + SerDeParameters serdeParams) { + super(colFamily, columnNames, serdeParams); } @Override + public Writable getWritable(List fields, + List list, + List declaredFields) throws IOException { + return write(fields, list, declaredFields); + } + public Writable write( - byte[] keyBytes, List fields, List list, List declaredFields) throws IOException { diff --git a/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/serde/cql/CqlSerDe.java b/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/serde/cql/CqlSerDe.java index 9159b63b..b82f0fd0 100644 --- a/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/serde/cql/CqlSerDe.java +++ b/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/serde/cql/CqlSerDe.java @@ -12,17 +12,32 @@ import org.apache.cassandra.exceptions.SyntaxException; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.cassandra.input.cql.LazyCqlRow; +import org.apache.hadoop.hive.cassandra.output.CassandraPut; import org.apache.hadoop.hive.cassandra.serde.CassandraLazyFactory; +import org.apache.hadoop.hive.cassandra.serde.AbstractCassandraSerDe; import org.apache.hadoop.hive.serde.Constants; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; +import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -public class CqlSerDe extends AbstractCqlSerDe { +public class CqlSerDe extends AbstractCassandraSerDe { + + public static final Logger LOG = LoggerFactory.getLogger(CqlSerDe.class); + + public static final String CASSANDRA_COLUMN_FAMILY_PRIMARY_KEY = "cql.primarykey"; + + protected LazyCqlRow lazyCqlRow; + protected List cassandraColumnNamesText; public static final String CASSANDRA_VALIDATOR_TYPE = "cassandra.cf.validatorType"; // validator type @@ -30,6 +45,45 @@ public class CqlSerDe extends AbstractCqlSerDe { private List validatorType; + @Override + public void initialize(Configuration conf, Properties tbl) throws SerDeException { + initCassandraSerDeParameters(conf, tbl, getClass().getName()); + cachedObjectInspector = createObjectInspector(); + + lazyCqlRow = new LazyCqlRow( + (LazySimpleStructObjectInspector) cachedObjectInspector); + + if (LOG.isDebugEnabled()) { + LOG.debug("AbstractCassandraSerDe initialized with : columnNames = " + + StringUtils.join(serdeParams.getColumnNames(), ",") + + " columnTypes = " + + StringUtils.join(serdeParams.getColumnTypes(), ",") + + " cassandraColumnMapping = " + + cassandraColumnNames); + } + } + + /* + * + * @see org.apache.hadoop.hive.serde2.Deserializer#deserialize(org.apache.hadoop.io.Writable) + * Turns a Cassandra row into a Hive row. + */ + @Override + public Object deserialize(Writable w) throws SerDeException { + if (!(w instanceof MapWritable)) { + throw new SerDeException(getClass().getName() + ": expects MapWritable not " + w.getClass().getName()); + } + + MapWritable columnMap = (MapWritable) w; + lazyCqlRow.init(columnMap, cassandraColumnNames, cassandraColumnNamesText); + return lazyCqlRow; + } + + @Override + public Class getSerializedClass() { + return CassandraPut.class; + } + /** * Initialize the cassandra serialization and deserialization parameters from table properties and configuration. * @@ -51,8 +105,6 @@ protected void initCassandraSerDeParameters(Configuration job, Properties tbl, S cassandraColumnNamesText.add(new Text(columnName)); } - iKey = parseIndexOfKeyColumn(tbl); - serdeParams = LazySimpleSerDe.initSerdeParams(job, tbl, serdeName); validatorType = parseOrCreateValidatorType(tbl); @@ -87,7 +139,7 @@ protected void initCassandraSerDeParameters(Configuration job, Properties tbl, S } @Override - protected ObjectInspector createObjectInspector() { + public ObjectInspector createObjectInspector() { return CassandraLazyFactory.createLazyStructInspector( serdeParams.getColumnNames(), serdeParams.getColumnTypes(), @@ -160,4 +212,88 @@ private List parseValidatorType(List columnList) return types; } + /** + * Parses the cassandra columns mapping to identify the column name. + * One of the Hive table columns maps to the cassandra row key, by default the + * first column. + * + * @param columnMapping - the column mapping specification to be parsed + * @return a list of cassandra column names + */ + public static List parseColumnMapping(String columnMapping) { + assert StringUtils.isNotBlank(columnMapping); + String[] columnArray = columnMapping.split(","); + String[] trimmedColumnArray = trim(columnArray); + + return Arrays.asList(trimmedColumnArray); + } + + /* + * Creates the cassandra column mappings from the hive column names. + * This would be triggered when no cassandra.columns.mapping has been defined + * in the user query. + * + * row_key is a special column name, it maps to the key of a row in cassandra; + * column_name maps to the name of a column/supercolumn; + * value maps to the value of a column; + * sub_column_name maps to the name of a column (This can only be used for a super column family.) + * + * @param tblColumnStr hive table column names + */ + public static String createColumnMappingString(String tblColumnStr) { + if (StringUtils.isBlank(tblColumnStr)) { + throw new IllegalArgumentException("table must have columns"); + } + + //String[] colNames = tblColumnStr.split(","); + + //return createColumnMappingString(colNames); + return tblColumnStr; + } + + /** + * Parse the column mappping from table properties. If cassandra.columns.mapping + * is defined in the property, use it to create the mapping. Otherwise, create the mapping from table + * columns using the default mapping. + * + * @param tbl table properties + * @return A list of column names + * @throws org.apache.hadoop.hive.serde2.SerDeException + * + */ + protected List parseOrCreateColumnMapping(Properties tbl) throws SerDeException { + String prop = tbl.getProperty(AbstractCassandraSerDe.CASSANDRA_COL_MAPPING); + + if (prop != null) { + return parseColumnMapping(prop); + } else { + String tblColumnStr = tbl.getProperty(Constants.LIST_COLUMNS); + + if (tblColumnStr != null) { + //auto-create + String mappingStr = createColumnMappingString(tblColumnStr); + + if (LOG.isDebugEnabled()) { + LOG.debug("table column string: " + tblColumnStr); + LOG.debug("Auto-created mapping string: " + mappingStr); + } + + return Arrays.asList(mappingStr.split(",")); + + } else { + throw new SerDeException("Can't find table column definitions"); + } + } + } + + /** + * Set the table mapping. We only support transposed mapping and regular table mapping for now. + * + * @throws org.apache.hadoop.hive.serde2.SerDeException + * + */ + protected void setTableMapping() throws SerDeException { + mapping = new CqlRegularTableMapping(cassandraColumnFamily, cassandraColumnNames, serdeParams); + } + }