Skip to content

Commit

Permalink
Support select outfile to openmldb online tables
Browse files Browse the repository at this point in the history
  • Loading branch information
tobegit3hub committed Nov 21, 2023
1 parent 72f752b commit 98cdcb4
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package com._4paradigm.openmldb.batch.nodes

import com._4paradigm.hybridse.vm.PhysicalSelectIntoNode
import com._4paradigm.openmldb.batch.utils.HybridseUtil
import com._4paradigm.openmldb.batch.utils.{HybridseUtil, OpenmldbTableUtil}
import com._4paradigm.openmldb.batch.{PlanContext, SparkInstance}
import org.slf4j.LoggerFactory

Expand All @@ -32,7 +32,7 @@ object SelectIntoPlan {
logger.debug("select {} rows", input.getDf().count())
input.getDf().show(10)
}

// write options don't need deepCopy, may have coalesce
val (format, options, mode, extra) = HybridseUtil.parseOptions(outPath, node)
if (input.getSchema.size == 0 && input.getDf().isEmpty) {
Expand All @@ -45,6 +45,25 @@ object SelectIntoPlan {
val dbt = HybridseUtil.hiveDest(outPath)
logger.info(s"offline select into: hive way, write mode[${mode}], out table ${dbt}")
input.getDf().write.format("hive").mode(mode).saveAsTable(dbt)
} else if (format == "openmldb") {

val (db, table) = HybridseUtil.getOpenmldbDbAndTable(outPath)

val createIfNotExists = extra.get("create_if_not_exists").get.toBoolean
if (createIfNotExists) {
logger.info("Try to create openmldb output table: " + table)

OpenmldbTableUtil.createOpenmldbTableFromDf(ctx.getOpenmldbSession, input.getDf(), db, table)
}

val writeOptions = Map(
"db" -> db,
"table" -> table,
"zkCluster" -> ctx.getConf.openmldbZkCluster,
"zkPath" -> ctx.getConf.openmldbZkRootPath)

input.getDf().write.options(writeOptions).format("openmldb").mode(mode).save()

} else {
logger.info("offline select into: format[{}], options[{}], write mode[{}], out path {}", format, options,
mode, outPath)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,8 @@ object HybridseUtil {
// load data: read format, select into: write format
val format = if (file.toLowerCase().startsWith("hive://")) {
"hive"
} else if (file.toLowerCase().startsWith("openmldb://")) {
"openmldb"
} else {
parseOption(getOptionFromNode(node, "format"), "csv", getStringOrDefault).toLowerCase
}
Expand Down Expand Up @@ -254,6 +256,8 @@ object HybridseUtil {
extraOptions += ("sql" -> parseOption(getOptionFromNode(node, "sql"), "", getStringOrDefault))
extraOptions += ("writer_type") -> parseOption(getOptionFromNode(node, "writer_type"), "single", getStringOrDefault)

extraOptions += ("create_if_not_exists" -> parseOption(getOptionFromNode(node, "create_if_not_exists"), "true", getBoolOrDefault))

(format, options.toMap, mode, extraOptions.toMap)
}

Expand Down Expand Up @@ -451,6 +455,23 @@ object HybridseUtil {
path.substring(tableStartPos)
}

def getOpenmldbDbAndTable(path: String): (String, String) = {
require(path.toLowerCase.startsWith("openmldb://"))
// openmldb://<table_pattern>
val tableStartPos = 11
val dbAndTableString = path.substring(tableStartPos)


System.out.println("input path: " + path);
System.out.println("format path: " + dbAndTableString);

require(dbAndTableString.split("\\.").size == 2)

val db = dbAndTableString.split("\\.")(0)
val table = dbAndTableString.split("\\.")(1)
(db, table)
}

private def hiveLoad(openmldbSession: OpenmldbSession, file: String, columns: util.List[Common.ColumnDesc],
loadDataSql: String = ""): DataFrame = {
if (logger.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ object OpenmldbTableUtil {

val schema = df.schema

var createTableSql = s"CREATE TABLE $tableName ("
var createTableSql = s"CREATE TABLE IF NOT EXISTS $tableName ("
schema.map(structField => {
val colName = structField.name
val colType = DataTypeUtil.sparkTypeToString(structField.dataType)
Expand Down

0 comments on commit 98cdcb4

Please sign in to comment.