Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support select outfile to openmldb online tables #3616

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package com._4paradigm.openmldb.batch.nodes

import com._4paradigm.hybridse.vm.PhysicalSelectIntoNode
import com._4paradigm.openmldb.batch.utils.HybridseUtil
import com._4paradigm.openmldb.batch.utils.{HybridseUtil, OpenmldbTableUtil}
import com._4paradigm.openmldb.batch.{PlanContext, SparkInstance}
import org.slf4j.LoggerFactory

Expand Down Expand Up @@ -45,6 +45,25 @@
val dbt = HybridseUtil.hiveDest(outPath)
logger.info(s"offline select into: hive way, write mode[${mode}], out table ${dbt}")
input.getDf().write.format("hive").mode(mode).saveAsTable(dbt)
} else if (format == "openmldb") {

val (db, table) = HybridseUtil.getOpenmldbDbAndTable(outPath)

val createIfNotExists = extra.get("create_if_not_exists").get.toBoolean

Check warning on line 52 in java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/nodes/SelectIntoPlan.scala

View check run for this annotation

Codecov / codecov/patch

java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/nodes/SelectIntoPlan.scala#L52

Added line #L52 was not covered by tests
if (createIfNotExists) {
logger.info("Try to create openmldb output table: " + table)

Check warning on line 54 in java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/nodes/SelectIntoPlan.scala

View check run for this annotation

Codecov / codecov/patch

java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/nodes/SelectIntoPlan.scala#L54

Added line #L54 was not covered by tests

OpenmldbTableUtil.createOpenmldbTableFromDf(ctx.getOpenmldbSession, input.getDf(), db, table)

Check warning on line 56 in java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/nodes/SelectIntoPlan.scala

View check run for this annotation

Codecov / codecov/patch

java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/nodes/SelectIntoPlan.scala#L56

Added line #L56 was not covered by tests
}

val writeOptions = Map(
"db" -> db,
"table" -> table,
"zkCluster" -> ctx.getConf.openmldbZkCluster,
"zkPath" -> ctx.getConf.openmldbZkRootPath)

Check warning on line 63 in java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/nodes/SelectIntoPlan.scala

View check run for this annotation

Codecov / codecov/patch

java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/nodes/SelectIntoPlan.scala#L59-L63

Added lines #L59 - L63 were not covered by tests

input.getDf().write.options(writeOptions).format("openmldb").mode(mode).save()

Check warning on line 65 in java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/nodes/SelectIntoPlan.scala

View check run for this annotation

Codecov / codecov/patch

java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/nodes/SelectIntoPlan.scala#L65

Added line #L65 was not covered by tests

} else {
logger.info("offline select into: format[{}], options[{}], write mode[{}], out path {}", format, options,
mode, outPath)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,8 @@
// load data: read format, select into: write format
val format = if (file.toLowerCase().startsWith("hive://")) {
"hive"
} else if (file.toLowerCase().startsWith("openmldb://")) {
"openmldb"

Check warning on line 217 in java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/utils/HybridseUtil.scala

View check run for this annotation

Codecov / codecov/patch

java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/utils/HybridseUtil.scala#L217

Added line #L217 was not covered by tests
} else {
parseOption(getOptionFromNode(node, "format"), "csv", getStringOrDefault).toLowerCase
}
Expand Down Expand Up @@ -252,7 +254,11 @@
// only for select into, "" means N/A
extraOptions += ("coalesce" -> parseOption(getOptionFromNode(node, "coalesce"), "0", getIntOrDefault))
extraOptions += ("sql" -> parseOption(getOptionFromNode(node, "sql"), "", getStringOrDefault))
extraOptions += ("writer_type") -> parseOption(getOptionFromNode(node, "writer_type"), "single", getStringOrDefault)
extraOptions += ("writer_type") -> parseOption(getOptionFromNode(node, "writer_type"), "single",
getStringOrDefault)

extraOptions += ("create_if_not_exists" -> parseOption(getOptionFromNode(node, "create_if_not_exists"),
"true", getBoolOrDefault))

(format, options.toMap, mode, extraOptions.toMap)
}
Expand Down Expand Up @@ -451,6 +457,19 @@
path.substring(tableStartPos)
}

def getOpenmldbDbAndTable(path: String): (String, String) = {
require(path.toLowerCase.startsWith("openmldb://"))

Check warning on line 461 in java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/utils/HybridseUtil.scala

View check run for this annotation

Codecov / codecov/patch

java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/utils/HybridseUtil.scala#L461

Added line #L461 was not covered by tests
// openmldb://<table_pattern>
val tableStartPos = 11
val dbAndTableString = path.substring(tableStartPos)

Check warning on line 464 in java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/utils/HybridseUtil.scala

View check run for this annotation

Codecov / codecov/patch

java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/utils/HybridseUtil.scala#L463-L464

Added lines #L463 - L464 were not covered by tests

require(dbAndTableString.split("\\.").size == 2)

val db = dbAndTableString.split("\\.")(0)
val table = dbAndTableString.split("\\.")(1)
(db, table)

Check warning on line 470 in java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/utils/HybridseUtil.scala

View check run for this annotation

Codecov / codecov/patch

java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/utils/HybridseUtil.scala#L468-L470

Added lines #L468 - L470 were not covered by tests
}

private def hiveLoad(openmldbSession: OpenmldbSession, file: String, columns: util.List[Common.ColumnDesc],
loadDataSql: String = ""): DataFrame = {
if (logger.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@

val schema = df.schema

var createTableSql = s"CREATE TABLE $tableName ("
var createTableSql = s"CREATE TABLE IF NOT EXISTS $tableName ("

Check warning on line 39 in java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/utils/OpenmldbTableUtil.scala

View check run for this annotation

Codecov / codecov/patch

java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/utils/OpenmldbTableUtil.scala#L39

Added line #L39 was not covered by tests
tobegit3hub marked this conversation as resolved.
Show resolved Hide resolved
schema.map(structField => {
val colName = structField.name
val colType = DataTypeUtil.sparkTypeToString(structField.dataType)
Expand Down
Loading