Skip to content

Commit

Permalink
[Feature] DRC-916 Implement the new commands for upload/download dbt …
Browse files Browse the repository at this point in the history
…artifacts

Signed-off-by: Kent Huang <[email protected]>
  • Loading branch information
kentwelcome committed Nov 29, 2024
1 parent 5388fb4 commit 9018c22
Show file tree
Hide file tree
Showing 5 changed files with 193 additions and 21 deletions.
125 changes: 124 additions & 1 deletion recce/artifact.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,15 @@
import gzip
import os
import shutil
import tarfile
import tempfile

import requests
from rich.console import Console

from recce.git import hosting_repo, current_commit_hash, commit_hash_from_branch
from recce.state import s3_sse_c_headers
from recce.util.recce_cloud import RecceCloud, PresignedUrlMethod


def verify_artifact_path(target_path: str) -> bool:
Expand Down Expand Up @@ -32,4 +43,116 @@ def verify_artifact_path(target_path: str) -> bool:
def archive_artifact(target_path: str) -> str:
if verify_artifact_path(target_path) is False:
return None
pass

# prepare the temporary artifact path
tmp_dir = tempfile.mkdtemp()
artifact_tar_path = os.path.join(tmp_dir, 'dbt_artifact.tar')
artifact_tar_gz_path = artifact_tar_path + '.gz'

with tarfile.open(artifact_tar_path, 'w') as tar:
manifest_path = os.path.join(target_path, 'manifest.json')
catalog_path = os.path.join(target_path, 'catalog.json')
tar.add(manifest_path, arcname='manifest.json')
tar.add(catalog_path, arcname='catalog.json')

# Compress the tar file
with open(artifact_tar_path, 'rb') as f_in, gzip.open(artifact_tar_gz_path, 'wb') as f_out:
f_out.writelines(f_in)

# Clean up the temporary directory
try:
os.remove(artifact_tar_path)
except FileNotFoundError:
pass

return artifact_tar_gz_path


def upload_dbt_artifact(target_path: str, branch: str, token: str, password: str, debug: bool = False):
console = Console()
if verify_artifact_path(target_path) is False:
console.print(f"[[red]Error[/red]] Invalid target path: {target_path}")
console.print("Please provide a valid target path containing manifest.json and catalog.json.")
return 1

compress_file_path = archive_artifact(target_path)
repo = hosting_repo()

# Get the presigned URL for uploading the artifact
presigned_url = RecceCloud(token).get_presigned_url(
method=PresignedUrlMethod.UPLOAD,
repository=repo,
artifact_name='dbt_artifact.tar.gz',
sha=current_commit_hash(),
)

if debug:
console.print('Git information:')
console.print(f'Branch: {branch}')
console.print(f'Commit hash: {current_commit_hash()}')
console.print(f'GitHub repository: {repo}')
console.print(f'Artifact path: {compress_file_path}')
console.print(f'Presigned URL: {presigned_url}')

# Upload the compressed artifact
headers = s3_sse_c_headers(password)
response = requests.put(presigned_url, data=open(compress_file_path, 'rb').read(), headers=headers)
if response.status_code != 200:
raise Exception({response.text})

# Clean up the compressed artifact
try:
# Remove the compressed artifact
os.remove(compress_file_path)
# Clean up the temporary directory
os.rmdir(os.path.dirname(compress_file_path))
except FileNotFoundError:
pass


def download_dbt_artifact(target_path: str, branch: str, token: str, password: str,
force: bool = False,
debug: bool = False):
repo = hosting_repo()
sha = commit_hash_from_branch(branch)
if debug:
console = Console()
console.rule('Debug information:')
console.print(f'Git Branch: {branch}')
console.print(f'Git Commit hash: {sha}')
console.print(f'GitHub repository: {repo}')

if os.path.exists(target_path):
if not force:
raise Exception(f'Path {target_path} already exists. Please provide a new path.')
console.print(f'[[yellow]Warning[/yellow]] Removing existing path: {target_path}')
shutil.rmtree(target_path)

os.mkdir(target_path)

