diff --git a/components/snowflake/component.yaml b/components/snowflake/component.yaml new file mode 100644 index 00000000000..3c6bb13d145 --- /dev/null +++ b/components/snowflake/component.yaml @@ -0,0 +1,156 @@ +# PIPELINE DEFINITION +# Name: snowflake-unload-op +# Description: Run COPY in snowflake to unload data to GCS bucket. +# output_gcs_path: the location to land the data +# sf_storage_integration: the Snowflake Storage Integration name +# query: the query to execute in the COPY command +# sf_user: the snowflake username +# sf_password: the snowflake password +# sf_warehouse: the snowflake warehouse name +# sf_database: the database to use +# Inputs: +# output_gcs_path: str +# query: str +# sf_account: str +# sf_database: str +# sf_password: str +# sf_storage_integration: str +# sf_user: str +# sf_warehouse: str +# Outputs: +# Output: str +components: + comp-snowflake-unload-op: + executorLabel: exec-snowflake-unload-op + inputDefinitions: + parameters: + output_gcs_path: + parameterType: STRING + query: + parameterType: STRING + sf_account: + parameterType: STRING + sf_database: + parameterType: STRING + sf_password: + parameterType: STRING + sf_storage_integration: + parameterType: STRING + sf_user: + parameterType: STRING + sf_warehouse: + parameterType: STRING + outputDefinitions: + parameters: + Output: + parameterType: STRING +deploymentSpec: + executors: + exec-snowflake-unload-op: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - snowflake_unload_op + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.7.0'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\ + \ python3 -m pip install --quiet --no-warn-script-location 'snowflake-connector-python:3.12.3'\ + \ && \"$0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef snowflake_unload_op(\n output_gcs_path: str,\n sf_storage_integration:\ + \ str,\n query: str,\n sf_user: str,\n sf_password: str,\n sf_account:\ + \ str,\n sf_warehouse: str,\n sf_database: str\n ) -> str:\n \ + \ \"\"\"\n Run COPY in snowflake to unload data to GCS bucket.\n\n \ + \ output_gcs_path: the location to land the data\n sf_storage_integration:\ + \ the Snowflake Storage Integration name\n query: the query to execute\ + \ in the COPY command\n sf_user: the snowflake username\n sf_password:\ + \ the snowflake password\n sf_warehouse: the snowflake warehouse name\n\ + \ sf_database: the database to use\n \"\"\"\n import snowflake.connector\n\ + \ conn = snowflake.connector.connect(user=sf_user,\n \ + \ password=sf_password,\n \ + \ account=sf_account,\n \ + \ role=\"ACCOUNTADMIN\")\n\n conn.cursor().execute(f\"USE WAREHOUSE\ + \ {sf_warehouse};\")\n conn.cursor().execute(f\"USE DATABASE {sf_database};\"\ + )\n result = conn.cursor().execute(f\"\"\"\n COPY INTO 'gcs://{output_gcs_path}'\n\ + \ FROM ({query})\n FILE_FORMAT = (TYPE = CSV COMPRESSION=NONE)\n \ + \ STORAGE_INTEGRATION = {sf_storage_integration}\n HEADER = TRUE\n\ + \ \"\"\")\n _ = result.fetchall()\n if output_gcs_path.endswith(\"\ + /\"):\n return output_gcs_path + \"data_0_0_0.csv\"\n else:\n\ + \ return output_gcs_path\n\n" + image: python:3.11 +pipelineInfo: + name: snowflake-unload-op +root: + dag: + outputs: + parameters: + Output: + valueFromParameter: + outputParameterKey: Output + producerSubtask: snowflake-unload-op + tasks: + snowflake-unload-op: + cachingOptions: + enableCache: true + componentRef: + name: comp-snowflake-unload-op + inputs: + parameters: + output_gcs_path: + componentInputParameter: output_gcs_path + query: + componentInputParameter: query + sf_account: + componentInputParameter: sf_account + sf_database: + componentInputParameter: sf_database + sf_password: + componentInputParameter: sf_password + sf_storage_integration: + componentInputParameter: sf_storage_integration + sf_user: + componentInputParameter: sf_user + sf_warehouse: + componentInputParameter: sf_warehouse + taskInfo: + name: snowflake-unload-op + inputDefinitions: + parameters: + output_gcs_path: + parameterType: STRING + query: + parameterType: STRING + sf_account: + parameterType: STRING + sf_database: + parameterType: STRING + sf_password: + parameterType: STRING + sf_storage_integration: + parameterType: STRING + sf_user: + parameterType: STRING + sf_warehouse: + parameterType: STRING + outputDefinitions: + parameters: + Output: + parameterType: STRING +schemaVersion: 2.1.0 +sdkVersion: kfp-2.7.0 diff --git a/components/snowflake/snowflake_unload_data.py b/components/snowflake/snowflake_unload_data.py new file mode 100644 index 00000000000..6da2faefef0 --- /dev/null +++ b/components/snowflake/snowflake_unload_data.py @@ -0,0 +1,56 @@ +""" +This is a KFP component doing "unload data to GCS bucket" operation + from the Snowflake database. +""" +from kfp import compiler +from kfp.dsl import component + +@component( + base_image="python:3.11", + packages_to_install=["snowflake-connector-python:3.12.3"] +) +def snowflake_unload_op( + output_gcs_path: str, + sf_storage_integration: str, + query: str, + sf_user: str, + sf_password: str, + sf_account: str, + sf_warehouse: str, + sf_database: str + ) -> str: + """ + Run COPY in snowflake to unload data to GCS bucket. + + output_gcs_path: the location to land the data + sf_storage_integration: the Snowflake Storage Integration name + query: the query to execute in the COPY command + sf_user: the snowflake username + sf_password: the snowflake password + sf_warehouse: the snowflake warehouse name + sf_database: the database to use + """ + import snowflake.connector + conn = snowflake.connector.connect(user=sf_user, + password=sf_password, + account=sf_account, + role="ACCOUNTADMIN") + + conn.cursor().execute(f"USE WAREHOUSE {sf_warehouse};") + conn.cursor().execute(f"USE DATABASE {sf_database};") + result = conn.cursor().execute(f""" + COPY INTO 'gcs://{output_gcs_path}' + FROM ({query}) + FILE_FORMAT = (TYPE = CSV COMPRESSION=NONE) + STORAGE_INTEGRATION = {sf_storage_integration} + HEADER = TRUE + """) + _ = result.fetchall() + if output_gcs_path.endswith("/"): + return output_gcs_path + "data_0_0_0.csv" + else: + return output_gcs_path + +if __name__ == "__main__": + compiler.Compiler().compile(pipeline_func=snowflake_unload_op, + package_path="component.yaml")