Skip to content

Commit

Permalink
get_snowflake_path
Browse files Browse the repository at this point in the history
  • Loading branch information
aabbasi-hbo committed Nov 4, 2022
1 parent 46db910 commit 6fa8596
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 11 deletions.
14 changes: 14 additions & 0 deletions feathr_project/feathr/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,20 @@ def build_features(self, anchor_list: List[FeatureAnchor] = [], derived_feature_
# Pretty print anchor_list
if verbose and self.anchor_list:
FeaturePrinter.pretty_print_anchors(self.anchor_list)

def get_snowflake_path(self, database: str, schema: str, dbtable: str = None, query: str = None) -> str:
"""
Returns snowflake path given dataset location information.
Either dbtable or query must be specified but not both.
"""
if dbtable is not None and query is not None:
raise RuntimeError("Both dbtable and query are specified. Can only specify one..")
if dbtable is None and query is None:
raise RuntimeError("One of dbtable or query must be specified..")
if dbtable:
return f"snowflake://snowflake_account/?sfDatabase={database}&fSchema={schema}&dbtable={dbtable}"
else:
return f"snowflake://snowflake_account/?sfDatabase={database}&fSchema={schema}&query={query}"

def list_registered_features(self, project_name: str = None) -> List[str]:
"""List all the already registered features under the given project.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -711,8 +711,6 @@ private[offline] class DataSourceLoader extends JsonDeserializer[DataSource] {
override def deserialize(p: JsonParser, ctxt: DeserializationContext): DataSource = {
val codec = p.getCodec
val node = codec.readTree[TreeNode](p).asInstanceOf[ObjectNode]
println(s"NODE: ${node.toPrettyString}")
println(s"NODE TYPE: ${Option(node.get("type"))}")
// for now only HDFS can be set, in the future, here may allow more options
// also to form a unified interface with online
val dataSourceType = Option(node.get("type")) match {
Expand All @@ -734,15 +732,13 @@ private[offline] class DataSourceLoader extends JsonDeserializer[DataSource] {
} else {
SourceFormatType.FIXED_PATH
}
println(s"SOURCE FORMAT TYPE: ${sourceFormatType}")
/*
* path here can be:
*
* 1. a HDFS path
* 2. a placeholder with reserved string "PASSTHROUGH" for anchor defined pass-through features,
* since anchor defined pass-through features do not have path
*/
println(s"DATA SOURCE TYPE: ${dataSourceType}")
val path: DataLocation = dataSourceType match {
case "KAFKA" =>
Option(node.get("config")) match {
Expand All @@ -757,7 +753,6 @@ private[offline] class DataSourceLoader extends JsonDeserializer[DataSource] {
case "SNOWFLAKE" =>
Option(node.get("config")) match {
case Some(field: ObjectNode) =>
println(s"FIELD: ${field}")
LocationUtils.getMapper().treeToValue(field, classOf[Snowflake])
case None => throw new FeathrConfigException(ErrorLabel.FEATHR_USER_ERROR,
s"Snowflake config is not defined for Snowflake source ${node.toPrettyString()}")
Expand All @@ -766,7 +761,6 @@ private[offline] class DataSourceLoader extends JsonDeserializer[DataSource] {
}
case _ => Option(node.get("location")) match {
case Some(field: ObjectNode) =>
println(s"FIELD: ${field}")
LocationUtils.getMapper().treeToValue(field, classOf[DataLocation])
case None => throw new FeathrConfigException(ErrorLabel.FEATHR_USER_ERROR,
s"Data location is not defined for data source ${node.toPrettyString()}")
Expand All @@ -783,7 +777,7 @@ private[offline] class DataSourceLoader extends JsonDeserializer[DataSource] {
case _ => throw new FeathrConfigException(ErrorLabel.FEATHR_USER_ERROR,
s"Illegal setting for timeWindowParameters ${node.toPrettyString()}, expected map")
}
println(s"TIME WINDOW PARAMETER NODE: ${timeWindowParameterNode}")

val timeWindowParameters = timeWindowParameterNode match {
case Some(node: ObjectNode) =>
if (node.has("timestamp")) { // legacy configurations
Expand All @@ -805,7 +799,6 @@ private[offline] class DataSourceLoader extends JsonDeserializer[DataSource] {
}
case None => null
}
println(s"TIME WINDOW PARAMS: ${timeWindowParameters}")
if (path.isInstanceOf[KafkaEndpoint]) {
DataSource(path, sourceFormatType)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ case class Snowflake(@JsonProperty("database") database: String,
@JsonProperty("query") query: String = "") extends DataLocation {

override def loadDf(ss: SparkSession, dataIOParameters: Map[String, String] = Map()): DataFrame = {
println(s"SNOWFLAKE PATH: ${getPathList}")
SparkIOUtils.createUnionDataFrame(getPathList, dataIOParameters, new JobConf(), List()) // The simple path is not responsible for handling custom data loaders.
SparkIOUtils.createUnionDataFrame(getPathList, dataIOParameters, new JobConf(), List())
}

override def writeDf(ss: SparkSession, df: DataFrame, header: Option[Header]): Unit = ???
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ class SnowflakeDataLoader(ss: SparkSession) {
val uri = new URI(url)
val charset = Charset.forName("UTF-8")
val params = URLEncodedUtils.parse(uri.getQuery, charset).asScala
println(s"PARAMS: ${params}")
params.foreach(x => {
authParams = authParams.updated(x.getName, x.getValue)
})
Expand Down

0 comments on commit 6fa8596

Please sign in to comment.