presigned_url = RecceCloud(token).get_presigned_url(
method=PresignedUrlMethod.DOWNLOAD,
repository=repo,
artifact_name='dbt_artifact.tar.gz',
sha=sha,
)
headers = s3_sse_c_headers(password)
response = requests.get(presigned_url, headers=headers)

if response.status_code != 200:
raise Exception(response.text)

tar_gz_file = os.path.join(target_path, 'dbt_artifact.tar.gz')
with open(tar_gz_file, 'wb') as f:
f.write(response.content)

with tarfile.open(tar_gz_file, 'r') as tar:
tar.extractall(path=target_path)

# Clean up the compressed artifact
try:
# Remove the compressed artifact
os.remove(tar_gz_file)
except FileNotFoundError:
pass
return 0
51 changes: 37 additions & 14 deletions recce/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@
import click
import uvicorn

from build.lib.recce.event.track import console
from recce import event
from recce.artifact import verify_artifact_path
from recce.artifact import upload_dbt_artifact, download_dbt_artifact
from recce.config import RecceConfig, RECCE_CONFIG_FILE, RECCE_ERROR_LOG_FILE
from recce.git import current_branch, current_commit_hash
from recce.git import current_branch
from recce.run import cli_run, check_github_ci_env
from recce.state import RecceStateLoader, RecceCloudStateManager
from recce.summary import generate_markdown_summary
Expand Down Expand Up @@ -570,36 +571,58 @@ def download(**kwargs):
@click.option('--branch', '-b', help='The branch of the provided artifact.', type=click.STRING,
envvar='GITHUB_HEAD_REF', default=current_branch())
@click.option('--target-path', help='dbt artifacts directory for your artifact.', type=click.STRING, default='target')
@click.option('--password', '-p', help='The password to encrypt the dbt artifact in cloud.', type=click.STRING,
envvar='RECCE_STATE_PASSWORD', required=True)
@add_options(recce_options)
def upload_artifact(**kwargs):
"""
Upload the DBT artifact to cloud
"""
from rich.console import Console
console = Console()
cloud_token = kwargs.get('cloud_token')
password = kwargs.get('password')
target_path = kwargs.get('target_path')
branch = kwargs.get('branch')

if verify_artifact_path(target_path) is False:
console.print(f"[[red]Error[/red]] Invalid target path: {target_path}")
console.print("Please provide a valid target path containing manifest.json and catalog.json.")
return 1

console.print(current_commit_hash())
pass
try:
rc = upload_dbt_artifact(target_path, branch=branch,
token=cloud_token, password=password,
debug=kwargs.get('debug', False))
except Exception as e:
console.print(f"[[red]Error[/red]] Failed to upload the DBT artifact to cloud.")
console.print(f"Reason: {e}")
rc = 1
return rc


