diff --git a/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/cql/CqlManager.java b/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/cql/CqlManager.java index cc4a379c..1046c138 100644 --- a/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/cql/CqlManager.java +++ b/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/cql/CqlManager.java @@ -6,15 +6,12 @@ import org.apache.hadoop.hive.cassandra.CassandraException; import org.apache.hadoop.hive.cassandra.CassandraProxyClient; import org.apache.hadoop.hive.cassandra.serde.AbstractColumnSerDe; -import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.apache.hadoop.hive.metastore.api.MetaException; -import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.MetaStoreUtils; +import org.apache.hadoop.hive.metastore.api.*; +import org.apache.hadoop.hive.metastore.api.Constants; import org.apache.thrift.TException; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; /** * A class to handle the transaction to cassandra backend database. @@ -23,6 +20,16 @@ public class CqlManager { final static public int DEFAULT_REPLICATION_FACTOR = 1; final static public String DEFAULT_STRATEGY = "org.apache.cassandra.locator.SimpleStrategy"; + final static Map hiveTypeToCqlType = new HashMap(); + + static{ + hiveTypeToCqlType.put(org.apache.hadoop.hive.serde.Constants.STRING_TYPE_NAME, "text"); + hiveTypeToCqlType.put(org.apache.hadoop.hive.serde.Constants.INT_TYPE_NAME, "int"); + hiveTypeToCqlType.put(org.apache.hadoop.hive.serde.Constants.BOOLEAN_TYPE_NAME, "boolean"); + hiveTypeToCqlType.put(org.apache.hadoop.hive.serde.Constants.DOUBLE_TYPE_NAME, "double"); + hiveTypeToCqlType.put(org.apache.hadoop.hive.serde.Constants.FLOAT_TYPE_NAME, "float"); + } + //Cassandra Host Name private final String host; @@ -208,7 +215,35 @@ public CfDef createColumnFamily() throws MetaException { CfDef cf = getCfDef(); try { cch.getClient().set_keyspace(keyspace); - cch.getClient().system_add_column_family(cf); + Properties properties = MetaStoreUtils.getSchema(tbl); + + String columnsStr = (String) properties.get(Constants.META_TABLE_COLUMNS); + String columnTypesStr = (String) properties.get(Constants.META_TABLE_COLUMN_TYPES); + + String[] columnNames = columnsStr.split(","); + String[] columnTypes = columnTypesStr.split(":"); + if(columnNames.length != columnTypes.length){ + throw new MetaException("Unable to create column family '" + columnFamilyName + ". Error: Column names count and column types count do not match"); + } + + StringBuilder queryBuilder = new StringBuilder("CREATE TABLE "); + queryBuilder.append(keyspace); + queryBuilder.append("."); + queryBuilder.append(columnFamilyName); + queryBuilder.append("("); + for(int i = 0; i < columnNames.length; i++) { + queryBuilder.append(columnNames[i]); + queryBuilder.append(" "); + queryBuilder.append(hiveTypeToCqlType.get(columnTypes[i])); + queryBuilder.append(","); + } + queryBuilder.append(" primary key ("); + //todo how do we specify composite keys ? + queryBuilder.append(columnNames[0]); + queryBuilder.append(")"); + queryBuilder.append(")"); + + cch.getClient().execute_cql3_query(ByteBufferUtil.bytes(queryBuilder.toString()), Compression.NONE, ConsistencyLevel.ONE); return cf; } catch (TException e) { throw new MetaException("Unable to create column family '" + columnFamilyName + "'. Error:" @@ -219,6 +254,12 @@ public CfDef createColumnFamily() throws MetaException { } catch (SchemaDisagreementException e) { throw new MetaException("Unable to create column family '" + columnFamilyName + "'. Error:" + e.getMessage()); + } catch (UnavailableException e) { + throw new MetaException("Unable to create column family '" + columnFamilyName + "'. Error:" + + e.getMessage()); + } catch (TimedOutException e) { + throw new MetaException("Unable to create column family '" + columnFamilyName + "'. Error:" + + e.getMessage()); } } diff --git a/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/serde/cql/AbstractCqlSerDe.java b/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/serde/cql/AbstractCqlSerDe.java index d0dd5b52..b3c2b79d 100644 --- a/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/serde/cql/AbstractCqlSerDe.java +++ b/cassandra-handler/src/main/java/org/apache/hadoop/hive/cassandra/serde/cql/AbstractCqlSerDe.java @@ -6,7 +6,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.cassandra.input.cql.LazyCqlRow; import org.apache.hadoop.hive.cassandra.output.CassandraPut; -import org.apache.hadoop.hive.cassandra.serde.RegularTableMapping; import org.apache.hadoop.hive.cassandra.serde.TableMapping; import org.apache.hadoop.hive.serde.Constants; import org.apache.hadoop.hive.serde2.SerDe;