Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
ezvz committed Dec 1, 2023
1 parent d8839bc commit 2289824
Show file tree
Hide file tree
Showing 5 changed files with 191 additions and 17 deletions.
6 changes: 3 additions & 3 deletions api/py/test/sample/data/checkouts.csv
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
ds,ts,checkout_id,user_id,product_id
"2023-11-01",1698890809000,111,123,111
"2023-11-02",1698977209000,111,123,111
"2023-11-03",1699063609000,111,123,111
2023-11-01,2023-11-01 10:10:10,111,123,111
2023-11-02,2023-11-02 12:12:12,111,123,111
2023-11-03,2023-11-03 13:13:13,111,123,111
4 changes: 3 additions & 1 deletion api/py/test/sample/data/purchases.csv
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
ds,ts,purchase_id,user_id,product_id,purchase_price
"2023-11-01",1698847070000,111,123,111,10
2023-11-01,2023-11-01 14:10:10,111,123,111,10
2023-11-02,2023-11-02 01:10:10,111,123,111,10
2023-11-03,2023-11-03 03:10:10,111,123,111,10
184 changes: 184 additions & 0 deletions api/py/test/sample/production/joins/quickstart/training_set.v1
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
"customJson": "{\"check_consistency\": false, \"lag\": 0, \"join_tags\": null, \"join_part_tags\": {}}",
"dependencies": [
"{\"name\": \"wait_for_data.checkouts_ds\", \"spec\": \"data.checkouts/ds={{ ds }}\", \"start\": null, \"end\": null}",
"{\"name\": \"wait_for_data.purchases_ds\", \"spec\": \"data.purchases/ds={{ ds }}\", \"start\": null, \"end\": null}",
"{\"name\": \"wait_for_data.returns_ds\", \"spec\": \"data.returns/ds={{ ds }}\", \"start\": null, \"end\": null}",
"{\"name\": \"wait_for_data.users_ds\", \"spec\": \"data.users/ds={{ ds }}\", \"start\": null, \"end\": null}"
],
"tableProperties": {
Expand All @@ -30,6 +32,188 @@
}
},
"joinParts": [
{
"groupBy": {
"metaData": {
"name": "quickstart.purchases.v1",
"customJson": "{\"lag\": 0, \"groupby_tags\": null, \"column_tags\": {}}",
"dependencies": [
"{\"name\": \"wait_for_data.purchases_ds\", \"spec\": \"data.purchases/ds={{ ds }}\", \"start\": null, \"end\": null}"
],
"team": "quickstart",
"offlineSchedule": "@daily"
},
"sources": [
{
"events": {
"table": "data.purchases",
"topic": "events/purchase_events",
"query": {
"selects": {
"user_id": "user_id",
"purchase_price": "purchase_price"
},
"timeColumn": "ts",
"setups": []
}
}
}
],
"keyColumns": [
"user_id"
],
"aggregations": [
{
"inputColumn": "purchase_price",
"operation": 7,
"argMap": {},
"windows": [
{
"length": 3,
"timeUnit": 1
},
{
"length": 30,
"timeUnit": 1
},
{
"length": 90,
"timeUnit": 1
}
]
},
{
"inputColumn": "purchase_price",
"operation": 6,
"argMap": {},
"windows": [
{
"length": 3,
"timeUnit": 1
},
{
"length": 30,
"timeUnit": 1
},
{
"length": 90,
"timeUnit": 1
}
]
},
{
"inputColumn": "purchase_price",
"operation": 8,
"argMap": {},
"windows": [
{
"length": 3,
"timeUnit": 1
},
{
"length": 30,
"timeUnit": 1
},
{
"length": 90,
"timeUnit": 1
}
]
}
]
}
},
{
"groupBy": {
"metaData": {
"name": "quickstart.returns.v1",
"customJson": "{\"lag\": 0, \"groupby_tags\": null, \"column_tags\": {}}",
"dependencies": [
"{\"name\": \"wait_for_data.returns_ds\", \"spec\": \"data.returns/ds={{ ds }}\", \"start\": null, \"end\": null}"
],
"team": "quickstart",
"offlineSchedule": "@daily"
},
"sources": [
{
"events": {
"table": "data.returns",
"topic": "return_events",
"query": {
"selects": {
"user_id": "user_id",
"refund_amt": "refund_amt"
},
"timeColumn": "ts",
"setups": []
}
}
}
],
"keyColumns": [
"user_id"
],
"aggregations": [
{
"inputColumn": "refund_amt",
"operation": 7,
"argMap": {},
"windows": [
{
"length": 3,
"timeUnit": 1
},
{
"length": 30,
"timeUnit": 1
},
{
"length": 90,
"timeUnit": 1
}
]
},
{
"inputColumn": "refund_amt",
"operation": 6,
"argMap": {},
"windows": [
{
"length": 3,
"timeUnit": 1
},
{
"length": 30,
"timeUnit": 1
},
{
"length": 90,
"timeUnit": 1
}
]
},
{
"inputColumn": "refund_amt",
"operation": 8,
"argMap": {},
"windows": [
{
"length": 3,
"timeUnit": 1
},
{
"length": 30,
"timeUnit": 1
},
{
"length": 90,
"timeUnit": 1
}
]
}
]
}
},
{
"groupBy": {
"metaData": {
Expand Down
1 change: 0 additions & 1 deletion api/py/test/sample/scripts/spark_submit.sh
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ $SPARK_SUBMIT_PATH \
--conf spark.port.maxRetries=20 \
--conf spark.task.maxFailures=20 \
--conf spark.stage.maxConsecutiveAttempts=12 \
--conf spark.maxRemoteBlockSizeFetchToMem=2G \
--conf spark.network.timeout=230s \
--conf spark.executor.heartbeatInterval=200s \
--conf spark.local.dir=${CHRONON_WORKING_DIR} \
Expand Down
13 changes: 1 addition & 12 deletions spark/src/main/scala/ai/chronon/spark/LocalDataLoader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -50,21 +50,10 @@ object LocalDataLoader {
if (schema.fieldNames.contains("ts") && schema(schema.fieldIndex("ts")).dataType == StringType) {
df = df
.withColumnRenamed("ts", "ts_string")
.withColumn("ts", date_format(col("ts_string"), "yyyy-MM-dd") * 1000)
.withColumn("ts", unix_timestamp(date_format(col("ts_string"), "yyyy-MM-dd HH:mm:ss")) * 1000)
.drop("ts_string")
}

// Spark schema inference converts ds yyyy-MM-dd to timestamps.
// We need these to be read as strings
if (schema.fieldNames.contains("ds") && schema(schema.fieldIndex("ds")).dataType ==
TimestampType) {
df = df
.withColumnRenamed("ds", "ds_time")
.withColumn("ds", unix_timestamp(col("ts_string")) * 1000)
.drop("ds_time")
}


println(s"Loading data from ${file.getPath} into $tableName. Sample data and schema shown below")
df.show(100)
println(df.schema.pretty)
Expand Down

0 comments on commit 2289824

Please sign in to comment.