diff --git a/.gitignore b/.gitignore index 12c8b054..fa439b61 100644 --- a/.gitignore +++ b/.gitignore @@ -10,3 +10,4 @@ bazel-* **/.ipynb_checkpoints +shuffle* \ No newline at end of file diff --git a/baseline.sh b/baseline.sh new file mode 100644 index 00000000..8e99f3fd --- /dev/null +++ b/baseline.sh @@ -0,0 +1,23 @@ +sudo docker run --gpus 1 \ + -v /home/${USER}:/home/${USER} \ + google/deepvariant:"${BIN_VERSION}-gpu" \ + /opt/deepvariant/bin/run_deepvariant \ + --model_type WGS \ + --ref "${REF}" \ + --reads "${BAM_CHR20}" \ + --regions "chr20" \ + --output_vcf "${OUTPUT_DIR}/baseline.vcf.gz" \ + --num_shards=4 + +time sudo docker run -it \ +-v "${DATA_DIR}:${DATA_DIR}" \ +-v "${OUTPUT_DIR}:${OUTPUT_DIR}" \ +jmcdani20/hap.py:v0.3.12 /opt/hap.py/bin/hap.py \ + "${TRUTH_VCF}" \ + "${OUTPUT_DIR}/baseline.vcf.gz" \ + -f "${TRUTH_BED}" \ + -r "${REF}" \ + -o "${OUTPUT_DIR}/chr20-calling_general.happy.output" \ + -l chr20 \ + --engine=vcfeval \ + --pass-only \ No newline at end of file diff --git a/docs/add_alarm.sh b/docs/add_alarm.sh new file mode 100644 index 00000000..fa12ecdd --- /dev/null +++ b/docs/add_alarm.sh @@ -0,0 +1,38 @@ +#!/bin/bash + +# Variables +host="my-ec2-instance" +region="us-east-1" + +# Step 1: Launch the EC2 instance +instance_id=$(aws ec2 run-instances \ + --image-id ami-096ea6a12ea24a797 \ + --count 1 \ + --instance-type t4g.small \ + --security-group-id sg-0b734813083db4ba2 \ + --key-name gpu \ + --block-device-mappings DeviceName=/dev/sda1,Ebs={VolumeSize=20} \ + --tag-specifications 'ResourceType=instance,Tags=[{Key=Name,Value='"${host}"'}]' \ + --query "Instances[0].InstanceId" \ + --output text \ + --region $region \ + --profile gpu) + +echo "Launched EC2 instance with ID: $instance_id" + +# Step 2: Create the CloudWatch alarm +aws cloudwatch put-metric-alarm \ + --alarm-name "CPUUtilization-Low-${instance_id}" \ + --metric-name CPUUtilization \ + --namespace AWS/EC2 \ + --statistic Average \ + --period 3600 \ + --threshold 1 \ + --comparison-operator LessThanOrEqualToThreshold \ + --dimensions "Name=InstanceId,Value=${instance_id}" \ + --evaluation-periods 2 \ + --alarm-actions arn:aws:sns:us-east-1:940583394710:idle-instance-alarm \ + --region $region \ + --profile gpu + +echo "Alarm created for instance: $instance_id" diff --git a/docs/deepvariant-training-case-study.md b/docs/deepvariant-training-case-study.md index f6c97ed8..2a5c428d 100644 --- a/docs/deepvariant-training-case-study.md +++ b/docs/deepvariant-training-case-study.md @@ -27,32 +27,91 @@ accuracy comparing to the WGS model as a baseline: This tutorial is meant as an example for training; all the other processing in this tutorial were done serially with no pipeline optimization. +## bam processing +Since PicoV3 is very low coverage, we need to only take the Bam files regions that have enough coverage >2, +it make sense at least 3 reads to vote to have a majority +Using Bed file or something like that +Learn from the Pacbio examples + +First set up AWS deepvariant + +Collect the bams files with ground truth data available. +Do variant calling using the generalized model first to see the performance on the regions that have coverage >2 +Then see if we could improve on that + +```bash +BAM_CHR1="${DATA_DIR}/BGISEQ_PE100_NA12878.sorted.chr1.bam" +BAM_CHR20="${DATA_DIR}/BGISEQ_PE100_NA12878.sorted.chr20.bam" +BAM_CHR21="${DATA_DIR}/BGISEQ_PE100_NA12878.sorted.chr21.bam" +merged_bam="merged.bam" +mininum_coverage=2 +coverage_bed="pass_threshold.bed" +# why do we need this bed file? +TRUTH_BED="${DATA_DIR}/HG001_GRCh37_GIAB_highconf_CG-IllFB-IllGATKHC-Ion-10X-SOLID_CHROM1-X_v.3.3.2_highconf_nosomaticdel_chr.bed" +# https://bedtools.readthedocs.io/en/latest/content/tools/genomecov.html +samtools merge $merged_bam $BAM_CHR1 $BAM_CHR20 $BAM_CHR21 +bedtools genomecov -ibam $merged_bam -bg | \ + awk -v min_cov="$minimum_coverage" '$4 > min_cov {print $1, $2, $3}' | \ + bedtools intersect -a $TRUTH_BED -b - > $coverage_bed +``` + ## Request a machine For this case study, we use a [GPU machine] with 16 vCPUs. You can request this -machine on Google Cloud using the following command: +machine on AWS using the following command: ```bash +# public.ecr.aws/aws-genomics/google/deepvariant:1.4.0 +# https://cloud-images.ubuntu.com/locator/ec2/ +aws ec2 run-instances \ + --image-id ami-0c272455b0778ebeb \ # Replace with the correct AMI ID for Ubuntu 20.04 LTS in your region + --count 1 \ + --instance-type p3.2xlarge \ # p3 instances use Nvidia Tesla V100 GPUs, which is close to the Tesla P100 + --key-name MyKeyPair \ # Replace with your key pair name + --block-device-mappings DeviceName=/dev/sda1,Ebs={VolumeSize=300} \ + --tag-specifications 'ResourceType=instance,Tags=[{Key=Name,Value='"${USER}-deepvariant-vm"'}]' \ + --region us-west-2 \ + --iam-instance-profile Name=gpu + --placement AvailabilityZone=us-west-2b +``` +```bash +aws ec2 run-instances \ + --image-id ami-0c272455b0778ebeb \ # Replace with the correct AMI ID for Ubuntu 20.04 LTS in your region + --count 1 \ + --instance-type t4g.small \ # just get a small instance to try it out first + --key-name gpu \ # Replace with your key pair name + --block-device-mappings DeviceName=/dev/sda1,Ebs={VolumeSize=20} \ + --tag-specifications 'ResourceType=instance,Tags=[{Key=Name,Value='"${USER}-deepvariant-vm"'}]' \ + --region us-west-2 \ + --iam-instance-profile Name=gpu + --placement AvailabilityZone=us-west-2b +``` +```bash +# this actually works host="${USER}-deepvariant-vm" -zone="us-west1-b" - -gcloud compute instances create ${host} \ - --scopes "compute-rw,storage-full,cloud-platform" \ - --maintenance-policy "TERMINATE" \ - --accelerator=type=nvidia-tesla-p100,count=1 \ - --image-family "ubuntu-2004-lts" \ - --image-project "ubuntu-os-cloud" \ - --machine-type "n1-standard-16" \ - --boot-disk-size "300" \ - --zone "${zone}" \ - --min-cpu-platform "Intel Skylake" +region="us-east-1" +chmod 400 ~/gpu.pem +# this image id is not right +aws ec2 run-instances \ + --image-id ami-096ea6a12ea24a797 \ + --count 1 \ + --instance-type t4g.small \ + --security-group-id sg-0b734813083db4ba2 \ + --key-name gpu \ + --block-device-mappings DeviceName=/dev/sda1,Ebs={VolumeSize=20} \ + --tag-specifications 'ResourceType=instance,Tags=[{Key=Name,Value='"${host}"'}]' \ + --region $region \ + --profile gpu ``` - After a minute or two, your VM should be ready and you can ssh into it using the following command: ```bash -gcloud compute ssh ${host} --zone ${zone} +aws ec2 stop-instances --instance-ids i-0e4f059f74edbb771 -profile gpu +# elastic IP +ssh -i "~/gpu.pem" ubuntu@54.156.141.144 +ssh gpu +# ssh -i ~/gpu.pem ubuntu@${host} ``` Once you have logged in, set the variables: @@ -60,7 +119,7 @@ Once you have logged in, set the variables: ```bash YOUR_PROJECT=REPLACE_WITH_YOUR_PROJECT OUTPUT_GCS_BUCKET=REPLACE_WITH_YOUR_GCS_BUCKET - +# might have to install gsutil to make sure the instance connect to deepvariant's standard files BUCKET="gs://deepvariant" VERSION="1.6.1" DOCKER_IMAGE="google/deepvariant:${VERSION}" @@ -113,6 +172,8 @@ gsutil -m cp -r "${DATA_BUCKET}/HG001_GRCh37_GIAB_highconf_CG-IllFB-IllGATKHC-Io ### Download extra packages ```bash +snap install gh +gh auth login sudo apt -y update sudo apt -y install parallel curl -O https://raw.githubusercontent.com/google/deepvariant/r1.6.1/scripts/install_nvidia_docker.sh @@ -538,7 +599,7 @@ time sudo docker run -it \ jmcdani20/hap.py:v0.3.12 /opt/hap.py/bin/hap.py \ "${TRUTH_VCF}" \ "${OUTPUT_DIR}/test_set.vcf.gz" \ - -f "${TRUTH_BED}" \ + -f "${TRUTH_BED}" \ # this is important in my study to make sure coverage >2 -r "${REF}" \ -o "${OUTPUT_DIR}/chr20-calling.happy.output" \ -l chr20 \ @@ -588,7 +649,7 @@ sudo docker run --gpus all \ --output_vcf "${OUTPUT_DIR}/baseline.vcf.gz" \ --num_shards=${N_SHARDS} ``` - +baseline vcf run happy Baseline: | Type | TRUTH.TP | TRUTH.FN | QUERY.FP | METRIC.Recall | METRIC.Precision | METRIC.F1_Score | diff --git a/eval.sh b/eval.sh new file mode 100644 index 00000000..e0db35ca --- /dev/null +++ b/eval.sh @@ -0,0 +1,23 @@ +BASE="/home/${USER}/data/training-case-study" +OUTPUT_DIR="${BASE}/output" +model="/home/${USER}/data/model/model.ckpt" +TRAINING_DIR="${OUTPUT_DIR}/training_dir" +BIN_VERSION="1.4.0" +INPUT_DIR="${BASE}/input" +LOG_DIR="${OUTPUT_DIR}/logs" +DATA_DIR="${INPUT_DIR}/data" +REF="${DATA_DIR}/ucsc_hg19.fa" +BAM_CHR1="${DATA_DIR}/BGISEQ_PE100_NA12878.sorted.chr1.bam" +BAM_CHR20="${DATA_DIR}/BGISEQ_PE100_NA12878.sorted.chr20.bam" + +sudo docker run --gpus 1 \ + -v /home/${USER}:/home/${USER} \ + google/deepvariant:"${BIN_VERSION}-gpu" \ + /opt/deepvariant/bin/run_deepvariant \ + --model_type WGS \ + --customized_model "${TRAINING_DIR}/model.ckpt-50000" \ + --ref "${REF}" \ + --reads "${BAM_CHR20}" \ + --regions "chr20" \ + --output_vcf "${OUTPUT_DIR}/test_set.vcf.gz" \ + --num_shards=4 \ No newline at end of file diff --git a/index_too_old.sh b/index_too_old.sh new file mode 100644 index 00000000..83245f2b --- /dev/null +++ b/index_too_old.sh @@ -0,0 +1,2 @@ +#index is older than file +solve by touch the index files \ No newline at end of file diff --git a/make_example.sh b/make_example.sh new file mode 100644 index 00000000..f1a36f7c --- /dev/null +++ b/make_example.sh @@ -0,0 +1,54 @@ +# amd64 architecture, but your machine is running an arm64 architecture + +YOUR_PROJECT=takara +OUTPUT_GCS_BUCKET=REPLACE_WITH_YOUR_GCS_BUCKET +# might have to install gsutil to make sure the instance connect to deepvariant's standard files +BUCKET="gs://deepvariant" +VERSION="1.6.1" +DOCKER_IMAGE="google/deepvariant:${VERSION}" + +MODEL_BUCKET="${BUCKET}/models/DeepVariant/${VERSION}/DeepVariant-inception_v3-${VERSION}+data-wgs_standard" +GCS_PRETRAINED_WGS_MODEL="${MODEL_BUCKET}/model.ckpt" + +OUTPUT_BUCKET="${OUTPUT_GCS_BUCKET}/customized_training" +TRAINING_DIR="${OUTPUT_BUCKET}/training_dir" + +BASE="/home/ubuntu/data/training-case-study" +DATA_BUCKET=gs://deepvariant/training-case-study/BGISEQ-HG001 + +INPUT_DIR="${BASE}/input" +BIN_DIR="${INPUT_DIR}/bin" +DATA_DIR="${INPUT_DIR}/data" +OUTPUT_DIR="${BASE}/output2" +LOG_DIR="${OUTPUT_DIR}/logs" +SHUFFLE_SCRIPT_DIR="${HOME}/deepvariant/tools" + +REF="${DATA_DIR}/ucsc_hg19.fa" +BAM_CHR1="${DATA_DIR}/BGISEQ_PE100_NA12878.sorted.chr1.bam" +BAM_CHR20="${DATA_DIR}/BGISEQ_PE100_NA12878.sorted.chr20.bam" +BAM_CHR21="${DATA_DIR}/BGISEQ_PE100_NA12878.sorted.chr21.bam" +TRUTH_VCF="${DATA_DIR}/HG001_GRCh37_GIAB_highconf_CG-IllFB-IllGATKHC-Ion-10X-SOLID_CHROM1-X_v.3.3.2_highconf_PGandRTGphasetransfer_chrs_FIXED.vcf.gz" +TRUTH_BED="${DATA_DIR}/HG001_GRCh37_GIAB_highconf_CG-IllFB-IllGATKHC-Ion-10X-SOLID_CHROM1-X_v.3.3.2_highconf_nosomaticdel_chr.bed" + +N_SHARDS=7 +mkdir -p "${OUTPUT_DIR}" +mkdir -p "${BIN_DIR}" +mkdir -p "${DATA_DIR}" +mkdir -p "${LOG_DIR}" + +( time seq 0 $((N_SHARDS-1)) | \ + parallel --halt 2 --line-buffer \ + sudo docker run \ + -v ${HOME}:${HOME} \ + ${DOCKER_IMAGE} \ + make_examples \ + --mode training \ + --ref "${REF}" \ + --reads "${BAM_CHR1}" \ + --examples "${OUTPUT_DIR}/training_set.with_label.tfrecord@${N_SHARDS}.gz" \ + --truth_variants "${TRUTH_VCF}" \ + --confident_regions "${TRUTH_BED}" \ + --task {} \ + --regions "'chr1'" \ + --channels "insert_size" \ +) 2>&1 | tee "${LOG_DIR}/training_set.with_label.make_examples.log" \ No newline at end of file diff --git a/run_hap.sh b/run_hap.sh new file mode 100644 index 00000000..78b496bb --- /dev/null +++ b/run_hap.sh @@ -0,0 +1,22 @@ +BAM_CHR1="${DATA_DIR}/BGISEQ_PE100_NA12878.sorted.chr1.bam" +BAM_CHR20="${DATA_DIR}/BGISEQ_PE100_NA12878.sorted.chr20.bam" +BAM_CHR21="${DATA_DIR}/BGISEQ_PE100_NA12878.sorted.chr21.bam" +TRUTH_VCF="${DATA_DIR}/HG001_GRCh37_GIAB_highconf_CG-IllFB-IllGATKHC-Ion-10X-SOLID_CHROM1-X_v.3.3.2_highconf_PGandRTGphasetransfer_chrs_FIXED.vcf.gz" +TRUTH_BED="${DATA_DIR}/HG001_GRCh37_GIAB_highconf_CG-IllFB-IllGATKHC-Ion-10X-SOLID_CHROM1-X_v.3.3.2_highconf_nosomaticdel_chr.bed" +REF="${DATA_DIR}/ucsc_hg19.fa" + + +sudo docker pull jmcdani20/hap.py:v0.3.12 + +time sudo docker run -it \ +-v "${DATA_DIR}:${DATA_DIR}" \ +-v "${OUTPUT_DIR}:${OUTPUT_DIR}" \ +jmcdani20/hap.py:v0.3.12 /opt/hap.py/bin/hap.py \ + "${TRUTH_VCF}" \ + "${OUTPUT_DIR}/test_set.vcf.gz" \ + -f "${TRUTH_BED}" \ + -r "${REF}" \ + -o "${OUTPUT_DIR}/chr20-calling.happy.output" \ + -l chr20 \ + --engine=vcfeval \ + --pass-only \ No newline at end of file diff --git a/shuffle.sh b/shuffle.sh new file mode 100644 index 00000000..d0a4fef7 --- /dev/null +++ b/shuffle.sh @@ -0,0 +1,26 @@ +# +# git clone https://github.com/apache/beam-starter-python.git +# cd beam-starter-python +# python3 -m venv env +# source env/bin/activate + +# pip3 install setuptools --upgrade +# pip3 install apache_beam # installed 2.59.0 +# pip3 install tensorflow # For parsing tf.Example in shuffle_tfrecords_beam.py. + +# play around with snappy will make it crash in local server +# python-snappy +# python3 -m pip install snappy +# source ../beam-starter-python/shiyi/bin/activate +YOUR_PROJECT=takara +BASE="/home/syin/lol/data/training-case-study" +OUTPUT_DIR="${BASE}/output2" +time python3 tools/shuffle_tfrecords_beam.py \ + --project="${YOUR_PROJECT}" \ + --input_pattern_list="${OUTPUT_DIR}"/training_set.with_label.tfrecord-?????-of-00007.gz \ + --output_pattern_prefix="${OUTPUT_DIR}/training_set.with_label.shuffled" \ + --output_dataset_name="HG001" \ + --output_dataset_config_pbtxt="${OUTPUT_DIR}/training_set.dataset_config.pbtxt" \ + --job_name=shuffle-tfrecords \ + --runner=DirectRunner \ + --direct_num_workers=32 diff --git a/shuffle_validation.sh b/shuffle_validation.sh new file mode 100644 index 00000000..4e3da2d8 --- /dev/null +++ b/shuffle_validation.sh @@ -0,0 +1,13 @@ +YOUR_PROJECT=takara +BASE="/home/syin/lol/data/training-case-study" +OUTPUT_DIR="${BASE}/output" +time python3 tools/shuffle_tfrecords_beam.py \ + --project="${YOUR_PROJECT}" \ + --input_pattern_list="${OUTPUT_DIR}"/validation_set.with_label.tfrecord-?????-of-?????.gz \ + --output_pattern_prefix="${OUTPUT_DIR}/2/validation_set.with_label.shuffled" \ + --output_dataset_name="HG001" \ + --output_dataset_config_pbtxt="${OUTPUT_DIR}/2/validation_set.dataset_config.pbtxt" \ + --job_name=shuffle-tfrecords \ + --runner=DirectRunner \ + --direct_num_workers=0 + # --direct_running_mode=multi_threading \ diff --git a/tools/beam_test.py b/tools/beam_test.py new file mode 100644 index 00000000..f974d583 --- /dev/null +++ b/tools/beam_test.py @@ -0,0 +1,44 @@ +# pylint: disable=line-too-long +r"""Hello World with Apache Beam and SparkRunner on AWS. + +To run on AWS using the Spark Runner: +1) Set up an AWS EMR cluster or use an existing Spark cluster. + +2) Upload the input file to an S3 bucket. + +3) Run the following command on your Spark cluster or submit it through EMR: + + python hello_world_beam_spark.py \ + --input_pattern_list="s3://your-bucket/input_data/hello_world.txt" \ + --output_pattern_prefix="s3://your-bucket/output_data/hello_world_output" \ + --runner=SparkRunner \ + --spark_master=yarn \ + --region=us-east-1 + +""" + +import argparse +import apache_beam as beam +from apache_beam.options.pipeline_options import PipelineOptions + +def parse_cmdline(argv): + """Parse the commandline arguments.""" + parser = argparse.ArgumentParser() + parser.add_argument('--input_pattern_list', help='Input file in S3.') + parser.add_argument('--output_pattern_prefix', help='Output file pattern in S3.') + known_args, pipeline_args = parser.parse_known_args(argv) + return known_args, pipeline_args + +def run(argv=None): + known_args, pipeline_args = parse_cmdline(argv) + pipeline_options = PipelineOptions(pipeline_args) + + with beam.Pipeline(options=pipeline_options) as p: + # Read input file and prepend "Hello, World!" + (p + | 'ReadInput' >> beam.io.ReadFromText(known_args.input_pattern_list) + | 'AddHelloWorld' >> beam.Map(lambda x: f"Hello, World! {x}") + | 'WriteOutput' >> beam.io.WriteToText(known_args.output_pattern_prefix)) + +if __name__ == '__main__': + run() diff --git a/tools/beam_test_2024.py b/tools/beam_test_2024.py new file mode 100644 index 00000000..6d715a64 --- /dev/null +++ b/tools/beam_test_2024.py @@ -0,0 +1,40 @@ +import apache_beam as beam +from apache_beam.options.pipeline_options import PipelineOptions +from apache_beam.io.tfrecordio import ReadFromTFRecord, WriteToTFRecord + +class ShuffleRecords(beam.PTransform): + """Custom PTransform to shuffle records.""" + def expand(self, pcoll): + return (pcoll + | "Pair with random key" >> beam.Map(lambda x: (hash(x), x)) + | "Shuffle" >> beam.Reshuffle() + | "Extract shuffled records" >> beam.Map(lambda x: x[1])) + +def run(input_tfrecord, output_prefix, output_shards): + # Define pipeline options + options = PipelineOptions( + runner='SparkRunner', + spark_master='yarn', + spark_master='yarn://ip-172-31-18-156.ec2.internal:8032', + temp_location='s3://deepvariant-training-data-shiyi-2024-11/training-case-study/tmp/', # Replace with your actual S3 path + ) + + # Create the pipeline + with beam.Pipeline(options=options) as p: + ( + p + | "Read TFRecords" >> ReadFromTFRecord(file_pattern=input_tfrecord) + | "Shuffle Records" >> ShuffleRecords() + | "Write to S3" >> WriteToTFRecord( + file_path_prefix=output_prefix, + num_shards=output_shards + ) + ) + +if __name__ == "__main__": + # Input and output locations + INPUT_TFRECORD = "s3://deepvariant-training-data-shiyi-2024-11/training-case-study/output/validation_set.with_label.tfrecord*.gz" + OUTPUT_PREFIX = "s3://deepvariant-training-data-shiyi-2024-11/training-case-study/output/2024_12_emr/training.examples" + OUTPUT_SHARDS = 10 + + run(INPUT_TFRECORD, OUTPUT_PREFIX, OUTPUT_SHARDS) diff --git a/tools/beam_test_local.py b/tools/beam_test_local.py new file mode 100644 index 00000000..ed3dc5b0 --- /dev/null +++ b/tools/beam_test_local.py @@ -0,0 +1,52 @@ +import os +import apache_beam as beam +from apache_beam.options.pipeline_options import PipelineOptions +import argparse +import boto3 + +# Setup boto3 client +session = boto3.Session(profile_name='gpu') +s3_client = session.client('s3') + +# Specify the bucket and the prefix (path to your folder) +bucket_name = 'deepvariant-training-data-shiyi-2024-11' +prefix = 'training-case-study/output/training_set.with_label.tfrecord' + +# List files in the S3 path +response = s3_client.list_objects_v2(Bucket=bucket_name, Prefix=prefix) +files = [content['Key'] for content in response.get('Contents', [])] + +# Print the files +for file in files: + print(file) + + +def parse_cmdline(argv): + """Parse the commandline arguments.""" + parser = argparse.ArgumentParser() + parser.add_argument('--output_pattern_prefix', help='Output file pattern in S3.') + known_args, pipeline_args = parser.parse_known_args(argv) + return known_args, pipeline_args + +def run(): + # Hardcode the output path + output_pattern_prefix = "s3://deepvariant-training-data-shiyi-2024-11/training-case-study/output/2024_12_emr/training.examples" + input_pattern_prefix = "s3://deepvariant-training-data-shiyi-2024-11/training-case-study/output/validation_set.with_label.tfrecord*.gz" + + # Set pipeline options + pipeline_args = [ + '--runner=SparkRunner', + '--spark_master=local[*]' # Use all available cores + ] + + pipeline_options = PipelineOptions(pipeline_args) + + with beam.Pipeline(options=pipeline_options) as p: + # Example: create a list of strings and write to the specified output path + (p + | 'ReadInput' >> beam.io.ReadFromTFRecord(input_pattern_prefix) + | 'AddHelloWorld' >> beam.Map(lambda x: f"Hello, World! {x}") + | 'WriteOutput' >> beam.io.WriteToText(output_pattern_prefix)) + +if __name__ == '__main__': + run() diff --git a/tools/classic_create_emr_cluster.sh b/tools/classic_create_emr_cluster.sh new file mode 100644 index 00000000..dc71741f --- /dev/null +++ b/tools/classic_create_emr_cluster.sh @@ -0,0 +1,13 @@ +aws emr create-cluster \ + --name "My cluster" \ + --log-uri "s3://aws-logs-940583394710-us-east-1/elasticmapreduce" \ + --release-label "emr-7.5.0" \ + --service-role "arn:aws:iam::940583394710:role/EMR-default" \ + --unhealthy-node-replacement \ + --ec2-attributes '{"InstanceProfile":"ecsInstanceRole","EmrManagedMasterSecurityGroup":"sg-06aa995e5453d60e6","EmrManagedSlaveSecurityGroup":"sg-073b0c53976a0cae7","AdditionalMasterSecurityGroups":[],"AdditionalSlaveSecurityGroups":[],"SubnetId":"subnet-0ce2d946"}' \ + --applications Name=Hadoop Name=Hive Name=JupyterEnterpriseGateway Name=Livy Name=Spark \ + --instance-groups '[{"InstanceCount":1,"InstanceGroupType":"TASK","Name":"Task - 1","InstanceType":"m5.xlarge","EbsConfiguration":{"EbsBlockDeviceConfigs":[{"VolumeSpecification":{"VolumeType":"gp2","SizeInGB":32},"VolumesPerInstance":2}]}},{"InstanceCount":1,"InstanceGroupType":"MASTER","Name":"Primary","InstanceType":"m5.xlarge","EbsConfiguration":{"EbsBlockDeviceConfigs":[{"VolumeSpecification":{"VolumeType":"gp2","SizeInGB":32},"VolumesPerInstance":2}]}},{"InstanceCount":1,"InstanceGroupType":"CORE","Name":"Core","InstanceType":"m5.xlarge","EbsConfiguration":{"EbsBlockDeviceConfigs":[{"VolumeSpecification":{"VolumeType":"gp2","SizeInGB":32},"VolumesPerInstance":2}]}}]' \ + --scale-down-behavior "TERMINATE_AT_TASK_COMPLETION" \ + --auto-termination-policy '{"IdleTimeout":3600}' \ + --region "us-east-1" \ + --profile gpu \ No newline at end of file diff --git a/tools/create_emr_cluster.sh b/tools/create_emr_cluster.sh new file mode 100644 index 00000000..1c40d9b6 --- /dev/null +++ b/tools/create_emr_cluster.sh @@ -0,0 +1,14 @@ +aws emr create-cluster \ + --name "My cluster" \ + --log-uri "s3://aws-logs-940583394710-us-east-1/elasticmapreduce" \ + --release-label "emr-7.5.0" \ + --service-role "arn:aws:iam::940583394710:role/EMR-default" \ + --unhealthy-node-replacement \ + --ec2-attributes '{"InstanceProfile":"ecsInstanceRole","EmrManagedMasterSecurityGroup":"sg-06aa995e5453d60e6","EmrManagedSlaveSecurityGroup":"sg-073b0c53976a0cae7","AdditionalMasterSecurityGroups":["sg-0b734813083db4ba2"],"AdditionalSlaveSecurityGroups":[],"SubnetId":"subnet-0ce2d946"}' \ + --applications Name=Hadoop Name=Hive Name=JupyterEnterpriseGateway Name=Livy Name=Spark \ + --instance-groups '[{"InstanceCount":1,"InstanceGroupType":"CORE","Name":"Core","InstanceType":"m5.xlarge","EbsConfiguration":{"EbsBlockDeviceConfigs":[{"VolumeSpecification":{"VolumeType":"gp2","SizeInGB":32},"VolumesPerInstance":2}]}},{"InstanceCount":1,"InstanceGroupType":"TASK","Name":"Task - 1","InstanceType":"m5.xlarge","EbsConfiguration":{"EbsBlockDeviceConfigs":[{"VolumeSpecification":{"VolumeType":"gp2","SizeInGB":32},"VolumesPerInstance":2}]}},{"InstanceCount":1,"InstanceGroupType":"MASTER","Name":"Primary","InstanceType":"m5.xlarge","EbsConfiguration":{"EbsBlockDeviceConfigs":[{"VolumeSpecification":{"VolumeType":"gp2","SizeInGB":32},"VolumesPerInstance":2}]}}]' \ + --bootstrap-actions '[{"Args":[],"Name":"setup","Path":"s3://deepvariant-training-data-shiyi-2024-11/setup2.sh"}]' \ + --scale-down-behavior "TERMINATE_AT_TASK_COMPLETION" \ + --auto-termination-policy '{"IdleTimeout":3600}' \ + --region "us-east-1" \ + --profile gpu \ No newline at end of file diff --git a/tools/docker.sh b/tools/docker.sh new file mode 100644 index 00000000..2e0d0437 --- /dev/null +++ b/tools/docker.sh @@ -0,0 +1,4 @@ +# use docker to provide tensorflow and apache beam +spark-submit \ + --conf spark.kubernetes.container.image=shiyiyin/shuffle:latest \ + your_script.py diff --git a/tools/dockerfile b/tools/dockerfile new file mode 100644 index 00000000..8549500b --- /dev/null +++ b/tools/dockerfile @@ -0,0 +1,30 @@ +FROM amazoncorretto:8 + +# Update the system and install necessary packages +RUN yum -y update +RUN yum -y install yum-utils +RUN yum -y groupinstall development + +# Install Python 3 and related packages +RUN yum list python3* +RUN yum -y install python3 python3-dev python3-pip python3-virtualenv + +# Verify Python installation +RUN python -V +RUN python3 -V + +# Set environment variables for PySpark +ENV PYSPARK_DRIVER_PYTHON python3 +ENV PYSPARK_PYTHON python3 + +# Upgrade pip to the latest version +RUN pip3 install --upgrade pip + +# Install necessary Python libraries +RUN pip3 install "apache_beam[aws]==2.61.0" +#2.48 +RUN pip3 install tensorflow==2.18.0 + +# Verify the installation of Apache Beam and TensorFlow +RUN python3 -c "import apache_beam; import tensorflow as tf; print('Apache Beam version:', apache_beam.__version__); print('TensorFlow version:', tf.__version__)" + diff --git a/tools/penv.sh b/tools/penv.sh new file mode 100644 index 00000000..74898dd0 --- /dev/null +++ b/tools/penv.sh @@ -0,0 +1,21 @@ +# initialize a python virtual environment +python3 -m venv shuffle +source shuffle/bin/activate + +# optionally, ensure pip is up-to-date +pip3 install --upgrade pip + +# install the python packages +pip3 install setuptools --upgrade +pip3 install "apache_beam[aws]==2.61.0" # 2.51.0 didn't work in my run. +pip3 install tensorflow==2.18.0 # For parsing tf.Example in shuffle_tfrecords_beam.py. + +# package the virtual environment into an archive +pip3 install venv-pack +venv-pack -f -o pyspark_venv.tar.gz + +# copy the archive to an S3 location +aws s3 cp pyspark_venv.tar.gz s3://deepvariant-training-data-shiyi-2024-11/ --profile gpu + +# optionally, remove the virtual environment directory +rm -fr pyspark_venvsource \ No newline at end of file diff --git a/tools/run.sh b/tools/run.sh new file mode 100644 index 00000000..2da2a568 --- /dev/null +++ b/tools/run.sh @@ -0,0 +1,39 @@ +s3://deepvariant-training-data-shiyi-2024-11/pyspark_venv.tar.gz +s3://deepvariant-training-data-shiyi-2024-11/shuffle_tfrecords_beam.py +--conf spark.archives=s3://deepvariant-training-data-shiyi-2024-11/pyspark_venv.tar.gz +--conf spark.emr-serverless.driverEnv.PYSPARK_DRIVER_PYTHON=./environment/bin/python +--conf spark.emr-serverless.driverEnv.PYSPARK_PYTHON=./environment/bin/python +--conf spark.executorEnv.PYSPARK_PYTHON=./environment/bin/python + +--input_pattern_list="s3://deepvariant-training-data-shiyi-2024-11/training-case-study/output/training_set.with_label.shuffled-?????-of-?????.tfrecord.gz" +--output_pattern_prefix="s3://deepvariant-training-data-shiyi-2024-11/training-case-study/output/2024_12_emr/training.examples" +--output_dataset_name="HG001" +--runner=SparkRunner +--spark_master=yarn +--region=us-east-1 + spark-submit --master yarn \ + --deploy-mode cluster \ + --conf 'spark.archives=s3://deepvariant-training-data-shiyi-2024-11/pyspark_venv.tar.gz#environment' \ + --conf 'spark.yarn.appMasterEnv.PYSPARK_PYTHON=./environment/bin/python' \ + --conf 'spark.executorEnv.PYSPARK_PYTHON=./environment/bin/python' \ + shuffle_tfrecords_beam.py \ + --input_pattern_list="s3://deepvariant-training-data-shiyi-2024-11/training-case-study/output/training_set.with_label.shuffled-?????-of-?????.tfrecord.gz" \ + --output_pattern_prefix="s3://deepvariant-training-data-shiyi-2024-11/training-case-study/output/2024_12_emr/training.examples" \ + --output_dataset_name="HG001" \ + --runner=SparkRunner \ + --spark_master=yarn \ + --region=us-east-1 + +python tools/shuffle_tfrecords_beam.py \ + --input_pattern_list="/path/to/local/tfrecords/*.tfrecord.gz" \ + --output_pattern_prefix="file:///path/to/output/training.examples" \ + --output_dataset_name="HG001" \ + --runner=DirectRunner + +sudo -u hadoop yarn logs -applicationId application_1733349531230_0001 + +#key learning +serverless EMR could be tested too +I tried add-step, but I should not use sparkrunner option, becuase I am using spark-submit + --steps Type=Spark,Name="SparkJob",ActionOnFailure=CONTINUE,\ +when I use python script, I could use sparkrunner option diff --git a/tools/run2.sh b/tools/run2.sh new file mode 100644 index 00000000..27fd2688 --- /dev/null +++ b/tools/run2.sh @@ -0,0 +1,17 @@ +aws emr add-steps \ + --cluster-id j-14NIKY3MAI1QU \ + --steps Type=Spark,Name="SparkJob",ActionOnFailure=CONTINUE,Args='[ + "--master", "yarn", + "--deploy-mode", "cluster", + '--conf', 'spark.archives=s3://deepvariant-training-data-shiyi-2024-11/pyspark_venv.tar.gz#environment', + '--conf', 'spark.yarn.appMasterEnv.PYSPARK_PYTHON=./environment/bin/python', + '--conf', 'spark.executorEnv.PYSPARK_PYTHON=./environment/bin/python', + "s3://deepvariant-training-data-shiyi-2024-11/shuffle_tfrecords_beam.py", + "--input_pattern_list=s3://deepvariant-training-data-shiyi-2024-11/training-case-study/output/training_set.with_label.tfrecord-?????-of-?????.gz", + "--output_pattern_prefix=s3://deepvariant-training-data-shiyi-2024-11/training-case-study/output/2024_12_emr/training.examples", + "--output_dataset_name=HG001", + "--runner=SparkRunner", + "--region=us-east-1" +]' \ + --region us-east-1 \ + --profile gpu diff --git a/tools/run3.sh b/tools/run3.sh new file mode 100644 index 00000000..a33ce147 --- /dev/null +++ b/tools/run3.sh @@ -0,0 +1,14 @@ +#https://spark.apache.org/docs/latest/api/python/user_guide/python_packaging.html#using-virtualenv +#https://docs.aws.amazon.com/emr/latest/EMR-Serverless-UserGuide/using-python-libraries.html +aws emr add-steps \ + --cluster-id j-14NIKY3MAI1QU \ + --steps Type=Spark,Name="SparkJob",ActionOnFailure=CONTINUE,Args="[ + '--master', 'yarn', + '--deploy-mode', 'cluster', + '--conf', 'spark.archives=s3://deepvariant-training-data-shiyi-2024-11/pyspark_venv.tar.gz#environment', + '--conf', 'spark.yarn.appMasterEnv.PYSPARK_PYTHON=./environment/bin/python', + '--conf', 'spark.executorEnv.PYSPARK_PYTHON=./environment/bin/python', + 's3://deepvariant-training-data-shiyi-2024-11/test.py' + ]" \ + --region us-east-1 \ + --profile gpu diff --git a/tools/shuffle_tfrecords_beam.py b/tools/shuffle_tfrecords_beam.py index b6e3cd6e..29f510ba 100644 --- a/tools/shuffle_tfrecords_beam.py +++ b/tools/shuffle_tfrecords_beam.py @@ -1,70 +1,25 @@ -# Copyright 2018 Google LLC. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions -# are met: -# -# 1. Redistributions of source code must retain the above copyright notice, -# this list of conditions and the following disclaimer. -# -# 2. Redistributions in binary form must reproduce the above copyright -# notice, this list of conditions and the following disclaimer in the -# documentation and/or other materials provided with the distribution. -# -# 3. Neither the name of the copyright holder nor the names of its -# contributors may be used to endorse or promote products derived from this -# software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" -# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE -# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE -# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR -# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF -# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS -# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN -# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) -# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE -# POSSIBILITY OF SUCH DAMAGE. - # pylint: disable=line-too-long -r"""Shuffle tf.Example files using beam. +r"""Shuffle tf.Example files using Apache Beam with SparkRunner on AWS. -To run locally: -1) Install beam on your machine following the instructions at - https://beam.apache.org/get-started/quickstart-py/ +To run on AWS using the Spark Runner: +1) Set up an AWS EMR cluster or use an existing Spark cluster. -2) Copy any inputs to be on local disk. +2) Upload the input files to an S3 bucket. -3) Run - python path/to/shuffle_tfrecords_beam.py \ - --input_pattern_list="/tmp/some.examples-?????-of-00200.tfrecord.gz" \ - --output_pattern_prefix="/tmp/training.examples" \ +3) Run the following command on your Spark cluster or submit it through EMR: + + python tools/shuffle_tfrecords_beam.py \ + --input_pattern_list="s3://deepvariant-training-data-shiyi-2024-11/training-case-study/output/training_set.with_label.tfrecord-?????-of-?????.gz" \ + --output_pattern_prefix="s3://deepvariant-training-data-shiyi-2024-11/training-case-study/output/2024_12_emr/training.examples" \ --output_dataset_name="HG001" \ - --runner=DirectRunner - -To run on Google Cloud Dataflow Service: -1) Follow the Google Cloud Dataflow setup instructions at -https://beam.apache.org/documentation/runners/dataflow/ - -2) Upload this file to your GCE instance. - -3) Run - python shuffle_tfrecords_beam.py \ - --job_name=shuffle-tfrecords \ - --input_pattern_list="gs://YOUR_INPUT_BUCKET/A.tfrecord.gz" \ - --output_pattern_prefix="gs://YOUR_OUTPUT_BUCKET/training.examples" \ - --output_dataset_name="HG001" \ - --runner=DataflowRunner \ - --project=SET_YOUR_PROJECT_ID_HERE \ - --staging_location=gs://YOUR_BUCKET_NAME/AND_STAGING_DIRECTORY \ - --temp_location=gs://YOUR_BUCKET_NAME/AND_TEMP_DIRECTORY - -4) (Optional) To monitor or cancel the job while it is running, you can -use either the Dataflow Monitoring Interface -https://cloud.google.com/dataflow/pipelines/dataflow-monitoring-intf -or the Dataflow Command-line Interface -https://cloud.google.com/dataflow/pipelines/dataflow-command-line-intf + --runner=SparkRunner \ + --spark_master=yarn \ + --region=us-east-1 + +Make sure the AWS EMR cluster has access to read from and write to S3. + +You can monitor the Spark job using the Spark UI. + """ # pylint: enable=line-too-long @@ -91,9 +46,7 @@ def parse_cmdline(argv): The known arguments are required for this specific program to function, and the other pipeline arguments can be used to configure beam and the - specific beam backend being used. See - https://github.com/apache/beam/blob/master/sdks/python/apache_beam/options/pipeline_options.py - for a list and description of the pipeline arguments accepted. + specific beam backend being used. Args: argv: List containing command-line arguments. @@ -106,10 +59,10 @@ def parse_cmdline(argv): parser.add_argument( '--input_pattern_list', - help='Comma-separated list of TFRecord filename patterns.') + help='Comma-separated list of TFRecord filename patterns in S3.') parser.add_argument( '--output_pattern_prefix', - help='Filename pattern for the output TFRecords.') + help='Filename pattern for the output TFRecords in S3.') parser.add_argument( '--output_dataset_config_pbtxt', help='Optional. If set, print out a human-readable version of ' @@ -128,7 +81,7 @@ def read_from_tfrecords_files(pipeline, input_filename_pattern_list): Args: pipeline: Beam pipeline object. - input_filename_pattern_list: List of filename patterns. + input_filename_pattern_list: List of filename patterns in S3. Returns: A PCollection of read tf.Examples. @@ -142,7 +95,7 @@ def read_from_tfrecords_files(pipeline, input_filename_pattern_list): def shuffle_records(input_examples): - """Shuffles the input_examples in a effectively random order.""" + """Shuffles the input_examples in an effectively random order.""" def sha1(input_bytes): """Returns the sha1 hash of input_bytes.""" @@ -157,7 +110,7 @@ def sha1(input_bytes): def count_records_per_label(input_examples): - """Shuffles the input_examples in a effectively random order.""" + """Counts records by label.""" def label_example(input_bytes): """Returns the label of input_example.""" @@ -185,20 +138,8 @@ def make_config_string(name, tfrecord_path, num_examples): def write_summary_string_to_file(pipeline, output_examples, input_pattern_list, dataset_name, output_pattern_prefix, output_filename): - """Writes a file summarizing the PCollection of Examples. - - Args: - pipeline: Beam pipeline object. - output_examples: PCollection of examples. - input_pattern_list: str. A comma-separated string of input files. - dataset_name: str. The name of the dataset to be written in the output. - output_pattern_prefix: str. The prefix of the sharded output files. - output_filename: the output text file that contains the summary that can be - parsed into DeepVariantDatasetConfig. - """ + """Writes a file summarizing the PCollection of Examples.""" - # Beam currently has no way to materialize pipeline values, so we have - # to construct the file entirely in Beam pipeline operations. comment_str = pipeline | 'CreateFileHeader' >> beam.Create( [COMMENT_HEADER.format(input_pattern_list, output_pattern_prefix)]) num_examples = ( @@ -216,25 +157,18 @@ def write_summary_string_to_file(pipeline, output_examples, input_pattern_list, | 'WriteToFile' >> beam.io.WriteToText( output_filename, shard_name_template='', - header='# Generated by shuffle_tfrecords_beam.py')) + header='# Generated by shuffle_tfrecords_beam_spark.py')) def main(argv=None): """Main entry point; defines and runs the pipeline.""" known_args, pipeline_args = parse_cmdline(argv) - # Copy over the example_info.json file before the pipeline starts. - example_info_json = '{}*example_info.json'.format( - known_args.input_pattern_list.split(',')[0]) - example_info_json_list = tf.io.gfile.glob(example_info_json) - if example_info_json_list and tf.io.gfile.exists(example_info_json_list[0]): - training_dir = os.path.dirname(known_args.output_pattern_prefix) - if not tf.io.gfile.exists(training_dir): - tf.io.gfile.makedirs(training_dir) - output_example_info_json = os.path.join(training_dir, 'example_info.json') - if not tf.io.gfile.exists(output_example_info_json): - tf.io.gfile.copy(example_info_json_list[0], output_example_info_json) pipeline_options = PipelineOptions(pipeline_args) + + # Ensure AWS-specific Spark configurations + # pipeline_options.view_as(PipelineOptions).add_experiment('use_s3') + with beam.Pipeline(options=pipeline_options) as p: input_examples = read_from_tfrecords_files( p, known_args.input_pattern_list.split(',')) diff --git a/tools/test.py b/tools/test.py new file mode 100644 index 00000000..1254dc8e --- /dev/null +++ b/tools/test.py @@ -0,0 +1,29 @@ +import sys +import os + +print(f"Python executable: {sys.executable}") +print(f"Working directory: {os.getcwd()}") + +# List the contents of the working directory +print("Directory contents:") +for item in os.listdir(os.getcwd()): + print(f" {item}") + + +import apache_beam as beam + +def test_apache_beam(): + # Create a simple pipeline that reads a collection, processes it, and prints the result + with beam.Pipeline() as pipeline: + result = ( + pipeline + | 'Create data' >> beam.Create(['Hello', 'Apache', 'Beam']) + | 'Format data' >> beam.Map(lambda word: f"Processed: {word}") + | 'Print result' >> beam.Map(print) + ) + +if __name__ == "__main__": + try: + test_apache_beam() + except Exception as e: + print(f"Error: {e}") diff --git a/tools/test.sh b/tools/test.sh new file mode 100644 index 00000000..afac3b4b --- /dev/null +++ b/tools/test.sh @@ -0,0 +1,7 @@ +export AWS_PROFILE=gpu + +python tools/beam_test_local.py \ + --runner=SparkRunner \ + --spark_master=local[*] + + diff --git a/tools/test2024.sh b/tools/test2024.sh new file mode 100644 index 00000000..f422b234 --- /dev/null +++ b/tools/test2024.sh @@ -0,0 +1,5 @@ +python beam_test_local.py --runner=SparkRunner \ + --spark_master=yarn \ + --temp_location=s3://deepvariant-training-data-shiyi-2024-11/tmp/apache_beam/ \ + --output=s3://deepvariant-training-data-shiyi-2024-11/output/ + diff --git a/tools/to_do b/tools/to_do new file mode 100644 index 00000000..ebc398fe --- /dev/null +++ b/tools/to_do @@ -0,0 +1,40 @@ +make smaller examples and test running spark locally +try to read more on why it needs pip access + +python tools/beam_test_local.py \ + --runner=SparkRunner \ + --spark_master=local[*] + +python -m apache_beam.examples.wordcount \ + --input s3://deepvariant-training-data-shiyi-2024-11/test.py \ + --output s3://deepvariant-training-data-shiyi-2024-11/wordcount/ \ + --runner SparkRunner + + spark-submit --master yarn \ + --deploy-mode cluster \ + --conf spark.kubernetes.container.image=shiyiyin/shuffle:latest \ + --conf spark.kubernetes.container.image.pullPolicy=Always \ + --conf spark.pyspark.python=python3 \ + --conf spark.pyspark.driver.python=python3 \ + --conf spark.executorEnv.PYTHONPATH="/usr/local/lib/python3.7/site-packages" \ + beam_test_local.py \ + --input_pattern_list="s3://deepvariant-training-data-shiyi-2024-11/training-case-study/output/training_set.with_label.shuffled-*.gz" \ + --output_pattern_prefix="s3://deepvariant-training-data-shiyi-2024-11/training-case-study/output/2024_12_emr/training.examples" \ + --runner=SparkRunner \ + --spark_master=yarn \ + --region=us-east-1 + + +python -c "import apache_beam as beam; print(beam.__version__)" + +python -c "import tensorflow as tf; print(tf.__version__)" + + + spark-submit --master yarn \ + --deploy-mode cluster \ + beam_test_local.py \ + --input_pattern_list="s3://deepvariant-training-data-shiyi-2024-11/training-case-study/output/training_set.with_label.shuffled-*.gz" \ + --output_pattern_prefix="s3://deepvariant-training-data-shiyi-2024-11/training-case-study/output/2024_12_emr/training.examples" \ + --runner=SparkRunner \ + --spark_master=yarn \ + --region=us-east-1 \ No newline at end of file diff --git a/training.sh b/training.sh new file mode 100644 index 00000000..6c195280 --- /dev/null +++ b/training.sh @@ -0,0 +1,62 @@ +YOUR_PROJECT=takara +OUTPUT_GCS_BUCKET=REPLACE_WITH_YOUR_GCS_BUCKET +# might have to install gsutil to make sure the instance connect to deepvariant's standard files +BUCKET="gs://deepvariant" +VERSION="1.6.1" +DOCKER_IMAGE="google/deepvariant:${VERSION}" + +MODEL_BUCKET="${BUCKET}/models/DeepVariant/${VERSION}/DeepVariant-inception_v3-${VERSION}+data-wgs_standard" +GCS_PRETRAINED_WGS_MODEL="${MODEL_BUCKET}/model.ckpt" + +OUTPUT_BUCKET="${OUTPUT_GCS_BUCKET}/customized_training" +TRAINING_DIR="${OUTPUT_BUCKET}/training_dir" + +BASE="/home/ubuntu/data/training-case-study" +DATA_BUCKET=gs://deepvariant/training-case-study/BGISEQ-HG001 + +INPUT_DIR="${BASE}/input" +BIN_DIR="${INPUT_DIR}/bin" +DATA_DIR="${INPUT_DIR}/data" +OUTPUT_DIR="${BASE}/output" +LOG_DIR="${OUTPUT_DIR}/logs" +SHUFFLE_SCRIPT_DIR="${HOME}/deepvariant/tools" + +REF="${DATA_DIR}/ucsc_hg19.fa" +BAM_CHR1="${DATA_DIR}/BGISEQ_PE100_NA12878.sorted.chr1.bam" +BAM_CHR20="${DATA_DIR}/BGISEQ_PE100_NA12878.sorted.chr20.bam" +BAM_CHR21="${DATA_DIR}/BGISEQ_PE100_NA12878.sorted.chr21.bam" +TRUTH_VCF="${DATA_DIR}/HG001_GRCh37_GIAB_highconf_CG-IllFB-IllGATKHC-Ion-10X-SOLID_CHROM1-X_v.3.3.2_highconf_PGandRTGphasetransfer_chrs_FIXED.vcf.gz" +TRUTH_BED="${DATA_DIR}/HG001_GRCh37_GIAB_highconf_CG-IllFB-IllGATKHC-Ion-10X-SOLID_CHROM1-X_v.3.3.2_highconf_nosomaticdel_chr.bed" + +N_SHARDS=16 + +mkdir -p "${OUTPUT_DIR}" +mkdir -p "${BIN_DIR}" +mkdir -p "${DATA_DIR}" +mkdir -p "${LOG_DIR}" + +gsutil -m cp ${DATA_BUCKET}/BGISEQ_PE100_NA12878.sorted.chr*.bam* "${DATA_DIR}" +gsutil -m cp -r "${DATA_BUCKET}/ucsc_hg19.fa*" "${DATA_DIR}" +gsutil -m cp -r "${DATA_BUCKET}/HG001_GRCh37_GIAB_highconf_CG-IllFB-IllGATKHC-Ion-10X-SOLID_CHROM1-X_v.3.3.2_highconf_*" "${DATA_DIR}" + +docker pull ${DOCKER_IMAGE} # Standard CPU Docker Image. +# install gsutil +curl https://packages.cloud.google.com/apt/doc/apt-key.gpg | sudo gpg --dearmor -o /usr/share/keyrings/cloud.google.gpg +echo "deb [signed-by=/usr/share/keyrings/cloud.google.gpg] https://packages.cloud.google.com/apt cloud-sdk main" | sudo tee -a /etc/apt/sources.list.d/google-cloud-sdk.list +sudo apt-get update && sudo apt-get install google-cloud-cli +# install parallel +sudo apt -y update +sudo apt -y install parallel +# install docker +sudo apt-get install docker.io -y +sudo systemctl start docker +sudo docker run hello-world +sudo systemctl enable docker +sudo usermod -a -G docker $(whoami) +newgrp docker +# mount dev +mkdir /home/ubuntu/data/ +sudo mount /dev/nvme1n1 /home/ubuntu/data/ + + + diff --git a/training_real.sh b/training_real.sh new file mode 100644 index 00000000..f5e1eac0 --- /dev/null +++ b/training_real.sh @@ -0,0 +1,57 @@ +YOUR_PROJECT=takara +OUTPUT_GCS_BUCKET=REPLACE_WITH_YOUR_GCS_BUCKET +# might have to install gsutil to make sure the instance connect to deepvariant's standard files +BUCKET="gs://deepvariant" +VERSION="1.6.1" +DOCKER_IMAGE="google/deepvariant:${VERSION}" +BIN_VERSION="1.4.0" +sudo docker pull google/deepvariant:"${BIN_VERSION}-gpu" + +TRAINING_DIR="${OUTPUT_DIR}/training_dir" + +BASE="/home/ubuntu/data/training-case-study" +DATA_BUCKET=gs://deepvariant/training-case-study/BGISEQ-HG001 + +INPUT_DIR="${BASE}/input" +BIN_DIR="${INPUT_DIR}/bin" +DATA_DIR="${INPUT_DIR}/data" +OUTPUT_DIR="${BASE}/output" +LOG_DIR="${OUTPUT_DIR}/logs" +SHUFFLE_SCRIPT_DIR="${HOME}/deepvariant/tools" + + +BUCKET="gs://deepvariant" +BIN_VERSION="1.4.0" +MODEL_BUCKET="${BUCKET}/models/DeepVariant/${BIN_VERSION}/DeepVariant-inception_v3-${BIN_VERSION}+data-wgs_standard" +GCS_PRETRAINED_WGS_MODEL="${MODEL_BUCKET}/model.ckpt" +gsutil cp $GCS_PRETRAINED_WGS_MODEL data/model.ckpt + + +BASE="/home/${USER}/data/training-case-study" +OUTPUT_DIR="${BASE}/output" +model="/home/${USER}/data/model/model.ckpt" +TRAINING_DIR="${OUTPUT_DIR}/training_dir" +BIN_VERSION="1.4.0" +LOG_DIR="${OUTPUT_DIR}/logs" + +( time sudo docker run --gpus 1 \ + -v /home/${USER}:/home/${USER} \ + google/deepvariant:"${BIN_VERSION}-gpu" \ + /opt/deepvariant/bin/model_train \ + --dataset_config_pbtxt="${OUTPUT_DIR}/training_set.dataset_config.pbtxt" \ + --train_dir="${TRAINING_DIR}" \ + --model_name="inception_v3" \ + --number_of_steps=50000 \ + --save_interval_secs=300 \ + --batch_size=32 \ + --learning_rate=0.0005 \ + --start_from_checkpoint="${model}" \ +) > "${LOG_DIR}/train.log" 2>&1 & + +sudo docker run \ + -v /home/${USER}:/home/${USER} \ + google/deepvariant:"${BIN_VERSION}" \ + /opt/deepvariant/bin/model_eval \ + --dataset_config_pbtxt="${OUTPUT_DIR}/validation_set.dataset_config.pbtxt" \ + --checkpoint_dir="${TRAINING_DIR}" \ + --batch_size=512 > "${LOG_DIR}/eval.log" 2>&1 & \ No newline at end of file diff --git a/transfer.sh b/transfer.sh new file mode 100644 index 00000000..bfafcd78 --- /dev/null +++ b/transfer.sh @@ -0,0 +1,11 @@ +path="/home/ubuntu/data/training-case-study" +server="ubuntu@large" +key="~/.ssh/gpu.pem" +rsync -e "ssh -i $key" \ + -r \ + "$server:${path}" \ + /home/syin/lol/data + +# rsync -e "ssh -i $key" \ +# --list-only \ +# "$server:${path}" \ No newline at end of file diff --git a/transfer_up.sh b/transfer_up.sh new file mode 100644 index 00000000..dc6fe2c1 --- /dev/null +++ b/transfer_up.sh @@ -0,0 +1,7 @@ +path="/home/ubuntu/data/" +server="ubuntu@large" +key="~/.ssh/gpu.pem" +rsync -e "ssh -i $key" \ + -r \ + /home/syin/lol/data/training-case-study \ + "$server:${path}" diff --git a/validation.sh b/validation.sh new file mode 100644 index 00000000..0e88825d --- /dev/null +++ b/validation.sh @@ -0,0 +1,53 @@ +YOUR_PROJECT=takara +OUTPUT_GCS_BUCKET=REPLACE_WITH_YOUR_GCS_BUCKET +# might have to install gsutil to make sure the instance connect to deepvariant's standard files +BUCKET="gs://deepvariant" +VERSION="1.6.1" +DOCKER_IMAGE="google/deepvariant:${VERSION}" + +MODEL_BUCKET="${BUCKET}/models/DeepVariant/${VERSION}/DeepVariant-inception_v3-${VERSION}+data-wgs_standard" +GCS_PRETRAINED_WGS_MODEL="${MODEL_BUCKET}/model.ckpt" + +OUTPUT_BUCKET="${OUTPUT_GCS_BUCKET}/customized_training" +TRAINING_DIR="${OUTPUT_BUCKET}/training_dir" + +BASE="/home/ubuntu/data/training-case-study" +DATA_BUCKET=gs://deepvariant/training-case-study/BGISEQ-HG001 + +INPUT_DIR="${BASE}/input" +BIN_DIR="${INPUT_DIR}/bin" +DATA_DIR="${INPUT_DIR}/data" +OUTPUT_DIR="${BASE}/output" +LOG_DIR="${OUTPUT_DIR}/logs" +SHUFFLE_SCRIPT_DIR="${HOME}/deepvariant/tools" + +REF="${DATA_DIR}/ucsc_hg19.fa" +BAM_CHR1="${DATA_DIR}/BGISEQ_PE100_NA12878.sorted.chr1.bam" +BAM_CHR20="${DATA_DIR}/BGISEQ_PE100_NA12878.sorted.chr20.bam" +BAM_CHR21="${DATA_DIR}/BGISEQ_PE100_NA12878.sorted.chr21.bam" +TRUTH_VCF="${DATA_DIR}/HG001_GRCh37_GIAB_highconf_CG-IllFB-IllGATKHC-Ion-10X-SOLID_CHROM1-X_v.3.3.2_highconf_PGandRTGphasetransfer_chrs_FIXED.vcf.gz" +TRUTH_BED="${DATA_DIR}/HG001_GRCh37_GIAB_highconf_CG-IllFB-IllGATKHC-Ion-10X-SOLID_CHROM1-X_v.3.3.2_highconf_nosomaticdel_chr.bed" + +N_SHARDS=16 + +mkdir -p "${OUTPUT_DIR}" +mkdir -p "${BIN_DIR}" +mkdir -p "${DATA_DIR}" +mkdir -p "${LOG_DIR}" + +( time seq 0 $((N_SHARDS-1)) | \ + parallel --halt 2 --line-buffer \ + sudo docker run \ + -v /home/${USER}:/home/${USER} \ + ${DOCKER_IMAGE} \ + make_examples \ + --mode training \ + --ref "${REF}" \ + --reads "${BAM_CHR21}" \ + --examples "${OUTPUT_DIR}/validation_set.with_label.tfrecord@${N_SHARDS}.gz" \ + --truth_variants "${TRUTH_VCF}" \ + --confident_regions "${TRUTH_BED}" \ + --task {} \ + --regions "'chr21'" \ + --channels "insert_size" \ +) 2>&1 | tee "${LOG_DIR}/validation_set.with_label.make_examples.log" \ No newline at end of file