From 3289cd23ba22d770b0b5618d8abb17371ef2a8dd Mon Sep 17 00:00:00 2001 From: "Xu, Qinying (H&B, Herston)" Date: Thu, 4 Apr 2024 10:26:14 +1000 Subject: [PATCH 1/7] use htsjdk.samtools.util.BlockCompressedInputStream to load .vcf.bgz --- .../variantspark/cli/args/SparkArgs.scala | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/src/main/scala/au/csiro/variantspark/cli/args/SparkArgs.scala b/src/main/scala/au/csiro/variantspark/cli/args/SparkArgs.scala index 3d9f6641..4c611c4b 100644 --- a/src/main/scala/au/csiro/variantspark/cli/args/SparkArgs.scala +++ b/src/main/scala/au/csiro/variantspark/cli/args/SparkArgs.scala @@ -3,6 +3,9 @@ package au.csiro.variantspark.cli.args import org.kohsuke.args4j.Option import au.csiro.pbdava.ssparkle.spark.SparkApp import org.apache.spark.rdd.RDD +import htsjdk.samtools.util.BlockCompressedInputStream +import org.apache.hadoop.fs.Path +import java.io.File trait SparkArgs extends SparkApp { @@ -10,7 +13,17 @@ trait SparkArgs extends SparkApp { aliases = Array("--spark-par")) val sparkPar: Int = 0 - def textFile(inputFile: String): RDD[String] = - sc.textFile(inputFile, if (sparkPar > 0) sparkPar else sc.defaultParallelism) - + def textFile(inputFile: String): RDD[String] = { + val input = new File(inputFile) + val isBGZ = input.getName.split('.').lastOption.getOrElse("").equalsIgnoreCase("bgz") + println(inputFile + " is loading to spark RDD " + isBGZ) + if (isBGZ) { + val path = new Path(inputFile) + val fs = path.getFileSystem(sc.hadoopConfiguration) + val bgzInputStream = new BlockCompressedInputStream(fs.open(path)) + sc.parallelize(Stream.continually(bgzInputStream.readLine()).takeWhile(_ != null).toList) + } else { + sc.textFile(inputFile, if (sparkPar > 0) sparkPar else sc.defaultParallelism) + } + } } From 6741f68de987ad301f87efdd7faa20ff9e42a40c Mon Sep 17 00:00:00 2001 From: "Xu, Qinying (H&B, Herston)" Date: Thu, 4 Apr 2024 14:17:23 +1000 Subject: [PATCH 2/7] add method to check bgzip --- .../csiro/variantspark/utils/FileUtils.java | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) create mode 100644 src/main/java/au/csiro/variantspark/utils/FileUtils.java diff --git a/src/main/java/au/csiro/variantspark/utils/FileUtils.java b/src/main/java/au/csiro/variantspark/utils/FileUtils.java new file mode 100644 index 00000000..6df6f86f --- /dev/null +++ b/src/main/java/au/csiro/variantspark/utils/FileUtils.java @@ -0,0 +1,28 @@ +package au.csiro.variantspark.utils; + +import java.io.BufferedReader; +import java.io.File; +import java.io.InputStreamReader; +import java.io.IOException; +import htsjdk.samtools.util.BlockCompressedInputStream; + +public class FileUtils { + + /** + * + * @param file: an input file + * @return true if input file is BGZIP by check the first two byte of input file + */ + public static boolean isInputBGZ(final File file) { + + try(final BlockCompressedInputStream bgzInputStream = new BlockCompressedInputStream(file)) { + BufferedReader reader = new BufferedReader(new InputStreamReader(bgzInputStream)); + String line = reader.readLine(); + return line != null && !line.isEmpty(); + } catch (IOException e) { + //file is not .vcf.bgz file + return false; + } + } + +} From e58d7f84e3d39ea30ed5521f56154d9833032465 Mon Sep 17 00:00:00 2001 From: "Xu, Qinying (H&B, Herston)" Date: Thu, 4 Apr 2024 17:01:24 +1000 Subject: [PATCH 3/7] add testing note to code --- .../csiro/variantspark/utils/FileUtils.java | 79 ++++++++++++++++--- 1 file changed, 67 insertions(+), 12 deletions(-) diff --git a/src/main/java/au/csiro/variantspark/utils/FileUtils.java b/src/main/java/au/csiro/variantspark/utils/FileUtils.java index 6df6f86f..ff7e6839 100644 --- a/src/main/java/au/csiro/variantspark/utils/FileUtils.java +++ b/src/main/java/au/csiro/variantspark/utils/FileUtils.java @@ -1,8 +1,7 @@ package au.csiro.variantspark.utils; -import java.io.BufferedReader; -import java.io.File; -import java.io.InputStreamReader; +import java.io.*; +import java.util.zip.GZIPInputStream; import java.io.IOException; import htsjdk.samtools.util.BlockCompressedInputStream; @@ -14,15 +13,71 @@ public class FileUtils { * @return true if input file is BGZIP by check the first two byte of input file */ public static boolean isInputBGZ(final File file) { + + //.vcf.bgz is type of GZP file + //.vcf.gz is also GZP file but get java.lang.OutOfMemoryError at java.io.InputStreamReader.read(InputStreamReader.java:184) + //.vcf.bz2 is not GZP file and get java.lang.OutOfMemoryError at java.io.InputStreamReader.read(InputStreamReader.java:184) + //.vcf is not GZP file and get htsjdk.samtools.SAMFormatException: at header from java.io.BufferedReader.readLine(BufferedReader.java:389) - try(final BlockCompressedInputStream bgzInputStream = new BlockCompressedInputStream(file)) { - BufferedReader reader = new BufferedReader(new InputStreamReader(bgzInputStream)); - String line = reader.readLine(); - return line != null && !line.isEmpty(); - } catch (IOException e) { - //file is not .vcf.bgz file - return false; - } - } + boolean isGzip = false; + try { + isGzip = isInputGZip(file); //ture if .bgz or .gz + } catch (IOException e) {} + + + //if not gzip file, do following check + if(isGzip) { + + try (BufferedInputStream bufferedInputStream = new BufferedInputStream(new FileInputStream(file))) { + bufferedInputStream.mark(100); // mark the current position + boolean isValid = BlockCompressedInputStream.isValidFile(bufferedInputStream); + bufferedInputStream.reset(); // reset back to the marked position + return isValid; + } catch (IOException e) { + // Handle the exception + return false; + } + +// try(final BlockCompressedInputStream bgzInputStream = new BlockCompressedInputStream(file)) { +// System.out.println(" inside try block: start bufferReader ..."); +// BufferedReader reader = new BufferedReader(new InputStreamReader(bgzInputStream)); +// System.out.println(" inside try block: reader.readLine()... "); +// String line = reader.readLine(); +// return line != null && !line.isEmpty(); +// } catch (Exception e) { +// //file is not .vcf.bgz file +// //it will throw any type exception according to file type +// //hence we try to catch any type exception +// e.printStackTrace(); +// return false; +// } + } + + return false; + } + + /** + * + * @param file: an input file + * @return true if input file is Gzip by check the first two byte of input file + * @throws IOException + */ + public static boolean isInputGZip(final File file) throws IOException { + //final PushbackInputStream pb = new PushbackInputStream(input, 2); + + try(final InputStream input = new FileInputStream(file)){ + int header = input.read(); //read ID1 + if(header == -1) return false; + + int b = input.read(); //read ID2 + if(b == -1) return false; + + //ID2 * 256 + ID1 = 35615 + if( ( (b << 8) | header) == GZIPInputStream.GZIP_MAGIC) + return true; + } + + return false; + } } From 7b6df964947160495e0016343317cbb9df855ada Mon Sep 17 00:00:00 2001 From: "Xu, Qinying (H&B, Herston)" Date: Thu, 4 Apr 2024 17:14:59 +1000 Subject: [PATCH 4/7] add code passed build --- .../csiro/variantspark/utils/FileUtils.java | 56 +++++-------------- .../variantspark/cli/args/SparkArgs.scala | 8 +-- 2 files changed, 19 insertions(+), 45 deletions(-) diff --git a/src/main/java/au/csiro/variantspark/utils/FileUtils.java b/src/main/java/au/csiro/variantspark/utils/FileUtils.java index ff7e6839..e90e68e7 100644 --- a/src/main/java/au/csiro/variantspark/utils/FileUtils.java +++ b/src/main/java/au/csiro/variantspark/utils/FileUtils.java @@ -14,47 +14,21 @@ public class FileUtils { */ public static boolean isInputBGZ(final File file) { - //.vcf.bgz is type of GZP file - //.vcf.gz is also GZP file but get java.lang.OutOfMemoryError at java.io.InputStreamReader.read(InputStreamReader.java:184) - //.vcf.bz2 is not GZP file and get java.lang.OutOfMemoryError at java.io.InputStreamReader.read(InputStreamReader.java:184) - //.vcf is not GZP file and get htsjdk.samtools.SAMFormatException: at header from java.io.BufferedReader.readLine(BufferedReader.java:389) - - boolean isGzip = false; - try { - isGzip = isInputGZip(file); //ture if .bgz or .gz - } catch (IOException e) {} - - - //if not gzip file, do following check - if(isGzip) { - - try (BufferedInputStream bufferedInputStream = new BufferedInputStream(new FileInputStream(file))) { - bufferedInputStream.mark(100); // mark the current position - boolean isValid = BlockCompressedInputStream.isValidFile(bufferedInputStream); - bufferedInputStream.reset(); // reset back to the marked position - return isValid; - } catch (IOException e) { - // Handle the exception - return false; - } - -// try(final BlockCompressedInputStream bgzInputStream = new BlockCompressedInputStream(file)) { -// System.out.println(" inside try block: start bufferReader ..."); -// BufferedReader reader = new BufferedReader(new InputStreamReader(bgzInputStream)); -// System.out.println(" inside try block: reader.readLine()... "); -// String line = reader.readLine(); -// return line != null && !line.isEmpty(); -// } catch (Exception e) { -// //file is not .vcf.bgz file -// //it will throw any type exception according to file type -// //hence we try to catch any type exception -// e.printStackTrace(); -// return false; -// } - } - - return false; - + /** + * .vcf.bgz is type of GZP file + * .vcf.gz is also GZP file but get java.lang.OutOfMemoryError at java.io.InputStreamReader.read(InputStreamReader.java:184) + * .vcf.bz2 is not GZP file and get java.lang.OutOfMemoryError at java.io.InputStreamReader.read(InputStreamReader.java:184) + * .vcf is not GZP file and get htsjdk.samtools.SAMFormatException: at header from java.io.BufferedReader.readLine(BufferedReader.java:389) + */ + try (BufferedInputStream bufferedInputStream = new BufferedInputStream(new FileInputStream(file))) { + bufferedInputStream.mark(100); // mark the current position + boolean isValid = BlockCompressedInputStream.isValidFile(bufferedInputStream); + bufferedInputStream.reset(); // reset back to the marked position + return isValid; + } catch (IOException e) { + // Handle the exception + return false; + } } /** diff --git a/src/main/scala/au/csiro/variantspark/cli/args/SparkArgs.scala b/src/main/scala/au/csiro/variantspark/cli/args/SparkArgs.scala index 4c611c4b..ade9f4f5 100644 --- a/src/main/scala/au/csiro/variantspark/cli/args/SparkArgs.scala +++ b/src/main/scala/au/csiro/variantspark/cli/args/SparkArgs.scala @@ -2,6 +2,7 @@ package au.csiro.variantspark.cli.args import org.kohsuke.args4j.Option import au.csiro.pbdava.ssparkle.spark.SparkApp +import au.csiro.variantspark.utils._ import org.apache.spark.rdd.RDD import htsjdk.samtools.util.BlockCompressedInputStream import org.apache.hadoop.fs.Path @@ -14,10 +15,9 @@ trait SparkArgs extends SparkApp { val sparkPar: Int = 0 def textFile(inputFile: String): RDD[String] = { - val input = new File(inputFile) - val isBGZ = input.getName.split('.').lastOption.getOrElse("").equalsIgnoreCase("bgz") - println(inputFile + " is loading to spark RDD " + isBGZ) - if (isBGZ) { + val isBGZ = FileUtils.isInputBGZ(new File(inputFile)) + println(inputFile + " is loading to spark RDD, isBGZFile: " + isBGZ) + if (isBGZ ) { val path = new Path(inputFile) val fs = path.getFileSystem(sc.hadoopConfiguration) val bgzInputStream = new BlockCompressedInputStream(fs.open(path)) From da88fa9687f5f04d651ee690a6888732058cf69b Mon Sep 17 00:00:00 2001 From: "Xu, Qinying (H&B, Herston)" Date: Thu, 4 Apr 2024 17:45:50 +1000 Subject: [PATCH 5/7] code works --- .../java/au/csiro/variantspark/utils/FileUtils.java | 11 +++++------ .../au/csiro/variantspark/cli/args/SparkArgs.scala | 4 ++-- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/src/main/java/au/csiro/variantspark/utils/FileUtils.java b/src/main/java/au/csiro/variantspark/utils/FileUtils.java index e90e68e7..20ca1e48 100644 --- a/src/main/java/au/csiro/variantspark/utils/FileUtils.java +++ b/src/main/java/au/csiro/variantspark/utils/FileUtils.java @@ -12,15 +12,14 @@ public class FileUtils { * @param file: an input file * @return true if input file is BGZIP by check the first two byte of input file */ - public static boolean isInputBGZ(final File file) { - + public static boolean isBGZFile(String filePath) { /** - * .vcf.bgz is type of GZP file + * .vcf.bgz is type of GZP file, work well with BlockCompressedInputStream * .vcf.gz is also GZP file but get java.lang.OutOfMemoryError at java.io.InputStreamReader.read(InputStreamReader.java:184) * .vcf.bz2 is not GZP file and get java.lang.OutOfMemoryError at java.io.InputStreamReader.read(InputStreamReader.java:184) * .vcf is not GZP file and get htsjdk.samtools.SAMFormatException: at header from java.io.BufferedReader.readLine(BufferedReader.java:389) - */ - try (BufferedInputStream bufferedInputStream = new BufferedInputStream(new FileInputStream(file))) { + */ + try (BufferedInputStream bufferedInputStream = new BufferedInputStream(new FileInputStream(filePath))) { bufferedInputStream.mark(100); // mark the current position boolean isValid = BlockCompressedInputStream.isValidFile(bufferedInputStream); bufferedInputStream.reset(); // reset back to the marked position @@ -28,7 +27,7 @@ public static boolean isInputBGZ(final File file) { } catch (IOException e) { // Handle the exception return false; - } + } } /** diff --git a/src/main/scala/au/csiro/variantspark/cli/args/SparkArgs.scala b/src/main/scala/au/csiro/variantspark/cli/args/SparkArgs.scala index ade9f4f5..6e72a20a 100644 --- a/src/main/scala/au/csiro/variantspark/cli/args/SparkArgs.scala +++ b/src/main/scala/au/csiro/variantspark/cli/args/SparkArgs.scala @@ -15,9 +15,9 @@ trait SparkArgs extends SparkApp { val sparkPar: Int = 0 def textFile(inputFile: String): RDD[String] = { - val isBGZ = FileUtils.isInputBGZ(new File(inputFile)) + val isBGZ = FileUtils.isBGZFile(inputFile) println(inputFile + " is loading to spark RDD, isBGZFile: " + isBGZ) - if (isBGZ ) { + if (isBGZ) { val path = new Path(inputFile) val fs = path.getFileSystem(sc.hadoopConfiguration) val bgzInputStream = new BlockCompressedInputStream(fs.open(path)) From 3ed3b78ad920470ed024968c4abd642032b4cd2c Mon Sep 17 00:00:00 2001 From: "Xu, Qinying (H&B, Herston)" Date: Thu, 4 Apr 2024 18:13:03 +1000 Subject: [PATCH 6/7] remove mark --- src/main/java/au/csiro/variantspark/utils/FileUtils.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/au/csiro/variantspark/utils/FileUtils.java b/src/main/java/au/csiro/variantspark/utils/FileUtils.java index 20ca1e48..582db5f5 100644 --- a/src/main/java/au/csiro/variantspark/utils/FileUtils.java +++ b/src/main/java/au/csiro/variantspark/utils/FileUtils.java @@ -20,9 +20,9 @@ public static boolean isBGZFile(String filePath) { * .vcf is not GZP file and get htsjdk.samtools.SAMFormatException: at header from java.io.BufferedReader.readLine(BufferedReader.java:389) */ try (BufferedInputStream bufferedInputStream = new BufferedInputStream(new FileInputStream(filePath))) { - bufferedInputStream.mark(100); // mark the current position + //bufferedInputStream.mark(100); // mark the current position boolean isValid = BlockCompressedInputStream.isValidFile(bufferedInputStream); - bufferedInputStream.reset(); // reset back to the marked position + //bufferedInputStream.reset(); // reset back to the marked position return isValid; } catch (IOException e) { // Handle the exception From 47dda8f37e985170192c9407a35a1364d782b96d Mon Sep 17 00:00:00 2001 From: "Xu, Qinying (H&B, Herston)" Date: Fri, 5 Apr 2024 10:56:50 +1000 Subject: [PATCH 7/7] add comments as reviewer suggested --- .../csiro/variantspark/utils/FileUtils.java | 28 +------------------ .../variantspark/cli/args/SparkArgs.scala | 6 +++- 2 files changed, 6 insertions(+), 28 deletions(-) diff --git a/src/main/java/au/csiro/variantspark/utils/FileUtils.java b/src/main/java/au/csiro/variantspark/utils/FileUtils.java index 582db5f5..1288bcad 100644 --- a/src/main/java/au/csiro/variantspark/utils/FileUtils.java +++ b/src/main/java/au/csiro/variantspark/utils/FileUtils.java @@ -20,37 +20,11 @@ public static boolean isBGZFile(String filePath) { * .vcf is not GZP file and get htsjdk.samtools.SAMFormatException: at header from java.io.BufferedReader.readLine(BufferedReader.java:389) */ try (BufferedInputStream bufferedInputStream = new BufferedInputStream(new FileInputStream(filePath))) { - //bufferedInputStream.mark(100); // mark the current position boolean isValid = BlockCompressedInputStream.isValidFile(bufferedInputStream); - //bufferedInputStream.reset(); // reset back to the marked position return isValid; } catch (IOException e) { - // Handle the exception + //handle exception for non proper bgzip file return false; } } - - /** - * - * @param file: an input file - * @return true if input file is Gzip by check the first two byte of input file - * @throws IOException - */ - public static boolean isInputGZip(final File file) throws IOException { - //final PushbackInputStream pb = new PushbackInputStream(input, 2); - - try(final InputStream input = new FileInputStream(file)){ - int header = input.read(); //read ID1 - if(header == -1) return false; - - int b = input.read(); //read ID2 - if(b == -1) return false; - - //ID2 * 256 + ID1 = 35615 - if( ( (b << 8) | header) == GZIPInputStream.GZIP_MAGIC) - return true; - } - - return false; - } } diff --git a/src/main/scala/au/csiro/variantspark/cli/args/SparkArgs.scala b/src/main/scala/au/csiro/variantspark/cli/args/SparkArgs.scala index 6e72a20a..1eac4ecc 100644 --- a/src/main/scala/au/csiro/variantspark/cli/args/SparkArgs.scala +++ b/src/main/scala/au/csiro/variantspark/cli/args/SparkArgs.scala @@ -2,7 +2,7 @@ package au.csiro.variantspark.cli.args import org.kohsuke.args4j.Option import au.csiro.pbdava.ssparkle.spark.SparkApp -import au.csiro.variantspark.utils._ +import au.csiro.variantspark.utils.FileUtils import org.apache.spark.rdd.RDD import htsjdk.samtools.util.BlockCompressedInputStream import org.apache.hadoop.fs.Path @@ -18,11 +18,15 @@ trait SparkArgs extends SparkApp { val isBGZ = FileUtils.isBGZFile(inputFile) println(inputFile + " is loading to spark RDD, isBGZFile: " + isBGZ) if (isBGZ) { + // BGZIP file is compressed as blocks, requires specialized libraries htsjdk val path = new Path(inputFile) val fs = path.getFileSystem(sc.hadoopConfiguration) val bgzInputStream = new BlockCompressedInputStream(fs.open(path)) + // each blocks can be decompressed independently and to be read in parallel sc.parallelize(Stream.continually(bgzInputStream.readLine()).takeWhile(_ != null).toList) } else { + // The standard GZIP libraries can handle files compressed as a whole + // load .vcf, .vcf.gz or .vcf.bz2 to RDD sc.textFile(inputFile, if (sparkPar > 0) sparkPar else sc.defaultParallelism) } }