Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added enhancement in the API to support for Google BigTable #337

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,18 @@ The above defines a schema for a HBase table with name as table1, row key as key

Given a DataFrame with specified schema, above will create an HBase table with 5 regions and save the DataFrame inside. Note that if HBaseTableCatalog.newTable is not specified, the table has to be pre-created.

### Write to Google BigTable to populate data

sc.parallelize(data).toDF.write.options(
Map(HBaseTableCatalog.tableType -> "bigtable", HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5"))
.format("org.apache.spark.sql.execution.datasources.hbase")
.save()

Given a DataFrame with specified schema, above will create an Google BigTable with 5 regions and save the DataFrame inside.
Note that if HBaseTableCatalog.newTable is not specified, the table has to be pre-created.
HBaseTableCatalog.tableType -> "bigtable" must be explicitly set for writing into Google BigTable, otherwise by default API assumes writing to HBase table


### Perform DataFrame operation on top of HBase table

def withCatalog(cat: String): DataFrame = {
Expand All @@ -97,7 +109,9 @@ Given a DataFrame with specified schema, above will create an HBase table with 5
.format("org.apache.spark.sql.execution.datasources.hbase")
.load()
}


Note: The above task remain same in case of writing to Google BigTable

### Complicated query

val df = withCatalog(catalog)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,18 +124,26 @@ case class HBaseRelation(
def createTableIfNotExist() {
val cfs = catalog.getColumnFamilies
val connection = HBaseConnectionCache.getConnection(hbaseConf)

// Get Table Type from HBaseTableCatalog expected values = "hbase" or "bigtable"
val tableType = catalog.getTableType

// Initialize hBase table if necessary
val admin = connection.getAdmin
val isNameSpaceExist = try {
admin.getNamespaceDescriptor(catalog.namespace)
true
if(tableType.equals("hbase")) { // Google BigTable do not have namespaces, hence skip calling getNamespaceDescriptor if tableType is "bigtable"
admin.getNamespaceDescriptor(catalog.namespace)
true
}
else
false
} catch {
case e: NamespaceNotFoundException => false
case NonFatal(e) =>
logError("Unexpected error", e)
false
}
if (!isNameSpaceExist) {
if (!isNameSpaceExist && tableType.equals("hbase")) { // Google BigTable do not have namespaces, hence skip calling createNamespace
admin.createNamespace(NamespaceDescriptor.create(catalog.namespace).build)
}
val tName = TableName.valueOf(s"${catalog.namespace}:${catalog.name}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,11 @@ case class HBaseTableCatalog(
coderSet: Set[String],
val numReg: Int,
val splitRange: (String, String)) extends Logging {
var tableType = "hbase"
def getTableType = this.tableType
// Setter method to over write default value (hbase) for tableType class variable, in case of Google BigTable
def setTableType(tableType: String) = this.tableType = tableType

def toDataType = StructType(sMap.toFields)
def getField(name: String) = sMap.getField(name)
def getRowKey: Seq[Field] = row.fields
Expand Down Expand Up @@ -234,6 +239,8 @@ object HBaseTableCatalog {
val rowKey = "rowkey"
// The key for hbase table whose value specify namespace and table name
val table = "table"
// The table type Hbase or Google bigtable
val tableType = "tableType"
// The namespace of hbase table
val nameSpace = "namespace"
// The name of hbase table
Expand Down Expand Up @@ -300,8 +307,9 @@ object HBaseTableCatalog {

val minSplit = parameters.get(minTableSplitPoint).getOrElse("aaaaaa")
val maxSplit = parameters.get(maxTableSplitPoint).getOrElse("zzzzzz")

HBaseTableCatalog(nSpace, tName, rKey, SchemaMap(schemaMap), tCoder, coderSet, numReg, (minSplit, maxSplit))
val hbaseTableCatalog = HBaseTableCatalog(nSpace, tName, rKey, SchemaMap(schemaMap), tCoder, coderSet, numReg, (minSplit, maxSplit))
hbaseTableCatalog.setTableType(parameters.get(tableType).getOrElse("hbase"))
hbaseTableCatalog
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package org.apache.spark.sql

import org.apache.spark.sql.execution.datasources.hbase.{HBaseTableCatalog, Logging}
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite}

class HBaseTableCatalogSuite extends FunSuite with BeforeAndAfterEach with BeforeAndAfterAll with Logging {
def catalog = s"""{
|"table":{"namespace":"default", "name":"table1", "tableCoder":"PrimitiveType"},
|"rowkey":"key1:key2",
|"columns":{
|"col00":{"cf":"rowkey", "col":"key1", "type":"string", "length":"6"},
|"col01":{"cf":"rowkey", "col":"key2", "type":"int"},
|"col1":{"cf":"cf1", "col":"col1", "type":"boolean"},
|"col2":{"cf":"cf2", "col":"col2", "type":"double"},
|"col3":{"cf":"cf3", "col":"col3", "type":"float"},
|"col4":{"cf":"cf4", "col":"col4", "type":"int"},
|"col5":{"cf":"cf5", "col":"col5", "type":"bigint"},
|"col6":{"cf":"cf6", "col":"col6", "type":"smallint"},
|"col8":{"cf":"cf8", "col":"col8", "type":"tinyint"},
|"col7":{"cf":"cf7", "col":"col7", "type":"string"}
|}
|}""".stripMargin

test("HBaseTableCatalog tableType class variable test") {
var hbasetablecatalogobject = HBaseTableCatalog(Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.nameSpace -> "default",
HBaseTableCatalog.newTable -> "3"))
assert(hbasetablecatalogobject.getTableType.equals("hbase"))

hbasetablecatalogobject = HBaseTableCatalog(Map(HBaseTableCatalog.tableType -> "bigtable", HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.nameSpace -> "default",
HBaseTableCatalog.newTable -> "3"))
assert(hbasetablecatalogobject.getTableType.equals("bigtable"))
}
}