Source code for ancify.evaluate

"""Phase 3: Evaluate ancestral calls against a reference and/or VCF variants.

All evaluation steps are optional and driven by the ``evaluation`` block
in the pipeline configuration file:

* **Coverage statistics** are always produced (no extra data needed).
* **Reference comparison** requires a directory of reference ancestral
  FASTA files (e.g. Ensembl EPO) and a filename pattern.
* **VCF comparison** requires a directory of VCF files and a filename
  pattern, plus the ``scikit-allel`` package.
"""

import logging
from pathlib import Path
from concurrent.futures import ProcessPoolExecutor, as_completed

import numpy as np

from .utils import read_fasta, chrom_id, VALID_ALLELES

logger = logging.getLogger(__name__)


[docs] def compute_coverage_stats(sequence): """Compute coverage and confidence statistics for an ancestral sequence.""" arr = np.array(list(sequence), dtype="U1") upper = np.char.upper(arr) valid = np.isin(upper, list(VALID_ALLELES)) high_conf = np.isin(arr, list(VALID_ALLELES)) low_conf = valid & ~high_conf total = len(arr) return { "total_positions": total, "high_confidence": int(np.sum(high_conf)), "low_confidence": int(np.sum(low_conf)), "missing": total - int(np.sum(valid)), "prop_nonmissing": float(np.sum(valid) / total), "prop_high_confidence": float(np.sum(high_conf) / total), }
[docs] def compare_to_reference(test_seq, ref_seq, positions=None): """Compare two ancestral sequences, optionally at specific positions.""" test = np.array(list(test_seq), dtype="U1") ref = np.array(list(ref_seq), dtype="U1") if positions is not None: test = test[positions] ref = ref[positions] alleles = list(VALID_ALLELES) tu = np.char.upper(test) ru = np.char.upper(ref) tv = np.isin(tu, alleles) rv = np.isin(ru, alleles) both = tv & rv n_both = int(np.sum(both)) agree = tu[both] == ru[both] return { "test_nonmissing": float(np.mean(tv)), "ref_nonmissing": float(np.mean(rv)), "both_nonmissing": n_both, "agreement_rate": float(np.mean(agree)) if n_both > 0 else float("nan"), "disagreement_rate": float(1 - np.mean(agree)) if n_both > 0 else float("nan"), }
[docs] def compare_to_vcf(ancestral_seq, vcf_path): """Compare ancestral calls against VCF REF/ALT alleles. Requires ``scikit-allel`` (install with ``pip install scikit-allel``). """ try: import allel except ImportError: raise ImportError( "scikit-allel is required for VCF evaluation. " "Install with: pip install 'ancify[evaluate]'" ) vcf = allel.read_vcf( str(vcf_path), fields=["variants/POS", "variants/REF", "variants/ALT"], ) if vcf is None: return None pos = vcf["variants/POS"] ref = vcf["variants/REF"] alt = vcf["variants/ALT"] if alt.ndim == 2: alt = alt[:, 0] anc = np.array(list(ancestral_seq), dtype="U1")[pos - 1] anc_upper = np.char.upper(anc) alleles = list(VALID_ALLELES) valid = np.isin(anc_upper, alleles) matches_ref = anc_upper[valid] == ref[valid] matches_alt = anc_upper[valid] == alt[valid] matches_either = matches_ref | matches_alt return { "num_variants": len(pos), "prop_nonmissing": float(np.mean(valid)), "matches_ref": float(np.mean(matches_ref)), "matches_alt": float(np.mean(matches_alt)), "matches_either": float(np.mean(matches_either)), }
def _format_pattern(pattern, chrom): """Substitute ``{chrom}`` and ``{chrom_id}`` placeholders.""" return pattern.format(chrom=chrom, chrom_id=chrom_id(chrom)) def _evaluate_chromosome(args): """Worker: evaluate one chromosome.""" chrom, anc_path, ref_path, vcf_path, summary_path = args _, anc_seq = read_fasta(anc_path) results = {"chromosome": chrom} results["coverage"] = compute_coverage_stats(anc_seq) if ref_path and Path(ref_path).exists(): _, ref_seq = read_fasta(ref_path) results["reference_comparison"] = compare_to_reference(anc_seq, ref_seq) if vcf_path and Path(vcf_path).exists(): vcf_stats = compare_to_vcf(anc_seq, vcf_path) if vcf_stats: results["vcf_comparison"] = vcf_stats if summary_path: Path(summary_path).parent.mkdir(parents=True, exist_ok=True) with open(summary_path, "w") as f: for section, stats in results.items(): if isinstance(stats, dict): f.write(f"[{section}]\n") for k, v in stats.items(): f.write(f" {k}: {v}\n") f.write("\n") return results
[docs] def run_evaluation(config): """Execute Phase 3: evaluate ancestral calls for every chromosome. Writes per-chromosome summary files to ``<output_dir>/evaluation/``. """ chromosomes = config.resolve_chromosomes() out_dir = Path(config.output_dir) / "evaluation" out_dir.mkdir(parents=True, exist_ok=True) eval_cfg = config.evaluation tasks = [] for chrom in chromosomes: anc_path = str(Path(config.output_dir) / f"{chrom}.fa") ref_path = None if eval_cfg and eval_cfg.reference_dir: ref_path = str( Path(eval_cfg.reference_dir) / _format_pattern(eval_cfg.reference_pattern, chrom) ) vcf_path = None if eval_cfg and eval_cfg.vcf_dir: vcf_path = str( Path(eval_cfg.vcf_dir) / _format_pattern(eval_cfg.vcf_pattern, chrom) ) summary_path = str(out_dir / f"{chrom}.evaluation.txt") tasks.append((chrom, anc_path, ref_path, vcf_path, summary_path)) logger.info("Phase 3: evaluating %d chromosomes", len(tasks)) all_results = [] with ProcessPoolExecutor(max_workers=config.num_cpus) as pool: futures = {pool.submit(_evaluate_chromosome, t): t for t in tasks} for future in as_completed(futures): result = future.result() all_results.append(result) logger.info(" Completed %s", result["chromosome"]) logger.info("Phase 3 complete.") return all_results