Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Process non-nullable scala type before udf #1471

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions config/spark2kafka.properties
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
#Parameters you must configure
#==============================================================
#The data source path should be a directory.
#The data source should be a directory.
source.path =

#The CSV Column Name.For example:sA=string,sB=integer,sC=boolean...
#The csv col names.For example:sA=string,sB=integer,sC=boolean...
column.names =

#Primary keys.For example:sA=string,sB=integer,sC=boolean...
Expand All @@ -15,8 +15,8 @@ kafka.bootstrap.servers =
#Set your topic name.
topic.name =

#Spark checkpoint path
checkpoint =
#Spark checkpoint
checkpoint.path =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

請問為何加上.path? 如果是要統一命名的話,Metadata裡面用的變數名稱也要跟著改

Copy link
Collaborator Author

@wycccccc wycccccc Feb 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

主要是shell 如果按照checkpoint去搜索會把上方的註解也一併識別,因此乾脆改一個統一的名字。


#Parameters that can be selected for configuration
#==============================================================
Expand Down
2 changes: 1 addition & 1 deletion docker/start_etl.sh
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ declare -r SPARK_VERSION=${SPARK_VERSION:-3.3.1}
declare -r LOCAL_PATH=$(cd -- "$(dirname -- "${DOCKER_FOLDER}")" &>/dev/null && pwd)
# ===============================[properties keys]=================================
declare -r SOURCE_KEY="source.path"
declare -r CHECKPOINT_KEY="checkpoint"
declare -r CHECKPOINT_KEY="checkpoint.path"
# ===============================[spark driver/executor resource]==================
declare -r RESOURCES_CONFIGS="${RESOURCES_CONFIGS:-"--conf spark.driver.memory=4g --conf spark.executor.memory=4g"}"
# ===================================[functions]===================================
Expand Down
17 changes: 12 additions & 5 deletions etl/src/main/scala/org/astraea/etl/DataFrameProcessor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package org.astraea.etl

import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions._
import org.apache.spark.sql.sources.{IsNotNull, IsNull}
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.astraea.common.json.JsonConverter
Expand Down Expand Up @@ -70,7 +71,17 @@ class DataFrameProcessor(dataFrame: DataFrame) {
.withColumn(
"value",
defaultConverter(
map(cols.flatMap(c => List(lit(c.name), col(c.name))): _*)
map(
cols
.map(c =>
(
lit(c.name),
when(col(c.name).isNotNull, col(c.name))
)
)
.filter(_._2 != null)
.flatMap(c => List(c._1, c._2)): _*
)
)
)
.withColumn(
Expand Down Expand Up @@ -171,10 +182,6 @@ object DataFrameProcessor {

private def schema(columns: Seq[DataColumn]): StructType =
StructType(columns.map { col =>
if (col.dataType != DataType.StringType)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

現在支援非string 型別了嗎?

Copy link
Collaborator Author

@wycccccc wycccccc Feb 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

沒錯,目前我測試下來已支援。因爲在column時是能夠處理null的,但如果放在udf中轉換回scala中的某些type就不支持null處理了。

throw new IllegalArgumentException(
"Sorry, only string type is currently supported.Because a problem(astraea #1286) has led to the need to wrap the non-nullable type."
)
StructField(col.name, col.dataType.sparkType)
})
}
Expand Down
4 changes: 2 additions & 2 deletions etl/src/main/scala/org/astraea/etl/Metadata.scala
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ case class Metadata private (
object Metadata {
private[etl] val ARCHIVE_PATH = "archive.path"
private[etl] val SOURCE_PATH_KEY = "source.path"
private[etl] val CHECKPOINT_KEY = "checkpoint"
private[etl] val CHECKPOINT_KEY = "checkpoint.path"
private[etl] val COLUMN_NAME_KEY = "column.names"
private[etl] val COLUMN_TYPES_KEY = "column.types"
private[etl] val CLEAN_SOURCE = "clean.source"
Expand All @@ -71,7 +71,7 @@ object Metadata {

private[etl] val DEFAULT_PARTITIONS = 15
private[etl] val DEFAULT_REPLICAS = 1.toShort
private[etl] val DEFAULT_RECURSIVE = "ture"
private[etl] val DEFAULT_RECURSIVE = "true"
private[etl] val DEFAULT_CLEAN_SOURCE = "delete"

// Parameters needed to configure ETL.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import java.io._
import java.nio.file.Files
import java.util.concurrent.TimeUnit
import scala.concurrent.duration.Duration
import scala.jdk.CollectionConverters._
import scala.jdk.CollectionConverters.CollectionHasAsScala

class DataFrameProcessorTest {
@Test
Expand Down