@cloud.command(cls=TrackCommand)
@click.option('--cloud-token', help='The token used by Recce Cloud.', type=click.STRING,
envvar='GITHUB_TOKEN')
@click.option('--branch', '-b', help='The branch of the selected artifact.', type=click.STRING,
envvar='GITHUB_BASE_REF', default=current_branch())
@click.option('--target-path', help='dbt artifacts directory for your artifact.', type=click.STRING,
@click.option('--target-path', help='The dbt artifacts directory for your artifact.', type=click.STRING,
default='target-base')
@click.option('--password', '-p', help='The password to encrypt the dbt artifact in cloud.', type=click.STRING,
envvar='RECCE_STATE_PASSWORD')
@click.option('--force', '-f', help='Bypasses the confirmation prompt. Download the artifact directly.',
is_flag=True)b
@add_options(recce_options)
def download_artifact(**kwargs):
"""
Download the DBT artifact to cloud
"""

pass
cloud_token = kwargs.get('cloud_token')
password = kwargs.get('password')
target_path = kwargs.get('target_path')
branch = kwargs.get('branch')
try:
rc = download_dbt_artifact(target_path, branch=branch, token=cloud_token, password=password,
force=kwargs.get('force', False),
debug=kwargs.get('debug', False))
except Exception as e:
console.print(f"[[red]Error[/red]] Failed to download the DBT artifact from cloud.")
console.print(f"Reason: {e}")
rc = 1
return rc


@cli.group('github', short_help='GitHub related commands', hidden=True)
Expand Down
14 changes: 14 additions & 0 deletions recce/git.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,20 @@ def current_commit_hash(short_length: int = 7, short: bool = False):
return None


def commit_hash_from_branch(branch: str, short_length: int = 7, short: bool = False):
try:
repo = Repo(search_parent_directories=True)
commit = repo.commit(branch)
if short:
return commit.hexsha[:short_length]
return commit.hexsha
except TypeError:
# happened when HEAD is a detached symbolic reference
return None
except Exception:
return None


def hosting_repo(remote: str = 'origin'):
try:
repo = Repo(search_parent_directories=True)
Expand Down
11 changes: 8 additions & 3 deletions recce/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,9 @@ def _export_state_to_recce_cloud(self, metadata: dict = None) -> Union[str, None
import requests

presigned_url = RecceCloud(token=self.cloud_options.get('token')).get_presigned_url(
method=PresignedUrlMethod.UPLOAD, pr_info=self.pr_info, artifact_name=RECCE_STATE_COMPRESSED_FILE,
method=PresignedUrlMethod.UPLOAD, repository=self.pr_info.repository,
artifact_name=RECCE_STATE_COMPRESSED_FILE,
pr_id=self.pr_info.id,
metadata=metadata)
compress_passwd = self.cloud_options.get('password')
headers = s3_sse_c_headers(compress_passwd)
Expand Down Expand Up @@ -514,7 +516,9 @@ def _upload_state_to_recce_cloud(self, state: RecceState, metadata: dict = None)
import requests

presigned_url = RecceCloud(token=self.cloud_options.get('token')).get_presigned_url(
method=PresignedUrlMethod.UPLOAD, pr_info=self.pr_info, artifact_name=RECCE_STATE_COMPRESSED_FILE,
method=PresignedUrlMethod.UPLOAD, repository=self.pr_info.repository,
artifact_name=RECCE_STATE_COMPRESSED_FILE,
pr_id=self.pr_info.id,
metadata=metadata)

compress_passwd = self.cloud_options.get('password')
Expand Down Expand Up @@ -591,7 +595,8 @@ def _download_state_from_recce_cloud(self, filepath):
import requests

presigned_url = RecceCloud(token=self.cloud_options.get('token')).get_presigned_url(
method=PresignedUrlMethod.DOWNLOAD, pr_info=self.pr_info, artifact_name=RECCE_STATE_COMPRESSED_FILE)
method=PresignedUrlMethod.DOWNLOAD, repository=self.pr_info.repository,
artifact_name=RECCE_STATE_COMPRESSED_FILE, pr_id=self.pr_info.id)

password = self.cloud_options.get('password')
if password is None:
Expand Down
13 changes: 10 additions & 3 deletions recce/util/recce_cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,17 @@ def _request(self, method, url, data=None):

def get_presigned_url(self,
method: PresignedUrlMethod,
pr_info: PullRequestInfo,
repository: str,
artifact_name: str,
metadata: dict = None) -> str:
api_url = f'{self.base_url}/{pr_info.repository}/pulls/{pr_info.id}/artifacts/{method}?artifact_name={artifact_name}&enable_ssec=true'
metadata: dict = None,
pr_id: int = None,
sha: str = None) -> str:
if pr_id is not None:
api_url = f'{self.base_url}/{repository}/pulls/{pr_id}/artifacts/{method}?artifact_name={artifact_name}&enable_ssec=true'
elif sha is not None:
api_url = f'{self.base_url}/{repository}/commits/{sha}/artifacts/{method}?artifact_name={artifact_name}&enable_ssec=true'
else:
raise ValueError('Either pr_id or sha must be provided.')
response = self._request('POST', api_url, data=metadata)
if response.status_code != 200:
raise RecceCloudException(
Expand Down

0 comments on commit 9018c22

Please sign in to comment.