Source code for ancify.config

"""Configuration loading and validation for ancify."""

from dataclasses import dataclass, field
from pathlib import Path
from typing import List, Optional

import yaml


[docs] @dataclass class OutgroupSpec: """Specification for a single outgroup species.""" name: str alignment: str
[docs] @dataclass class EvaluationConfig: """Optional evaluation settings.""" reference_dir: Optional[str] = None reference_pattern: str = "{chrom}.fa" vcf_dir: Optional[str] = None vcf_pattern: str = "{chrom}.vcf.gz"
[docs] @dataclass class PipelineConfig: """Complete pipeline configuration.""" focal_species: str chromosome_lengths: str outgroups_inner: List[OutgroupSpec] outgroups_outer: List[OutgroupSpec] chromosomes: Optional[List[str]] = None work_dir: str = "." output_dir: str = "./ancestral_calls" min_inner_freq: int = 1 min_outer_freq: int = 1 num_cpus: int = 4 backend: str = "auto" gpu_devices: Optional[List[int]] = None method: str = "voting" tree: Optional[str] = None ml_model_path: Optional[str] = None ml_training_reference: Optional[str] = None ml_high_threshold: float = 0.8 ml_low_threshold: float = 0.5 substitution_model: str = "JC69" model_kappa: float = 2.0 model_base_freqs: Optional[List[float]] = None model_rates: Optional[List[float]] = None likelihood_high_threshold: float = 0.8 likelihood_low_threshold: float = 0.5 evaluation: Optional[EvaluationConfig] = None
[docs] def resolve_chromosomes(self): """Return the list of chromosomes to process. If *chromosomes* was not set explicitly, reads all chromosome names from the chromosome-lengths file. """ if self.chromosomes is None: from .utils import read_chromosome_lengths lengths = read_chromosome_lengths(self.chromosome_lengths) self.chromosomes = list(lengths.keys()) return self.chromosomes
@property def all_outgroups(self) -> "List[OutgroupSpec]": """All outgroup species (inner + outer), deduplicated by name.""" seen: set = set() result: list = [] for og in self.outgroups_inner + self.outgroups_outer: if og.name not in seen: seen.add(og.name) result.append(og) return result
[docs] def load_config(path): """Load a YAML configuration file and return a PipelineConfig.""" with open(path) as f: raw = yaml.safe_load(f) inner = [OutgroupSpec(**s) for s in raw["outgroups"]["inner"]] outer = [OutgroupSpec(**s) for s in raw["outgroups"]["outer"]] eval_cfg = None if raw.get("evaluation"): eval_cfg = EvaluationConfig(**raw["evaluation"]) tree_raw = raw.get("tree") if tree_raw is not None: tree_path = Path(tree_raw) if tree_path.suffix in (".nwk", ".tree", ".newick") or ( "(" not in tree_raw and tree_path.is_file() ): tree_raw = tree_path.read_text().strip() cfg = PipelineConfig( focal_species=raw["focal_species"], chromosome_lengths=raw["chromosome_lengths"], outgroups_inner=inner, outgroups_outer=outer, chromosomes=raw.get("chromosomes"), work_dir=raw.get("work_dir", "."), output_dir=raw.get("output_dir", "./ancestral_calls"), min_inner_freq=raw.get("min_inner_freq", 1), min_outer_freq=raw.get("min_outer_freq", 1), num_cpus=raw.get("num_cpus", 4), backend=raw.get("backend", "auto"), gpu_devices=raw.get("gpu_devices"), method=raw.get("method", "voting"), tree=tree_raw, ml_model_path=raw.get("ml_model_path"), ml_training_reference=raw.get("ml_training_reference"), ml_high_threshold=float(raw.get("ml_high_threshold", 0.8)), ml_low_threshold=float(raw.get("ml_low_threshold", 0.5)), substitution_model=raw.get("substitution_model", "JC69"), model_kappa=float(raw.get("model_kappa", 2.0)), model_base_freqs=raw.get("model_base_freqs"), model_rates=raw.get("model_rates"), likelihood_high_threshold=float(raw.get("likelihood_high_threshold", 0.8)), likelihood_low_threshold=float(raw.get("likelihood_low_threshold", 0.5)), evaluation=eval_cfg, ) validate_config(cfg) return cfg
[docs] def validate_config(cfg): """Check that a PipelineConfig is self-consistent. Raises ValueError with a descriptive message on any problem. """ if cfg.method not in ("voting", "parsimony", "ml", "likelihood"): raise ValueError( f"Unknown method: {cfg.method!r} " "(must be 'voting', 'parsimony', 'ml', or 'likelihood')" ) if cfg.method == "voting": if not cfg.outgroups_inner: raise ValueError("At least one inner outgroup species is required.") if not cfg.outgroups_outer: raise ValueError("At least one outer outgroup species is required.") elif cfg.method == "ml": if not cfg.outgroups_inner: raise ValueError("At least one inner outgroup species is required.") if not cfg.outgroups_outer: raise ValueError("At least one outer outgroup species is required.") if cfg.ml_model_path and not Path(cfg.ml_model_path).exists(): raise ValueError( f"ML model file not found: {cfg.ml_model_path}" ) if not (0.0 <= cfg.ml_low_threshold <= cfg.ml_high_threshold <= 1.0): raise ValueError( "ML thresholds must satisfy 0 <= low <= high <= 1; " f"got low={cfg.ml_low_threshold}, high={cfg.ml_high_threshold}" ) elif cfg.method == "likelihood": all_outgroups = cfg.outgroups_inner + cfg.outgroups_outer if not all_outgroups: raise ValueError( "At least one outgroup species is required for likelihood." ) valid_models = ("JC69", "K80", "HKY85", "GTR") if cfg.substitution_model.upper() not in valid_models: raise ValueError( f"Unknown substitution_model: {cfg.substitution_model!r} " f"(must be one of {valid_models})" ) if cfg.substitution_model.upper() == "GTR": if cfg.model_rates is None or len(cfg.model_rates) != 6: raise ValueError( "substitution_model 'GTR' requires 'model_rates' " "with exactly 6 exchangeability rates [AC, AG, AT, CG, CT, GT]" ) if cfg.model_base_freqs is not None: if len(cfg.model_base_freqs) != 4: raise ValueError( "'model_base_freqs' must have exactly 4 values [A, C, G, T]" ) total = sum(cfg.model_base_freqs) if abs(total - 1.0) > 0.01: raise ValueError( f"'model_base_freqs' must sum to ~1.0 (got {total:.4f})" ) if not (0.0 <= cfg.likelihood_low_threshold <= cfg.likelihood_high_threshold <= 1.0): raise ValueError( "Likelihood thresholds must satisfy 0 <= low <= high <= 1; " f"got low={cfg.likelihood_low_threshold}, high={cfg.likelihood_high_threshold}" ) else: all_outgroups = cfg.outgroups_inner + cfg.outgroups_outer if not all_outgroups: raise ValueError( "At least one outgroup species is required for parsimony." ) if not Path(cfg.chromosome_lengths).exists(): raise ValueError( f"Chromosome lengths file not found: {cfg.chromosome_lengths}" ) for og in cfg.outgroups_inner + cfg.outgroups_outer: if not Path(og.alignment).exists(): raise ValueError( f"Alignment file not found for {og.name}: {og.alignment}" ) if cfg.method in ("parsimony", "likelihood"): if not cfg.tree: raise ValueError( f"method {cfg.method!r} requires a 'tree' field " "(Newick string or path to .nwk file)" ) from .parsimony import get_leaf_names, parse_newick tree = parse_newick(cfg.tree) leaf_names = set(get_leaf_names(tree)) outgroup_names = { og.name for og in cfg.outgroups_inner + cfg.outgroups_outer } missing = leaf_names - outgroup_names if missing: raise ValueError( f"Tree leaf names not found in outgroups: {sorted(missing)}" ) extra = outgroup_names - leaf_names if extra: raise ValueError( f"Outgroup names not found as tree leaves: {sorted(extra)}" )