From 8323340bffc8119635f73424879d43b60c5b0844 Mon Sep 17 00:00:00 2001 From: Andrea T <15690844+telatin@users.noreply.github.com> Date: Thu, 16 Jan 2025 15:01:22 +0000 Subject: [PATCH] udpate --- qimba/cli.py | 12 +- qimba/commands/dada2_split.py | 187 +++++++++++++++++++++++ qimba/commands/make_mapping.py | 214 ++++++++++++++++---------- qimba/commands/merge.py | 156 +++++++++++++++++++ qimba/core.py | 44 +++++- qimba/formats.py | 270 +++++++++++++++++++++++++++++++++ 6 files changed, 804 insertions(+), 79 deletions(-) create mode 100644 qimba/commands/dada2_split.py create mode 100644 qimba/commands/merge.py create mode 100644 qimba/formats.py diff --git a/qimba/cli.py b/qimba/cli.py index 5e27d86..59ae080 100644 --- a/qimba/cli.py +++ b/qimba/cli.py @@ -12,12 +12,20 @@ 'sample': { 'name': 'Sample Management', # 'description': 'Commands for handling sample information and mapping files', - 'commands': ['make-mapping', 'show-samples'] + 'commands': ['make-mapping', + 'show-samples'] }, 'sequence': { 'name': 'Sequence Processing', # 'description': 'Commands for processing sequence data', - 'commands': ['derep'] + 'commands': [ + 'merge', + 'derep'] + }, + 'formats': { + 'name': 'Format conversions and manipulation', +# 'description': 'Commands for processing sequence data', + 'commands': ['dada2-split'] }, 'file': { 'name': 'File Operations', diff --git a/qimba/commands/dada2_split.py b/qimba/commands/dada2_split.py new file mode 100644 index 0000000..688801a --- /dev/null +++ b/qimba/commands/dada2_split.py @@ -0,0 +1,187 @@ +# qimba/commands/dada2_split.py +import click +from pathlib import Path +import sys +from typing import List, Dict, TextIO, Tuple +import csv + +class GroupedCommand(click.Command): + def format_options(self, ctx, formatter): + """Writes all the options into the formatter if they exist.""" + opts = [] + for param in self.get_params(ctx): + rv = param.get_help_record(ctx) + if rv is not None: + opts.append(rv) + + if opts: + main_opts = [(x,y) for x,y in opts if x.startswith('-o')] + other_opts = [(x,y) for x,y in opts if not x.startswith('-o')] + + if main_opts: + with formatter.section('Output Options'): + formatter.write_dl(main_opts) + if other_opts: + with formatter.section('Other Options'): + formatter.write_dl(other_opts) + +def validate_tsv(input_file: Path) -> Tuple[List[str], List[str]]: + """ + Validate the DADA2 TSV format and return headers and sequences. + + Returns: + Tuple of (headers, sequences) + + Raises: + click.BadParameter: If file format is invalid + """ + try: + with input_file.open() as f: + reader = csv.reader(f, delimiter='\t') + + # Read and validate header + try: + headers = next(reader) + except StopIteration: + raise click.BadParameter("Input file is empty") + + if len(headers) < 2: + raise click.BadParameter( + "Invalid DADA2 format: TSV must have at least 2 columns " + "(sequence and at least one sample)" + ) + + # Validate data rows and collect sequences + sequences = [] + line_num = 1 + for row in reader: + line_num += 1 + if not row: + continue + + if len(row) != len(headers): + raise click.BadParameter( + f"Invalid DADA2 format: Line {line_num} has {len(row)} " + f"fields, expected {len(headers)}" + ) + + # Validate sequence (first column) + sequence = row[0] + if not sequence or not all(c in 'ACGTN' for c in sequence.upper()): + raise click.BadParameter( + f"Invalid sequence at line {line_num}: {sequence[:50]}..." + ) + + # Validate counts (remaining columns) + for i, count in enumerate(row[1:], 1): + if count and not count.isdigit(): + raise click.BadParameter( + f"Invalid count '{count}' in column {headers[i]} " + f"at line {line_num}" + ) + + sequences.append(sequence) + + if not sequences: + raise click.BadParameter("No valid sequences found in input file") + + return headers, sequences + + except (OSError, UnicodeDecodeError) as e: + raise click.BadParameter(f"Error reading input file: {e}") + +def write_fasta(sequences: List[str], counts: Dict[str, int], output_file: Path) -> None: + """Write sequences to FASTA format with ASV IDs.""" + try: + with output_file.open('w') as f: + for idx, seq in enumerate(sequences, 1): + f.write(f">ASV{idx} counts={counts[seq]}\n") + f.write(f"{seq}\n") + except OSError as e: + raise click.ClickException(f"Error writing FASTA file: {e}") + +def write_tsv(headers: List[str], data: List[List[str]], output_file: Path) -> None: + """Write simplified TSV with ASV IDs replacing sequences.""" + try: + with output_file.open('w') as f: + writer = csv.writer(f, delimiter='\t', lineterminator='\n') + writer.writerow(headers) + writer.writerows(data) + except OSError as e: + raise click.ClickException(f"Error writing TSV file: {e}") + +@click.command(cls=GroupedCommand) +@click.argument('input-file', type=click.Path(exists=True, dir_okay=False)) +@click.option('-o', '--output', required=True, + help='Output basename (without extension)') +@click.option('-v', '--verbose', is_flag=True, + help='Print detailed progress information') +def cli(input_file: str, output: str, verbose: bool) -> None: + """Split DADA2 TSV file into FASTA and simplified TSV. + + This command processes a DADA2-format TSV file containing sequences and their + counts across samples. It generates: + + 1. A FASTA file containing unique sequences with ASV IDs + 2. A simplified TSV file with ASV IDs replacing sequences + + The input TSV must have sequences in the first column and sample counts in + subsequent columns. Empty counts are treated as zeros. + + Example usage: + qimba dada2-split input.tsv -o output + qimba dada2-split input.tsv -o output --verbose + """ + input_path = Path(input_file) + + if verbose: + click.echo(f"Processing {input_path}...") + + # Validate input and get headers/sequences + headers, sequences = validate_tsv(input_path) + + if verbose: + click.echo(f"Found {len(sequences)} unique sequences across {len(headers)-1} samples") + + # Process input file + try: + with input_path.open() as f: + reader = csv.reader(f, delimiter='\t') + next(reader) # Skip header + + # Calculate total counts per sequence + seq_counts = {} + simplified_rows = [] + + for idx, row in enumerate(reader, 1): + if not row: + continue + + sequence = row[0] + counts = [int(count) if count else 0 for count in row[1:]] + seq_counts[sequence] = sum(counts) + + # Replace sequence with ASV ID + simplified_rows.append([f"ASV{idx}"] + row[1:]) + + if verbose and idx % 1000 == 0: + click.echo(f"Processed {idx} sequences...") + + except (OSError, ValueError) as e: + raise click.ClickException(f"Error processing input file: {e}") + + # Write output files + output_base = Path(output) + fasta_path = output_base.with_suffix('.fasta') + tsv_path = output_base.with_suffix('.tsv') + + if verbose: + click.echo(f"Writing FASTA output to {fasta_path}...") + write_fasta(sequences, seq_counts, fasta_path) + + if verbose: + click.echo(f"Writing TSV output to {tsv_path}...") + write_tsv(headers, simplified_rows, tsv_path) + + if verbose: + click.echo("Processing complete!") \ No newline at end of file diff --git a/qimba/commands/make_mapping.py b/qimba/commands/make_mapping.py index c3ffa93..f2e78ac 100644 --- a/qimba/commands/make_mapping.py +++ b/qimba/commands/make_mapping.py @@ -1,11 +1,9 @@ -# qimba/commands/make_mapping.py import click from pathlib import Path import sys -from typing import Dict, List, Tuple, Optional, TextIO -from collections import defaultdict -import csv import re +from typing import Optional, Tuple, Dict +from ..formats import SampleSheet class GroupedCommand(click.Command): def format_options(self, ctx, formatter): @@ -17,7 +15,7 @@ def format_options(self, ctx, formatter): opts.append(rv) if opts: - main_opts = [(x,y) for x,y in opts if x.startswith('-o')] + main_opts = [(x,y) for x,y in opts if x.startswith('-o') or x.startswith('-a')] pattern_opts = [(x,y) for x,y in opts if x in ['-e', '--ext', '-1', '--tag-for', '-2', '--tag-rev', '-s', '--strip']] other_opts = [(x,y) for x,y in opts if (x,y) not in main_opts and (x,y) not in pattern_opts] @@ -32,6 +30,7 @@ def format_options(self, ctx, formatter): with formatter.section('Other Options'): formatter.write_dl(other_opts) + def process_filename(filename: Path, extension: str, strip_str: str, tag_for: str, tag_rev: str) -> Tuple[Optional[str], bool]: """ @@ -60,27 +59,83 @@ def process_filename(filename: Path, extension: str, strip_str: str, return None, False -def write_mapping(samples: Dict[str, Dict[str, Path]], - output: TextIO, - input_dir: Path) -> None: - """Write the mapping file in TSV format.""" - writer = csv.writer(output, delimiter='\t', lineterminator='\n') + +def rename_sample_id(sample_id: str, safe_char: str, prefix: str = "Sample", + name_counts: dict = None) -> str: + """ + Rename a sample ID according to specified rules. + + Args: + sample_id: Original sample ID + safe_char: Character to replace non-alphanumeric characters with + prefix: Prefix to add for samples starting with digits + name_counts: Dictionary to track name occurrences for handling duplicates + + Returns: + Renamed sample ID + """ + if name_counts is None: + name_counts = {} + + new_name = sample_id + + # Rule 1: Prepend prefix if starts with digit + if new_name[0].isdigit(): + new_name = prefix + safe_char + new_name + + # Rule 2: Replace non-alphanumeric chars with safe_char + new_name = re.sub(r'[^a-zA-Z0-9]', safe_char, new_name) + + # Rule 3: Handle duplicates + base_name = new_name + if base_name in name_counts: + name_counts[base_name] += 1 + new_name = f"{base_name}{name_counts[base_name]}" + else: + name_counts[base_name] = 0 + + return new_name + + +def collect_read_files(input_path: Path, ext: str, strip_str: str, + tag_for: str, tag_rev: str) -> Dict[str, Dict[str, Path]]: + """ + Scan directory and collect read files, pairing R1 and R2 files by sample name. - # Write header - writer.writerow(['Sample ID', 'Forward', 'Reverse']) + Returns: + Dictionary mapping sample IDs to their forward and reverse read files + """ + # Dictionary to store file pairs: sample_id -> {'forward': path, 'reverse': path} + read_pairs = {} - # Write samples - for sample_id, files in sorted(samples.items()): - forward = files.get('forward', '') - reverse = files.get('reverse', '') + # First pass: collect all files and their types + for filepath in input_path.glob('**/*'): + if not filepath.is_file(): + continue + + sample_id, is_forward = process_filename(filepath, ext, strip_str, tag_for, tag_rev) - # Use relative paths from input directory - if forward: - forward = str(Path(forward).relative_to(input_dir)) - if reverse: - reverse = str(Path(reverse).relative_to(input_dir)) + if not sample_id: + continue + + # Initialize or update sample entry + if sample_id not in read_pairs: + read_pairs[sample_id] = {'forward': None, 'reverse': None} - writer.writerow([sample_id, forward, reverse]) + file_type = 'forward' if is_forward else 'reverse' + + # Check for duplicate files + if read_pairs[sample_id][file_type] is not None: + raise ValueError( + f"Duplicate {file_type} file found for sample {sample_id}:\n" + f" Existing: {read_pairs[sample_id][file_type]}\n" + f" New: {filepath}" + ) + + read_pairs[sample_id][file_type] = filepath + + return read_pairs + @click.command(cls=GroupedCommand) @click.argument('input-dir', type=click.Path(exists=True, file_okay=False)) @@ -94,10 +149,16 @@ def write_mapping(samples: Dict[str, Dict[str, Path]], help='Reverse read tag [default: _R2]') @click.option('-s', '--strip', default='', help='Additional string to strip from filenames') -def cli(input_dir, output, ext, tag_for, tag_rev, strip): +@click.option('-c', '--safe-char', default='_', + help='Safe character for sample names (default _)') +@click.option('-a', '--absolute', is_flag=True, + help='Use absolute paths in output') +@click.option('-k', '--dont-rename', is_flag=True, + help='Do not remove illegal chars from SampleIDs') +def cli(input_dir: str, output: str, ext: str, tag_for: str, tag_rev: str, + strip: str, safe_char: str, absolute: bool, dont_rename: bool): """Generate a sample mapping file from a directory of sequence files. - \b This command scans INPUT_DIR for sequence files and creates a mapping file based on the file naming pattern. Sample names are extracted by: 1. Removing the extension @@ -105,66 +166,67 @@ def cli(input_dir, output, ext, tag_for, tag_rev, strip): 3. Splitting on forward/reverse tags and taking the prefix Each sample must have an R1 file, R2 is optional. - - Example usage: - qimba make-mapping data_dir -o mapping.tsv - qimba make-mapping data_dir -e .fq.gz -1 _1 -2 _2 -s _filtered """ input_path = Path(input_dir) - samples = defaultdict(dict) errors = [] + name_counts = {} # For tracking duplicate renamed samples - # Scan directory - for filepath in input_path.glob('**/*'): - if not filepath.is_file(): - continue - - sample_id, is_forward = process_filename( - filepath, ext, strip, tag_for, tag_rev - ) + try: + # Collect and pair read files + read_pairs = collect_read_files(input_path, ext, strip, tag_for, tag_rev) - if sample_id: - file_type = 'forward' if is_forward else 'reverse' - - # Check for duplicate files - if file_type in samples[sample_id]: - errors.append( - f"Duplicate {file_type} file found for sample {sample_id}:\n" - f" Existing: {samples[sample_id][file_type]}\n" - f" New: {filepath}" + # Create sample sheet + sample_sheet = SampleSheet() + + # Process each sample + for sample_id, files in read_pairs.items(): + # Skip samples without forward reads + if not files['forward']: + errors.append(f"Sample {sample_id} missing forward read file") + continue + + # Rename sample if needed + final_id = (rename_sample_id(sample_id, safe_char, name_counts=name_counts) + if not dont_rename else sample_id) + + # Add to sample sheet + try: + sample_sheet.add_sample( + final_id, + files['forward'].absolute() if absolute else files['forward'], + files['reverse'].absolute() if absolute and files['reverse'] else files['reverse'] ) - else: - samples[sample_id][file_type] = filepath - - # Validate samples - for sample_id, files in samples.items(): - if 'forward' not in files: - errors.append(f"Sample {sample_id} missing forward read file") - - # Report errors if any - if errors: - click.echo("Errors found:", err=True) - for error in errors: - click.echo(f" {error}", err=True) - sys.exit(1) - - if not samples: - click.echo( - f"No valid samples found in {input_dir} " - f"(extension: {ext}, forward: {tag_for}, reverse: {tag_rev})", - err=True - ) - sys.exit(1) - - # Write output - try: + except ValueError as e: + errors.append(str(e)) + + # Check if any samples were found + if len(sample_sheet) == 0: + click.echo( + f"No valid samples found in {input_dir} " + f"(extension: {ext}, forward: {tag_for}, reverse: {tag_rev})", + err=True + ) + sys.exit(1) + + # Report errors if any were found + if errors: + click.echo("Errors found:", err=True) + for error in errors: + click.echo(f" {error}", err=True) + sys.exit(1) + + sample_sheet = sample_sheet.sort() + # Write output if output: - with open(output, 'w') as f: - write_mapping(samples, f, input_path) + sample_sheet.save_to_file(output, absolute) click.echo(f"Created mapping file: {output}") else: - write_mapping(samples, sys.stdout, input_path) + print(str(sample_sheet)) - except IOError as e: - click.echo(f"Error writing mapping file: {e}", err=True) + except (ValueError, IOError) as e: + click.echo(f"Error processing files: {e}", err=True) sys.exit(1) + + +if __name__ == '__main__': + cli() \ No newline at end of file diff --git a/qimba/commands/merge.py b/qimba/commands/merge.py new file mode 100644 index 0000000..c335e9c --- /dev/null +++ b/qimba/commands/merge.py @@ -0,0 +1,156 @@ +# qimba/commands/derep.py +import click +from pathlib import Path +import sys +import subprocess +from ..core import * +from ..formats import * +import tempfile +import os +class ThreadOption(click.Option): + def get_help_record(self, ctx): + """Customize the help text to include the config default.""" + if ctx.obj is None or 'config' not in ctx.obj: + help_text = 'Number of threads (overrides config value)' + else: + config = ctx.obj['config'] + default = config.get('qimba', 'threads', fallback='1') + help_text = f'Number of threads [default from config: {default}]' + + self.help = help_text + return super().get_help_record(ctx) + +class GroupedCommand(click.Command): + def format_options(self, ctx, formatter): + """Writes all the options into the formatter if they exist.""" + opts = [] + for param in self.get_params(ctx): + rv = param.get_help_record(ctx) + if rv is not None: + opts.append(rv) + + if opts: + # Split options into groups + main_opts = [(x,y) for x,y in opts if x.startswith('-i') or x.startswith('-o')] + runtime_opts = [(x,y) for x,y in opts if x.startswith('--threads') or x.startswith('--verbose')] + other_opts = [(x,y) for x,y in opts if (x,y) not in main_opts and (x,y) not in runtime_opts] + + if main_opts: + with formatter.section('Main Arguments'): + formatter.write_dl(main_opts) + if runtime_opts: + with formatter.section('Runtime Options'): + formatter.write_dl(runtime_opts) + if other_opts: + with formatter.section('Other Options'): + formatter.write_dl(other_opts) + +@click.command(cls=GroupedCommand) +@click.option('-i', '--input-samplesheet', required=True, + type=click.Path(exists=True, dir_okay=False), + help='Input samplesheet') +@click.option('-o', '--output', required=True, + type=click.Path(dir_okay=False), + help='Output FASTQ file') +@click.option('--tmp-dir', + type=click.Path(dir_okay=False), + help='Temporary directory (overrides config value)') +@click.option('--threads', type=int, + cls=ThreadOption, + help='Number of threads (overrides config value)') +@click.option('--verbose', is_flag=True, + help='Enable verbose output') +def cli(input_samplesheet, output, tmp_dir, threads, verbose): + """Merge paired end into a single file using USEARCH + + \b + This command generates a merged FASTQ file + + Example usage: + qimba merge -i input.tsv -o merge.fastq --threads 8 + """ + # Get configuration + ctx = click.get_current_context() + config = ctx.obj['config'] + + # Determine threads to use + config_threads = config.get('qimba', 'threads', fallback='1') + config_tmpdir = config.get('qimba', 'tmpdir', fallback='/tmp/') + thread_count = str(threads) if threads else config_threads + runtime_tmpdir = Path(tmp_dir) if tmp_dir else Path(config_tmpdir) + runtime_tmpdir.mkdir(parents=True, exist_ok=True) + + # make new temp dir + tmp_path = make_temp_dir(parent_dir=runtime_tmpdir) + + # Prepare log directory using output path as reference + log_dir = tmp_path / 'logs' + log_dir.mkdir(parents=True, exist_ok=True) + + if verbose: + click.echo(f"Input samples: {input_samplesheet}") + click.echo(f"Output file: {output}") + click.echo(f"Using {thread_count} threads") + click.echo(f"Temporary directory: {tmp_path}") + click.echo(f"Logdir: {log_dir}") + + samplesheet = SampleSheet.load_from_file(input_samplesheet) + + # check output file + if os.path.exists(output): + click.echo(f"Error: {output} file already found", err=True) + sys.exit(1) + # now cycle through all the samples using sampleName, sample_R1 and sample_R2 and generate a temporary + + + outputs = [] + for sample in samplesheet: + + tmpOutput = os.path.join(tmp_path, sample.id + '.fastq') + job = Job( + command=[ + 'usearch', + '-fastq_mergepairs', + sample.forward, + '-reverse', + sample.reverse, + '-relabel', + sample.id + '.', + '-fastqout', + tmpOutput, + '-threads', + thread_count + ], + required_input=[sample.forward, sample.reverse], + required_output=[tmpOutput], + log_stderr=log_dir / f"{Path(tmpOutput).stem}.merge.err", + log_stdout=log_dir / f"{Path(tmpOutput).stem}.merge.out" + ) + + + + try: + if verbose: + click.echo(f"Merging {sample.id}") + job.run() + outputs.extend(job.required_output) + except FileNotFoundError as e: + click.echo(f"Error: {e}", err=True) + sys.exit(1) + except subprocess.CalledProcessError as e: + click.echo(f"Error: Dereplication failed with code {e.returncode}", err=True) + if verbose: + click.echo(f"Check logs in: {log_dir}") + sys.exit(1) + except RuntimeError as e: + click.echo(f"Error: {e}", err=True) + if verbose: + click.echo(f"Check logs in: {log_dir}") + sys.exit(1) + + with Path(output).open("wb") as outfile: + for filepath in outputs: + if verbose: + click.echo(f"Concatenating temporary file {filepath}") + with filepath.open('rb') as infile: + outfile.write(infile.read()) \ No newline at end of file diff --git a/qimba/core.py b/qimba/core.py index fb2e488..bad9c5a 100644 --- a/qimba/core.py +++ b/qimba/core.py @@ -5,7 +5,7 @@ import shlex import sys from datetime import datetime - +import tempfile class Job: """ Represents a system command to be executed with input/output validation and logging. @@ -267,3 +267,45 @@ def __getitem__(self, sample_id: str) -> Dict[str, Any]: def __contains__(self, sample_id: str) -> bool: """Check if a sample exists.""" return sample_id in self.samples + +def make_temp_dir(parent_dir='/my/temps', prefix='qimba_') -> Path: + """Create a temporary directory in the specified parent directory.""" + parent = Path(parent_dir) + parent.mkdir(parents=True, exist_ok=True) + return Path(tempfile.mkdtemp(prefix=prefix, dir=parent)) + +import re + +def extract_from_log(filepath: str, pattern: str) -> list: + """ + Extract data from a log file using a regex pattern. + + Args: + filepath (str): Path to the log file + pattern (str): Regular expression pattern with capture groups + + Returns: + list: List of matched groups or empty list if no match found + """ + try: + with open(filepath, 'r') as file: + content = file.read() + match = re.search(pattern, content) + if match: + # Convert matched groups to appropriate types (float or int) + return [float(group) if '.' in group else int(group) + for group in match.groups()] + return [] + except FileNotFoundError: + print(f"Error: File {filepath} not found") + return [] + except Exception as e: + print(f"Error processing file: {str(e)}") + return [] + +# Example usage: +# pattern = r"Merged \((\d+), ([.\d]+)%\)" +# results = extract_from_log("path/to/your/logfile.txt", pattern) +# if results: +# count, percentage = results +# print(f"Merged: {count} ({percentage}%)") \ No newline at end of file diff --git a/qimba/formats.py b/qimba/formats.py new file mode 100644 index 0000000..3b032b8 --- /dev/null +++ b/qimba/formats.py @@ -0,0 +1,270 @@ +from dataclasses import dataclass +from enum import Enum +from pathlib import Path +from typing import Dict, Iterator, Optional, Union, Any, List +import csv + +class SortBy(Enum): + """Enumeration for sort options.""" + SAMPLE_ID = "sample_id" + FORWARD_READ = "forward_read" + +@dataclass +class Sample: + """Represents a single sample with its forward and reverse reads and additional attributes.""" + id: str + forward: Path + reverse: Optional[Path] = None + attributes: Dict[str, str] = None + + def __post_init__(self): + """Initialize attributes dictionary if None.""" + if self.attributes is None: + self.attributes = {} + + def __str__(self) -> str: + """String representation including all attributes.""" + rev = str(self.reverse) if self.reverse else "" + return f"{self.id}\t{self.forward}\t{rev}" + + def get_attr(self, attr: str) -> Optional[str]: + """Get an attribute value.""" + return self.attributes.get(attr) + +class SampleSheet: + """Manages a collection of sequencing samples and their associated files.""" + REQUIRED_COLUMNS = {'Forward', 'Reverse'} + + def __init__(self): + """Initialize an empty sample sheet.""" + self._samples: Dict[str, Sample] = {} + self.columns: List[str] = [] + + def add_sample(self, sample_id: str, forward: Union[str, Path], + reverse: Optional[Union[str, Path]] = None, + attributes: Dict[str, str] = None) -> None: + """ + Add a new sample to the sample sheet. + + Args: + sample_id: Unique identifier for the sample + forward: Path to forward reads file + reverse: Optional path to reverse reads file + attributes: Optional dictionary of additional attributes + + Raises: + ValueError: If sample_id already exists + """ + if sample_id in self._samples: + raise ValueError(f"Sample {sample_id} already exists in sample sheet") + + # Convert strings to Path objects + fwd = Path(forward) if isinstance(forward, str) else forward + rev = Path(reverse) if isinstance(reverse, str) and reverse else reverse + + self._samples[sample_id] = Sample(sample_id, fwd, rev, attributes or {}) + + def get_sample_attr(self, sample_id: str, attr: str) -> Optional[str]: + """ + Get a sample's attribute value. + + Args: + sample_id: Sample identifier + attr: Attribute name + + Returns: + Attribute value if it exists, None otherwise + + Raises: + KeyError: If sample_id doesn't exist + """ + if sample_id not in self._samples: + raise KeyError(f"Sample not found: {sample_id}") + + return self._samples[sample_id].get_attr(attr) + + def get_samples_by_attr(self, attr: str, value: str) -> List[str]: + """ + Get all sample IDs that match a specific attribute value. + + Args: + attr: Attribute name + value: Attribute value to match + + Returns: + List of matching sample IDs + """ + return [ + sample.id for sample in self._samples.values() + if sample.get_attr(attr) == value + ] + + @classmethod + def load_from_file(cls, filepath: Union[str, Path]) -> 'SampleSheet': + """ + Create a new SampleSheet object from an existing mapping file. + + Args: + filepath: Path to the mapping file (TSV format) + + Returns: + A new SampleSheet object + + Raises: + ValueError: If required columns are missing or file format is invalid + """ + sheet = cls() + filepath = Path(filepath) + + with filepath.open() as f: + reader = csv.reader(f, delimiter='\t') + + # Get and validate headers + try: + headers = next(reader) + except StopIteration: + raise ValueError(f"Empty mapping file: {filepath}") + + # Store column names + sheet.columns = headers + + # First column is always sample ID + if not headers[0].strip(): + raise ValueError("First column must contain sample IDs") + + # Find required columns + col_idx = {col.strip(): idx for idx, col in enumerate(headers)} + missing_cols = cls.REQUIRED_COLUMNS - set(col_idx.keys()) + if missing_cols: + raise ValueError( + f"Missing required columns in mapping file: {', '.join(missing_cols)}" + ) + + # Process samples + for row_num, row in enumerate(reader, 2): # Start at 2 for human-readable line numbers + if len(row) != len(headers): + raise ValueError( + f"Line {row_num}: Expected {len(headers)} columns, got {len(row)}" + ) + + sample_id = row[0].strip() + if not sample_id: + continue + + # Get required fields + forward = row[col_idx['Forward']] + reverse = row[col_idx['Reverse']] + if not reverse.strip(): + reverse = None + + # Get additional attributes + attributes = { + col: row[idx] for col, idx in col_idx.items() + if col not in {'Forward', 'Reverse'} and col != headers[0] + } + + sheet.add_sample(sample_id, forward, reverse, attributes) + + return sheet + + def save_to_file(self, filepath: Union[str, Path], + absolute_paths: bool = False) -> None: + """ + Save the sample sheet to a file. + + Args: + filepath: Output file path + absolute_paths: Whether to use absolute paths for read files + """ + filepath = Path(filepath) + + with filepath.open('w') as f: + writer = csv.writer(f, delimiter='\t', lineterminator='\n') + + # Write header with all columns + writer.writerow(self.columns) + + # Write samples + for sample in self: + # Prepare row with all attributes + row = [sample.id] + for col in self.columns[1:]: # Skip sample ID column + if col == 'Forward': + value = str(sample.forward.absolute() if absolute_paths else sample.forward) + elif col == 'Reverse': + value = str(sample.reverse.absolute() if absolute_paths and sample.reverse + else sample.reverse or '') + else: + value = sample.get_attr(col) or '' + row.append(value) + writer.writerow(row) + + def remove_sample(self, sample_id: str) -> None: + """Remove a sample from the sample sheet.""" + if sample_id not in self._samples: + raise KeyError(f"Sample {sample_id} not found in sample sheet") + del self._samples[sample_id] + + def get_sample(self, sample_id: str) -> Sample: + """Retrieve a sample by its ID.""" + if sample_id not in self._samples: + raise KeyError(f"Sample {sample_id} not found in sample sheet") + return self._samples[sample_id] + + def sort(self, by: Union[str, SortBy] = SortBy.SAMPLE_ID) -> 'SampleSheet': + """Sort samples by specified criterion and return a new SampleSheet.""" + # Convert string to enum if necessary + if isinstance(by, str): + try: + by = SortBy(by.lower()) + except ValueError: + raise ValueError(f"Invalid sort criterion: {by}. " + f"Valid options are: {[e.value for e in SortBy]}") + + # Create new sorted sample list + if by == SortBy.SAMPLE_ID: + sorted_samples = sorted(self._samples.values(), key=lambda x: x.id) + elif by == SortBy.FORWARD_READ: + sorted_samples = sorted(self._samples.values(), key=lambda x: str(x.forward)) + else: + raise ValueError(f"Invalid sort criterion: {by}") + + # Create new SampleSheet with sorted samples + new_sheet = SampleSheet() + new_sheet.columns = self.columns.copy() # Preserve column order + for sample in sorted_samples: + new_sheet.add_sample(sample.id, sample.forward, sample.reverse, sample.attributes) + + return new_sheet + + def __iter__(self) -> Iterator[Sample]: + """Iterate over samples in the sheet.""" + return iter(self._samples.values()) + + def __len__(self) -> int: + """Return the number of samples in the sheet.""" + return len(self._samples) + + def __str__(self) -> str: + """Return a string representation of the sample sheet.""" + # If we have defined columns, use them + if self.columns: + lines = ['\t'.join(self.columns)] + for sample in self: + row = [sample.id] + for col in self.columns[1:]: + if col == 'Forward': + value = str(sample.forward) + elif col == 'Reverse': + value = str(sample.reverse or '') + else: + value = sample.get_attr(col) or '' + row.append(value) + lines.append('\t'.join(row)) + else: + # Fallback to basic format if no columns are defined + lines = ['SampleID\tForward\tReverse'] + for sample in self: + rev = str(sample.reverse) if sample.reverse else "" + lines.append(f"{sample.id}\t{sample.forward}\t{rev}") + return '\n'.join(lines) \ No newline at end of file